// And now that you've seen all the races that I found and attempted to fix,
// here's the code for you to find some more!
-use crate::sync::Arc;
+use crate::cell::UnsafeCell;
use crate::error;
use crate::fmt;
use crate::mem;
-use crate::cell::UnsafeCell;
+use crate::sync::Arc;
use crate::time::{Duration, Instant};
mod blocking;
+mod mpsc_queue;
mod oneshot;
mod shared;
+mod spsc_queue;
mod stream;
mod sync;
-mod mpsc_queue;
-mod spsc_queue;
mod cache_aligned;
// The receiver port can be sent from place to place, so long as it
// is not used to receive non-sendable things.
#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: Send> Send for Receiver<T> { }
+unsafe impl<T: Send> Send for Receiver<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for Receiver<T> { }
+impl<T> !Sync for Receiver<T> {}
/// An iterator over messages on a [`Receiver`], created by [`iter`].
///
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(Debug)]
pub struct Iter<'a, T: 'a> {
- rx: &'a Receiver<T>
+ rx: &'a Receiver<T>,
}
/// An iterator that attempts to yield all pending values for a [`Receiver`],
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
#[derive(Debug)]
pub struct TryIter<'a, T: 'a> {
- rx: &'a Receiver<T>
+ rx: &'a Receiver<T>,
}
/// An owning iterator over messages on a [`Receiver`],
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
#[derive(Debug)]
pub struct IntoIter<T> {
- rx: Receiver<T>
+ rx: Receiver<T>,
}
/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
// The send port can be sent from place to place, so long as it
// is not used to send non-sendable things.
#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: Send> Send for Sender<T> { }
+unsafe impl<T: Send> Send for Sender<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for Sender<T> { }
+impl<T> !Sync for Sender<T> {}
/// The sending-half of Rust's synchronous [`sync_channel`] type.
///
impl<T> Sender<T> {
fn new(inner: Flavor<T>) -> Sender<T> {
- Sender {
- inner: UnsafeCell::new(inner),
- }
+ Sender { inner: UnsafeCell::new(inner) }
}
/// Attempts to send a value on this channel, returning it back if it could
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
let sleeper = match p.upgrade(rx) {
- oneshot::UpSuccess |
- oneshot::UpDisconnected => None,
+ oneshot::UpSuccess | oneshot::UpDisconnected => None,
oneshot::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
let sleeper = match p.upgrade(rx) {
- stream::UpSuccess |
- stream::UpDisconnected => None,
+ stream::UpSuccess | stream::UpDisconnected => None,
stream::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
pub fn try_recv(&self) -> Result<T, TryRecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(oneshot::Empty) => return Err(TryRecvError::Empty),
- Err(oneshot::Disconnected) => {
- return Err(TryRecvError::Disconnected)
- }
- Err(oneshot::Upgraded(rx)) => rx,
- }
- }
- Flavor::Stream(ref p) => {
- match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(stream::Empty) => return Err(TryRecvError::Empty),
- Err(stream::Disconnected) => {
- return Err(TryRecvError::Disconnected)
- }
- Err(stream::Upgraded(rx)) => rx,
- }
- }
- Flavor::Shared(ref p) => {
- match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(shared::Empty) => return Err(TryRecvError::Empty),
- Err(shared::Disconnected) => {
- return Err(TryRecvError::Disconnected)
- }
- }
- }
- Flavor::Sync(ref p) => {
- match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(sync::Empty) => return Err(TryRecvError::Empty),
- Err(sync::Disconnected) => {
- return Err(TryRecvError::Disconnected)
- }
- }
- }
+ Flavor::Oneshot(ref p) => match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Empty) => return Err(TryRecvError::Empty),
+ Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
+ Err(oneshot::Upgraded(rx)) => rx,
+ },
+ Flavor::Stream(ref p) => match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(stream::Empty) => return Err(TryRecvError::Empty),
+ Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
+ Err(stream::Upgraded(rx)) => rx,
+ },
+ Flavor::Shared(ref p) => match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(shared::Empty) => return Err(TryRecvError::Empty),
+ Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
+ },
+ Flavor::Sync(ref p) => match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(sync::Empty) => return Err(TryRecvError::Empty),
+ Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
+ },
};
unsafe {
- mem::swap(self.inner_mut(),
- new_port.inner_mut());
+ mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}
pub fn recv(&self) -> Result<T, RecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(RecvError),
- Err(oneshot::Upgraded(rx)) => rx,
- Err(oneshot::Empty) => unreachable!(),
- }
- }
- Flavor::Stream(ref p) => {
- match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(RecvError),
- Err(stream::Upgraded(rx)) => rx,
- Err(stream::Empty) => unreachable!(),
- }
- }
- Flavor::Shared(ref p) => {
- match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(RecvError),
- Err(shared::Empty) => unreachable!(),
- }
- }
+ Flavor::Oneshot(ref p) => match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Disconnected) => return Err(RecvError),
+ Err(oneshot::Upgraded(rx)) => rx,
+ Err(oneshot::Empty) => unreachable!(),
+ },
+ Flavor::Stream(ref p) => match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(stream::Disconnected) => return Err(RecvError),
+ Err(stream::Upgraded(rx)) => rx,
+ Err(stream::Empty) => unreachable!(),
+ },
+ Flavor::Shared(ref p) => match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(shared::Disconnected) => return Err(RecvError),
+ Err(shared::Empty) => unreachable!(),
+ },
Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
};
unsafe {
loop {
let port_or_empty = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(Disconnected),
- Err(oneshot::Upgraded(rx)) => Some(rx),
- Err(oneshot::Empty) => None,
- }
- }
- Flavor::Stream(ref p) => {
- match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(Disconnected),
- Err(stream::Upgraded(rx)) => Some(rx),
- Err(stream::Empty) => None,
- }
- }
- Flavor::Shared(ref p) => {
- match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(Disconnected),
- Err(shared::Empty) => None,
- }
- }
- Flavor::Sync(ref p) => {
- match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(sync::Disconnected) => return Err(Disconnected),
- Err(sync::Empty) => None,
- }
- }
+ Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Disconnected) => return Err(Disconnected),
+ Err(oneshot::Upgraded(rx)) => Some(rx),
+ Err(oneshot::Empty) => None,
+ },
+ Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(stream::Disconnected) => return Err(Disconnected),
+ Err(stream::Upgraded(rx)) => Some(rx),
+ Err(stream::Empty) => None,
+ },
+ Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(shared::Disconnected) => return Err(Disconnected),
+ Err(shared::Empty) => None,
+ },
+ Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(sync::Disconnected) => return Err(Disconnected),
+ Err(sync::Empty) => None,
+ },
};
if let Some(new_port) = port_or_empty {
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter { rx: self }
}
-
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
- fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
+ fn next(&mut self) -> Option<T> {
+ self.rx.recv().ok()
+ }
}
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;
- fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
+ fn next(&mut self) -> Option<T> {
+ self.rx.try_recv().ok()
+ }
}
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
type Item = T;
type IntoIter = Iter<'a, T>;
- fn into_iter(self) -> Iter<'a, T> { self.iter() }
+ fn into_iter(self) -> Iter<'a, T> {
+ self.iter()
+ }
}
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
impl<T> Iterator for IntoIter<T> {
type Item = T;
- fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
+ fn next(&mut self) -> Option<T> {
+ self.rx.recv().ok()
+ }
}
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
-impl <T> IntoIterator for Receiver<T> {
+impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> error::Error for SendError<T> {
+ #[allow(deprecated)]
fn description(&self) -> &str {
"sending on a closed channel"
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
- TrySendError::Full(..) => {
- "sending on a full channel".fmt(f)
- }
- TrySendError::Disconnected(..) => {
- "sending on a closed channel".fmt(f)
- }
+ TrySendError::Full(..) => "sending on a full channel".fmt(f),
+ TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
}
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> error::Error for TrySendError<T> {
-
+ #[allow(deprecated)]
fn description(&self) -> &str {
match *self {
- TrySendError::Full(..) => {
- "sending on a full channel"
- }
- TrySendError::Disconnected(..) => {
- "sending on a closed channel"
- }
+ TrySendError::Full(..) => "sending on a full channel",
+ TrySendError::Disconnected(..) => "sending on a closed channel",
}
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl error::Error for RecvError {
-
+ #[allow(deprecated)]
fn description(&self) -> &str {
"receiving on a closed channel"
}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
- TryRecvError::Empty => {
- "receiving on an empty channel".fmt(f)
- }
- TryRecvError::Disconnected => {
- "receiving on a closed channel".fmt(f)
- }
+ TryRecvError::Empty => "receiving on an empty channel".fmt(f),
+ TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
}
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl error::Error for TryRecvError {
-
+ #[allow(deprecated)]
fn description(&self) -> &str {
match *self {
- TryRecvError::Empty => {
- "receiving on an empty channel"
- }
- TryRecvError::Disconnected => {
- "receiving on a closed channel"
- }
+ TryRecvError::Empty => "receiving on an empty channel",
+ TryRecvError::Disconnected => "receiving on a closed channel",
}
}
}
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
- RecvTimeoutError::Timeout => {
- "timed out waiting on channel".fmt(f)
- }
- RecvTimeoutError::Disconnected => {
- "channel is empty and sending half is closed".fmt(f)
- }
+ RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
+ RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
}
}
}
#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
impl error::Error for RecvTimeoutError {
+ #[allow(deprecated)]
fn description(&self) -> &str {
match *self {
- RecvTimeoutError::Timeout => {
- "timed out waiting on channel"
- }
- RecvTimeoutError::Disconnected => {
- "channel is empty and sending half is closed"
- }
+ RecvTimeoutError::Timeout => "timed out waiting on channel",
+ RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
}
}
}
#[test]
fn smoke_threads() {
let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
#[test]
fn port_gone_concurrent() {
let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap();
});
while tx.send(1).is_ok() {}
fn port_gone_concurrent_shared() {
let (tx, rx) = channel::<i32>();
let tx2 = tx.clone();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap();
});
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
#[test]
fn chan_gone_concurrent() {
let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
tx.send(1).unwrap();
tx.send(1).unwrap();
});
#[test]
fn stress() {
let (tx, rx) = channel::<i32>();
- let t = thread::spawn(move|| {
- for _ in 0..10000 { tx.send(1).unwrap(); }
+ let t = thread::spawn(move || {
+ for _ in 0..10000 {
+ tx.send(1).unwrap();
+ }
});
for _ in 0..10000 {
assert_eq!(rx.recv().unwrap(), 1);
const NTHREADS: u32 = 8;
let (tx, rx) = channel::<i32>();
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move|| {
- for _ in 0..AMT { tx.send(1).unwrap(); }
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ tx.send(1).unwrap();
+ }
});
}
drop(tx);
fn send_from_outside_runtime() {
let (tx1, rx1) = channel::<()>();
let (tx2, rx2) = channel::<i32>();
- let t1 = thread::spawn(move|| {
+ let t1 = thread::spawn(move || {
tx1.send(()).unwrap();
for _ in 0..40 {
assert_eq!(rx2.recv().unwrap(), 1);
}
});
rx1.recv().unwrap();
- let t2 = thread::spawn(move|| {
+ let t2 = thread::spawn(move || {
for _ in 0..40 {
tx2.send(1).unwrap();
}
#[test]
fn recv_from_outside_runtime() {
let (tx, rx) = channel::<i32>();
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
for _ in 0..40 {
assert_eq!(rx.recv().unwrap(), 1);
}
fn no_runtime() {
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<i32>();
- let t1 = thread::spawn(move|| {
+ let t1 = thread::spawn(move || {
assert_eq!(rx1.recv().unwrap(), 1);
tx2.send(2).unwrap();
});
- let t2 = thread::spawn(move|| {
+ let t2 = thread::spawn(move || {
tx1.send(1).unwrap();
assert_eq!(rx2.recv().unwrap(), 2);
});
#[test]
fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
- let res = thread::spawn(move|| {
+ let res = thread::spawn(move || {
let (tx, rx) = channel::<i32>();
drop(tx);
rx.recv().unwrap();
- }).join();
+ })
+ .join();
// What is our res?
assert!(res.is_err());
}
#[test]
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = channel::<Box<i32>>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
assert!(*rx.recv().unwrap() == 10);
});
#[test]
fn oneshot_multi_task_recv_then_close() {
let (tx, rx) = channel::<Box<i32>>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(tx);
});
- let res = thread::spawn(move|| {
+ let res = thread::spawn(move || {
assert!(*rx.recv().unwrap() == 10);
- }).join();
+ })
+ .join();
assert!(res.is_err());
}
fn oneshot_multi_thread_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(rx);
});
drop(tx);
fn oneshot_multi_thread_send_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(rx);
});
- let _ = thread::spawn(move|| {
+ let _ = thread::spawn(move || {
tx.send(1).unwrap();
- }).join();
+ })
+ .join();
}
}
fn oneshot_multi_thread_recv_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<i32>();
- thread::spawn(move|| {
- let res = thread::spawn(move|| {
+ thread::spawn(move || {
+ let res = thread::spawn(move || {
rx.recv().unwrap();
- }).join();
+ })
+ .join();
assert!(res.is_err());
});
- let _t = thread::spawn(move|| {
- thread::spawn(move|| {
+ let _t = thread::spawn(move || {
+ thread::spawn(move || {
drop(tx);
});
});
fn oneshot_multi_thread_send_recv_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = channel::<Box<isize>>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
tx.send(box 10).unwrap();
});
assert!(*rx.recv().unwrap() == 10);
recv(rx, 0);
fn send(tx: Sender<Box<i32>>, i: i32) {
- if i == 10 { return }
+ if i == 10 {
+ return;
+ }
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(box i).unwrap();
send(tx, i + 1);
});
}
fn recv(rx: Receiver<Box<i32>>, i: i32) {
- if i == 10 { return }
+ if i == 10 {
+ return;
+ }
- thread::spawn(move|| {
+ thread::spawn(move || {
assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
fn very_long_recv_timeout_wont_panic() {
let (tx, rx) = channel::<()>();
- let join_handle = thread::spawn(move || {
- rx.recv_timeout(Duration::from_secs(u64::max_value()))
- });
+ let join_handle =
+ thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value())));
thread::sleep(Duration::from_secs(1));
assert!(tx.send(()).is_ok());
assert_eq!(join_handle.join().unwrap(), Ok(()));
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = channel();
- for _ in 0..10000 { tx.send(()).unwrap(); }
- for _ in 0..10000 { rx.recv().unwrap(); }
+ for _ in 0..10000 {
+ tx.send(()).unwrap();
+ }
+ for _ in 0..10000 {
+ rx.recv().unwrap();
+ }
}
#[test]
let total = 5;
for _ in 0..total {
let tx = tx.clone();
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(()).unwrap();
});
}
- for _ in 0..total { rx.recv().unwrap(); }
+ for _ in 0..total {
+ rx.recv().unwrap();
+ }
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
tx.send(()).unwrap();
let total = stress_factor() + 100;
for _ in 0..total {
let tx = tx.clone();
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(()).unwrap();
});
}
let (tx, rx) = channel::<i32>();
let (total_tx, total_rx) = channel::<i32>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
let mut acc = 0;
for x in rx.iter() {
acc += x;
let (tx, rx) = channel::<i32>();
let (count_tx, count_rx) = channel();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
let mut count = 0;
for x in rx.iter() {
if count >= 3 {
let (response_tx, response_rx) = channel();
// Request `x`s until we have `6`.
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
let mut count = 0;
loop {
for x in response_rx.try_iter() {
#[test]
fn test_recv_into_iter_owned() {
let mut iter = {
- let (tx, rx) = channel::<i32>();
- tx.send(1).unwrap();
- tx.send(2).unwrap();
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
- rx.into_iter()
+ rx.into_iter()
};
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<()>();
let (tx3, rx3) = channel::<()>();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx2.recv().unwrap();
tx1.send(1).unwrap();
tx3.send(()).unwrap();
fn destroy_upgraded_shared_port_when_sender_still_active() {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap(); // wait on a oneshot
- drop(rx); // destroy a shared
+ drop(rx); // destroy a shared
tx2.send(()).unwrap();
});
// make sure the other thread has gone to sleep
- for _ in 0..5000 { thread::yield_now(); }
+ for _ in 0..5000 {
+ thread::yield_now();
+ }
// upgrade to a shared chan and send a message
let t = tx.clone();
#[test]
fn smoke_threads() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
#[test]
fn port_gone_concurrent() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap();
});
while tx.send(1).is_ok() {}
fn port_gone_concurrent_shared() {
let (tx, rx) = sync_channel::<i32>(0);
let tx2 = tx.clone();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap();
});
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
#[test]
fn chan_gone_concurrent() {
let (tx, rx) = sync_channel::<i32>(0);
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(1).unwrap();
tx.send(1).unwrap();
});
#[test]
fn stress() {
let (tx, rx) = sync_channel::<i32>(0);
- thread::spawn(move|| {
- for _ in 0..10000 { tx.send(1).unwrap(); }
+ thread::spawn(move || {
+ for _ in 0..10000 {
+ tx.send(1).unwrap();
+ }
});
for _ in 0..10000 {
assert_eq!(rx.recv().unwrap(), 1);
fn stress_recv_timeout_two_threads() {
let (tx, rx) = sync_channel::<i32>(0);
- thread::spawn(move|| {
- for _ in 0..10000 { tx.send(1).unwrap(); }
+ thread::spawn(move || {
+ for _ in 0..10000 {
+ tx.send(1).unwrap();
+ }
});
let mut recv_count = 0;
Ok(v) => {
assert_eq!(v, 1);
recv_count += 1;
- },
+ }
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
}
let (tx, rx) = sync_channel::<i32>(0);
let (dtx, drx) = sync_channel::<()>(0);
- thread::spawn(move|| {
+ thread::spawn(move || {
let mut recv_count = 0;
loop {
match rx.recv_timeout(Duration::from_millis(10)) {
Ok(v) => {
assert_eq!(v, 1);
recv_count += 1;
- },
+ }
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
}
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move|| {
- for _ in 0..AMT { tx.send(1).unwrap(); }
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ tx.send(1).unwrap();
+ }
});
}
let (tx, rx) = sync_channel::<i32>(0);
let (dtx, drx) = sync_channel::<()>(0);
- thread::spawn(move|| {
+ thread::spawn(move || {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move|| {
- for _ in 0..AMT { tx.send(1).unwrap(); }
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ tx.send(1).unwrap();
+ }
});
}
drop(tx);
#[test]
fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
- let res = thread::spawn(move|| {
+ let res = thread::spawn(move || {
let (tx, rx) = sync_channel::<i32>(0);
drop(tx);
rx.recv().unwrap();
- }).join();
+ })
+ .join();
// What is our res?
assert!(res.is_err());
}
#[test]
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
assert!(*rx.recv().unwrap() == 10);
});
#[test]
fn oneshot_multi_task_recv_then_close() {
let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(tx);
});
- let res = thread::spawn(move|| {
+ let res = thread::spawn(move || {
assert!(*rx.recv().unwrap() == 10);
- }).join();
+ })
+ .join();
assert!(res.is_err());
}
fn oneshot_multi_thread_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(rx);
});
drop(tx);
fn oneshot_multi_thread_send_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
drop(rx);
});
let _ = thread::spawn(move || {
tx.send(1).unwrap();
- }).join();
+ })
+ .join();
}
}
fn oneshot_multi_thread_recv_close_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
- let res = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
+ let res = thread::spawn(move || {
rx.recv().unwrap();
- }).join();
+ })
+ .join();
assert!(res.is_err());
});
- let _t = thread::spawn(move|| {
- thread::spawn(move|| {
+ let _t = thread::spawn(move || {
+ thread::spawn(move || {
drop(tx);
});
});
fn oneshot_multi_thread_send_recv_stress() {
for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
tx.send(box 10).unwrap();
});
assert!(*rx.recv().unwrap() == 10);
recv(rx, 0);
fn send(tx: SyncSender<Box<i32>>, i: i32) {
- if i == 10 { return }
+ if i == 10 {
+ return;
+ }
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(box i).unwrap();
send(tx, i + 1);
});
}
fn recv(rx: Receiver<Box<i32>>, i: i32) {
- if i == 10 { return }
+ if i == 10 {
+ return;
+ }
- thread::spawn(move|| {
+ thread::spawn(move || {
assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = sync_channel(10000);
- for _ in 0..10000 { tx.send(()).unwrap(); }
- for _ in 0..10000 { rx.recv().unwrap(); }
+ for _ in 0..10000 {
+ tx.send(()).unwrap();
+ }
+ for _ in 0..10000 {
+ rx.recv().unwrap();
+ }
}
#[test]
let total = stress_factor() + 100;
for _ in 0..total {
let tx = tx.clone();
- thread::spawn(move|| {
+ thread::spawn(move || {
tx.send(()).unwrap();
});
}
let (tx, rx) = sync_channel::<i32>(0);
let (total_tx, total_rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
let mut acc = 0;
for x in rx.iter() {
acc += x;
let (tx, rx) = sync_channel::<i32>(0);
let (count_tx, count_rx) = sync_channel(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
let mut count = 0;
for x in rx.iter() {
if count >= 3 {
let (tx1, rx1) = sync_channel::<i32>(1);
let (tx2, rx2) = sync_channel::<()>(1);
let (tx3, rx3) = sync_channel::<()>(1);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx2.recv().unwrap();
tx1.send(1).unwrap();
tx3.send(()).unwrap();
fn destroy_upgraded_shared_port_when_sender_still_active() {
let (tx, rx) = sync_channel::<()>(0);
let (tx2, rx2) = sync_channel::<()>(0);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx.recv().unwrap(); // wait on a oneshot
- drop(rx); // destroy a shared
+ drop(rx); // destroy a shared
tx2.send(()).unwrap();
});
// make sure the other thread has gone to sleep
- for _ in 0..5000 { thread::yield_now(); }
+ for _ in 0..5000 {
+ thread::yield_now();
+ }
// upgrade to a shared chan and send a message
let t = tx.clone();
#[test]
fn send1() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| { rx.recv().unwrap(); });
+ let _t = thread::spawn(move || {
+ rx.recv().unwrap();
+ });
assert_eq!(tx.send(1), Ok(()));
}
#[test]
fn send2() {
let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| { drop(rx); });
+ let _t = thread::spawn(move || {
+ drop(rx);
+ });
assert!(tx.send(1).is_err());
}
fn send3() {
let (tx, rx) = sync_channel::<i32>(1);
assert_eq!(tx.send(1), Ok(()));
- let _t =thread::spawn(move|| { drop(rx); });
+ let _t = thread::spawn(move || {
+ drop(rx);
+ });
assert!(tx.send(1).is_err());
}
let tx2 = tx.clone();
let (done, donerx) = channel();
let done2 = done.clone();
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
assert!(tx.send(1).is_err());
done.send(()).unwrap();
});
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
assert!(tx2.send(2).is_err());
done2.send(()).unwrap();
});
let (tx1, rx1) = sync_channel::<()>(3);
let (tx2, rx2) = sync_channel::<()>(3);
- let _t = thread::spawn(move|| {
+ let _t = thread::spawn(move || {
rx1.recv().unwrap();
tx2.try_send(()).unwrap();
});