From f950193d74d13628537a04728eef25cec642e41b Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 17 May 2019 11:36:25 -0700 Subject: [PATCH] Remove the unstable and deprecated mpsc_select This removes macro `select!` and `std::sync::mpsc::{Handle, Select}`, which were all unstable and have been deprecated since 1.32. --- src/libstd/macros.rs | 55 --- src/libstd/sync/mpsc/mod.rs | 85 +--- src/libstd/sync/mpsc/oneshot.rs | 72 --- src/libstd/sync/mpsc/select.rs | 352 --------------- src/libstd/sync/mpsc/select_tests.rs | 413 ------------------ src/libstd/sync/mpsc/shared.rs | 35 +- src/libstd/sync/mpsc/stream.rs | 53 --- src/libstd/sync/mpsc/sync.rs | 37 -- src/test/run-pass/issues/issue-13494.rs | 35 -- .../run-pass/macros/macro-comma-support.rs | 2 - 10 files changed, 9 insertions(+), 1130 deletions(-) delete mode 100644 src/libstd/sync/mpsc/select.rs delete mode 100644 src/libstd/sync/mpsc/select_tests.rs delete mode 100644 src/test/run-pass/issues/issue-13494.rs diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 03aebeda47c..9af7bba97aa 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -357,61 +357,6 @@ macro_rules! dbg { }; } -/// Selects the first successful receive event from a number of receivers. -/// -/// This macro is used to wait for the first event to occur on a number of -/// receivers. It places no restrictions on the types of receivers given to -/// this macro, this can be viewed as a heterogeneous select. -/// -/// # Examples -/// -/// ``` -/// #![feature(mpsc_select)] -/// -/// use std::thread; -/// use std::sync::mpsc; -/// -/// // two placeholder functions for now -/// fn long_running_thread() {} -/// fn calculate_the_answer() -> u32 { 42 } -/// -/// let (tx1, rx1) = mpsc::channel(); -/// let (tx2, rx2) = mpsc::channel(); -/// -/// thread::spawn(move|| { long_running_thread(); tx1.send(()).unwrap(); }); -/// thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); }); -/// -/// select! { -/// _ = rx1.recv() => println!("the long running thread finished first"), -/// answer = rx2.recv() => { -/// println!("the answer was: {}", answer.unwrap()); -/// } -/// } -/// # drop(rx1.recv()); -/// # drop(rx2.recv()); -/// ``` -/// -/// For more information about select, see the `std::sync::mpsc::Select` structure. -#[macro_export] -#[unstable(feature = "mpsc_select", issue = "27800")] -#[rustc_deprecated(since = "1.32.0", - reason = "channel selection will be removed in a future release")] -macro_rules! select { - ( - $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ - ) => ({ - use $crate::sync::mpsc::Select; - let sel = Select::new(); - $( let mut $rx = sel.handle(&$rx); )+ - unsafe { - $( $rx.add(); )+ - } - let ret = sel.wait(); - $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ - { unreachable!() } - }) -} - #[cfg(test)] macro_rules! assert_approx_eq { ($a:expr, $b:expr) => ({ diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 04353fde1b4..69ecd201063 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -116,7 +116,6 @@ //! ``` #![stable(feature = "rust1", since = "1.0.0")] -#![allow(deprecated)] // for mpsc_select // A description of how Rust's channel implementation works // @@ -263,6 +262,8 @@ // believe that there is anything fundamental that needs to change about these // channels, however, in order to support a more efficient select(). // +// FIXME: Select is now removed, so these factors are ready to be cleaned up! +// // # Conclusion // // And now that you've seen all the races that I found and attempted to fix, @@ -275,18 +276,8 @@ use crate::cell::UnsafeCell; use crate::time::{Duration, Instant}; -#[unstable(feature = "mpsc_select", issue = "27800")] -pub use self::select::{Select, Handle}; -use self::select::StartResult; -use self::select::StartResult::*; -use self::blocking::SignalToken; - -#[cfg(all(test, not(target_os = "emscripten")))] -mod select_tests; - mod blocking; mod oneshot; -mod select; mod shared; mod stream; mod sync; @@ -1514,78 +1505,6 @@ pub fn try_iter(&self) -> TryIter<'_, T> { } -impl select::Packet for Receiver { - fn can_recv(&self) -> bool { - loop { - let new_port = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.can_recv() { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Flavor::Stream(ref p) => { - match p.can_recv() { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Flavor::Shared(ref p) => return p.can_recv(), - Flavor::Sync(ref p) => return p.can_recv(), - }; - unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); - } - } - } - - fn start_selection(&self, mut token: SignalToken) -> StartResult { - loop { - let (t, new_port) = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.start_selection(token) { - oneshot::SelSuccess => return Installed, - oneshot::SelCanceled => return Abort, - oneshot::SelUpgraded(t, rx) => (t, rx), - } - } - Flavor::Stream(ref p) => { - match p.start_selection(token) { - stream::SelSuccess => return Installed, - stream::SelCanceled => return Abort, - stream::SelUpgraded(t, rx) => (t, rx), - } - } - Flavor::Shared(ref p) => return p.start_selection(token), - Flavor::Sync(ref p) => return p.start_selection(token), - }; - token = t; - unsafe { - mem::swap(self.inner_mut(), new_port.inner_mut()); - } - } - } - - fn abort_selection(&self) -> bool { - let mut was_upgrade = false; - loop { - let result = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => p.abort_selection(), - Flavor::Stream(ref p) => p.abort_selection(was_upgrade), - Flavor::Shared(ref p) => return p.abort_selection(was_upgrade), - Flavor::Sync(ref p) => return p.abort_selection(), - }; - let new_port = match result { Ok(b) => return b, Err(p) => p }; - was_upgrade = true; - unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); - } - } - } -} - #[stable(feature = "rust1", since = "1.0.0")] impl<'a, T> Iterator for Iter<'a, T> { type Item = T; diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs index 5c516d5de0f..e7a5cc46b31 100644 --- a/src/libstd/sync/mpsc/oneshot.rs +++ b/src/libstd/sync/mpsc/oneshot.rs @@ -24,7 +24,6 @@ pub use self::Failure::*; pub use self::UpgradeResult::*; -pub use self::SelectionResult::*; use self::MyUpgrade::*; use crate::sync::mpsc::Receiver; @@ -66,12 +65,6 @@ pub enum UpgradeResult { UpWoke(SignalToken), } -pub enum SelectionResult { - SelCanceled, - SelUpgraded(SignalToken, Receiver), - SelSuccess, -} - enum MyUpgrade { NothingSent, SendUsed, @@ -264,71 +257,6 @@ pub fn drop_port(&self) { // select implementation //////////////////////////////////////////////////////////////////////////// - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&self) -> Result> { - unsafe { - match self.state.load(Ordering::SeqCst) { - EMPTY => Ok(false), // Welp, we tried - DATA => Ok(true), // we have some un-acquired data - DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data - DISCONNECTED => { - match ptr::replace(self.upgrade.get(), SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => Err(upgrade), - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { ptr::write(self.upgrade.get(), up); Ok(true) } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - } - - // Attempts to start selection on this port. This can either succeed, fail - // because there is data, or fail because there is an upgrade pending. - pub fn start_selection(&self, token: SignalToken) -> SelectionResult { - unsafe { - let ptr = token.cast_to_usize(); - match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { - EMPTY => SelSuccess, - DATA => { - drop(SignalToken::cast_from_usize(ptr)); - SelCanceled - } - DISCONNECTED if (*self.data.get()).is_some() => { - drop(SignalToken::cast_from_usize(ptr)); - SelCanceled - } - DISCONNECTED => { - match ptr::replace(self.upgrade.get(), SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => { - SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade) - } - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { - ptr::write(self.upgrade.get(), up); - drop(SignalToken::cast_from_usize(ptr)); - SelCanceled - } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - } - // Remove a previous selecting thread from this port. This ensures that the // blocked thread will no longer be visible to any other threads. // diff --git a/src/libstd/sync/mpsc/select.rs b/src/libstd/sync/mpsc/select.rs deleted file mode 100644 index d1b5f2deccc..00000000000 --- a/src/libstd/sync/mpsc/select.rs +++ /dev/null @@ -1,352 +0,0 @@ -//! Selection over an array of receivers -//! -//! This module contains the implementation machinery necessary for selecting -//! over a number of receivers. One large goal of this module is to provide an -//! efficient interface to selecting over any receiver of any type. -//! -//! This is achieved through an architecture of a "receiver set" in which -//! receivers are added to a set and then the entire set is waited on at once. -//! The set can be waited on multiple times to prevent re-adding each receiver -//! to the set. -//! -//! Usage of this module is currently encouraged to go through the use of the -//! `select!` macro. This macro allows naturally binding of variables to the -//! received values of receivers in a much more natural syntax then usage of the -//! `Select` structure directly. -//! -//! # Examples -//! -//! ```rust -//! #![feature(mpsc_select)] -//! -//! use std::sync::mpsc::channel; -//! -//! let (tx1, rx1) = channel(); -//! let (tx2, rx2) = channel(); -//! -//! tx1.send(1).unwrap(); -//! tx2.send(2).unwrap(); -//! -//! select! { -//! val = rx1.recv() => { -//! assert_eq!(val.unwrap(), 1); -//! }, -//! val = rx2.recv() => { -//! assert_eq!(val.unwrap(), 2); -//! } -//! } -//! ``` - -#![allow(dead_code)] -#![unstable(feature = "mpsc_select", - reason = "This implementation, while likely sufficient, is unsafe and \ - likely to be error prone. At some point in the future this \ - module will be removed.", - issue = "27800")] -#![rustc_deprecated(since = "1.32.0", - reason = "channel selection will be removed in a future release")] - -use core::cell::{Cell, UnsafeCell}; -use core::marker; -use core::ptr; -use core::usize; - -use crate::fmt; -use crate::sync::mpsc::{Receiver, RecvError}; -use crate::sync::mpsc::blocking::{self, SignalToken}; - -/// The "receiver set" of the select interface. This structure is used to manage -/// a set of receivers which are being selected over. -pub struct Select { - inner: UnsafeCell, - next_id: Cell, -} - -struct SelectInner { - head: *mut Handle<'static, ()>, - tail: *mut Handle<'static, ()>, -} - -impl !marker::Send for Select {} - -/// A handle to a receiver which is currently a member of a `Select` set of -/// receivers. This handle is used to keep the receiver in the set as well as -/// interact with the underlying receiver. -pub struct Handle<'rx, T:Send+'rx> { - /// The ID of this handle, used to compare against the return value of - /// `Select::wait()`. - id: usize, - selector: *mut SelectInner, - next: *mut Handle<'static, ()>, - prev: *mut Handle<'static, ()>, - added: bool, - packet: &'rx (dyn Packet+'rx), - - // due to our fun transmutes, we be sure to place this at the end. (nothing - // previous relies on T) - rx: &'rx Receiver, -} - -struct Packets { cur: *mut Handle<'static, ()> } - -#[doc(hidden)] -#[derive(PartialEq, Eq)] -pub enum StartResult { - Installed, - Abort, -} - -#[doc(hidden)] -pub trait Packet { - fn can_recv(&self) -> bool; - fn start_selection(&self, token: SignalToken) -> StartResult; - fn abort_selection(&self) -> bool; -} - -impl Select { - /// Creates a new selection structure. This set is initially empty. - /// - /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through - /// the `select!` macro. - /// - /// # Examples - /// - /// ``` - /// #![feature(mpsc_select)] - /// - /// use std::sync::mpsc::Select; - /// - /// let select = Select::new(); - /// ``` - pub fn new() -> Select { - Select { - inner: UnsafeCell::new(SelectInner { - head: ptr::null_mut(), - tail: ptr::null_mut(), - }), - next_id: Cell::new(1), - } - } - - /// Creates a new handle into this receiver set for a new receiver. Note - /// that this does *not* add the receiver to the receiver set, for that you - /// must call the `add` method on the handle itself. - pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver) -> Handle<'a, T> { - let id = self.next_id.get(); - self.next_id.set(id + 1); - Handle { - id, - selector: self.inner.get(), - next: ptr::null_mut(), - prev: ptr::null_mut(), - added: false, - rx, - packet: rx, - } - } - - /// Waits for an event on this receiver set. The returned value is *not* an - /// index, but rather an ID. This ID can be queried against any active - /// `Handle` structures (each one has an `id` method). The handle with - /// the matching `id` will have some sort of event available on it. The - /// event could either be that data is available or the corresponding - /// channel has been closed. - pub fn wait(&self) -> usize { - self.wait2(true) - } - - /// Helper method for skipping the preflight checks during testing - pub(super) fn wait2(&self, do_preflight_checks: bool) -> usize { - // Note that this is currently an inefficient implementation. We in - // theory have knowledge about all receivers in the set ahead of time, - // so this method shouldn't really have to iterate over all of them yet - // again. The idea with this "receiver set" interface is to get the - // interface right this time around, and later this implementation can - // be optimized. - // - // This implementation can be summarized by: - // - // fn select(receivers) { - // if any receiver ready { return ready index } - // deschedule { - // block on all receivers - // } - // unblock on all receivers - // return ready index - // } - // - // Most notably, the iterations over all of the receivers shouldn't be - // necessary. - unsafe { - // Stage 1: preflight checks. Look for any packets ready to receive - if do_preflight_checks { - for handle in self.iter() { - if (*handle).packet.can_recv() { - return (*handle).id(); - } - } - } - - // Stage 2: begin the blocking process - // - // Create a number of signal tokens, and install each one - // sequentially until one fails. If one fails, then abort the - // selection on the already-installed tokens. - let (wait_token, signal_token) = blocking::tokens(); - for (i, handle) in self.iter().enumerate() { - match (*handle).packet.start_selection(signal_token.clone()) { - StartResult::Installed => {} - StartResult::Abort => { - // Go back and abort the already-begun selections - for handle in self.iter().take(i) { - (*handle).packet.abort_selection(); - } - return (*handle).id; - } - } - } - - // Stage 3: no messages available, actually block - wait_token.wait(); - - // Stage 4: there *must* be message available; find it. - // - // Abort the selection process on each receiver. If the abort - // process returns `true`, then that means that the receiver is - // ready to receive some data. Note that this also means that the - // receiver may have yet to have fully read the `to_wake` field and - // woken us up (although the wakeup is guaranteed to fail). - // - // This situation happens in the window of where a sender invokes - // increment(), sees -1, and then decides to wake up the thread. After - // all this is done, the sending thread will set `selecting` to - // `false`. Until this is done, we cannot return. If we were to - // return, then a sender could wake up a receiver which has gone - // back to sleep after this call to `select`. - // - // Note that it is a "fairly small window" in which an increment() - // views that it should wake a thread up until the `selecting` bit - // is set to false. For now, the implementation currently just spins - // in a yield loop. This is very distasteful, but this - // implementation is already nowhere near what it should ideally be. - // A rewrite should focus on avoiding a yield loop, and for now this - // implementation is tying us over to a more efficient "don't - // iterate over everything every time" implementation. - let mut ready_id = usize::MAX; - for handle in self.iter() { - if (*handle).packet.abort_selection() { - ready_id = (*handle).id; - } - } - - // We must have found a ready receiver - assert!(ready_id != usize::MAX); - return ready_id; - } - } - - fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } } -} - -impl<'rx, T: Send> Handle<'rx, T> { - /// Retrieves the ID of this handle. - #[inline] - pub fn id(&self) -> usize { self.id } - - /// Blocks to receive a value on the underlying receiver, returning `Some` on - /// success or `None` if the channel disconnects. This function has the same - /// semantics as `Receiver.recv` - pub fn recv(&mut self) -> Result { self.rx.recv() } - - /// Adds this handle to the receiver set that the handle was created from. This - /// method can be called multiple times, but it has no effect if `add` was - /// called previously. - /// - /// This method is unsafe because it requires that the `Handle` is not moved - /// while it is added to the `Select` set. - pub unsafe fn add(&mut self) { - if self.added { return } - let selector = &mut *self.selector; - let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>; - - if selector.head.is_null() { - selector.head = me; - selector.tail = me; - } else { - (*me).prev = selector.tail; - assert!((*me).next.is_null()); - (*selector.tail).next = me; - selector.tail = me; - } - self.added = true; - } - - /// Removes this handle from the `Select` set. This method is unsafe because - /// it has no guarantee that the `Handle` was not moved since `add` was - /// called. - pub unsafe fn remove(&mut self) { - if !self.added { return } - - let selector = &mut *self.selector; - let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>; - - if self.prev.is_null() { - assert_eq!(selector.head, me); - selector.head = self.next; - } else { - (*self.prev).next = self.next; - } - if self.next.is_null() { - assert_eq!(selector.tail, me); - selector.tail = self.prev; - } else { - (*self.next).prev = self.prev; - } - - self.next = ptr::null_mut(); - self.prev = ptr::null_mut(); - - self.added = false; - } -} - -impl Drop for Select { - fn drop(&mut self) { - unsafe { - assert!((&*self.inner.get()).head.is_null()); - assert!((&*self.inner.get()).tail.is_null()); - } - } -} - -impl Drop for Handle<'_, T> { - fn drop(&mut self) { - unsafe { self.remove() } - } -} - -impl Iterator for Packets { - type Item = *mut Handle<'static, ()>; - - fn next(&mut self) -> Option<*mut Handle<'static, ()>> { - if self.cur.is_null() { - None - } else { - let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).next; } - ret - } - } -} - -impl fmt::Debug for Select { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Select").finish() - } -} - -impl fmt::Debug for Handle<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Handle").finish() - } -} diff --git a/src/libstd/sync/mpsc/select_tests.rs b/src/libstd/sync/mpsc/select_tests.rs deleted file mode 100644 index 18d93462c78..00000000000 --- a/src/libstd/sync/mpsc/select_tests.rs +++ /dev/null @@ -1,413 +0,0 @@ -#![allow(unused_imports)] - -/// This file exists to hack around https://github.com/rust-lang/rust/issues/47238 - -use crate::thread; -use crate::sync::mpsc::*; - -// Don't use the libstd version so we can pull in the right Select structure -// (std::comm points at the wrong one) -macro_rules! select { - ( - $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ - ) => ({ - let sel = Select::new(); - $( let mut $rx = sel.handle(&$rx); )+ - unsafe { - $( $rx.add(); )+ - } - let ret = sel.wait(); - $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ - { unreachable!() } - }) -} - -#[test] -fn smoke() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - tx1.send(1).unwrap(); - select! { - foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); }, - _bar = rx2.recv() => { panic!() } - } - tx2.send(2).unwrap(); - select! { - _foo = rx1.recv() => { panic!() }, - bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) } - } - drop(tx1); - select! { - foo = rx1.recv() => { assert!(foo.is_err()); }, - _bar = rx2.recv() => { panic!() } - } - drop(tx2); - select! { - bar = rx2.recv() => { assert!(bar.is_err()); } - } -} - -#[test] -fn smoke2() { - let (_tx1, rx1) = channel::(); - let (_tx2, rx2) = channel::(); - let (_tx3, rx3) = channel::(); - let (_tx4, rx4) = channel::(); - let (tx5, rx5) = channel::(); - tx5.send(4).unwrap(); - select! { - _foo = rx1.recv() => { panic!("1") }, - _foo = rx2.recv() => { panic!("2") }, - _foo = rx3.recv() => { panic!("3") }, - _foo = rx4.recv() => { panic!("4") }, - foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); } - } -} - -#[test] -fn closed() { - let (_tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - drop(tx2); - - select! { - _a1 = rx1.recv() => { panic!() }, - a2 = rx2.recv() => { assert!(a2.is_err()); } - } -} - -#[test] -fn unblocks() { - let (tx1, rx1) = channel::(); - let (_tx2, rx2) = channel::(); - let (tx3, rx3) = channel::(); - - let _t = thread::spawn(move|| { - for _ in 0..20 { thread::yield_now(); } - tx1.send(1).unwrap(); - rx3.recv().unwrap(); - for _ in 0..20 { thread::yield_now(); } - }); - - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - _b = rx2.recv() => { panic!() } - } - tx3.send(1).unwrap(); - select! { - a = rx1.recv() => { assert!(a.is_err()) }, - _b = rx2.recv() => { panic!() } - } -} - -#[test] -fn both_ready() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - let (tx3, rx3) = channel::<()>(); - - let _t = thread::spawn(move|| { - for _ in 0..20 { thread::yield_now(); } - tx1.send(1).unwrap(); - tx2.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } - } - select! { - a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, - a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } - } - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); - tx3.send(()).unwrap(); -} - -#[test] -fn stress() { - const AMT: i32 = 10000; - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - let (tx3, rx3) = channel::<()>(); - - let _t = thread::spawn(move|| { - for i in 0..AMT { - if i % 2 == 0 { - tx1.send(i).unwrap(); - } else { - tx2.send(i).unwrap(); - } - rx3.recv().unwrap(); - } - }); - - for i in 0..AMT { - select! { - i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, - i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } - } - tx3.send(()).unwrap(); - } -} - -#[allow(unused_must_use)] -#[test] -fn cloning() { - let (tx1, rx1) = channel::(); - let (_tx2, rx2) = channel::(); - let (tx3, rx3) = channel::<()>(); - - let _t = thread::spawn(move|| { - rx3.recv().unwrap(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); - tx1.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - tx3.send(()).unwrap(); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()).unwrap(); -} - -#[allow(unused_must_use)] -#[test] -fn cloning2() { - let (tx1, rx1) = channel::(); - let (_tx2, rx2) = channel::(); - let (tx3, rx3) = channel::<()>(); - - let _t = thread::spawn(move|| { - rx3.recv().unwrap(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); - tx1.send(2).unwrap(); - rx3.recv().unwrap(); - }); - - tx3.send(()).unwrap(); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()).unwrap(); -} - -#[test] -fn cloning3() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move|| { - let s = Select::new(); - let mut h1 = s.handle(&rx1); - let mut h2 = s.handle(&rx2); - unsafe { h2.add(); } - unsafe { h1.add(); } - assert_eq!(s.wait(), h2.id()); - tx3.send(()).unwrap(); - }); - - for _ in 0..1000 { thread::yield_now(); } - drop(tx1.clone()); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); -} - -#[test] -fn preflight1() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } -} - -#[test] -fn preflight2() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } -} - -#[test] -fn preflight3() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - select! { - _n = rx.recv() => {} - } -} - -#[test] -fn preflight4() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn preflight5() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - tx.send(()).unwrap(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn preflight6() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn preflight7() { - let (tx, rx) = channel::<()>(); - drop(tx); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn preflight8() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - drop(tx); - rx.recv().unwrap(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn preflight9() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()).unwrap(); - drop(tx); - rx.recv().unwrap(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id()); -} - -#[test] -fn oneshot_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - let _t = thread::spawn(move|| { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { thread::yield_now() } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); -} - -#[test] -fn stream_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - tx1.send(()).unwrap(); - tx1.send(()).unwrap(); - rx1.recv().unwrap(); - rx1.recv().unwrap(); - let _t = thread::spawn(move|| { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { thread::yield_now() } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); -} - -#[test] -fn shared_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - drop(tx1.clone()); - tx1.send(()).unwrap(); - rx1.recv().unwrap(); - let _t = thread::spawn(move|| { - select! { - _n = rx1.recv() => {} - } - tx2.send(()).unwrap(); - }); - - for _ in 0..100 { thread::yield_now() } - tx1.send(()).unwrap(); - rx2.recv().unwrap(); -} - -#[test] -fn sync1() { - let (tx, rx) = sync_channel::(1); - tx.send(1).unwrap(); - select! { - n = rx.recv() => { assert_eq!(n.unwrap(), 1); } - } -} - -#[test] -fn sync2() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move|| { - for _ in 0..100 { thread::yield_now() } - tx.send(1).unwrap(); - }); - select! { - n = rx.recv() => { assert_eq!(n.unwrap(), 1); } - } -} - -#[test] -fn sync3() { - let (tx1, rx1) = sync_channel::(0); - let (tx2, rx2): (Sender, Receiver) = channel(); - let _t = thread::spawn(move|| { tx1.send(1).unwrap(); }); - let _t = thread::spawn(move|| { tx2.send(2).unwrap(); }); - select! { - n = rx1.recv() => { - let n = n.unwrap(); - assert_eq!(n, 1); - assert_eq!(rx2.recv().unwrap(), 2); - }, - n = rx2.recv() => { - let n = n.unwrap(); - assert_eq!(n, 2); - assert_eq!(rx1.recv().unwrap(), 1); - } - } -} diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs index cc70a620365..dbcdcdac932 100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@ -9,6 +9,7 @@ /// channels are quite similar, and this is no coincidence! pub use self::Failure::*; +use self::StartResult::*; use core::cmp; use core::intrinsics::abort; @@ -19,8 +20,6 @@ use crate::sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::mpsc_queue as mpsc; -use crate::sync::mpsc::select::StartResult::*; -use crate::sync::mpsc::select::StartResult; use crate::sync::{Mutex, MutexGuard}; use crate::thread; use crate::time::Instant; @@ -57,6 +56,12 @@ pub enum Failure { Disconnected, } +#[derive(PartialEq, Eq)] +enum StartResult { + Installed, + Abort, +} + impl Packet { // Creation of a packet *must* be followed by a call to postinit_lock // and later by inherit_blocker @@ -394,16 +399,6 @@ fn take_to_wake(&self) -> SignalToken { // select implementation //////////////////////////////////////////////////////////////////////////// - // Helper function for select, tests whether this port can receive without - // blocking (obviously not an atomic decision). - // - // This is different than the stream version because there's no need to peek - // at the queue, we can just look at the local count. - pub fn can_recv(&self) -> bool { - let cnt = self.cnt.load(Ordering::SeqCst); - cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0 - } - // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { match self.cnt.fetch_add(amt, Ordering::SeqCst) { @@ -415,22 +410,6 @@ fn bump(&self, amt: isize) -> isize { } } - // Inserts the signal token for selection on this port, returning true if - // blocking should proceed. - // - // The code here is the same as in stream.rs, except that it doesn't need to - // peek at the channel to see if an upgrade is pending. - pub fn start_selection(&self, token: SignalToken) -> StartResult { - match self.decrement(token) { - Installed => Installed, - Abort => { - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - Abort - } - } - } - // Cancels a previous thread waiting on this port, returning whether there's // data on the port. // diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index 7ae6f68b514..40877282761 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -9,7 +9,6 @@ pub use self::Failure::*; pub use self::UpgradeResult::*; -pub use self::SelectionResult::*; use self::Message::*; use core::cmp; @@ -60,12 +59,6 @@ pub enum UpgradeResult { UpWoke(SignalToken), } -pub enum SelectionResult { - SelSuccess, - SelCanceled, - SelUpgraded(SignalToken, Receiver), -} - // Any message could contain an "upgrade request" to a new shared port, so the // internal queue it's a queue of T, but rather Message enum Message { @@ -338,27 +331,6 @@ pub fn drop_port(&self) { // select implementation //////////////////////////////////////////////////////////////////////////// - // Tests to see whether this port can receive without blocking. If Ok is - // returned, then that's the answer. If Err is returned, then the returned - // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&self) -> Result> { - // We peek at the queue to see if there's anything on it, and we use - // this return value to determine if we should pop from the queue and - // upgrade this channel immediately. If it looks like we've got an - // upgrade pending, then go through the whole recv rigamarole to update - // the internal state. - match self.queue.peek() { - Some(&mut GoUp(..)) => { - match self.recv(None) { - Err(Upgraded(port)) => Err(port), - _ => unreachable!(), - } - } - Some(..) => Ok(true), - None => Ok(false) - } - } - // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { @@ -370,31 +342,6 @@ fn bump(&self, amt: isize) -> isize { } } - // Attempts to start selecting on this port. Like a oneshot, this can fail - // immediately because of an upgrade. - pub fn start_selection(&self, token: SignalToken) -> SelectionResult { - match self.decrement(token) { - Ok(()) => SelSuccess, - Err(token) => { - let ret = match self.queue.peek() { - Some(&mut GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => SelUpgraded(token, port), - _ => unreachable!(), - } - } - Some(..) => SelCanceled, - None => SelCanceled, - }; - // Undo our decrement above, and we should be guaranteed that the - // previous value is positive because we're not going to sleep - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - ret - } - } - } - // Removes a previous thread from being blocked in this port pub fn abort_selection(&self, was_upgrade: bool) -> Result> { diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs index b2d9f4c6491..3c4f8e077c9 100644 --- a/src/libstd/sync/mpsc/sync.rs +++ b/src/libstd/sync/mpsc/sync.rs @@ -33,7 +33,6 @@ use crate::sync::atomic::{Ordering, AtomicUsize}; use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken}; -use crate::sync::mpsc::select::StartResult::{self, Installed, Abort}; use crate::sync::{Mutex, MutexGuard}; use crate::time::Instant; @@ -406,42 +405,6 @@ pub fn drop_port(&self) { while let Some(token) = queue.dequeue() { token.signal(); } waiter.map(|t| t.signal()); } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&self) -> bool { - let guard = self.lock.lock().unwrap(); - guard.disconnected || guard.buf.size() > 0 - } - - // Attempts to start selection on this port. This can either succeed or fail - // because there is data waiting. - pub fn start_selection(&self, token: SignalToken) -> StartResult { - let mut guard = self.lock.lock().unwrap(); - if guard.disconnected || guard.buf.size() > 0 { - Abort - } else { - match mem::replace(&mut guard.blocker, BlockedReceiver(token)) { - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - BlockedReceiver(..) => unreachable!(), - } - Installed - } - } - - // Remove a previous selecting thread from this port. This ensures that the - // blocked thread will no longer be visible to any other threads. - // - // The return value indicates whether there's data on this port. - pub fn abort_selection(&self) -> bool { - let mut guard = self.lock.lock().unwrap(); - abort_selection(&mut guard) - } } impl Drop for Packet { diff --git a/src/test/run-pass/issues/issue-13494.rs b/src/test/run-pass/issues/issue-13494.rs deleted file mode 100644 index 12be97702a9..00000000000 --- a/src/test/run-pass/issues/issue-13494.rs +++ /dev/null @@ -1,35 +0,0 @@ -// run-pass -#![allow(unused_must_use)] -// ignore-emscripten no threads support - -// This test may not always fail, but it can be flaky if the race it used to -// expose is still present. - -#![feature(mpsc_select)] -#![allow(deprecated)] - -use std::sync::mpsc::{channel, Sender, Receiver}; -use std::thread; - -fn helper(rx: Receiver>) { - for tx in rx.iter() { - let _ = tx.send(()); - } -} - -fn main() { - let (tx, rx) = channel(); - let t = thread::spawn(move|| { helper(rx) }); - let (snd, rcv) = channel::(); - for _ in 1..100000 { - snd.send(1).unwrap(); - let (tx2, rx2) = channel(); - tx.send(tx2).unwrap(); - select! { - _ = rx2.recv() => (), - _ = rcv.recv() => () - } - } - drop(tx); - t.join(); -} diff --git a/src/test/run-pass/macros/macro-comma-support.rs b/src/test/run-pass/macros/macro-comma-support.rs index 904f2e2c7fd..12a612c153a 100644 --- a/src/test/run-pass/macros/macro-comma-support.rs +++ b/src/test/run-pass/macros/macro-comma-support.rs @@ -233,8 +233,6 @@ fn println() { println!("hello {}", "world",); } -// select! is too troublesome and unlikely to be stabilized - // stringify! is N/A #[cfg(std)] -- 2.44.0