]> git.lizzy.rs Git - rust.git/commitdiff
Remove the unstable and deprecated mpsc_select
authorJosh Stone <jistone@redhat.com>
Fri, 17 May 2019 18:36:25 +0000 (11:36 -0700)
committerJosh Stone <jistone@redhat.com>
Fri, 17 May 2019 19:16:52 +0000 (12:16 -0700)
This removes macro `select!` and `std::sync::mpsc::{Handle, Select}`,
which were all unstable and have been deprecated since 1.32.

src/libstd/macros.rs
src/libstd/sync/mpsc/mod.rs
src/libstd/sync/mpsc/oneshot.rs
src/libstd/sync/mpsc/select.rs [deleted file]
src/libstd/sync/mpsc/select_tests.rs [deleted file]
src/libstd/sync/mpsc/shared.rs
src/libstd/sync/mpsc/stream.rs
src/libstd/sync/mpsc/sync.rs
src/test/run-pass/issues/issue-13494.rs [deleted file]
src/test/run-pass/macros/macro-comma-support.rs

index 03aebeda47c4851f707c2d2ffb39682e166c4dcf..9af7bba97aa58de06daab2106874f17909a16cd6 100644 (file)
@@ -357,61 +357,6 @@ macro_rules! dbg {
     };
 }
 
-/// Selects the first successful receive event from a number of receivers.
-///
-/// This macro is used to wait for the first event to occur on a number of
-/// receivers. It places no restrictions on the types of receivers given to
-/// this macro, this can be viewed as a heterogeneous select.
-///
-/// # Examples
-///
-/// ```
-/// #![feature(mpsc_select)]
-///
-/// use std::thread;
-/// use std::sync::mpsc;
-///
-/// // two placeholder functions for now
-/// fn long_running_thread() {}
-/// fn calculate_the_answer() -> u32 { 42 }
-///
-/// let (tx1, rx1) = mpsc::channel();
-/// let (tx2, rx2) = mpsc::channel();
-///
-/// thread::spawn(move|| { long_running_thread(); tx1.send(()).unwrap(); });
-/// thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); });
-///
-/// select! {
-///     _ = rx1.recv() => println!("the long running thread finished first"),
-///     answer = rx2.recv() => {
-///         println!("the answer was: {}", answer.unwrap());
-///     }
-/// }
-/// # drop(rx1.recv());
-/// # drop(rx2.recv());
-/// ```
-///
-/// For more information about select, see the `std::sync::mpsc::Select` structure.
-#[macro_export]
-#[unstable(feature = "mpsc_select", issue = "27800")]
-#[rustc_deprecated(since = "1.32.0",
-                   reason = "channel selection will be removed in a future release")]
-macro_rules! select {
-    (
-        $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
-    ) => ({
-        use $crate::sync::mpsc::Select;
-        let sel = Select::new();
-        $( let mut $rx = sel.handle(&$rx); )+
-        unsafe {
-            $( $rx.add(); )+
-        }
-        let ret = sel.wait();
-        $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
-        { unreachable!() }
-    })
-}
-
 #[cfg(test)]
 macro_rules! assert_approx_eq {
     ($a:expr, $b:expr) => ({
index 04353fde1b4d97620d17ad07702446d130dc5073..69ecd201063b034baf0f8d5df0074823a89ecda6 100644 (file)
 //! ```
 
 #![stable(feature = "rust1", since = "1.0.0")]
-#![allow(deprecated)] // for mpsc_select
 
 // A description of how Rust's channel implementation works
 //
 // believe that there is anything fundamental that needs to change about these
 // channels, however, in order to support a more efficient select().
 //
+// FIXME: Select is now removed, so these factors are ready to be cleaned up!
+//
 // # Conclusion
 //
 // And now that you've seen all the races that I found and attempted to fix,
 use crate::cell::UnsafeCell;
 use crate::time::{Duration, Instant};
 
-#[unstable(feature = "mpsc_select", issue = "27800")]
-pub use self::select::{Select, Handle};
-use self::select::StartResult;
-use self::select::StartResult::*;
-use self::blocking::SignalToken;
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod select_tests;
-
 mod blocking;
 mod oneshot;
-mod select;
 mod shared;
 mod stream;
 mod sync;
@@ -1514,78 +1505,6 @@ pub fn try_iter(&self) -> TryIter<'_, T> {
 
 }
 
-impl<T> select::Packet for Receiver<T> {
-    fn can_recv(&self) -> bool {
-        loop {
-            let new_port = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => {
-                    match p.can_recv() {
-                        Ok(ret) => return ret,
-                        Err(upgrade) => upgrade,
-                    }
-                }
-                Flavor::Stream(ref p) => {
-                    match p.can_recv() {
-                        Ok(ret) => return ret,
-                        Err(upgrade) => upgrade,
-                    }
-                }
-                Flavor::Shared(ref p) => return p.can_recv(),
-                Flavor::Sync(ref p) => return p.can_recv(),
-            };
-            unsafe {
-                mem::swap(self.inner_mut(),
-                          new_port.inner_mut());
-            }
-        }
-    }
-
-    fn start_selection(&self, mut token: SignalToken) -> StartResult {
-        loop {
-            let (t, new_port) = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => {
-                    match p.start_selection(token) {
-                        oneshot::SelSuccess => return Installed,
-                        oneshot::SelCanceled => return Abort,
-                        oneshot::SelUpgraded(t, rx) => (t, rx),
-                    }
-                }
-                Flavor::Stream(ref p) => {
-                    match p.start_selection(token) {
-                        stream::SelSuccess => return Installed,
-                        stream::SelCanceled => return Abort,
-                        stream::SelUpgraded(t, rx) => (t, rx),
-                    }
-                }
-                Flavor::Shared(ref p) => return p.start_selection(token),
-                Flavor::Sync(ref p) => return p.start_selection(token),
-            };
-            token = t;
-            unsafe {
-                mem::swap(self.inner_mut(), new_port.inner_mut());
-            }
-        }
-    }
-
-    fn abort_selection(&self) -> bool {
-        let mut was_upgrade = false;
-        loop {
-            let result = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => p.abort_selection(),
-                Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
-                Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
-                Flavor::Sync(ref p) => return p.abort_selection(),
-            };
-            let new_port = match result { Ok(b) => return b, Err(p) => p };
-            was_upgrade = true;
-            unsafe {
-                mem::swap(self.inner_mut(),
-                          new_port.inner_mut());
-            }
-        }
-    }
-}
-
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<'a, T> Iterator for Iter<'a, T> {
     type Item = T;
index 5c516d5de0f176ed735d8e2cc7a6de488d5e83c2..e7a5cc46b31a857912e23bc880fac2c97d4580e2 100644 (file)
@@ -24,7 +24,6 @@
 
 pub use self::Failure::*;
 pub use self::UpgradeResult::*;
-pub use self::SelectionResult::*;
 use self::MyUpgrade::*;
 
 use crate::sync::mpsc::Receiver;
@@ -66,12 +65,6 @@ pub enum UpgradeResult {
     UpWoke(SignalToken),
 }
 
-pub enum SelectionResult<T> {
-    SelCanceled,
-    SelUpgraded(SignalToken, Receiver<T>),
-    SelSuccess,
-}
-
 enum MyUpgrade<T> {
     NothingSent,
     SendUsed,
@@ -264,71 +257,6 @@ pub fn drop_port(&self) {
     // select implementation
     ////////////////////////////////////////////////////////////////////////////
 
-    // If Ok, the value is whether this port has data, if Err, then the upgraded
-    // port needs to be checked instead of this one.
-    pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
-        unsafe {
-            match self.state.load(Ordering::SeqCst) {
-                EMPTY => Ok(false), // Welp, we tried
-                DATA => Ok(true),   // we have some un-acquired data
-                DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
-                DISCONNECTED => {
-                    match ptr::replace(self.upgrade.get(), SendUsed) {
-                        // The other end sent us an upgrade, so we need to
-                        // propagate upwards whether the upgrade can receive
-                        // data
-                        GoUp(upgrade) => Err(upgrade),
-
-                        // If the other end disconnected without sending an
-                        // upgrade, then we have data to receive (the channel is
-                        // disconnected).
-                        up => { ptr::write(self.upgrade.get(), up); Ok(true) }
-                    }
-                }
-                _ => unreachable!(), // we're the "one blocker"
-            }
-        }
-    }
-
-    // Attempts to start selection on this port. This can either succeed, fail
-    // because there is data, or fail because there is an upgrade pending.
-    pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
-        unsafe {
-            let ptr = token.cast_to_usize();
-            match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
-                EMPTY => SelSuccess,
-                DATA => {
-                    drop(SignalToken::cast_from_usize(ptr));
-                    SelCanceled
-                }
-                DISCONNECTED if (*self.data.get()).is_some() => {
-                    drop(SignalToken::cast_from_usize(ptr));
-                    SelCanceled
-                }
-                DISCONNECTED => {
-                    match ptr::replace(self.upgrade.get(), SendUsed) {
-                        // The other end sent us an upgrade, so we need to
-                        // propagate upwards whether the upgrade can receive
-                        // data
-                        GoUp(upgrade) => {
-                            SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
-                        }
-
-                        // If the other end disconnected without sending an
-                        // upgrade, then we have data to receive (the channel is
-                        // disconnected).
-                        up => {
-                            ptr::write(self.upgrade.get(), up);
-                            drop(SignalToken::cast_from_usize(ptr));
-                            SelCanceled
-                        }
-                    }
-                }
-                _ => unreachable!(), // we're the "one blocker"
-            }
-        }
-    }
-
     // Remove a previous selecting thread from this port. This ensures that the
     // blocked thread will no longer be visible to any other threads.
     //
diff --git a/src/libstd/sync/mpsc/select.rs b/src/libstd/sync/mpsc/select.rs
deleted file mode 100644 (file)
index d1b5f2d..0000000
+++ /dev/null
@@ -1,352 +0,0 @@
-//! Selection over an array of receivers
-//!
-//! This module contains the implementation machinery necessary for selecting
-//! over a number of receivers. One large goal of this module is to provide an
-//! efficient interface to selecting over any receiver of any type.
-//!
-//! This is achieved through an architecture of a "receiver set" in which
-//! receivers are added to a set and then the entire set is waited on at once.
-//! The set can be waited on multiple times to prevent re-adding each receiver
-//! to the set.
-//!
-//! Usage of this module is currently encouraged to go through the use of the
-//! `select!` macro. This macro allows naturally binding of variables to the
-//! received values of receivers in a much more natural syntax then usage of the
-//! `Select` structure directly.
-//!
-//! # Examples
-//!
-//! ```rust
-//! #![feature(mpsc_select)]
-//!
-//! use std::sync::mpsc::channel;
-//!
-//! let (tx1, rx1) = channel();
-//! let (tx2, rx2) = channel();
-//!
-//! tx1.send(1).unwrap();
-//! tx2.send(2).unwrap();
-//!
-//! select! {
-//!     val = rx1.recv() => {
-//!         assert_eq!(val.unwrap(), 1);
-//!     },
-//!     val = rx2.recv() => {
-//!         assert_eq!(val.unwrap(), 2);
-//!     }
-//! }
-//! ```
-
-#![allow(dead_code)]
-#![unstable(feature = "mpsc_select",
-            reason = "This implementation, while likely sufficient, is unsafe and \
-                      likely to be error prone. At some point in the future this \
-                      module will be removed.",
-            issue = "27800")]
-#![rustc_deprecated(since = "1.32.0",
-                    reason = "channel selection will be removed in a future release")]
-
-use core::cell::{Cell, UnsafeCell};
-use core::marker;
-use core::ptr;
-use core::usize;
-
-use crate::fmt;
-use crate::sync::mpsc::{Receiver, RecvError};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-
-/// The "receiver set" of the select interface. This structure is used to manage
-/// a set of receivers which are being selected over.
-pub struct Select {
-    inner: UnsafeCell<SelectInner>,
-    next_id: Cell<usize>,
-}
-
-struct SelectInner {
-    head: *mut Handle<'static, ()>,
-    tail: *mut Handle<'static, ()>,
-}
-
-impl !marker::Send for Select {}
-
-/// A handle to a receiver which is currently a member of a `Select` set of
-/// receivers. This handle is used to keep the receiver in the set as well as
-/// interact with the underlying receiver.
-pub struct Handle<'rx, T:Send+'rx> {
-    /// The ID of this handle, used to compare against the return value of
-    /// `Select::wait()`.
-    id: usize,
-    selector: *mut SelectInner,
-    next: *mut Handle<'static, ()>,
-    prev: *mut Handle<'static, ()>,
-    added: bool,
-    packet: &'rx (dyn Packet+'rx),
-
-    // due to our fun transmutes, we be sure to place this at the end. (nothing
-    // previous relies on T)
-    rx: &'rx Receiver<T>,
-}
-
-struct Packets { cur: *mut Handle<'static, ()> }
-
-#[doc(hidden)]
-#[derive(PartialEq, Eq)]
-pub enum StartResult {
-    Installed,
-    Abort,
-}
-
-#[doc(hidden)]
-pub trait Packet {
-    fn can_recv(&self) -> bool;
-    fn start_selection(&self, token: SignalToken) -> StartResult;
-    fn abort_selection(&self) -> bool;
-}
-
-impl Select {
-    /// Creates a new selection structure. This set is initially empty.
-    ///
-    /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
-    /// the `select!` macro.
-    ///
-    /// # Examples
-    ///
-    /// ```
-    /// #![feature(mpsc_select)]
-    ///
-    /// use std::sync::mpsc::Select;
-    ///
-    /// let select = Select::new();
-    /// ```
-    pub fn new() -> Select {
-        Select {
-            inner: UnsafeCell::new(SelectInner {
-                head: ptr::null_mut(),
-                tail: ptr::null_mut(),
-            }),
-            next_id: Cell::new(1),
-        }
-    }
-
-    /// Creates a new handle into this receiver set for a new receiver. Note
-    /// that this does *not* add the receiver to the receiver set, for that you
-    /// must call the `add` method on the handle itself.
-    pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
-        let id = self.next_id.get();
-        self.next_id.set(id + 1);
-        Handle {
-            id,
-            selector: self.inner.get(),
-            next: ptr::null_mut(),
-            prev: ptr::null_mut(),
-            added: false,
-            rx,
-            packet: rx,
-        }
-    }
-
-    /// Waits for an event on this receiver set. The returned value is *not* an
-    /// index, but rather an ID. This ID can be queried against any active
-    /// `Handle` structures (each one has an `id` method). The handle with
-    /// the matching `id` will have some sort of event available on it. The
-    /// event could either be that data is available or the corresponding
-    /// channel has been closed.
-    pub fn wait(&self) -> usize {
-        self.wait2(true)
-    }
-
-    /// Helper method for skipping the preflight checks during testing
-    pub(super) fn wait2(&self, do_preflight_checks: bool) -> usize {
-        // Note that this is currently an inefficient implementation. We in
-        // theory have knowledge about all receivers in the set ahead of time,
-        // so this method shouldn't really have to iterate over all of them yet
-        // again. The idea with this "receiver set" interface is to get the
-        // interface right this time around, and later this implementation can
-        // be optimized.
-        //
-        // This implementation can be summarized by:
-        //
-        //      fn select(receivers) {
-        //          if any receiver ready { return ready index }
-        //          deschedule {
-        //              block on all receivers
-        //          }
-        //          unblock on all receivers
-        //          return ready index
-        //      }
-        //
-        // Most notably, the iterations over all of the receivers shouldn't be
-        // necessary.
-        unsafe {
-            // Stage 1: preflight checks. Look for any packets ready to receive
-            if do_preflight_checks {
-                for handle in self.iter() {
-                    if (*handle).packet.can_recv() {
-                        return (*handle).id();
-                    }
-                }
-            }
-
-            // Stage 2: begin the blocking process
-            //
-            // Create a number of signal tokens, and install each one
-            // sequentially until one fails. If one fails, then abort the
-            // selection on the already-installed tokens.
-            let (wait_token, signal_token) = blocking::tokens();
-            for (i, handle) in self.iter().enumerate() {
-                match (*handle).packet.start_selection(signal_token.clone()) {
-                    StartResult::Installed => {}
-                    StartResult::Abort => {
-                        // Go back and abort the already-begun selections
-                        for handle in self.iter().take(i) {
-                            (*handle).packet.abort_selection();
-                        }
-                        return (*handle).id;
-                    }
-                }
-            }
-
-            // Stage 3: no messages available, actually block
-            wait_token.wait();
-
-            // Stage 4: there *must* be message available; find it.
-            //
-            // Abort the selection process on each receiver. If the abort
-            // process returns `true`, then that means that the receiver is
-            // ready to receive some data. Note that this also means that the
-            // receiver may have yet to have fully read the `to_wake` field and
-            // woken us up (although the wakeup is guaranteed to fail).
-            //
-            // This situation happens in the window of where a sender invokes
-            // increment(), sees -1, and then decides to wake up the thread. After
-            // all this is done, the sending thread will set `selecting` to
-            // `false`. Until this is done, we cannot return. If we were to
-            // return, then a sender could wake up a receiver which has gone
-            // back to sleep after this call to `select`.
-            //
-            // Note that it is a "fairly small window" in which an increment()
-            // views that it should wake a thread up until the `selecting` bit
-            // is set to false. For now, the implementation currently just spins
-            // in a yield loop. This is very distasteful, but this
-            // implementation is already nowhere near what it should ideally be.
-            // A rewrite should focus on avoiding a yield loop, and for now this
-            // implementation is tying us over to a more efficient "don't
-            // iterate over everything every time" implementation.
-            let mut ready_id = usize::MAX;
-            for handle in self.iter() {
-                if (*handle).packet.abort_selection() {
-                    ready_id = (*handle).id;
-                }
-            }
-
-            // We must have found a ready receiver
-            assert!(ready_id != usize::MAX);
-            return ready_id;
-        }
-    }
-
-    fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
-}
-
-impl<'rx, T: Send> Handle<'rx, T> {
-    /// Retrieves the ID of this handle.
-    #[inline]
-    pub fn id(&self) -> usize { self.id }
-
-    /// Blocks to receive a value on the underlying receiver, returning `Some` on
-    /// success or `None` if the channel disconnects. This function has the same
-    /// semantics as `Receiver.recv`
-    pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
-
-    /// Adds this handle to the receiver set that the handle was created from. This
-    /// method can be called multiple times, but it has no effect if `add` was
-    /// called previously.
-    ///
-    /// This method is unsafe because it requires that the `Handle` is not moved
-    /// while it is added to the `Select` set.
-    pub unsafe fn add(&mut self) {
-        if self.added { return }
-        let selector = &mut *self.selector;
-        let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
-
-        if selector.head.is_null() {
-            selector.head = me;
-            selector.tail = me;
-        } else {
-            (*me).prev = selector.tail;
-            assert!((*me).next.is_null());
-            (*selector.tail).next = me;
-            selector.tail = me;
-        }
-        self.added = true;
-    }
-
-    /// Removes this handle from the `Select` set. This method is unsafe because
-    /// it has no guarantee that the `Handle` was not moved since `add` was
-    /// called.
-    pub unsafe fn remove(&mut self) {
-        if !self.added { return }
-
-        let selector = &mut *self.selector;
-        let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
-
-        if self.prev.is_null() {
-            assert_eq!(selector.head, me);
-            selector.head = self.next;
-        } else {
-            (*self.prev).next = self.next;
-        }
-        if self.next.is_null() {
-            assert_eq!(selector.tail, me);
-            selector.tail = self.prev;
-        } else {
-            (*self.next).prev = self.prev;
-        }
-
-        self.next = ptr::null_mut();
-        self.prev = ptr::null_mut();
-
-        self.added = false;
-    }
-}
-
-impl Drop for Select {
-    fn drop(&mut self) {
-        unsafe {
-            assert!((&*self.inner.get()).head.is_null());
-            assert!((&*self.inner.get()).tail.is_null());
-        }
-    }
-}
-
-impl<T: Send> Drop for Handle<'_, T> {
-    fn drop(&mut self) {
-        unsafe { self.remove() }
-    }
-}
-
-impl Iterator for Packets {
-    type Item = *mut Handle<'static, ()>;
-
-    fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
-        if self.cur.is_null() {
-            None
-        } else {
-            let ret = Some(self.cur);
-            unsafe { self.cur = (*self.cur).next; }
-            ret
-        }
-    }
-}
-
-impl fmt::Debug for Select {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("Select").finish()
-    }
-}
-
-impl<T: Send> fmt::Debug for Handle<'_, T> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("Handle").finish()
-    }
-}
diff --git a/src/libstd/sync/mpsc/select_tests.rs b/src/libstd/sync/mpsc/select_tests.rs
deleted file mode 100644 (file)
index 18d9346..0000000
+++ /dev/null
@@ -1,413 +0,0 @@
-#![allow(unused_imports)]
-
-/// This file exists to hack around https://github.com/rust-lang/rust/issues/47238
-
-use crate::thread;
-use crate::sync::mpsc::*;
-
-// Don't use the libstd version so we can pull in the right Select structure
-// (std::comm points at the wrong one)
-macro_rules! select {
-    (
-        $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
-    ) => ({
-        let sel = Select::new();
-        $( let mut $rx = sel.handle(&$rx); )+
-        unsafe {
-            $( $rx.add(); )+
-        }
-        let ret = sel.wait();
-        $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
-        { unreachable!() }
-    })
-}
-
-#[test]
-fn smoke() {
-    let (tx1, rx1) = channel::<i32>();
-    let (tx2, rx2) = channel::<i32>();
-    tx1.send(1).unwrap();
-    select! {
-        foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
-        _bar = rx2.recv() => { panic!() }
-    }
-    tx2.send(2).unwrap();
-    select! {
-        _foo = rx1.recv() => { panic!() },
-        bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
-    }
-    drop(tx1);
-    select! {
-        foo = rx1.recv() => { assert!(foo.is_err()); },
-        _bar = rx2.recv() => { panic!() }
-    }
-    drop(tx2);
-    select! {
-        bar = rx2.recv() => { assert!(bar.is_err()); }
-    }
-}
-
-#[test]
-fn smoke2() {
-    let (_tx1, rx1) = channel::<i32>();
-    let (_tx2, rx2) = channel::<i32>();
-    let (_tx3, rx3) = channel::<i32>();
-    let (_tx4, rx4) = channel::<i32>();
-    let (tx5, rx5) = channel::<i32>();
-    tx5.send(4).unwrap();
-    select! {
-        _foo = rx1.recv() => { panic!("1") },
-        _foo = rx2.recv() => { panic!("2") },
-        _foo = rx3.recv() => { panic!("3") },
-        _foo = rx4.recv() => { panic!("4") },
-        foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
-    }
-}
-
-#[test]
-fn closed() {
-    let (_tx1, rx1) = channel::<i32>();
-    let (tx2, rx2) = channel::<i32>();
-    drop(tx2);
-
-    select! {
-        _a1 = rx1.recv() => { panic!() },
-        a2 = rx2.recv() => { assert!(a2.is_err()); }
-    }
-}
-
-#[test]
-fn unblocks() {
-    let (tx1, rx1) = channel::<i32>();
-    let (_tx2, rx2) = channel::<i32>();
-    let (tx3, rx3) = channel::<i32>();
-
-    let _t = thread::spawn(move|| {
-        for _ in 0..20 { thread::yield_now(); }
-        tx1.send(1).unwrap();
-        rx3.recv().unwrap();
-        for _ in 0..20 { thread::yield_now(); }
-    });
-
-    select! {
-        a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
-        _b = rx2.recv() => { panic!() }
-    }
-    tx3.send(1).unwrap();
-    select! {
-        a = rx1.recv() => { assert!(a.is_err()) },
-        _b = rx2.recv() => { panic!() }
-    }
-}
-
-#[test]
-fn both_ready() {
-    let (tx1, rx1) = channel::<i32>();
-    let (tx2, rx2) = channel::<i32>();
-    let (tx3, rx3) = channel::<()>();
-
-    let _t = thread::spawn(move|| {
-        for _ in 0..20 { thread::yield_now(); }
-        tx1.send(1).unwrap();
-        tx2.send(2).unwrap();
-        rx3.recv().unwrap();
-    });
-
-    select! {
-        a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
-        a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
-    }
-    select! {
-        a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
-        a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
-    }
-    assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
-    assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
-    tx3.send(()).unwrap();
-}
-
-#[test]
-fn stress() {
-    const AMT: i32 = 10000;
-    let (tx1, rx1) = channel::<i32>();
-    let (tx2, rx2) = channel::<i32>();
-    let (tx3, rx3) = channel::<()>();
-
-    let _t = thread::spawn(move|| {
-        for i in 0..AMT {
-            if i % 2 == 0 {
-                tx1.send(i).unwrap();
-            } else {
-                tx2.send(i).unwrap();
-            }
-            rx3.recv().unwrap();
-        }
-    });
-
-    for i in 0..AMT {
-        select! {
-            i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
-            i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
-        }
-        tx3.send(()).unwrap();
-    }
-}
-
-#[allow(unused_must_use)]
-#[test]
-fn cloning() {
-    let (tx1, rx1) = channel::<i32>();
-    let (_tx2, rx2) = channel::<i32>();
-    let (tx3, rx3) = channel::<()>();
-
-    let _t = thread::spawn(move|| {
-        rx3.recv().unwrap();
-        tx1.clone();
-        assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
-        tx1.send(2).unwrap();
-        rx3.recv().unwrap();
-    });
-
-    tx3.send(()).unwrap();
-    select! {
-        _i1 = rx1.recv() => {},
-        _i2 = rx2.recv() => panic!()
-    }
-    tx3.send(()).unwrap();
-}
-
-#[allow(unused_must_use)]
-#[test]
-fn cloning2() {
-    let (tx1, rx1) = channel::<i32>();
-    let (_tx2, rx2) = channel::<i32>();
-    let (tx3, rx3) = channel::<()>();
-
-    let _t = thread::spawn(move|| {
-        rx3.recv().unwrap();
-        tx1.clone();
-        assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
-        tx1.send(2).unwrap();
-        rx3.recv().unwrap();
-    });
-
-    tx3.send(()).unwrap();
-    select! {
-        _i1 = rx1.recv() => {},
-        _i2 = rx2.recv() => panic!()
-    }
-    tx3.send(()).unwrap();
-}
-
-#[test]
-fn cloning3() {
-    let (tx1, rx1) = channel::<()>();
-    let (tx2, rx2) = channel::<()>();
-    let (tx3, rx3) = channel::<()>();
-    let _t = thread::spawn(move|| {
-        let s = Select::new();
-        let mut h1 = s.handle(&rx1);
-        let mut h2 = s.handle(&rx2);
-        unsafe { h2.add(); }
-        unsafe { h1.add(); }
-        assert_eq!(s.wait(), h2.id());
-        tx3.send(()).unwrap();
-    });
-
-    for _ in 0..1000 { thread::yield_now(); }
-    drop(tx1.clone());
-    tx2.send(()).unwrap();
-    rx3.recv().unwrap();
-}
-
-#[test]
-fn preflight1() {
-    let (tx, rx) = channel();
-    tx.send(()).unwrap();
-    select! {
-        _n = rx.recv() => {}
-    }
-}
-
-#[test]
-fn preflight2() {
-    let (tx, rx) = channel();
-    tx.send(()).unwrap();
-    tx.send(()).unwrap();
-    select! {
-        _n = rx.recv() => {}
-    }
-}
-
-#[test]
-fn preflight3() {
-    let (tx, rx) = channel();
-    drop(tx.clone());
-    tx.send(()).unwrap();
-    select! {
-        _n = rx.recv() => {}
-    }
-}
-
-#[test]
-fn preflight4() {
-    let (tx, rx) = channel();
-    tx.send(()).unwrap();
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn preflight5() {
-    let (tx, rx) = channel();
-    tx.send(()).unwrap();
-    tx.send(()).unwrap();
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn preflight6() {
-    let (tx, rx) = channel();
-    drop(tx.clone());
-    tx.send(()).unwrap();
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn preflight7() {
-    let (tx, rx) = channel::<()>();
-    drop(tx);
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn preflight8() {
-    let (tx, rx) = channel();
-    tx.send(()).unwrap();
-    drop(tx);
-    rx.recv().unwrap();
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn preflight9() {
-    let (tx, rx) = channel();
-    drop(tx.clone());
-    tx.send(()).unwrap();
-    drop(tx);
-    rx.recv().unwrap();
-    let s = Select::new();
-    let mut h = s.handle(&rx);
-    unsafe { h.add(); }
-    assert_eq!(s.wait2(false), h.id());
-}
-
-#[test]
-fn oneshot_data_waiting() {
-    let (tx1, rx1) = channel();
-    let (tx2, rx2) = channel();
-    let _t = thread::spawn(move|| {
-        select! {
-            _n = rx1.recv() => {}
-        }
-        tx2.send(()).unwrap();
-    });
-
-    for _ in 0..100 { thread::yield_now() }
-    tx1.send(()).unwrap();
-    rx2.recv().unwrap();
-}
-
-#[test]
-fn stream_data_waiting() {
-    let (tx1, rx1) = channel();
-    let (tx2, rx2) = channel();
-    tx1.send(()).unwrap();
-    tx1.send(()).unwrap();
-    rx1.recv().unwrap();
-    rx1.recv().unwrap();
-    let _t = thread::spawn(move|| {
-        select! {
-            _n = rx1.recv() => {}
-        }
-        tx2.send(()).unwrap();
-    });
-
-    for _ in 0..100 { thread::yield_now() }
-    tx1.send(()).unwrap();
-    rx2.recv().unwrap();
-}
-
-#[test]
-fn shared_data_waiting() {
-    let (tx1, rx1) = channel();
-    let (tx2, rx2) = channel();
-    drop(tx1.clone());
-    tx1.send(()).unwrap();
-    rx1.recv().unwrap();
-    let _t = thread::spawn(move|| {
-        select! {
-            _n = rx1.recv() => {}
-        }
-        tx2.send(()).unwrap();
-    });
-
-    for _ in 0..100 { thread::yield_now() }
-    tx1.send(()).unwrap();
-    rx2.recv().unwrap();
-}
-
-#[test]
-fn sync1() {
-    let (tx, rx) = sync_channel::<i32>(1);
-    tx.send(1).unwrap();
-    select! {
-        n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
-    }
-}
-
-#[test]
-fn sync2() {
-    let (tx, rx) = sync_channel::<i32>(0);
-    let _t = thread::spawn(move|| {
-        for _ in 0..100 { thread::yield_now() }
-        tx.send(1).unwrap();
-    });
-    select! {
-        n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
-    }
-}
-
-#[test]
-fn sync3() {
-    let (tx1, rx1) = sync_channel::<i32>(0);
-    let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
-    let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
-    let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
-    select! {
-        n = rx1.recv() => {
-            let n = n.unwrap();
-            assert_eq!(n, 1);
-            assert_eq!(rx2.recv().unwrap(), 2);
-        },
-        n = rx2.recv() => {
-            let n = n.unwrap();
-            assert_eq!(n, 2);
-            assert_eq!(rx1.recv().unwrap(), 1);
-        }
-    }
-}
index cc70a62036590b4b514fdbd0639dbf93d64e6f74..dbcdcdac9326855330847013db2d442e233e5623 100644 (file)
@@ -9,6 +9,7 @@
 /// channels are quite similar, and this is no coincidence!
 
 pub use self::Failure::*;
+use self::StartResult::*;
 
 use core::cmp;
 use core::intrinsics::abort;
@@ -19,8 +20,6 @@
 use crate::sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
 use crate::sync::mpsc::blocking::{self, SignalToken};
 use crate::sync::mpsc::mpsc_queue as mpsc;
-use crate::sync::mpsc::select::StartResult::*;
-use crate::sync::mpsc::select::StartResult;
 use crate::sync::{Mutex, MutexGuard};
 use crate::thread;
 use crate::time::Instant;
@@ -57,6 +56,12 @@ pub enum Failure {
     Disconnected,
 }
 
+#[derive(PartialEq, Eq)]
+enum StartResult {
+    Installed,
+    Abort,
+}
+
 impl<T> Packet<T> {
     // Creation of a packet *must* be followed by a call to postinit_lock
     // and later by inherit_blocker
@@ -394,16 +399,6 @@ fn take_to_wake(&self) -> SignalToken {
     // select implementation
     ////////////////////////////////////////////////////////////////////////////
 
-    // Helper function for select, tests whether this port can receive without
-    // blocking (obviously not an atomic decision).
-    //
-    // This is different than the stream version because there's no need to peek
-    // at the queue, we can just look at the local count.
-    pub fn can_recv(&self) -> bool {
-        let cnt = self.cnt.load(Ordering::SeqCst);
-        cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
-    }
-
     // increment the count on the channel (used for selection)
     fn bump(&self, amt: isize) -> isize {
         match self.cnt.fetch_add(amt, Ordering::SeqCst) {
@@ -415,22 +410,6 @@ fn bump(&self, amt: isize) -> isize {
         }
     }
 
-    // Inserts the signal token for selection on this port, returning true if
-    // blocking should proceed.
-    //
-    // The code here is the same as in stream.rs, except that it doesn't need to
-    // peek at the channel to see if an upgrade is pending.
-    pub fn start_selection(&self, token: SignalToken) -> StartResult {
-        match self.decrement(token) {
-            Installed => Installed,
-            Abort => {
-                let prev = self.bump(1);
-                assert!(prev == DISCONNECTED || prev >= 0);
-                Abort
-            }
-        }
-    }
-
     // Cancels a previous thread waiting on this port, returning whether there's
     // data on the port.
     //
index 7ae6f68b514595d305bb993f1a73896f622a4f49..40877282761790cd78c989d9eb37fad0cd92c181 100644 (file)
@@ -9,7 +9,6 @@
 
 pub use self::Failure::*;
 pub use self::UpgradeResult::*;
-pub use self::SelectionResult::*;
 use self::Message::*;
 
 use core::cmp;
@@ -60,12 +59,6 @@ pub enum UpgradeResult {
     UpWoke(SignalToken),
 }
 
-pub enum SelectionResult<T> {
-    SelSuccess,
-    SelCanceled,
-    SelUpgraded(SignalToken, Receiver<T>),
-}
-
 // Any message could contain an "upgrade request" to a new shared port, so the
 // internal queue it's a queue of T, but rather Message<T>
 enum Message<T> {
@@ -338,27 +331,6 @@ pub fn drop_port(&self) {
     // select implementation
     ////////////////////////////////////////////////////////////////////////////
 
-    // Tests to see whether this port can receive without blocking. If Ok is
-    // returned, then that's the answer. If Err is returned, then the returned
-    // port needs to be queried instead (an upgrade happened)
-    pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
-        // We peek at the queue to see if there's anything on it, and we use
-        // this return value to determine if we should pop from the queue and
-        // upgrade this channel immediately. If it looks like we've got an
-        // upgrade pending, then go through the whole recv rigamarole to update
-        // the internal state.
-        match self.queue.peek() {
-            Some(&mut GoUp(..)) => {
-                match self.recv(None) {
-                    Err(Upgraded(port)) => Err(port),
-                    _ => unreachable!(),
-                }
-            }
-            Some(..) => Ok(true),
-            None => Ok(false)
-        }
-    }
-
     // increment the count on the channel (used for selection)
     fn bump(&self, amt: isize) -> isize {
         match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
@@ -370,31 +342,6 @@ fn bump(&self, amt: isize) -> isize {
         }
     }
 
-    // Attempts to start selecting on this port. Like a oneshot, this can fail
-    // immediately because of an upgrade.
-    pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
-        match self.decrement(token) {
-            Ok(()) => SelSuccess,
-            Err(token) => {
-                let ret = match self.queue.peek() {
-                    Some(&mut GoUp(..)) => {
-                        match self.queue.pop() {
-                            Some(GoUp(port)) => SelUpgraded(token, port),
-                            _ => unreachable!(),
-                        }
-                    }
-                    Some(..) => SelCanceled,
-                    None => SelCanceled,
-                };
-                // Undo our decrement above, and we should be guaranteed that the
-                // previous value is positive because we're not going to sleep
-                let prev = self.bump(1);
-                assert!(prev == DISCONNECTED || prev >= 0);
-                ret
-            }
-        }
-    }
-
     // Removes a previous thread from being blocked in this port
     pub fn abort_selection(&self,
                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
index b2d9f4c6491e4026c8429cd6ffa11b0d57cdb64c..3c4f8e077c922ee536340b607fc3e2c2b29179cc 100644 (file)
@@ -33,7 +33,6 @@
 
 use crate::sync::atomic::{Ordering, AtomicUsize};
 use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken};
-use crate::sync::mpsc::select::StartResult::{self, Installed, Abort};
 use crate::sync::{Mutex, MutexGuard};
 use crate::time::Instant;
 
@@ -406,42 +405,6 @@ pub fn drop_port(&self) {
         while let Some(token) = queue.dequeue() { token.signal(); }
         waiter.map(|t| t.signal());
     }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // If Ok, the value is whether this port has data, if Err, then the upgraded
-    // port needs to be checked instead of this one.
-    pub fn can_recv(&self) -> bool {
-        let guard = self.lock.lock().unwrap();
-        guard.disconnected || guard.buf.size() > 0
-    }
-
-    // Attempts to start selection on this port. This can either succeed or fail
-    // because there is data waiting.
-    pub fn start_selection(&self, token: SignalToken) -> StartResult {
-        let mut guard = self.lock.lock().unwrap();
-        if guard.disconnected || guard.buf.size() > 0 {
-            Abort
-        } else {
-            match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
-                NoneBlocked => {}
-                BlockedSender(..) => unreachable!(),
-                BlockedReceiver(..) => unreachable!(),
-            }
-            Installed
-        }
-    }
-
-    // Remove a previous selecting thread from this port. This ensures that the
-    // blocked thread will no longer be visible to any other threads.
-    //
-    // The return value indicates whether there's data on this port.
-    pub fn abort_selection(&self) -> bool {
-        let mut guard = self.lock.lock().unwrap();
-        abort_selection(&mut guard)
-    }
 }
 
 impl<T> Drop for Packet<T> {
diff --git a/src/test/run-pass/issues/issue-13494.rs b/src/test/run-pass/issues/issue-13494.rs
deleted file mode 100644 (file)
index 12be977..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-// run-pass
-#![allow(unused_must_use)]
-// ignore-emscripten no threads support
-
-// This test may not always fail, but it can be flaky if the race it used to
-// expose is still present.
-
-#![feature(mpsc_select)]
-#![allow(deprecated)]
-
-use std::sync::mpsc::{channel, Sender, Receiver};
-use std::thread;
-
-fn helper(rx: Receiver<Sender<()>>) {
-    for tx in rx.iter() {
-        let _ = tx.send(());
-    }
-}
-
-fn main() {
-    let (tx, rx) = channel();
-    let t = thread::spawn(move|| { helper(rx) });
-    let (snd, rcv) = channel::<isize>();
-    for _ in 1..100000 {
-        snd.send(1).unwrap();
-        let (tx2, rx2) = channel();
-        tx.send(tx2).unwrap();
-        select! {
-            _ = rx2.recv() => (),
-            _ = rcv.recv() => ()
-        }
-    }
-    drop(tx);
-    t.join();
-}
index 904f2e2c7fd7eff21de27eb8080a4fc4455d7931..12a612c153ad62f085884b66c058e350a9dbeeb0 100644 (file)
@@ -233,8 +233,6 @@ fn println() {
     println!("hello {}", "world",);
 }
 
-// select! is too troublesome and unlikely to be stabilized
-
 // stringify! is N/A
 
 #[cfg(std)]