From: Alex Crichton Date: Fri, 2 Jan 2015 17:15:54 +0000 (-0800) Subject: rollup merge of #20273: alexcrichton/second-pass-comm X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=8b7d032014ccbc9256abdc56e633e7c0a3b8342c;p=rust.git rollup merge of #20273: alexcrichton/second-pass-comm Conflicts: src/doc/guide.md src/libcollections/bit.rs src/libcollections/btree/node.rs src/libcollections/slice.rs src/libcore/ops.rs src/libcore/prelude.rs src/librand/rand_impls.rs src/librustc/middle/check_match.rs src/librustc/middle/infer/region_inference/mod.rs src/librustc_driver/lib.rs src/librustdoc/test.rs src/libstd/bitflags.rs src/libstd/io/comm_adapters.rs src/libstd/io/mem.rs src/libstd/io/mod.rs src/libstd/io/net/pipe.rs src/libstd/io/net/tcp.rs src/libstd/io/net/udp.rs src/libstd/io/pipe.rs src/libstd/io/process.rs src/libstd/io/stdio.rs src/libstd/io/timer.rs src/libstd/io/util.rs src/libstd/macros.rs src/libstd/os.rs src/libstd/path/posix.rs src/libstd/path/windows.rs src/libstd/prelude/v1.rs src/libstd/rand/mod.rs src/libstd/rand/os.rs src/libstd/sync/barrier.rs src/libstd/sync/condvar.rs src/libstd/sync/future.rs src/libstd/sync/mpsc/mod.rs src/libstd/sync/mpsc/mpsc_queue.rs src/libstd/sync/mpsc/select.rs src/libstd/sync/mpsc/spsc_queue.rs src/libstd/sync/mutex.rs src/libstd/sync/once.rs src/libstd/sync/rwlock.rs src/libstd/sync/semaphore.rs src/libstd/sync/task_pool.rs src/libstd/sys/common/helper_thread.rs src/libstd/sys/unix/process.rs src/libstd/sys/unix/timer.rs src/libstd/sys/windows/c.rs src/libstd/sys/windows/timer.rs src/libstd/sys/windows/tty.rs src/libstd/thread.rs src/libstd/thread_local/mod.rs src/libstd/thread_local/scoped.rs src/libtest/lib.rs src/test/auxiliary/cci_capture_clause.rs src/test/bench/shootout-reverse-complement.rs src/test/bench/shootout-spectralnorm.rs src/test/compile-fail/array-old-syntax-2.rs src/test/compile-fail/bind-by-move-no-guards.rs src/test/compile-fail/builtin-superkinds-self-type.rs src/test/compile-fail/comm-not-freeze-receiver.rs src/test/compile-fail/comm-not-freeze.rs src/test/compile-fail/issue-12041.rs src/test/compile-fail/unsendable-class.rs src/test/run-pass/builtin-superkinds-capabilities-transitive.rs src/test/run-pass/builtin-superkinds-capabilities-xc.rs src/test/run-pass/builtin-superkinds-capabilities.rs src/test/run-pass/builtin-superkinds-self-type.rs src/test/run-pass/capturing-logging.rs src/test/run-pass/closure-bounds-can-capture-chan.rs src/test/run-pass/comm.rs src/test/run-pass/core-run-destroy.rs src/test/run-pass/drop-trait-enum.rs src/test/run-pass/hashmap-memory.rs src/test/run-pass/issue-13494.rs src/test/run-pass/issue-3609.rs src/test/run-pass/issue-4446.rs src/test/run-pass/issue-4448.rs src/test/run-pass/issue-8827.rs src/test/run-pass/issue-9396.rs src/test/run-pass/ivec-tag.rs src/test/run-pass/rust-log-filter.rs src/test/run-pass/send-resource.rs src/test/run-pass/send-type-inference.rs src/test/run-pass/sendable-class.rs src/test/run-pass/spawn-types.rs src/test/run-pass/task-comm-0.rs src/test/run-pass/task-comm-10.rs src/test/run-pass/task-comm-11.rs src/test/run-pass/task-comm-13.rs src/test/run-pass/task-comm-14.rs src/test/run-pass/task-comm-15.rs src/test/run-pass/task-comm-16.rs src/test/run-pass/task-comm-3.rs src/test/run-pass/task-comm-4.rs src/test/run-pass/task-comm-5.rs src/test/run-pass/task-comm-6.rs src/test/run-pass/task-comm-7.rs src/test/run-pass/task-comm-9.rs src/test/run-pass/task-comm-chan-nil.rs src/test/run-pass/task-spawn-move-and-copy.rs src/test/run-pass/task-stderr.rs src/test/run-pass/tcp-accept-stress.rs src/test/run-pass/tcp-connect-timeouts.rs src/test/run-pass/tempfile.rs src/test/run-pass/trait-bounds-in-arc.rs src/test/run-pass/trivial-message.rs src/test/run-pass/unique-send-2.rs src/test/run-pass/unique-send.rs src/test/run-pass/unwind-resource.rs --- 8b7d032014ccbc9256abdc56e633e7c0a3b8342c diff --cc src/libstd/io/net/pipe.rs index 95147c52a69,68f3a8e1836..daefdd28b30 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@@ -702,10 -702,10 +702,10 @@@ mod tests let (tx, rx) = channel::<()>(); Thread::spawn(move|| { let mut s = UnixStream::connect(&addr).unwrap(); - rx.recv(); + rx.recv().unwrap(); let mut amt = 0; while amt < 100 * 128 * 1024 { - match s.read(&mut [0, ..128 * 1024]) { + match s.read(&mut [0;128 * 1024]) { Ok(n) => { amt += n; } Err(e) => panic!("{}", e), } @@@ -718,9 -718,9 +718,9 @@@ assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); - tx.send(()); + tx.send(()).unwrap(); for _ in range(0u, 100) { - assert!(s.write(&[0, ..128 * 1024]).is_ok()); + assert!(s.write(&[0;128 * 1024]).is_ok()); } } diff --cc src/libstd/io/net/tcp.rs index 4492e679cd5,57ffcfaad30..3e59aaa05ef --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@@ -976,15 -977,15 +977,15 @@@ mod test cl.write(&[10]).unwrap(); let mut b = [0]; cl.read(&mut b).unwrap(); - tx.send(()); + tx.send(()).unwrap(); }); - rx.recv(); + rx.recv().unwrap(); let mut c = TcpStream::connect(addr).unwrap(); - let mut b = [0, ..10]; + let mut b = [0; 10]; assert_eq!(c.read(&mut b), Ok(1)); c.write(&[1]).unwrap(); - rx.recv(); + rx.recv().unwrap(); } #[test] @@@ -1279,10 -1280,10 +1280,10 @@@ let (tx, rx) = channel::<()>(); Thread::spawn(move|| { let mut s = TcpStream::connect(addr).unwrap(); - rx.recv(); + rx.recv().unwrap(); let mut amt = 0; while amt < 100 * 128 * 1024 { - match s.read(&mut [0, ..128 * 1024]) { + match s.read(&mut [0;128 * 1024]) { Ok(n) => { amt += n; } Err(e) => panic!("{}", e), } @@@ -1295,9 -1296,9 +1296,9 @@@ assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); - tx.send(()); + tx.send(()).unwrap(); for _ in range(0i, 100) { - assert!(s.write(&[0, ..128 * 1024]).is_ok()); + assert!(s.write(&[0;128 * 1024]).is_ok()); } } diff --cc src/libstd/io/pipe.rs index ee86eae058d,ee376658283..09dcafb0218 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@@ -129,11 -129,11 +129,11 @@@ mod test let _t = Thread::spawn(move|| { let mut out = out; out.write(&[10]).unwrap(); - rx.recv(); // don't close the pipe until the other read has finished + rx.recv().unwrap(); // don't close the pipe until the other read has finished }); - let mut buf = [0, ..10]; + let mut buf = [0; 10]; input.read(&mut buf).unwrap(); - tx.send(()); + tx.send(()).unwrap(); } } diff --cc src/libstd/path/windows.rs index 107e9d80fc3,9117827ffc2..e047afc8eee --- a/src/libstd/path/windows.rs +++ b/src/libstd/path/windows.rs @@@ -1119,19 -1119,14 +1119,23 @@@ fn prefix_len(p: Option) - #[cfg(test)] mod tests { - use super::*; + use prelude::v1::Option::{mod, Some, None}; + use prelude::v1::{Vec, Clone, AsSlice, SliceExt, CloneSliceExt, IteratorExt}; + use prelude::v1::{DoubleEndedIteratorExt, Str, ToString, GenericPath}; + use super::PathPrefix::*; use super::parse_prefix; + use super::*; + use clone::Clone; + use iter::{IteratorExt, DoubleEndedIteratorExt}; + use option::Option::{mod, Some, None}; + use path::GenericPath; + use slice::{AsSlice, SliceExt, CloneSliceExt}; + use str::Str; + use string::ToString; + use vec::Vec; + macro_rules! t { (s: $path:expr, $exp:expr) => ( { diff --cc src/libstd/sync/condvar.rs index df3f2e5cf62,28960c1574e..d71cdeb25fd --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@@ -328,22 -311,22 +328,22 @@@ mod tests let tx = tx.clone(); Thread::spawn(move|| { let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock(); + let mut cnt = lock.lock().unwrap(); *cnt += 1; if *cnt == N { - tx.send(()); + tx.send(()).unwrap(); } while *cnt != 0 { - cond.wait(&cnt); + cnt = cond.wait(cnt).unwrap(); } - tx.send(()); + tx.send(()).unwrap(); }).detach(); } drop(tx); let &(ref lock, ref cond) = &*data; - rx.recv(); + rx.recv().unwrap(); - let mut cnt = lock.lock(); + let mut cnt = lock.lock().unwrap(); *cnt = 0; cond.notify_all(); drop(cnt); diff --cc src/libstd/sync/mpsc/mod.rs index 00000000000,e2294906229..413675f26d5 mode 000000,100644..100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@@ -1,0 -1,2079 +1,2079 @@@ + // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT + // file at the top-level directory of this distribution and at + // http://rust-lang.org/COPYRIGHT. + // + // Licensed under the Apache License, Version 2.0 or the MIT license + // , at your + // option. This file may not be copied, modified, or distributed + // except according to those terms. + + //! Multi-producer, single-consumer communication primitives threads + //! + //! This module provides message-based communication over channels, concretely + //! defined among three types: + //! + //! * `Sender` + //! * `SyncSender` + //! * `Receiver` + //! + //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both + //! senders are clone-able (multi-producer) such that many threads can send + //! simultaneously to one receiver (single-consumer). These channels are + //! + //! These channels come in two flavors: + //! + //! 1. An asynchronous, infinitely buffered channel. The `channel()` function + //! will return a `(Sender, Receiver)` tuple where all sends will be + //! **asynchronous** (they never block). The channel conceptually has an + //! infinite buffer. + //! + //! 2. A synchronous, bounded channel. The `sync_channel()` function will return + //! a `(SyncSender, Receiver)` tuple where the storage for pending messages + //! is a pre-allocated buffer of a fixed size. All sends will be + //! **synchronous** by blocking until there is buffer space available. Note + //! that a bound of 0 is allowed, causing the channel to become a + //! "rendezvous" channel where each sender atomically hands off a message to + //! a receiver. + //! + //! ## Disconnection + //! + //! The send and receive operations on channels will all return a `Result` + //! indicating whether the operation succeeded or not. An unsuccessful operation + //! is normally indicative of the other half of a channel having "hung up" by + //! being dropped in its corresponding thread. + //! + //! Once half of a channel has been deallocated, most operations can no longer + //! continue to make progress, so `Err` will be returned. Many applications will + //! continue to `unwrap()` the results returned from this module, instigating a + //! propagation of failure among threads if one unexpectedly dies. + //! + //! # Examples + //! + //! Simple usage: + //! + //! ``` + //! use std::thread::Thread; + //! use std::sync::mpsc::channel; + //! + //! // Create a simple streaming channel + //! let (tx, rx) = channel(); + //! Thread::spawn(move|| { + //! tx.send(10i).unwrap(); + //! }).detach(); + //! assert_eq!(rx.recv().unwrap(), 10i); + //! ``` + //! + //! Shared usage: + //! + //! ``` + //! use std::thread::Thread; + //! use std::sync::mpsc::channel; + //! + //! // Create a shared channel that can be sent along from many threads + //! // where tx is the sending half (tx for transmission), and rx is the receiving + //! // half (rx for receiving). + //! let (tx, rx) = channel(); + //! for i in range(0i, 10i) { + //! let tx = tx.clone(); + //! Thread::spawn(move|| { + //! tx.send(i).unwrap(); + //! }).detach() + //! } + //! + //! for _ in range(0i, 10i) { + //! let j = rx.recv().unwrap(); + //! assert!(0 <= j && j < 10); + //! } + //! ``` + //! + //! Propagating panics: + //! + //! ``` + //! use std::sync::mpsc::channel; + //! + //! // The call to recv() will return an error because the channel has already + //! // hung up (or been deallocated) + //! let (tx, rx) = channel::(); + //! drop(tx); + //! assert!(rx.recv().is_err()); + //! ``` + //! + //! Synchronous channels: + //! + //! ``` + //! use std::thread::Thread; + //! use std::sync::mpsc::sync_channel; + //! + //! let (tx, rx) = sync_channel::(0); + //! Thread::spawn(move|| { + //! // This will wait for the parent task to start receiving + //! tx.send(53).unwrap(); + //! }).detach(); + //! rx.recv().unwrap(); + //! ``` + //! + //! Reading from a channel with a timeout requires to use a Timer together + //! with the channel. You can use the select! macro to select either and + //! handle the timeout case. This first example will break out of the loop + //! after 10 seconds no matter what: + //! + //! ```no_run + //! use std::sync::mpsc::channel; + //! use std::io::timer::Timer; + //! use std::time::Duration; + //! + //! let (tx, rx) = channel::(); + //! let mut timer = Timer::new().unwrap(); + //! let timeout = timer.oneshot(Duration::seconds(10)); + //! + //! loop { + //! select! { + //! val = rx.recv() => println!("Received {}", val.unwrap()), + //! _ = timeout.recv() => { + //! println!("timed out, total time was more than 10 seconds"); + //! break; + //! } + //! } + //! } + //! ``` + //! + //! This second example is more costly since it allocates a new timer every + //! time a message is received, but it allows you to timeout after the channel + //! has been inactive for 5 seconds: + //! + //! ```no_run + //! use std::sync::mpsc::channel; + //! use std::io::timer::Timer; + //! use std::time::Duration; + //! + //! let (tx, rx) = channel::(); + //! let mut timer = Timer::new().unwrap(); + //! + //! loop { + //! let timeout = timer.oneshot(Duration::seconds(5)); + //! + //! select! { + //! val = rx.recv() => println!("Received {}", val.unwrap()), + //! _ = timeout.recv() => { + //! println!("timed out, no message received in 5 seconds"); + //! break; + //! } + //! } + //! } + //! ``` + + // A description of how Rust's channel implementation works + // + // Channels are supposed to be the basic building block for all other + // concurrent primitives that are used in Rust. As a result, the channel type + // needs to be highly optimized, flexible, and broad enough for use everywhere. + // + // The choice of implementation of all channels is to be built on lock-free data + // structures. The channels themselves are then consequently also lock-free data + // structures. As always with lock-free code, this is a very "here be dragons" + // territory, especially because I'm unaware of any academic papers that have + // gone into great length about channels of these flavors. + // + // ## Flavors of channels + // + // From the perspective of a consumer of this library, there is only one flavor + // of channel. This channel can be used as a stream and cloned to allow multiple + // senders. Under the hood, however, there are actually three flavors of + // channels in play. + // -// * Oneshots - these channels are highly optimized for the one-send use case. ++// * Flavor::Oneshots - these channels are highly optimized for the one-send use case. + // They contain as few atomics as possible and involve one and + // exactly one allocation. + // * Streams - these channels are optimized for the non-shared use case. They + // use a different concurrent queue that is more tailored for this + // use case. The initial allocation of this flavor of channel is not + // optimized. + // * Shared - this is the most general form of channel that this module offers, + // a channel with multiple senders. This type is as optimized as it + // can be, but the previous two types mentioned are much faster for + // their use-cases. + // + // ## Concurrent queues + // + // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but + // recv() obviously blocks. This means that under the hood there must be some + // shared and concurrent queue holding all of the actual data. + // + // With two flavors of channels, two flavors of queues are also used. We have + // chosen to use queues from a well-known author that are abbreviated as SPSC + // and MPSC (single producer, single consumer and multiple producer, single + // consumer). SPSC queues are used for streams while MPSC queues are used for + // shared channels. + // + // ### SPSC optimizations + // + // The SPSC queue found online is essentially a linked list of nodes where one + // half of the nodes are the "queue of data" and the other half of nodes are a + // cache of unused nodes. The unused nodes are used such that an allocation is + // not required on every push() and a free doesn't need to happen on every + // pop(). + // + // As found online, however, the cache of nodes is of an infinite size. This + // means that if a channel at one point in its life had 50k items in the queue, + // then the queue will always have the capacity for 50k items. I believed that + // this was an unnecessary limitation of the implementation, so I have altered + // the queue to optionally have a bound on the cache size. + // + // By default, streams will have an unbounded SPSC queue with a small-ish cache + // size. The hope is that the cache is still large enough to have very fast + // send() operations while not too large such that millions of channels can + // coexist at once. + // + // ### MPSC optimizations + // + // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses + // a linked list under the hood to earn its unboundedness, but I have not put + // forth much effort into having a cache of nodes similar to the SPSC queue. + // + // For now, I believe that this is "ok" because shared channels are not the most + // common type, but soon we may wish to revisit this queue choice and determine + // another candidate for backend storage of shared channels. + // + // ## Overview of the Implementation + // + // Now that there's a little background on the concurrent queues used, it's + // worth going into much more detail about the channels themselves. The basic + // pseudocode for a send/recv are: + // + // + // send(t) recv() + // queue.push(t) return if queue.pop() + // if increment() == -1 deschedule { + // wakeup() if decrement() > 0 + // cancel_deschedule() + // } + // queue.pop() + // + // As mentioned before, there are no locks in this implementation, only atomic + // instructions are used. + // + // ### The internal atomic counter + // + // Every channel has a shared counter with each half to keep track of the size + // of the queue. This counter is used to abort descheduling by the receiver and + // to know when to wake up on the sending side. + // + // As seen in the pseudocode, senders will increment this count and receivers + // will decrement the count. The theory behind this is that if a sender sees a + // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, + // then it doesn't need to block. + // + // The recv() method has a beginning call to pop(), and if successful, it needs + // to decrement the count. It is a crucial implementation detail that this + // decrement does *not* happen to the shared counter. If this were the case, + // then it would be possible for the counter to be very negative when there were + // no receivers waiting, in which case the senders would have to determine when + // it was actually appropriate to wake up a receiver. + // + // Instead, the "steal count" is kept track of separately (not atomically + // because it's only used by receivers), and then the decrement() call when + // descheduling will lump in all of the recent steals into one large decrement. + // + // The implication of this is that if a sender sees a -1 count, then there's + // guaranteed to be a waiter waiting! + // + // ## Native Implementation + // + // A major goal of these channels is to work seamlessly on and off the runtime. + // All of the previous race conditions have been worded in terms of + // scheduler-isms (which is obviously not available without the runtime). + // + // For now, native usage of channels (off the runtime) will fall back onto + // mutexes/cond vars for descheduling/atomic decisions. The no-contention path + // is still entirely lock-free, the "deschedule" blocks above are surrounded by + // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a + // condition variable. + // + // ## Select + // + // Being able to support selection over channels has greatly influenced this + // design, and not only does selection need to work inside the runtime, but also + // outside the runtime. + // + // The implementation is fairly straightforward. The goal of select() is not to + // return some data, but only to return which channel can receive data without + // blocking. The implementation is essentially the entire blocking procedure + // followed by an increment as soon as its woken up. The cancellation procedure + // involves an increment and swapping out of to_wake to acquire ownership of the + // task to unblock. + // + // Sadly this current implementation requires multiple allocations, so I have + // seen the throughput of select() be much worse than it should be. I do not + // believe that there is anything fundamental that needs to change about these + // channels, however, in order to support a more efficient select(). + // + // # Conclusion + // + // And now that you've seen all the races that I found and attempted to fix, + // here's the code for you to find some more! + + use prelude::v1::*; + + use sync::Arc; + use fmt; + use kinds::marker; + use mem; + use cell::UnsafeCell; + + pub use self::select::{Select, Handle}; + use self::select::StartResult; + use self::select::StartResult::*; + use self::blocking::SignalToken; + + mod blocking; + mod oneshot; + mod select; + mod shared; + mod stream; + mod sync; + mod mpsc_queue; + mod spsc_queue; + + /// The receiving-half of Rust's channel type. This half can only be owned by + /// one task + #[stable] + pub struct Receiver { + inner: UnsafeCell>, + } + + // The receiver port can be sent from place to place, so long as it + // is not used to receive non-sendable things. + unsafe impl Send for Receiver { } + + /// An iterator over messages on a receiver, this iterator will block + /// whenever `next` is called, waiting for a new message, and `None` will be + /// returned when the corresponding channel has hung up. + #[stable] + pub struct Iter<'a, T:'a> { + rx: &'a Receiver + } + + /// The sending-half of Rust's asynchronous channel type. This half can only be + /// owned by one task, but it can be cloned to send to other tasks. + #[stable] + pub struct Sender { + inner: UnsafeCell>, + } + + // The send port can be sent from place to place, so long as it + // is not used to send non-sendable things. + unsafe impl Send for Sender { } + + /// The sending-half of Rust's synchronous channel type. This half can only be + /// owned by one task, but it can be cloned to send to other tasks. + #[stable] + pub struct SyncSender { + inner: Arc>>, + // can't share in an arc + _marker: marker::NoSync, + } + + /// An error returned from the `send` function on channels. + /// + /// A `send` operation can only fail if the receiving end of a channel is + /// disconnected, implying that the data could never be received. The error + /// contains the data being sent as a payload so it can be recovered. + #[deriving(PartialEq, Eq)] + #[stable] + pub struct SendError(pub T); + + /// An error returned from the `recv` function on a `Receiver`. + /// + /// The `recv` operation can only fail if the sending half of a channel is + /// disconnected, implying that no further messages will ever be received. + #[deriving(PartialEq, Eq, Clone, Copy)] + #[stable] + pub struct RecvError; + + /// This enumeration is the list of the possible reasons that try_recv could not + /// return data when called. + #[deriving(PartialEq, Clone, Copy)] + #[stable] + pub enum TryRecvError { + /// This channel is currently empty, but the sender(s) have not yet + /// disconnected, so data may yet become available. + #[stable] + Empty, + + /// This channel's sending half has become disconnected, and there will + /// never be any more data received on this channel + #[stable] + Disconnected, + } + + /// This enumeration is the list of the possible error outcomes for the + /// `SyncSender::try_send` method. + #[deriving(PartialEq, Clone)] + #[stable] + pub enum TrySendError { + /// The data could not be sent on the channel because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no receiver available to + /// acquire the data. + #[stable] + Full(T), + + /// This channel's receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + #[stable] + Disconnected(T), + } + + enum Flavor { + Oneshot(Arc>>), + Stream(Arc>>), + Shared(Arc>>), + Sync(Arc>>), + } + + #[doc(hidden)] + trait UnsafeFlavor { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell>; + unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor { + &mut *self.inner_unsafe().get() + } + unsafe fn inner<'a>(&'a self) -> &'a Flavor { + &*self.inner_unsafe().get() + } + } + impl UnsafeFlavor for Sender { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell> { + &self.inner + } + } + impl UnsafeFlavor for Receiver { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell> { + &self.inner + } + } + + /// Creates a new asynchronous channel, returning the sender/receiver halves. + /// + /// All data sent on the sender will become available on the receiver, and no + /// send will block the calling task (this channel has an "infinite buffer"). + /// + /// # Example + /// + /// ``` + /// use std::sync::mpsc::channel; + /// use std::thread::Thread; + /// + /// // tx is is the sending half (tx for transmission), and rx is the receiving + /// // half (rx for receiving). + /// let (tx, rx) = channel(); + /// + /// // Spawn off an expensive computation + /// Thread::spawn(move|| { + /// # fn expensive_computation() {} + /// tx.send(expensive_computation()).unwrap(); + /// }).detach(); + /// + /// // Do some useful work for awhile + /// + /// // Let's see what that answer was + /// println!("{}", rx.recv().unwrap()); + /// ``` + #[stable] + pub fn channel() -> (Sender, Receiver) { + let a = Arc::new(RacyCell::new(oneshot::Packet::new())); + (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) + } + + /// Creates a new synchronous, bounded channel. + /// + /// Like asynchronous channels, the `Receiver` will block until a message + /// becomes available. These channels differ greatly in the semantics of the + /// sender from asynchronous channels, however. + /// + /// This channel has an internal buffer on which messages will be queued. When + /// the internal buffer becomes full, future sends will *block* waiting for the + /// buffer to open up. Note that a buffer size of 0 is valid, in which case this + /// becomes "rendezvous channel" where each send will not return until a recv + /// is paired with it. + /// + /// As with asynchronous channels, all senders will panic in `send` if the + /// `Receiver` has been destroyed. + /// + /// # Example + /// + /// ``` + /// use std::sync::mpsc::sync_channel; + /// use std::thread::Thread; + /// + /// let (tx, rx) = sync_channel(1); + /// + /// // this returns immediately + /// tx.send(1i).unwrap(); + /// + /// Thread::spawn(move|| { + /// // this will block until the previous message has been received + /// tx.send(2i).unwrap(); + /// }).detach(); + /// + /// assert_eq!(rx.recv().unwrap(), 1i); + /// assert_eq!(rx.recv().unwrap(), 2i); + /// ``` + #[stable] + pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { + let a = Arc::new(RacyCell::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) + } + + //////////////////////////////////////////////////////////////////////////////// + // Sender + //////////////////////////////////////////////////////////////////////////////// + + impl Sender { + fn new(inner: Flavor) -> Sender { + Sender { + inner: UnsafeCell::new(inner), + } + } + + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of `Err` means that the data will never be + /// received, but a return value of `Ok` does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns `Ok`. + /// + /// This method will never block the current thread. + /// + /// # Example + /// + /// ``` + /// use std::sync::mpsc::channel; + /// + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// tx.send(1i).unwrap(); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send(1i).err().unwrap().0, 1); + /// ``` + pub fn send(&self, t: T) -> Result<(), SendError> { + let (new_inner, ret) = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + unsafe { + let p = p.get(); + if !(*p).sent() { + return (*p).send(t).map_err(SendError); + } else { + let a = + Arc::new(RacyCell::new(stream::Packet::new())); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match (*p).upgrade(rx) { + oneshot::UpSuccess => { + let ret = (*a.get()).send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + (*a.get()).send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) + } + } + } + } + } + Flavor::Stream(ref p) => return unsafe { + (*p.get()).send(t).map_err(SendError) + }, + Flavor::Shared(ref p) => return unsafe { + (*p.get()).send(t).map_err(SendError) + }, + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Flavor::Stream(new_inner)); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + ret.map_err(SendError) + } + } + + #[stable] + impl Clone for Sender { + fn clone(&self) -> Sender { + let (packet, sleeper, guard) = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + let a = Arc::new(RacyCell::new(shared::Packet::new())); + unsafe { + let guard = (*a.get()).postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + match (*p.get()).upgrade(rx) { + oneshot::UpSuccess | + oneshot::UpDisconnected => (a, None, guard), + oneshot::UpWoke(task) => (a, Some(task), guard) + } + } + } + Flavor::Stream(ref p) => { + let a = Arc::new(RacyCell::new(shared::Packet::new())); + unsafe { + let guard = (*a.get()).postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + match (*p.get()).upgrade(rx) { + stream::UpSuccess | + stream::UpDisconnected => (a, None, guard), + stream::UpWoke(task) => (a, Some(task), guard), + } + } + } + Flavor::Shared(ref p) => { + unsafe { (*p.get()).clone_chan(); } + return Sender::new(Flavor::Shared(p.clone())); + } + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + (*packet.get()).inherit_blocker(sleeper, guard); + + let tmp = Sender::new(Flavor::Shared(packet.clone())); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + Sender::new(Flavor::Shared(packet)) + } + } + + #[unsafe_destructor] + impl Drop for Sender { + fn drop(&mut self) { + match *unsafe { self.inner_mut() } { + Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Flavor::Sync(..) => unreachable!(), + } + } + } + + //////////////////////////////////////////////////////////////////////////////// + // SyncSender + //////////////////////////////////////////////////////////////////////////////// + + impl SyncSender { + fn new(inner: Arc>>) -> SyncSender { + SyncSender { inner: inner, _marker: marker::NoSync } + } + + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Items may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, it can be guaranteed that the + /// receiver has indeed received the data if this function returns success. + /// + /// This function will never panic, but it may return `Err` if the + /// `Receiver` has disconnected and is no longer able to receive + /// information. + #[stable] + pub fn send(&self, t: T) -> Result<(), SendError> { + unsafe { (*self.inner.get()).send(t).map_err(SendError) } + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send` by returning immediately if the + /// channel's buffer is full or no receiver is waiting to acquire some + /// data. Compared with `send`, this function has two failure cases + /// instead of one (one for disconnection, one for a full buffer). + /// + /// See `SyncSender::send` for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + #[stable] + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { + unsafe { (*self.inner.get()).try_send(t) } + } + } + + #[stable] + impl Clone for SyncSender { + fn clone(&self) -> SyncSender { + unsafe { (*self.inner.get()).clone_chan(); } + return SyncSender::new(self.inner.clone()); + } + } + + #[unsafe_destructor] + impl Drop for SyncSender { + fn drop(&mut self) { + unsafe { (*self.inner.get()).drop_chan(); } + } + } + + //////////////////////////////////////////////////////////////////////////////// + // Receiver + //////////////////////////////////////////////////////////////////////////////// + + impl Receiver { + fn new(inner: Flavor) -> Receiver { + Receiver { inner: UnsafeCell::new(inner) } + } + + /// Attempts to return a pending value on this receiver without blocking + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + #[stable] + pub fn try_recv(&self) -> Result { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(TryRecvError::Empty), + Err(oneshot::Disconnected) => { + return Err(TryRecvError::Disconnected) + } + Err(oneshot::Upgraded(rx)) => rx, + } + } + Flavor::Stream(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(TryRecvError::Empty), + Err(stream::Disconnected) => { + return Err(TryRecvError::Disconnected) + } + Err(stream::Upgraded(rx)) => rx, + } + } + Flavor::Shared(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(TryRecvError::Empty), + Err(shared::Disconnected) => { + return Err(TryRecvError::Disconnected) + } + } + } + Flavor::Sync(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(TryRecvError::Empty), + Err(sync::Disconnected) => { + return Err(TryRecvError::Disconnected) + } + } + } + }; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + + /// Attempt to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding `Sender`, then this receiver will wake up and + /// return that message. + /// + /// If the corresponding `Sender` has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return `Err` to + /// indicate that no more messages can ever be received on this channel. + #[stable] + pub fn recv(&self) -> Result { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return unreachable!(), + Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Upgraded(rx)) => rx, + } + } + Flavor::Stream(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(stream::Empty) => return unreachable!(), + Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Upgraded(rx)) => rx, + } + } + Flavor::Shared(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(shared::Empty) => return unreachable!(), + Err(shared::Disconnected) => return Err(RecvError), + } + } + Flavor::Sync(ref p) => return unsafe { + (*p.get()).recv().map_err(|()| RecvError) + } + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Returns an iterator that will block waiting for messages, but never + /// `panic!`. It will return `None` when the channel has hung up. + #[stable] + pub fn iter(&self) -> Iter { + Iter { rx: self } + } + } + + impl select::Packet for Receiver { + fn can_recv(&self) -> bool { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Flavor::Stream(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Flavor::Shared(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + Flavor::Sync(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + }; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + + fn start_selection(&self, mut token: SignalToken) -> StartResult { + loop { + let (t, new_port) = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + match unsafe { (*p.get()).start_selection(token) } { + oneshot::SelSuccess => return Installed, + oneshot::SelCanceled => return Abort, + oneshot::SelUpgraded(t, rx) => (t, rx), + } + } + Flavor::Stream(ref p) => { + match unsafe { (*p.get()).start_selection(token) } { + stream::SelSuccess => return Installed, + stream::SelCanceled => return Abort, + stream::SelUpgraded(t, rx) => (t, rx), + } + } + Flavor::Shared(ref p) => { + return unsafe { (*p.get()).start_selection(token) }; + } + Flavor::Sync(ref p) => { + return unsafe { (*p.get()).start_selection(token) }; + } + }; + token = t; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + fn abort_selection(&self) -> bool { + let mut was_upgrade = false; + loop { + let result = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, + Flavor::Stream(ref p) => unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Flavor::Shared(ref p) => return unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Flavor::Sync(ref p) => return unsafe { + (*p.get()).abort_selection() + }, + }; + let new_port = match result { Ok(b) => return b, Err(p) => p }; + was_upgrade = true; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + } + + #[unstable] + impl<'a, T: Send> Iterator for Iter<'a, T> { + fn next(&mut self) -> Option { self.rx.recv().ok() } + } + + #[unsafe_destructor] + impl Drop for Receiver { + fn drop(&mut self) { + match *unsafe { self.inner_mut() } { + Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, + } + } + } + + /// A version of `UnsafeCell` intended for use in concurrent data + /// structures (for example, you might put it in an `Arc`). + struct RacyCell(pub UnsafeCell); + + impl RacyCell { + + fn new(value: T) -> RacyCell { + RacyCell(UnsafeCell { value: value }) + } + + unsafe fn get(&self) -> *mut T { + self.0.get() + } + + } + + unsafe impl Send for RacyCell { } + + unsafe impl Sync for RacyCell { } // Oh dear + + impl fmt::Show for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + "sending on a closed channel".fmt(f) + } + } + + impl fmt::Show for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TrySendError::Full(..) => { + "sending on a full channel".fmt(f) + } + TrySendError::Disconnected(..) => { + "sending on a closed channel".fmt(f) + } + } + } + } + + impl fmt::Show for RecvError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } + } + + impl fmt::Show for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TryRecvError::Empty => { + "receiving on an empty channel".fmt(f) + } + TryRecvError::Disconnected => { + "receiving on a closed channel".fmt(f) + } + } + } + } + + #[cfg(test)] + mod test { + use prelude::v1::*; + + use os; + use super::*; + use thread::Thread; + + pub fn stress_factor() -> uint { + match os::getenv("RUST_TEST_STRESS") { + Some(val) => val.parse().unwrap(), + None => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = channel(); + tx.send(box 1i).unwrap(); + } + + #[test] + fn drop_full_shared() { + let (tx, _rx) = channel(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(box 1i).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = channel::(); + let _t = Thread::spawn(move|| { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(1).is_err()) + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = channel::(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = channel::(); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = channel::(); + let tx2 = tx.clone(); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = channel::(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = channel::(); + let _t = Thread::spawn(move|| { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + } + + #[test] + fn stress() { + let (tx, rx) = channel::(); + let t = Thread::spawn(move|| { + for _ in range(0u, 10000) { tx.send(1i).unwrap(); } + }); + for _ in range(0u, 10000) { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().unwrap(); + } + + #[test] + fn stress_shared() { + static AMT: uint = 10000; + static NTHREADS: uint = 8; + let (tx, rx) = channel::(); + + let t = Thread::spawn(move|| { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + for _ in range(0, NTHREADS) { + let tx = tx.clone(); + Thread::spawn(move|| { + for _ in range(0, AMT) { tx.send(1).unwrap(); } + }).detach(); + } + drop(tx); + t.join().ok().unwrap(); + } + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::(); + let t1 = Thread::spawn(move|| { + tx1.send(()).unwrap(); + for _ in range(0i, 40) { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = Thread::spawn(move|| { + for _ in range(0i, 40) { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::(); + let t = Thread::spawn(move|| { + for _ in range(0i, 40) { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in range(0u, 40) { + tx.send(1).unwrap(); + } + t.join().ok().unwrap(); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::(); + let (tx2, rx2) = channel::(); + let t1 = Thread::spawn(move|| { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = Thread::spawn(move|| { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::(); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::(); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::>(); + drop(rx); + assert!(tx.send(box 0).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = Thread::spawn(move|| { + let (tx, rx) = channel::(); + drop(tx); + rx.recv().unwrap(); + }).join(); + // What is our res? + assert!(res.is_err()); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::>(); + tx.send(box 10).unwrap(); + assert!(rx.recv().unwrap() == box 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(10).is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::>(); + let _t = Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box 10); + }); + + tx.send(box 10).unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::>(); + let _t = Thread::spawn(move|| { + drop(tx); + }); + let res = Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box 10); + }).join(); + assert!(res.is_err()); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::(); + let _t = Thread::spawn(move|| { + drop(rx); + }); + drop(tx); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::(); + let _t = Thread::spawn(move|| { + drop(rx); + }); + let _ = Thread::spawn(move|| { + tx.send(1).unwrap(); + }).join(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::(); + Thread::spawn(move|| { + let res = Thread::spawn(move|| { + rx.recv().unwrap(); + }).join(); + assert!(res.is_err()); + }).detach(); + let _t = Thread::spawn(move|| { + Thread::spawn(move|| { + drop(tx); + }).detach(); + }); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + tx.send(box 10i).unwrap(); + }); + assert!(rx.recv().unwrap() == box 10i); + } + } + + #[test] + fn stream_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender>, i: int) { + if i == 10 { return } + + Thread::spawn(move|| { + tx.send(box i).unwrap(); + send(tx, i + 1); + }).detach(); + } + + fn recv(rx: Receiver>, i: int) { + if i == 10 { return } + + Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box i); + recv(rx, i + 1); + }).detach(); + } + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in range(0i, 10000) { tx.send(()).unwrap(); } + for _ in range(0i, 10000) { rx.recv().unwrap(); } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in range(0, total) { + let tx = tx.clone(); + Thread::spawn(move|| { + tx.send(()).unwrap(); + }).detach(); + } + + for _ in range(0, total) { + rx.recv().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = channel::(); + let (total_tx, total_rx) = channel::(); + + let _t = Thread::spawn(move|| { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = channel::(); + let (count_tx, count_rx) = channel(); + + let _t = Thread::spawn(move|| { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = channel::(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let _t = Thread::spawn(move|| { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other task has gone to sleep + for _ in range(0u, 5000) { Thread::yield_now(); } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child task to exit before we exit + rx2.recv().unwrap(); + } + } + + #[cfg(test)] + mod sync_tests { + use prelude::v1::*; + + use os; + use thread::Thread; + use super::*; + + pub fn stress_factor() -> uint { + match os::getenv("RUST_TEST_STRESS") { + Some(val) => val.parse().unwrap(), + None => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = sync_channel::(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = sync_channel(1); + tx.send(box 1i).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = sync_channel::(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = sync_channel::(0); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::(0); + let tx2 = tx.clone(); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::(0); + Thread::spawn(move|| { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }).detach(); + while rx.recv().is_ok() {} + } + + #[test] + fn stress() { + let (tx, rx) = sync_channel::(0); + Thread::spawn(move|| { + for _ in range(0u, 10000) { tx.send(1).unwrap(); } + }).detach(); + for _ in range(0u, 10000) { + assert_eq!(rx.recv().unwrap(), 1); + } + } + + #[test] + fn stress_shared() { + static AMT: uint = 1000; + static NTHREADS: uint = 8; + let (tx, rx) = sync_channel::(0); + let (dtx, drx) = sync_channel::<()>(0); + + Thread::spawn(move|| { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }).detach(); + + for _ in range(0, NTHREADS) { + let tx = tx.clone(); + Thread::spawn(move|| { + for _ in range(0, AMT) { tx.send(1).unwrap(); } + }).detach(); + } + drop(tx); + drx.recv().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::(0); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::(0); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::>(0); + drop(rx); + assert!(tx.send(box 0).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = Thread::spawn(move|| { + let (tx, rx) = sync_channel::(0); + drop(tx); + rx.recv().unwrap(); + }).join(); + // What is our res? + assert!(res.is_err()); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::>(1); + tx.send(box 10).unwrap(); + assert!(rx.recv().unwrap() == box 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); + } + + #[test] + fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::>(0); + let _t = Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box 10); + }); + + tx.send(box 10).unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::>(0); + let _t = Thread::spawn(move|| { + drop(tx); + }); + let res = Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box 10); + }).join(); + assert!(res.is_err()); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { + drop(rx); + }); + drop(tx); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { + drop(rx); + }); + let _ = Thread::spawn(move || { + tx.send(1).unwrap(); + }).join(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { + let res = Thread::spawn(move|| { + rx.recv().unwrap(); + }).join(); + assert!(res.is_err()); + }); + let _t = Thread::spawn(move|| { + Thread::spawn(move|| { + drop(tx); + }).detach(); + }); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::>(0); + let _t = Thread::spawn(move|| { + tx.send(box 10i).unwrap(); + }); + assert!(rx.recv().unwrap() == box 10i); + } + } + + #[test] + fn stream_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::>(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender>, i: int) { + if i == 10 { return } + + Thread::spawn(move|| { + tx.send(box i).unwrap(); + send(tx, i + 1); + }).detach(); + } + + fn recv(rx: Receiver>, i: int) { + if i == 10 { return } + + Thread::spawn(move|| { + assert!(rx.recv().unwrap() == box i); + recv(rx, i + 1); + }).detach(); + } + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in range(0u, 10000) { tx.send(()).unwrap(); } + for _ in range(0u, 10000) { rx.recv().unwrap(); } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in range(0, total) { + let tx = tx.clone(); + Thread::spawn(move|| { + tx.send(()).unwrap(); + }).detach(); + } + + for _ in range(0, total) { + rx.recv().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::(0); + let (total_tx, total_rx) = sync_channel::(0); + + let _t = Thread::spawn(move|| { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = sync_channel::(0); + let (count_tx, count_rx) = sync_channel(0); + + let _t = Thread::spawn(move|| { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = sync_channel::(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let _t = Thread::spawn(move|| { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other task has gone to sleep + for _ in range(0u, 5000) { Thread::yield_now(); } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child task to exit before we exit + rx2.recv().unwrap(); + } + + #[test] + fn send1() { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { rx.recv().unwrap(); }); + assert_eq!(tx.send(1), Ok(())); + } + + #[test] + fn send2() { + let (tx, rx) = sync_channel::(0); + let _t = Thread::spawn(move|| { drop(rx); }); + assert!(tx.send(1).is_err()); + } + + #[test] + fn send3() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.send(1), Ok(())); + let _t =Thread::spawn(move|| { drop(rx); }); + assert!(tx.send(1).is_err()); + } + + #[test] + fn send4() { + let (tx, rx) = sync_channel::(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let _t = Thread::spawn(move|| { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let _t = Thread::spawn(move|| { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); + } + + #[test] + fn try_send1() { + let (tx, _rx) = sync_channel::(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send2() { + let (tx, _rx) = sync_channel::(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send3() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); + } + + #[test] + fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = Thread::spawn(move|| { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in range(0u, 100) { + repro() + } + } + } diff --cc src/libstd/sync/mpsc/shared.rs index 00000000000,e1606cb4317..e15c38cf9a1 mode 000000,100644..100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@@ -1,0 -1,486 +1,486 @@@ + // Copyright 2014 The Rust Project Developers. See the COPYRIGHT + // file at the top-level directory of this distribution and at + // http://rust-lang.org/COPYRIGHT. + // + // Licensed under the Apache License, Version 2.0 or the MIT license + // , at your + // option. This file may not be copied, modified, or distributed + // except according to those terms. + + /// Shared channels + /// + /// This is the flavor of channels which are not necessarily optimized for any + /// particular use case, but are the most general in how they are used. Shared + /// channels are cloneable allowing for multiple senders. + /// + /// High level implementation details can be found in the comment of the parent + /// module. You'll also note that the implementation of the shared and stream + /// channels are quite similar, and this is no coincidence! + + pub use self::Failure::*; + + use core::prelude::*; + + use core::cmp; + use core::int; + + use sync::{atomic, Mutex, MutexGuard}; + use sync::mpsc::mpsc_queue as mpsc; + use sync::mpsc::blocking::{mod, SignalToken}; + use sync::mpsc::select::StartResult; + use sync::mpsc::select::StartResult::*; + use thread::Thread; + + const DISCONNECTED: int = int::MIN; + const FUDGE: int = 1024; + #[cfg(test)] + const MAX_STEALS: int = 5; + #[cfg(not(test))] + const MAX_STEALS: int = 1 << 20; + + pub struct Packet { + queue: mpsc::Queue, + cnt: atomic::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomic::AtomicUint, // SignalToken for wake up + + // The number of channels which are currently using this packet. + channels: atomic::AtomicInt, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: atomic::AtomicBool, + sender_drain: atomic::AtomicInt, + + // this lock protects various portions of this implementation during + // select() + select_lock: Mutex<()>, + } + + pub enum Failure { + Empty, + Disconnected, + } + + impl Packet { + // Creation of a packet *must* be followed by a call to postinit_lock + // and later by inherit_blocker + pub fn new() -> Packet { + let p = Packet { + queue: mpsc::Queue::new(), + cnt: atomic::AtomicInt::new(0), + steals: 0, + to_wake: atomic::AtomicUint::new(0), + channels: atomic::AtomicInt::new(2), + port_dropped: atomic::AtomicBool::new(false), + sender_drain: atomic::AtomicInt::new(0), + select_lock: Mutex::new(()), + }; + return p; + } + + // This function should be used after newly created Packet + // was wrapped with an Arc + // In other case mutex data will be duplicated while cloning + // and that could cause problems on platforms where it is + // represented by opaque data structure + pub fn postinit_lock(&self) -> MutexGuard<()> { - self.select_lock.lock() ++ self.select_lock.lock().unwrap() + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked task. This is done to prevent spurious wakeups of + // tasks in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&mut self, + token: Option, + guard: MutexGuard<()>) { + token.map(|token| { + assert_eq!(self.cnt.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst); + self.cnt.store(-1, atomic::SeqCst); + + // This store is a little sketchy. What's happening here is that + // we're transferring a blocker from a oneshot or stream channel to + // this shared channel. In doing so, we never spuriously wake them + // up and rather only wake them up at the appropriate time. This + // implementation of shared channels assumes that any blocking + // recv() will undo the increment of steals performed in try_recv() + // once the recv is complete. This thread that we're inheriting, + // however, is not in the middle of recv. Hence, the first time we + // wake them up, they're going to wake up from their old port, move + // on to the upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data immediately + // available, counting it as a steal. This in fact wasn't a steal + // because we appropriately blocked them waiting for data. + // + // To offset this bad increment, we initially set the steal count to + // -1. You'll find some special code in abort_selection() as well to + // ensure that this -1 steal count doesn't escape too far. + self.steals = -1; + }); + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + drop(guard); + } + + pub fn send(&mut self, t: T) -> Result<(), T> { + // See Port::drop for what's going on + if self.port_dropped.load(atomic::SeqCst) { return Err(t) } + + // Note that the multiple sender case is a little trickier + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE { + return Err(t) + } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomic::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, atomic::SeqCst); + + if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + Ok(()) + } + + pub fn recv(&mut self) -> Result { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let (wait_token, signal_token) = blocking::tokens(); + if self.decrement(signal_token) == Installed { + wait_token.wait() + } + + match self.try_recv() { + data @ Ok(..) => { self.steals -= 1; data } + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + // Returns true if blocking should proceed. + fn decrement(&mut self, token: SignalToken) -> StartResult { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + let ptr = unsafe { token.cast_to_uint() }; + self.to_wake.store(ptr, atomic::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Installed } + } + } + + self.to_wake.store(0, atomic::SeqCst); + drop(unsafe { SignalToken::cast_from_uint(ptr) }); + Abort + } + + pub fn try_recv(&mut self) -> Result { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is reported as + // having data available, but our pop() has failed due to the queue + // being in an inconsistent state. This means that there is some + // pusher somewhere which has yet to complete, but we are guaranteed + // that a pop will eventually succeed. In this case, we spin in a + // yield loop because the remote sender should finish their enqueue + // operation "very quickly". + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M pushes have + // succeeded, at least M pops will succeed. The current queues + // guarantee that if there are N active pushes, you can pop N times + // once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + Thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { data = t; break } + mpsc::Empty => panic!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we + // might decrement steals. + Some(data) => { + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + } + n => { + let m = cmp::min(n, self.steals); + self.steals -= m; + self.bump(n - m); + } + } + assert!(self.steals >= 0); + } + self.steals += 1; + Ok(data) + } + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(atomic::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&mut self) { + self.channels.fetch_add(1, atomic::SeqCst); + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&mut self) { + match self.channels.fetch_sub(1, atomic::SeqCst) { + 1 => {} + n if n > 1 => return, + n => panic!("bad number of channels left {}", n), + } + + match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { + -1 => { self.take_to_wake().signal(); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&mut self) { + self.port_dropped.store(true, atomic::SeqCst); + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> SignalToken { + let ptr = self.to_wake.load(atomic::SeqCst); + self.to_wake.store(0, atomic::SeqCst); + assert!(ptr != 0); + unsafe { SignalToken::cast_from_uint(ptr) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Helper function for select, tests whether this port can receive without + // blocking (obviously not an atomic decision). + // + // This is different than the stream version because there's no need to peek + // at the queue, we can just look at the local count. + pub fn can_recv(&mut self) -> bool { + let cnt = self.cnt.load(atomic::SeqCst); + cnt == DISCONNECTED || cnt - self.steals > 0 + } + + // increment the count on the channel (used for selection) + fn bump(&mut self, amt: int) -> int { + match self.cnt.fetch_add(amt, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + DISCONNECTED + } + n => n + } + } + + // Inserts the signal token for selection on this port, returning true if + // blocking should proceed. + // + // The code here is the same as in stream.rs, except that it doesn't need to + // peek at the channel to see if an upgrade is pending. + pub fn start_selection(&mut self, token: SignalToken) -> StartResult { + match self.decrement(token) { + Installed => Installed, + Abort => { + let prev = self.bump(1); + assert!(prev == DISCONNECTED || prev >= 0); + Abort + } + } + } + + // Cancels a previous task waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + { - let _guard = self.select_lock.lock(); ++ let _guard = self.select_lock.lock().unwrap(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(atomic::SeqCst); + if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + }; + let prev = self.bump(steals + 1); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + drop(self.take_to_wake()); + } else { + while self.to_wake.load(atomic::SeqCst) != 0 { + Thread::yield_now(); + } + } + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + assert!(self.steals == 0 || self.steals == -1); + self.steals = steals; + prev >= 0 + } + } + } + + #[unsafe_destructor] + impl Drop for Packet { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.channels.load(atomic::SeqCst), 0); + } + } diff --cc src/libstd/sync/mpsc/sync.rs index 00000000000,28005831d4f..98f1c4c46f9 mode 000000,100644..100644 --- a/src/libstd/sync/mpsc/sync.rs +++ b/src/libstd/sync/mpsc/sync.rs @@@ -1,0 -1,483 +1,483 @@@ + // Copyright 2014 The Rust Project Developers. See the COPYRIGHT + // file at the top-level directory of this distribution and at + // http://rust-lang.org/COPYRIGHT. + // + // Licensed under the Apache License, Version 2.0 or the MIT license + // , at your + // option. This file may not be copied, modified, or distributed + // except according to those terms. + + /// Synchronous channels/ports + /// + /// This channel implementation differs significantly from the asynchronous + /// implementations found next to it (oneshot/stream/share). This is an + /// implementation of a synchronous, bounded buffer channel. + /// + /// Each channel is created with some amount of backing buffer, and sends will + /// *block* until buffer space becomes available. A buffer size of 0 is valid, + /// which means that every successful send is paired with a successful recv. + /// + /// This flavor of channels defines a new `send_opt` method for channels which + /// is the method by which a message is sent but the task does not panic if it + /// cannot be delivered. + /// + /// Another major difference is that send() will *always* return back the data + /// if it couldn't be sent. This is because it is deterministically known when + /// the data is received and when it is not received. + /// + /// Implementation-wise, it can all be summed up with "use a mutex plus some + /// logic". The mutex used here is an OS native mutex, meaning that no user code + /// is run inside of the mutex (to prevent context switching). This + /// implementation shares almost all code for the buffered and unbuffered cases + /// of a synchronous channel. There are a few branches for the unbuffered case, + /// but they're mostly just relevant to blocking senders. + + use core::prelude::*; + + pub use self::Failure::*; + use self::Blocker::*; + + use vec::Vec; + use core::mem; + + use sync::{atomic, Mutex, MutexGuard}; + use sync::mpsc::blocking::{mod, WaitToken, SignalToken}; + use sync::mpsc::select::StartResult::{mod, Installed, Abort}; + + pub struct Packet { + /// Only field outside of the mutex. Just done for kicks, but mainly because + /// the other shared channel already had the code implemented + channels: atomic::AtomicUint, + + lock: Mutex>, + } + + unsafe impl Send for Packet { } + + unsafe impl Sync for Packet { } + + struct State { + disconnected: bool, // Is the channel disconnected yet? + queue: Queue, // queue of senders waiting to send data + blocker: Blocker, // currently blocked task on this channel + buf: Buffer, // storage for buffered messages + cap: uint, // capacity of this channel + + /// A curious flag used to indicate whether a sender failed or succeeded in + /// blocking. This is used to transmit information back to the task that it + /// must dequeue its message from the buffer because it was not received. + /// This is only relevant in the 0-buffer case. This obviously cannot be + /// safely constructed, but it's guaranteed to always have a valid pointer + /// value. + canceled: Option<&'static mut bool>, + } + + unsafe impl Send for State {} + + /// Possible flavors of threads who can be blocked on this channel. + enum Blocker { + BlockedSender(SignalToken), + BlockedReceiver(SignalToken), + NoneBlocked + } + + /// Simple queue for threading tasks together. Nodes are stack-allocated, so + /// this structure is not safe at all + struct Queue { + head: *mut Node, + tail: *mut Node, + } + + struct Node { + token: Option, + next: *mut Node, + } + + unsafe impl Send for Node {} + + /// A simple ring-buffer + struct Buffer { + buf: Vec>, + start: uint, + size: uint, + } + + #[deriving(Show)] + pub enum Failure { + Empty, + Disconnected, + } + + /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` + /// in the meantime. This re-locks the mutex upon returning. + fn wait<'a, 'b, T: Send>(lock: &'a Mutex>, + mut guard: MutexGuard<'b, State>, + f: fn(SignalToken) -> Blocker) + -> MutexGuard<'a, State> + { + let (wait_token, signal_token) = blocking::tokens(); + match mem::replace(&mut guard.blocker, f(signal_token)) { + NoneBlocked => {} + _ => unreachable!(), + } - drop(guard); // unlock - wait_token.wait(); // block - lock.lock() // relock ++ drop(guard); // unlock ++ wait_token.wait(); // block ++ lock.lock().unwrap() // relock + } + + /// Wakes up a thread, dropping the lock at the correct time + fn wakeup(token: SignalToken, guard: MutexGuard>) { + // We need to be careful to wake up the waiting task *outside* of the mutex + // in case it incurs a context switch. + drop(guard); + token.signal(); + } + + impl Packet { + pub fn new(cap: uint) -> Packet { + Packet { + channels: atomic::AtomicUint::new(1), + lock: Mutex::new(State { + disconnected: false, + blocker: NoneBlocked, + cap: cap, + canceled: None, + queue: Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }, + buf: Buffer { - buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None), ++ buf: range(0, cap + if cap == 0 {1} else {0}).map(|_| None).collect(), + start: 0, + size: 0, + }, + }), + } + } + + // wait until a send slot is available, returning locked access to + // the channel state. + fn acquire_send_slot(&self) -> MutexGuard> { + let mut node = Node { token: None, next: 0 as *mut Node }; + loop { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + // are we ready to go? + if guard.disconnected || guard.buf.size() < guard.buf.cap() { + return guard; + } + // no room; actually block + let wait_token = guard.queue.enqueue(&mut node); + drop(guard); + wait_token.wait(); + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + let mut guard = self.acquire_send_slot(); + if guard.disconnected { return Err(t) } + guard.buf.enqueue(t); + + match mem::replace(&mut guard.blocker, NoneBlocked) { + // if our capacity is 0, then we need to wait for a receiver to be + // available to take our data. After waiting, we check again to make + // sure the port didn't go away in the meantime. If it did, we need + // to hand back our data. + NoneBlocked if guard.cap == 0 => { + let mut canceled = false; + assert!(guard.canceled.is_none()); + guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + let mut guard = wait(&self.lock, guard, BlockedSender); + if canceled {Err(guard.buf.dequeue())} else {Ok(())} + } + + // success, we buffered some data + NoneBlocked => Ok(()), + + // success, someone's about to receive our buffered data. + BlockedReceiver(token) => { wakeup(token, guard); Ok(()) } + + BlockedSender(..) => panic!("lolwut"), + } + } + + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { + Err(super::TrySendError::Disconnected(t)) + } else if guard.buf.size() == guard.buf.cap() { + Err(super::TrySendError::Full(t)) + } else if guard.cap == 0 { + // With capacity 0, even though we have buffer space we can't + // transfer the data unless there's a receiver waiting. + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => Err(super::TrySendError::Full(t)), + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => { + guard.buf.enqueue(t); + wakeup(token, guard); + Ok(()) + } + } + } else { + // If the buffer has some space and the capacity isn't 0, then we + // just enqueue the data for later retrieval, ensuring to wake up + // any blocked receiver if there is one. + assert!(guard.buf.size() < guard.buf.cap()); + guard.buf.enqueue(t); + match mem::replace(&mut guard.blocker, NoneBlocked) { + BlockedReceiver(token) => wakeup(token, guard), + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + } + Ok(()) + } + } + + // Receives a message from this channel + // + // When reading this, remember that there can only ever be one receiver at + // time. + pub fn recv(&self) -> Result { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + + // Wait for the buffer to have something in it. No need for a while loop + // because we're the only receiver. + let mut waited = false; + if !guard.disconnected && guard.buf.size() == 0 { + guard = wait(&self.lock, guard, BlockedReceiver); + waited = true; + } + if guard.disconnected && guard.buf.size() == 0 { return Err(()) } + + // Pick up the data, wake up our neighbors, and carry on + assert!(guard.buf.size() > 0); + let ret = guard.buf.dequeue(); + self.wakeup_senders(waited, guard); + return Ok(ret); + } + + pub fn try_recv(&self) -> Result { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + + // Easy cases first + if guard.disconnected { return Err(Disconnected) } + if guard.buf.size() == 0 { return Err(Empty) } + + // Be sure to wake up neighbors + let ret = Ok(guard.buf.dequeue()); + self.wakeup_senders(false, guard); + + return ret; + } + + // Wake up pending senders after some data has been received + // + // * `waited` - flag if the receiver blocked to receive some data, or if it + // just picked up some data on the way out + // * `guard` - the lock guard that is held over this channel's lock + fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard>) { + let pending_sender1: Option = guard.queue.dequeue(); + + // If this is a no-buffer channel (cap == 0), then if we didn't wait we + // need to ACK the sender. If we waited, then the sender waking us up + // was already the ACK. + let pending_sender2 = if guard.cap == 0 && !waited { + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedReceiver(..) => unreachable!(), + BlockedSender(token) => { + guard.canceled.take(); + Some(token) + } + } + } else { + None + }; + mem::drop(guard); + + // only outside of the lock do we wake up the pending tasks + pending_sender1.map(|t| t.signal()); + pending_sender2.map(|t| t.signal()); + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + self.channels.fetch_add(1, atomic::SeqCst); + } + + pub fn drop_chan(&self) { + // Only flag the channel as disconnected if we're the last channel + match self.channels.fetch_sub(1, atomic::SeqCst) { + 1 => {} + _ => return + } + + // Not much to do other than wake up a receiver if one's there - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { return } + guard.disconnected = true; + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => wakeup(token, guard), + } + } + + pub fn drop_port(&self) { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + + if guard.disconnected { return } + guard.disconnected = true; + + // If the capacity is 0, then the sender may want its data back after + // we're disconnected. Otherwise it's now our responsibility to destroy + // the buffered data. As with many other portions of this code, this + // needs to be careful to destroy the data *outside* of the lock to + // prevent deadlock. + let _data = if guard.cap != 0 { + mem::replace(&mut guard.buf.buf, Vec::new()) + } else { + Vec::new() + }; + let mut queue = mem::replace(&mut guard.queue, Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }); + + let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedSender(token) => { + *guard.canceled.take().unwrap() = true; + Some(token) + } + BlockedReceiver(..) => unreachable!(), + }; + mem::drop(guard); + + loop { + match queue.dequeue() { + Some(token) => { token.signal(); } + None => break, + } + } + waiter.map(|t| t.signal()); + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&self) -> bool { - let guard = self.lock.lock(); ++ let guard = self.lock.lock().unwrap(); + guard.disconnected || guard.buf.size() > 0 + } + + // Attempts to start selection on this port. This can either succeed or fail + // because there is data waiting. + pub fn start_selection(&self, token: SignalToken) -> StartResult { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + if guard.disconnected || guard.buf.size() > 0 { + Abort + } else { + match mem::replace(&mut guard.blocker, BlockedReceiver(token)) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(..) => unreachable!(), + } + Installed + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> bool { - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => true, + BlockedSender(token) => { + guard.blocker = BlockedSender(token); + true + } + BlockedReceiver(token) => { drop(token); false } + } + } + } + + #[unsafe_destructor] + impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.channels.load(atomic::SeqCst), 0); - let mut guard = self.lock.lock(); ++ let mut guard = self.lock.lock().unwrap(); + assert!(guard.queue.dequeue().is_none()); + assert!(guard.canceled.is_none()); + } + } + + + //////////////////////////////////////////////////////////////////////////////// + // Buffer, a simple ring buffer backed by Vec + //////////////////////////////////////////////////////////////////////////////// + + impl Buffer { + fn enqueue(&mut self, t: T) { + let pos = (self.start + self.size) % self.buf.len(); + self.size += 1; + let prev = mem::replace(&mut self.buf[pos], Some(t)); + assert!(prev.is_none()); + } + + fn dequeue(&mut self) -> T { + let start = self.start; + self.size -= 1; + self.start = (self.start + 1) % self.buf.len(); + self.buf[start].take().unwrap() + } + + fn size(&self) -> uint { self.size } + fn cap(&self) -> uint { self.buf.len() } + } + + //////////////////////////////////////////////////////////////////////////////// + // Queue, a simple queue to enqueue tasks with (stack-allocated nodes) + //////////////////////////////////////////////////////////////////////////////// + + impl Queue { + fn enqueue(&mut self, node: &mut Node) -> WaitToken { + let (wait_token, signal_token) = blocking::tokens(); + node.token = Some(signal_token); + node.next = 0 as *mut Node; + + if self.tail.is_null() { + self.head = node as *mut Node; + self.tail = node as *mut Node; + } else { + unsafe { + (*self.tail).next = node as *mut Node; + self.tail = node as *mut Node; + } + } + + wait_token + } + + fn dequeue(&mut self) -> Option { + if self.head.is_null() { + return None + } + let node = self.head; + self.head = unsafe { (*node).next }; + if self.head.is_null() { + self.tail = 0 as *mut Node; + } + unsafe { + (*node).next = 0 as *mut Node; + Some((*node).token.take().unwrap()) + } + } + } diff --cc src/libstd/sync/mutex.rs index 98425f26c1a,1562031499f..3f155b02065 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@@ -66,50 -55,17 +66,50 @@@ use sys_common::mutex as sys /// // The shared static can only be accessed once the lock is held. /// // Our non-atomic increment is safe because we're the only thread /// // which can access the shared state when the lock is held. -/// let mut data = data.lock(); +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // tasks to ever fail while holding the lock. +/// let mut data = data.lock().unwrap(); /// *data += 1; /// if *data == N { - /// tx.send(()); + /// tx.send(()).unwrap(); /// } /// // the lock is unlocked here when `data` goes out of scope. /// }).detach(); /// } /// - /// rx.recv(); + /// rx.recv().unwrap(); /// ``` +/// +/// To recover from a poisoned mutex: +/// +/// ```rust +/// use std::sync::{Arc, Mutex}; +/// use std::thread::Thread; +/// +/// let lock = Arc::new(Mutex::new(0u)); +/// let lock2 = lock.clone(); +/// +/// let _ = Thread::spawn(move || -> () { +/// // This thread will acquire the mutex first, unwrapping the result of +/// // `lock` because the lock has not been poisoned. +/// let _lock = lock2.lock().unwrap(); +/// +/// // This panic while holding the lock (`_guard` is in scope) will poison +/// // the mutex. +/// panic!(); +/// }).join(); +/// +/// // The lock is poisoned by this point, but the returned result can be +/// // pattern matched on to return the underlying guard on both branches. +/// let mut guard = match lock.lock() { +/// Ok(guard) => guard, +/// Err(poisoned) => poisoned.into_guard(), +/// }; +/// +/// *guard += 1; +/// ``` +#[stable] pub struct Mutex { // Note that this static mutex is in a *box*, not inlined into the struct // itself. Once a native mutex has been used once, its address can never @@@ -398,19 -357,19 +398,19 @@@ mod test let (tx, rx) = channel(); let _t = Thread::spawn(move|| { // wait until parent gets in - rx.recv(); + rx.recv().unwrap(); let &(ref lock, ref cvar) = &*packet2.0; - let mut lock = lock.lock(); + let mut lock = lock.lock().unwrap(); *lock = true; cvar.notify_one(); }); let &(ref lock, ref cvar) = &*packet.0; - let lock = lock.lock(); + let mut lock = lock.lock().unwrap(); - tx.send(()); + tx.send(()).unwrap(); assert!(!*lock); while !*lock { - cvar.wait(&lock); + lock = cvar.wait(lock).unwrap(); } } @@@ -421,25 -381,19 +421,25 @@@ let (tx, rx) = channel(); let _t = Thread::spawn(move || -> () { - rx.recv(); + rx.recv().unwrap(); let &(ref lock, ref cvar) = &*packet2.0; - let _g = lock.lock(); + let _g = lock.lock().unwrap(); cvar.notify_one(); // Parent should fail when it wakes up. panic!(); }); let &(ref lock, ref cvar) = &*packet.0; - let lock = lock.lock(); + let mut lock = lock.lock().unwrap(); - tx.send(()); + tx.send(()).unwrap(); while *lock == 1 { - cvar.wait(&lock); + match cvar.wait(lock) { + Ok(l) => { + lock = l; + assert_eq!(*lock, 1); + } + Err(..) => break, + } } } @@@ -462,12 -418,12 +462,12 @@@ let arc2 = Arc::new(Mutex::new(arc)); let (tx, rx) = channel(); let _t = Thread::spawn(move|| { - let lock = arc2.lock(); - let lock2 = lock.lock(); + let lock = arc2.lock().unwrap(); + let lock2 = lock.lock().unwrap(); assert_eq!(*lock2, 1); - tx.send(()); + tx.send(()).unwrap(); }); - rx.recv(); + rx.recv().unwrap(); } #[test] diff --cc src/libstd/sync/rwlock.rs index efdd894a806,3c4283c72e2..b23fff31c0a --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@@ -486,8 -494,8 +486,8 @@@ mod tests } // Wait for writer to finish - rx.recv(); + rx.recv().unwrap(); - let lock = arc.read(); + let lock = arc.read().unwrap(); assert_eq!(*lock, 10); } diff --cc src/libstd/sync/task_pool.rs index 63c10c18046,b0325998358..c34fa66d12a --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@@ -114,8 -114,8 +114,8 @@@ fn spawn_in_pool(jobs: Arc { match dead.iter().position(|&(i, _)| id == i) { Some(i) => { - let (_, i) = dead.remove(i).unwrap(); + let (_, i) = dead.remove(i); - ack.send(i); + ack.send(i).unwrap(); continue } None => {} } let i = active.iter().position(|i| i.id == id); let i = i.expect("no timer found"); - let t = active.remove(i).unwrap(); + let t = active.remove(i); - ack.send(t); + ack.send(t).unwrap(); } Err(..) => break } diff --cc src/libstd/sys/windows/c.rs index d28d0fe26b9,d1177776dd8..1ee57434fb9 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@@ -131,10 -131,9 +131,9 @@@ extern "system" pub mod compat { use intrinsics::{atomic_store_relaxed, transmute}; - use iter::IteratorExt; use libc::types::os::arch::extra::{LPCWSTR, HMODULE, LPCSTR, LPVOID}; use prelude::v1::*; - + use c_str::ToCStr; extern "system" { fn GetModuleHandleW(lpModuleName: LPCWSTR) -> HMODULE;