256 lines
6.0 KiB
Rust
256 lines
6.0 KiB
Rust
|
#[cfg(feature = "async")]
|
||
|
use {
|
||
|
flume::*,
|
||
|
futures::{stream::FuturesUnordered, StreamExt, TryFutureExt},
|
||
|
async_std::prelude::FutureExt,
|
||
|
std::time::Duration,
|
||
|
};
|
||
|
use futures::{stream, Stream};
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[test]
|
||
|
fn stream_recv() {
|
||
|
let (tx, rx) = unbounded();
|
||
|
|
||
|
let t = std::thread::spawn(move || {
|
||
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
||
|
tx.send(42u32).unwrap();
|
||
|
println!("sent");
|
||
|
});
|
||
|
|
||
|
async_std::task::block_on(async {
|
||
|
println!("receiving...");
|
||
|
let x = rx.stream().next().await;
|
||
|
println!("received");
|
||
|
assert_eq!(x, Some(42));
|
||
|
});
|
||
|
|
||
|
t.join().unwrap();
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[test]
|
||
|
fn stream_recv_disconnect() {
|
||
|
let (tx, rx) = bounded::<i32>(0);
|
||
|
|
||
|
let t = std::thread::spawn(move || {
|
||
|
tx.send(42);
|
||
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
||
|
drop(tx)
|
||
|
});
|
||
|
|
||
|
async_std::task::block_on(async {
|
||
|
let mut stream = rx.into_stream();
|
||
|
assert_eq!(stream.next().await, Some(42));
|
||
|
assert_eq!(stream.next().await, None);
|
||
|
});
|
||
|
|
||
|
t.join().unwrap();
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[test]
|
||
|
fn stream_recv_drop_recv() {
|
||
|
let (tx, rx) = bounded::<i32>(10);
|
||
|
|
||
|
let rx2 = rx.clone();
|
||
|
let mut stream = rx.into_stream();
|
||
|
|
||
|
async_std::task::block_on(async {
|
||
|
let res = async_std::future::timeout(
|
||
|
std::time::Duration::from_millis(500),
|
||
|
stream.next()
|
||
|
).await;
|
||
|
|
||
|
assert!(res.is_err());
|
||
|
});
|
||
|
|
||
|
let t = std::thread::spawn(move || {
|
||
|
async_std::task::block_on(async {
|
||
|
rx2.stream().next().await
|
||
|
})
|
||
|
});
|
||
|
|
||
|
std::thread::sleep(std::time::Duration::from_millis(500));
|
||
|
|
||
|
tx.send(42).unwrap();
|
||
|
|
||
|
drop(stream);
|
||
|
|
||
|
assert_eq!(t.join().unwrap(), Some(42))
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[test]
|
||
|
fn r#stream_drop_send_disconnect() {
|
||
|
let (tx, rx) = bounded::<i32>(1);
|
||
|
|
||
|
let t = std::thread::spawn(move || {
|
||
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
||
|
drop(tx);
|
||
|
});
|
||
|
|
||
|
async_std::task::block_on(async {
|
||
|
let mut stream = rx.into_stream();
|
||
|
assert_eq!(stream.next().await, None);
|
||
|
});
|
||
|
|
||
|
t.join().unwrap();
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[async_std::test]
|
||
|
async fn stream_send_1_million_no_drop_or_reorder() {
|
||
|
#[derive(Debug)]
|
||
|
enum Message {
|
||
|
Increment {
|
||
|
old: u64,
|
||
|
},
|
||
|
ReturnCount,
|
||
|
}
|
||
|
|
||
|
let (tx, rx) = unbounded();
|
||
|
|
||
|
let t = async_std::task::spawn(async move {
|
||
|
let mut count = 0u64;
|
||
|
let mut stream = rx.into_stream();
|
||
|
|
||
|
while let Some(Message::Increment { old }) = stream.next().await {
|
||
|
assert_eq!(old, count);
|
||
|
count += 1;
|
||
|
}
|
||
|
|
||
|
count
|
||
|
});
|
||
|
|
||
|
for next in 0..1_000_000 {
|
||
|
tx.send(Message::Increment { old: next }).unwrap();
|
||
|
}
|
||
|
|
||
|
tx.send(Message::ReturnCount).unwrap();
|
||
|
|
||
|
let count = t.await;
|
||
|
assert_eq!(count, 1_000_000)
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[async_std::test]
|
||
|
async fn parallel_streams_and_async_recv() {
|
||
|
let (tx, rx) = flume::unbounded();
|
||
|
let rx = ℞
|
||
|
let send_fut = async move {
|
||
|
let n_sends: usize = 100000;
|
||
|
for _ in 0..n_sends {
|
||
|
tx.send_async(()).await.unwrap();
|
||
|
}
|
||
|
};
|
||
|
|
||
|
async_std::task::spawn(
|
||
|
send_fut
|
||
|
.timeout(Duration::from_secs(5))
|
||
|
.map_err(|_| panic!("Send timed out!"))
|
||
|
);
|
||
|
|
||
|
let mut futures_unordered = (0..250)
|
||
|
.map(|n| async move {
|
||
|
if n % 2 == 0 {
|
||
|
let mut stream = rx.stream();
|
||
|
while let Some(()) = stream.next().await {}
|
||
|
} else {
|
||
|
while let Ok(()) = rx.recv_async().await {}
|
||
|
}
|
||
|
|
||
|
})
|
||
|
.collect::<FuturesUnordered<_>>();
|
||
|
|
||
|
let recv_fut = async {
|
||
|
while futures_unordered.next().await.is_some() {}
|
||
|
};
|
||
|
|
||
|
recv_fut
|
||
|
.timeout(Duration::from_secs(5))
|
||
|
.map_err(|_| panic!("Receive timed out!"))
|
||
|
.await
|
||
|
.unwrap();
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[test]
|
||
|
fn stream_no_double_wake() {
|
||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||
|
use std::sync::Arc;
|
||
|
use std::pin::Pin;
|
||
|
use std::task::Context;
|
||
|
use futures::task::{waker, ArcWake};
|
||
|
use futures::Stream;
|
||
|
|
||
|
let count = Arc::new(AtomicUsize::new(0));
|
||
|
|
||
|
// all this waker does is count how many times it is called
|
||
|
struct CounterWaker {
|
||
|
count: Arc<AtomicUsize>,
|
||
|
}
|
||
|
|
||
|
impl ArcWake for CounterWaker {
|
||
|
fn wake_by_ref(arc_self: &Arc<Self>) {
|
||
|
arc_self.count.fetch_add(1, Ordering::SeqCst);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// create waker and context
|
||
|
let w = CounterWaker {
|
||
|
count: count.clone(),
|
||
|
};
|
||
|
let w = waker(Arc::new(w));
|
||
|
let cx = &mut Context::from_waker(&w);
|
||
|
|
||
|
// create unbounded channel
|
||
|
let (tx, rx) = unbounded::<()>();
|
||
|
let mut stream = rx.stream();
|
||
|
|
||
|
// register waker with stream
|
||
|
let _ = Pin::new(&mut stream).poll_next(cx);
|
||
|
|
||
|
// send multiple items
|
||
|
tx.send(()).unwrap();
|
||
|
tx.send(()).unwrap();
|
||
|
tx.send(()).unwrap();
|
||
|
|
||
|
// verify that stream is only woken up once.
|
||
|
assert_eq!(count.load(Ordering::SeqCst), 1);
|
||
|
}
|
||
|
|
||
|
#[cfg(feature = "async")]
|
||
|
#[async_std::test]
|
||
|
async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55
|
||
|
fn dummy_stream() -> impl Stream<Item = usize> {
|
||
|
stream::unfold(0, |count| async move {
|
||
|
if count < 1000 {
|
||
|
Some((count, count + 1))
|
||
|
} else {
|
||
|
None
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
let (send_task, recv_task) = {
|
||
|
use futures::SinkExt;
|
||
|
let (tx, rx) = flume::bounded(100);
|
||
|
|
||
|
let send_task = dummy_stream()
|
||
|
.map(|i| Ok(i))
|
||
|
.forward(tx.into_sink().sink_map_err(|e| {
|
||
|
panic!("send error:{:#?}", e)
|
||
|
}));
|
||
|
|
||
|
let recv_task = rx
|
||
|
.into_stream()
|
||
|
.for_each(|item| async move {});
|
||
|
(send_task, recv_task)
|
||
|
};
|
||
|
|
||
|
let jh = async_std::task::spawn(send_task);
|
||
|
async_std::task::block_on(recv_task);
|
||
|
jh.await.unwrap();
|
||
|
}
|