mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-01-26 06:09:49 +00:00
652 lines
17 KiB
Rust
652 lines
17 KiB
Rust
use futures::channel::{mpsc, oneshot};
|
|
use futures::executor::{block_on, block_on_stream};
|
|
use futures::future::{poll_fn, FutureExt};
|
|
use futures::pin_mut;
|
|
use futures::sink::{Sink, SinkExt};
|
|
use futures::stream::{Stream, StreamExt};
|
|
use futures::task::{Context, Poll};
|
|
use futures_test::task::{new_count_waker, noop_context};
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
|
|
trait AssertSend: Send {}
|
|
impl AssertSend for mpsc::Sender<i32> {}
|
|
impl AssertSend for mpsc::Receiver<i32> {}
|
|
|
|
#[test]
|
|
fn send_recv() {
|
|
let (mut tx, rx) = mpsc::channel::<i32>(16);
|
|
|
|
block_on(tx.send(1)).unwrap();
|
|
drop(tx);
|
|
let v: Vec<_> = block_on(rx.collect());
|
|
assert_eq!(v, vec![1]);
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_no_buffer() {
|
|
// Run on a task context
|
|
block_on(poll_fn(move |cx| {
|
|
let (tx, rx) = mpsc::channel::<i32>(0);
|
|
pin_mut!(tx, rx);
|
|
|
|
assert!(tx.as_mut().poll_flush(cx).is_ready());
|
|
assert!(tx.as_mut().poll_ready(cx).is_ready());
|
|
|
|
// Send first message
|
|
assert!(tx.as_mut().start_send(1).is_ok());
|
|
assert!(tx.as_mut().poll_ready(cx).is_pending());
|
|
|
|
// poll_ready said Pending, so no room in buffer, therefore new sends
|
|
// should get rejected with is_full.
|
|
assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
|
|
assert!(tx.as_mut().poll_ready(cx).is_pending());
|
|
|
|
// Take the value
|
|
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
|
|
assert!(tx.as_mut().poll_ready(cx).is_ready());
|
|
|
|
// Send second message
|
|
assert!(tx.as_mut().poll_ready(cx).is_ready());
|
|
assert!(tx.as_mut().start_send(2).is_ok());
|
|
assert!(tx.as_mut().poll_ready(cx).is_pending());
|
|
|
|
// Take the value
|
|
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
|
|
assert!(tx.as_mut().poll_ready(cx).is_ready());
|
|
|
|
Poll::Ready(())
|
|
}));
|
|
}
|
|
|
|
#[test]
|
|
fn send_shared_recv() {
|
|
let (mut tx1, rx) = mpsc::channel::<i32>(16);
|
|
let mut rx = block_on_stream(rx);
|
|
let mut tx2 = tx1.clone();
|
|
|
|
block_on(tx1.send(1)).unwrap();
|
|
assert_eq!(rx.next(), Some(1));
|
|
|
|
block_on(tx2.send(2)).unwrap();
|
|
assert_eq!(rx.next(), Some(2));
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_threads() {
|
|
let (mut tx, rx) = mpsc::channel::<i32>(16);
|
|
|
|
let t = thread::spawn(move || {
|
|
block_on(tx.send(1)).unwrap();
|
|
});
|
|
|
|
let v: Vec<_> = block_on(rx.take(1).collect());
|
|
assert_eq!(v, vec![1]);
|
|
|
|
t.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_threads_no_capacity() {
|
|
let (mut tx, rx) = mpsc::channel::<i32>(0);
|
|
|
|
let t = thread::spawn(move || {
|
|
block_on(tx.send(1)).unwrap();
|
|
block_on(tx.send(2)).unwrap();
|
|
});
|
|
|
|
let v: Vec<_> = block_on(rx.collect());
|
|
assert_eq!(v, vec![1, 2]);
|
|
|
|
t.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn recv_close_gets_none() {
|
|
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
|
|
|
|
// Run on a task context
|
|
block_on(poll_fn(move |cx| {
|
|
rx.close();
|
|
|
|
assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
|
|
match tx.poll_ready(cx) {
|
|
Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
|
|
Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
|
|
};
|
|
|
|
Poll::Ready(())
|
|
}));
|
|
}
|
|
|
|
#[test]
|
|
fn tx_close_gets_none() {
|
|
let (_, mut rx) = mpsc::channel::<i32>(10);
|
|
|
|
// Run on a task context
|
|
block_on(poll_fn(move |cx| {
|
|
assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
|
|
Poll::Ready(())
|
|
}));
|
|
}
|
|
|
|
// #[test]
|
|
// fn spawn_sends_items() {
|
|
// let core = local_executor::Core::new();
|
|
// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
|
|
// let rx = mpsc::spawn(stream, &core, 1);
|
|
// assert_eq!(core.run(rx.take(4).collect()).unwrap(),
|
|
// [0, 1, 2, 3]);
|
|
// }
|
|
|
|
// #[test]
|
|
// fn spawn_kill_dead_stream() {
|
|
// use std::thread;
|
|
// use std::time::Duration;
|
|
// use futures::future::Either;
|
|
// use futures::sync::oneshot;
|
|
//
|
|
// // a stream which never returns anything (maybe a remote end isn't
|
|
// // responding), but dropping it leads to observable side effects
|
|
// // (like closing connections, releasing limited resources, ...)
|
|
// #[derive(Debug)]
|
|
// struct Dead {
|
|
// // when dropped you should get Err(oneshot::Canceled) on the
|
|
// // receiving end
|
|
// done: oneshot::Sender<()>,
|
|
// }
|
|
// impl Stream for Dead {
|
|
// type Item = ();
|
|
// type Error = ();
|
|
//
|
|
// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
|
// Ok(Poll::Pending)
|
|
// }
|
|
// }
|
|
//
|
|
// // need to implement a timeout for the test, as it would hang
|
|
// // forever right now
|
|
// let (timeout_tx, timeout_rx) = oneshot::channel();
|
|
// thread::spawn(move || {
|
|
// thread::sleep(Duration::from_millis(1000));
|
|
// let _ = timeout_tx.send(());
|
|
// });
|
|
//
|
|
// let core = local_executor::Core::new();
|
|
// let (done_tx, done_rx) = oneshot::channel();
|
|
// let stream = Dead{done: done_tx};
|
|
// let rx = mpsc::spawn(stream, &core, 1);
|
|
// let res = core.run(
|
|
// Ok::<_, ()>(())
|
|
// .into_future()
|
|
// .then(move |_| {
|
|
// // now drop the spawned stream: maybe some timeout exceeded,
|
|
// // or some connection on this end was closed by the remote
|
|
// // end.
|
|
// drop(rx);
|
|
// // and wait for the spawned stream to release its resources
|
|
// done_rx
|
|
// })
|
|
// .select2(timeout_rx)
|
|
// );
|
|
// match res {
|
|
// Err(Either::A((oneshot::Canceled, _))) => (),
|
|
// _ => {
|
|
// panic!("dead stream wasn't canceled");
|
|
// },
|
|
// }
|
|
// }
|
|
|
|
#[test]
|
|
fn stress_shared_unbounded() {
|
|
#[cfg(miri)]
|
|
const AMT: u32 = 100;
|
|
#[cfg(not(miri))]
|
|
const AMT: u32 = 10000;
|
|
const NTHREADS: u32 = 8;
|
|
let (tx, rx) = mpsc::unbounded::<i32>();
|
|
|
|
let t = thread::spawn(move || {
|
|
let result: Vec<_> = block_on(rx.collect());
|
|
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
|
|
for item in result {
|
|
assert_eq!(item, 1);
|
|
}
|
|
});
|
|
|
|
for _ in 0..NTHREADS {
|
|
let tx = tx.clone();
|
|
|
|
thread::spawn(move || {
|
|
for _ in 0..AMT {
|
|
tx.unbounded_send(1).unwrap();
|
|
}
|
|
});
|
|
}
|
|
|
|
drop(tx);
|
|
|
|
t.join().ok().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn stress_shared_bounded_hard() {
|
|
#[cfg(miri)]
|
|
const AMT: u32 = 100;
|
|
#[cfg(not(miri))]
|
|
const AMT: u32 = 10000;
|
|
const NTHREADS: u32 = 8;
|
|
let (tx, rx) = mpsc::channel::<i32>(0);
|
|
|
|
let t = thread::spawn(move || {
|
|
let result: Vec<_> = block_on(rx.collect());
|
|
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
|
|
for item in result {
|
|
assert_eq!(item, 1);
|
|
}
|
|
});
|
|
|
|
for _ in 0..NTHREADS {
|
|
let mut tx = tx.clone();
|
|
|
|
thread::spawn(move || {
|
|
for _ in 0..AMT {
|
|
block_on(tx.send(1)).unwrap();
|
|
}
|
|
});
|
|
}
|
|
|
|
drop(tx);
|
|
|
|
t.join().unwrap();
|
|
}
|
|
|
|
#[allow(clippy::same_item_push)]
|
|
#[test]
|
|
fn stress_receiver_multi_task_bounded_hard() {
|
|
#[cfg(miri)]
|
|
const AMT: usize = 100;
|
|
#[cfg(not(miri))]
|
|
const AMT: usize = 10_000;
|
|
const NTHREADS: u32 = 2;
|
|
|
|
let (mut tx, rx) = mpsc::channel::<usize>(0);
|
|
let rx = Arc::new(Mutex::new(Some(rx)));
|
|
let n = Arc::new(AtomicUsize::new(0));
|
|
|
|
let mut th = vec![];
|
|
|
|
for _ in 0..NTHREADS {
|
|
let rx = rx.clone();
|
|
let n = n.clone();
|
|
|
|
let t = thread::spawn(move || {
|
|
let mut i = 0;
|
|
|
|
loop {
|
|
i += 1;
|
|
let mut rx_opt = rx.lock().unwrap();
|
|
if let Some(rx) = &mut *rx_opt {
|
|
if i % 5 == 0 {
|
|
let item = block_on(rx.next());
|
|
|
|
if item.is_none() {
|
|
*rx_opt = None;
|
|
break;
|
|
}
|
|
|
|
n.fetch_add(1, Ordering::Relaxed);
|
|
} else {
|
|
// Just poll
|
|
let n = n.clone();
|
|
match rx.poll_next_unpin(&mut noop_context()) {
|
|
Poll::Ready(Some(_)) => {
|
|
n.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
Poll::Ready(None) => {
|
|
*rx_opt = None;
|
|
break;
|
|
}
|
|
Poll::Pending => {}
|
|
}
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
th.push(t);
|
|
}
|
|
|
|
for i in 0..AMT {
|
|
block_on(tx.send(i)).unwrap();
|
|
}
|
|
drop(tx);
|
|
|
|
for t in th {
|
|
t.join().unwrap();
|
|
}
|
|
|
|
assert_eq!(AMT, n.load(Ordering::Relaxed));
|
|
}
|
|
|
|
/// Stress test that receiver properly receives all the messages
|
|
/// after sender dropped.
|
|
#[test]
|
|
fn stress_drop_sender() {
|
|
#[cfg(miri)]
|
|
const ITER: usize = 100;
|
|
#[cfg(not(miri))]
|
|
const ITER: usize = 10000;
|
|
|
|
fn list() -> impl Stream<Item = i32> {
|
|
let (tx, rx) = mpsc::channel(1);
|
|
thread::spawn(move || {
|
|
block_on(send_one_two_three(tx));
|
|
});
|
|
rx
|
|
}
|
|
|
|
for _ in 0..ITER {
|
|
let v: Vec<_> = block_on(list().collect());
|
|
assert_eq!(v, vec![1, 2, 3]);
|
|
}
|
|
}
|
|
|
|
async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
|
|
for i in 1..=3 {
|
|
tx.send(i).await.unwrap();
|
|
}
|
|
}
|
|
|
|
/// Stress test that after receiver dropped,
|
|
/// no messages are lost.
|
|
fn stress_close_receiver_iter() {
|
|
let (tx, rx) = mpsc::unbounded();
|
|
let mut rx = block_on_stream(rx);
|
|
let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
|
|
let th = thread::spawn(move || {
|
|
for i in 1.. {
|
|
if tx.unbounded_send(i).is_err() {
|
|
unwritten_tx.send(i).expect("unwritten_tx");
|
|
return;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Read one message to make sure thread effectively started
|
|
assert_eq!(Some(1), rx.next());
|
|
|
|
rx.close();
|
|
|
|
for i in 2.. {
|
|
match rx.next() {
|
|
Some(r) => assert!(i == r),
|
|
None => {
|
|
let unwritten = unwritten_rx.recv().expect("unwritten_rx");
|
|
assert_eq!(unwritten, i);
|
|
th.join().unwrap();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg_attr(miri, ignore)] // Miri is too slow
|
|
#[test]
|
|
fn stress_close_receiver() {
|
|
const ITER: usize = 10000;
|
|
|
|
for _ in 0..ITER {
|
|
stress_close_receiver_iter();
|
|
}
|
|
}
|
|
|
|
async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
|
|
for i in (1..=count).rev() {
|
|
sender.send(i).await.unwrap();
|
|
}
|
|
}
|
|
|
|
/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
|
|
#[allow(clippy::same_item_push)]
|
|
#[test]
|
|
fn stress_poll_ready() {
|
|
#[cfg(miri)]
|
|
const AMT: u32 = 100;
|
|
#[cfg(not(miri))]
|
|
const AMT: u32 = 1000;
|
|
const NTHREADS: u32 = 8;
|
|
|
|
/// Run a stress test using the specified channel capacity.
|
|
fn stress(capacity: usize) {
|
|
let (tx, rx) = mpsc::channel(capacity);
|
|
let mut threads = Vec::new();
|
|
for _ in 0..NTHREADS {
|
|
let sender = tx.clone();
|
|
threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
|
|
}
|
|
drop(tx);
|
|
|
|
let result: Vec<_> = block_on(rx.collect());
|
|
assert_eq!(result.len() as u32, AMT * NTHREADS);
|
|
|
|
for thread in threads {
|
|
thread.join().unwrap();
|
|
}
|
|
}
|
|
|
|
stress(0);
|
|
stress(1);
|
|
stress(8);
|
|
stress(16);
|
|
}
|
|
|
|
#[cfg_attr(miri, ignore)] // Miri is too slow
|
|
#[test]
|
|
fn try_send_1() {
|
|
const N: usize = 3000;
|
|
let (mut tx, rx) = mpsc::channel(0);
|
|
|
|
let t = thread::spawn(move || {
|
|
for i in 0..N {
|
|
loop {
|
|
if tx.try_send(i).is_ok() {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
let result: Vec<_> = block_on(rx.collect());
|
|
for (i, j) in result.into_iter().enumerate() {
|
|
assert_eq!(i, j);
|
|
}
|
|
|
|
t.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn try_send_2() {
|
|
let (mut tx, rx) = mpsc::channel(0);
|
|
let mut rx = block_on_stream(rx);
|
|
|
|
tx.try_send("hello").unwrap();
|
|
|
|
let (readytx, readyrx) = oneshot::channel::<()>();
|
|
|
|
let th = thread::spawn(move || {
|
|
block_on(poll_fn(|cx| {
|
|
assert!(tx.poll_ready(cx).is_pending());
|
|
Poll::Ready(())
|
|
}));
|
|
|
|
drop(readytx);
|
|
block_on(tx.send("goodbye")).unwrap();
|
|
});
|
|
|
|
let _ = block_on(readyrx);
|
|
assert_eq!(rx.next(), Some("hello"));
|
|
assert_eq!(rx.next(), Some("goodbye"));
|
|
assert_eq!(rx.next(), None);
|
|
|
|
th.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn try_send_fail() {
|
|
let (mut tx, rx) = mpsc::channel(0);
|
|
let mut rx = block_on_stream(rx);
|
|
|
|
tx.try_send("hello").unwrap();
|
|
|
|
// This should fail
|
|
assert!(tx.try_send("fail").is_err());
|
|
|
|
assert_eq!(rx.next(), Some("hello"));
|
|
|
|
tx.try_send("goodbye").unwrap();
|
|
drop(tx);
|
|
|
|
assert_eq!(rx.next(), Some("goodbye"));
|
|
assert_eq!(rx.next(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn try_send_recv() {
|
|
let (mut tx, mut rx) = mpsc::channel(1);
|
|
tx.try_send("hello").unwrap();
|
|
tx.try_send("hello").unwrap();
|
|
tx.try_send("hello").unwrap_err(); // should be full
|
|
rx.try_next().unwrap();
|
|
rx.try_next().unwrap();
|
|
rx.try_next().unwrap_err(); // should be empty
|
|
tx.try_send("hello").unwrap();
|
|
rx.try_next().unwrap();
|
|
rx.try_next().unwrap_err(); // should be empty
|
|
}
|
|
|
|
#[test]
|
|
fn same_receiver() {
|
|
let (mut txa1, _) = mpsc::channel::<i32>(1);
|
|
let txa2 = txa1.clone();
|
|
|
|
let (mut txb1, _) = mpsc::channel::<i32>(1);
|
|
let txb2 = txb1.clone();
|
|
|
|
assert!(txa1.same_receiver(&txa2));
|
|
assert!(txb1.same_receiver(&txb2));
|
|
assert!(!txa1.same_receiver(&txb1));
|
|
|
|
txa1.disconnect();
|
|
txb1.close_channel();
|
|
|
|
assert!(!txa1.same_receiver(&txa2));
|
|
assert!(txb1.same_receiver(&txb2));
|
|
}
|
|
|
|
#[test]
|
|
fn is_connected_to() {
|
|
let (txa, rxa) = mpsc::channel::<i32>(1);
|
|
let (txb, rxb) = mpsc::channel::<i32>(1);
|
|
|
|
assert!(txa.is_connected_to(&rxa));
|
|
assert!(txb.is_connected_to(&rxb));
|
|
assert!(!txa.is_connected_to(&rxb));
|
|
assert!(!txb.is_connected_to(&rxa));
|
|
}
|
|
|
|
#[test]
|
|
fn hash_receiver() {
|
|
use std::collections::hash_map::DefaultHasher;
|
|
use std::hash::Hasher;
|
|
|
|
let mut hasher_a1 = DefaultHasher::new();
|
|
let mut hasher_a2 = DefaultHasher::new();
|
|
let mut hasher_b1 = DefaultHasher::new();
|
|
let mut hasher_b2 = DefaultHasher::new();
|
|
let (mut txa1, _) = mpsc::channel::<i32>(1);
|
|
let txa2 = txa1.clone();
|
|
|
|
let (mut txb1, _) = mpsc::channel::<i32>(1);
|
|
let txb2 = txb1.clone();
|
|
|
|
txa1.hash_receiver(&mut hasher_a1);
|
|
let hash_a1 = hasher_a1.finish();
|
|
txa2.hash_receiver(&mut hasher_a2);
|
|
let hash_a2 = hasher_a2.finish();
|
|
txb1.hash_receiver(&mut hasher_b1);
|
|
let hash_b1 = hasher_b1.finish();
|
|
txb2.hash_receiver(&mut hasher_b2);
|
|
let hash_b2 = hasher_b2.finish();
|
|
|
|
assert_eq!(hash_a1, hash_a2);
|
|
assert_eq!(hash_b1, hash_b2);
|
|
assert!(hash_a1 != hash_b1);
|
|
|
|
txa1.disconnect();
|
|
txb1.close_channel();
|
|
|
|
let mut hasher_a1 = DefaultHasher::new();
|
|
let mut hasher_a2 = DefaultHasher::new();
|
|
let mut hasher_b1 = DefaultHasher::new();
|
|
let mut hasher_b2 = DefaultHasher::new();
|
|
|
|
txa1.hash_receiver(&mut hasher_a1);
|
|
let hash_a1 = hasher_a1.finish();
|
|
txa2.hash_receiver(&mut hasher_a2);
|
|
let hash_a2 = hasher_a2.finish();
|
|
txb1.hash_receiver(&mut hasher_b1);
|
|
let hash_b1 = hasher_b1.finish();
|
|
txb2.hash_receiver(&mut hasher_b2);
|
|
let hash_b2 = hasher_b2.finish();
|
|
|
|
assert!(hash_a1 != hash_a2);
|
|
assert_eq!(hash_b1, hash_b2);
|
|
}
|
|
|
|
#[test]
|
|
fn send_backpressure() {
|
|
let (waker, counter) = new_count_waker();
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
let (mut tx, mut rx) = mpsc::channel(1);
|
|
block_on(tx.send(1)).unwrap();
|
|
|
|
let mut task = tx.send(2);
|
|
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
|
|
assert_eq!(counter, 0);
|
|
|
|
let item = block_on(rx.next()).unwrap();
|
|
assert_eq!(item, 1);
|
|
assert_eq!(counter, 1);
|
|
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
|
|
|
|
let item = block_on(rx.next()).unwrap();
|
|
assert_eq!(item, 2);
|
|
}
|
|
|
|
#[test]
|
|
fn send_backpressure_multi_senders() {
|
|
let (waker, counter) = new_count_waker();
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
let (mut tx1, mut rx) = mpsc::channel(1);
|
|
let mut tx2 = tx1.clone();
|
|
block_on(tx1.send(1)).unwrap();
|
|
|
|
let mut task = tx2.send(2);
|
|
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
|
|
assert_eq!(counter, 0);
|
|
|
|
let item = block_on(rx.next()).unwrap();
|
|
assert_eq!(item, 1);
|
|
assert_eq!(counter, 1);
|
|
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
|
|
|
|
let item = block_on(rx.next()).unwrap();
|
|
assert_eq!(item, 2);
|
|
}
|