~~~
# use std::task::spawn;
-let (port, chan) = SharedChan::new();
+let (port, chan) = Chan::new();
for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
remaining.reverse();
let mut pending = 0;
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
while pending > 0 || !remaining.is_empty() {
while pending < concurrency && !remaining.is_empty() {
pub fn run_test(force_ignore: bool,
test: TestDescAndFn,
- monitor_ch: SharedChan<MonitorMsg>) {
+ monitor_ch: Chan<MonitorMsg>) {
let TestDescAndFn {desc, testfn} = test;
}
fn run_test_inner(desc: TestDesc,
- monitor_ch: SharedChan<MonitorMsg>,
+ monitor_ch: Chan<MonitorMsg>,
testfn: proc()) {
spawn(proc() {
let mut task = task::task();
},
testfn: DynTestFn(proc() f()),
};
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res != TrOk);
},
testfn: DynTestFn(proc() f()),
};
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrIgnored);
},
testfn: DynTestFn(proc() f()),
};
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrOk);
},
testfn: DynTestFn(proc() f()),
};
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrFailed);
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
- done: SharedChan<()>,
+ done: Chan<()>,
}
impl SchedPool {
impl TaskState {
fn new() -> (Port<()>, TaskState) {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
(p, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
done: c,
//! that you would find on the respective platform.
use std::c_str::CString;
-use std::comm::SharedChan;
use std::io;
use std::io::IoError;
use std::io::net::ip::SocketAddr;
})
}
}
- fn signal(&mut self, _signal: Signum, _channel: SharedChan<Signum>)
+ fn signal(&mut self, _signal: Signum, _channel: Chan<Signum>)
-> IoResult<~RtioSignal> {
Err(unimpl())
}
// only torn down after everything else has exited. This means that these
// variables are read-only during use (after initialization) and both of which
// are safe to use concurrently.
-static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
+static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
pub fn boot(helper: fn(imp::signal, Port<Req>)) {
unsafe {
LOCK.lock();
if !INITIALIZED {
- let (msgp, msgc) = SharedChan::new();
+ let (msgp, msgc) = Chan::new();
+ // promote this to a shared channel
+ drop(msgc.clone());
HELPER_CHAN = cast::transmute(~msgc);
let (receive, send) = imp::new();
HELPER_SIGNAL = send;
// Clean up after ther helper thread
unsafe {
imp::close(HELPER_SIGNAL);
- let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
- HELPER_CHAN = 0 as *mut SharedChan<Req>;
+ let _chan: ~Chan<Req> = cast::transmute(HELPER_CHAN);
+ HELPER_CHAN = 0 as *mut Chan<Req>;
HELPER_SIGNAL = 0 as imp::signal;
}
}
use std::libc::c_int;
use std::io::signal::Signum;
-use std::comm::SharedChan;
use std::rt::rtio::RtioSignal;
use homing::{HomingIO, HomeHandle};
handle: *uvll::uv_signal_t,
home: HomeHandle,
- channel: SharedChan<Signum>,
+ channel: Chan<Signum>,
signal: Signum,
}
impl SignalWatcher {
pub fn new(io: &mut UvIoFactory, signum: Signum,
- channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
+ channel: Chan<Signum>) -> Result<~SignalWatcher, UvError> {
let s = ~SignalWatcher {
handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
home: io.make_handle(),
#[test]
fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375, relates to timers as well.
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
chan);
use std::c_str::CString;
use std::cast;
-use std::comm::SharedChan;
use std::io::IoError;
use std::io::net::ip::SocketAddr;
use std::io::process::ProcessConfig;
}
}
- fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
+ fn signal(&mut self, signum: Signum, channel: Chan<Signum>)
-> Result<~rtio::RtioSignal, IoError> {
match SignalWatcher::new(self, signum, channel) {
Ok(s) => Ok(s as ~rtio::RtioSignal),
//! communication between concurrent tasks. The primitives defined in this
//! module are the building blocks for synchronization in rust.
//!
-//! This module currently provides three main types:
+//! This module currently provides two types:
//!
//! * `Chan`
//! * `Port`
-//! * `SharedChan`
//!
-//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A
-//! `SharedChan` is clone-able such that many tasks can send simultaneously to
-//! one receiving port. These communication primitives are *task blocking*, not
-//! *thread blocking*. This means that if one task is blocked on a channel,
-//! other tasks can continue to make progress.
+//! `Chan` is used to send data to a `Port`. A `Chan` is clone-able such that
+//! many tasks can send simultaneously to one receiving port. These
+//! communication primitives are *task blocking*, not *thread blocking*. This
+//! means that if one task is blocked on a channel, other tasks can continue to
+//! make progress.
//!
//! Rust channels can be used as if they have an infinite internal buffer. What
//! this means is that the `send` operation will never block. `Port`s, on the
//! next operation `fail!`. The purpose of this is to allow propagation of
//! failure among tasks that are linked to one another via channels.
//!
-//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform
-//! their respective operations without failing, however.
+//! There are methods on both of `Chan` and `Port` to perform their respective
+//! operations without failing, however.
//!
//! ## Outside the Runtime
//!
//! assert_eq!(port.recv(), 10);
//!
//! // Create a shared channel which can be sent along from many tasks
-//! let (port, chan) = SharedChan::new();
+//! let (port, chan) = Chan::new();
//! for i in range(0, 10) {
//! let chan = chan.clone();
//! spawn(proc() {
//
// ## Flavors of channels
//
-// Rust channels come in two flavors: streams and shared channels. A stream has
-// one sender and one receiver while a shared channel could have multiple
-// senders. This choice heavily influences the design of the protocol set
-// forth for both senders/receivers.
+// From the perspective of a consumer of this library, there is only one flavor
+// of channel. This channel can be used as a stream and cloned to allow multiple
+// senders. Under the hood, however, there are actually three flavors of
+// channels in play.
+//
+// * Oneshots - these channels are highly optimized for the one-send use case.
+// They contain as few atomics as possible and involve one and
+// exactly one allocation.
+// * Streams - these channels are optimized for the non-shared use case. They
+// use a different concurrent queue which is more tailored for this
+// use case. The initial allocation of this flavor of channel is not
+// optimized.
+// * Shared - this is the most general form of channel that this module offers,
+// a channel with multiple senders. This type is as optimized as it
+// can be, but the previous two types mentioned are much faster for
+// their use-cases.
//
// ## Concurrent queues
//
// here's the code for you to find some more!
use cast;
+use cell::Cell;
use clone::Clone;
-use container::Container;
-use int;
use iter::Iterator;
-use kinds::marker;
use kinds::Send;
+use kinds::marker;
use ops::Drop;
-use option::{Option, Some, None};
-use result::{Ok, Err};
+use option::{Some, None, Option};
+use result::{Ok, Err, Result};
use rt::local::Local;
use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
use sync::arc::UnsafeArc;
-use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
-use vec::OwnedVector;
-
-use spsc = sync::spsc_queue;
-use mpsc = sync::mpsc_queue;
+use util;
-pub use self::select::{Select, Handle};
+pub use comm::select::{Select, Handle};
macro_rules! test (
{ fn $name:ident() $b:block $($a:attr)*} => (
)
mod select;
+mod oneshot;
+mod stream;
+mod shared;
-///////////////////////////////////////////////////////////////////////////////
-// Public structs
-///////////////////////////////////////////////////////////////////////////////
+// Use a power of 2 to allow LLVM to optimize to something that's not a
+// division, this is hit pretty regularly.
+static RESCHED_FREQ: int = 256;
/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
pub struct Port<T> {
- priv inner: PortInner<T>,
+ priv inner: Flavor<T>,
+ priv receives: Cell<uint>,
// can't share in an arc
priv marker: marker::NoFreeze,
}
/// The sending-half of Rust's channel type. This half can only be owned by one
/// task
pub struct Chan<T> {
- priv inner: UnsafeArc<SingleInner<T>>,
+ priv inner: Flavor<T>,
+ priv sends: Cell<uint>,
// can't share in an arc
priv marker: marker::NoFreeze,
}
-/// The sending-half of Rust's channel type. This half can be shared among many
-/// tasks by creating copies of itself through the `clone` method.
-pub struct SharedChan<T> {
- // can't share in an arc -- technically this implementation is
- // shareable, but it shouldn't be required to be shareable in an
- // arc
- priv marker: marker::NoFreeze,
-}
-
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone)]
Data(T),
}
-///////////////////////////////////////////////////////////////////////////////
-// Internal struct definitions
-///////////////////////////////////////////////////////////////////////////////
-
-enum PortInner<T> {
- Single(UnsafeArc<SingleInner<T>>),
- Shared(UnsafeArc<SharedInner<T>>),
-}
-
-struct SingleInner<T> {
- queue: spsc::Queue<T>,
- packet: Packet,
-}
-
-struct SharedInner<T> {
- queue: mpsc::Queue<T>,
- packet: Packet,
-}
-
-struct Packet {
- cnt: AtomicInt, // How many items are on this channel
- steals: int, // How many times has a port received without blocking?
- to_wake: Option<BlockedTask>, // Task to wake up
-
- // The number of channels which are currently using this packet. This is
- // used to reference count shared channels.
- channels: AtomicInt,
-
- selecting: AtomicBool,
- selection_id: uint,
- select_next: *mut Packet,
- select_prev: *mut Packet,
- recv_cnt: int,
-}
-
-///////////////////////////////////////////////////////////////////////////////
-// All implementations -- the fun part
-///////////////////////////////////////////////////////////////////////////////
-
-static DISCONNECTED: int = int::MIN;
-static RESCHED_FREQ: int = 200;
-
-impl<T: Send> PortInner<T> {
- fn packet<'a>(&'a mut self) -> &'a mut Packet {
- match *self {
- Single(ref arc) => unsafe { &mut (*arc.get()).packet },
- Shared(ref arc) => unsafe { &mut (*arc.get()).packet },
- }
- }
-}
-
-impl Packet {
- fn new() -> Packet {
- Packet {
- cnt: AtomicInt::new(0),
- steals: 0,
- to_wake: None,
- channels: AtomicInt::new(1),
-
- selecting: AtomicBool::new(false),
- selection_id: 0,
- select_next: 0 as *mut Packet,
- select_prev: 0 as *mut Packet,
- recv_cnt: 0,
- }
- }
-
- // Increments the channel size count, preserving the disconnected state if
- // the other end has disconnected.
- fn increment(&mut self) -> int {
- match self.cnt.fetch_add(1, SeqCst) {
- DISCONNECTED => {
- // see the comment in 'try' for a shared channel for why this
- // window of "not disconnected" is "ok".
- self.cnt.store(DISCONNECTED, SeqCst);
- DISCONNECTED
- }
- n => n
- }
- }
-
- // Decrements the reference count of the channel, returning whether the task
- // should block or not. This assumes that the task is ready to sleep in that
- // the `to_wake` field has already been filled in. Once this decrement
- // happens, the task could wake up on the other end.
- //
- // From an implementation perspective, this is also when our "steal count"
- // gets merged into the "channel count". Our steal count is reset to 0 after
- // this function completes.
- //
- // As with increment(), this preserves the disconnected state if the
- // channel is disconnected.
- fn decrement(&mut self) -> bool {
- let steals = self.steals;
- self.steals = 0;
- match self.cnt.fetch_sub(1 + steals, SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, SeqCst);
- false
- }
- n => {
- assert!(n >= 0);
- n - steals <= 0
- }
- }
- }
-
- // Helper function for select, tests whether this port can receive without
- // blocking (obviously not an atomic decision).
- fn can_recv(&self) -> bool {
- let cnt = self.cnt.load(SeqCst);
- cnt == DISCONNECTED || cnt - self.steals > 0
- }
-
- // This function must have had at least an acquire fence before it to be
- // properly called.
- fn wakeup(&mut self) {
- match self.to_wake.take_unwrap().wake() {
- Some(task) => task.reawaken(),
- None => {}
- }
- self.selecting.store(false, Relaxed);
- }
-
- // Aborts the selection process for a port. This happens as part of select()
- // once the task has reawoken. This will place the channel back into a
- // consistent state which is ready to be received from again.
- //
- // The method of doing this is a little subtle. These channels have the
- // invariant that if -1 is seen, then to_wake is always Some(..) and should
- // be woken up. This aborting process at least needs to add 1 to the
- // reference count, but that is not guaranteed to make the count positive
- // (our steal count subtraction could mean that after the addition the
- // channel count is still negative).
- //
- // In order to get around this, we force our channel count to go above 0 by
- // adding a large number >= 1 to it. This way no sender will see -1 unless
- // we are indeed blocking. This "extra lump" we took out of the channel
- // becomes our steal count (which will get re-factored into the count on the
- // next blocking recv)
- //
- // The return value of this method is whether there is data on this channel
- // to receive or not.
- fn abort_selection(&mut self, take_to_wake: bool) -> bool {
- // make sure steals + 1 makes the count go non-negative
- let steals = {
- let cnt = self.cnt.load(SeqCst);
- if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
- };
- let prev = self.cnt.fetch_add(steals + 1, SeqCst);
-
- // If we were previously disconnected, then we know for sure that there
- // is no task in to_wake, so just keep going
- if prev == DISCONNECTED {
- assert!(self.to_wake.is_none());
- self.cnt.store(DISCONNECTED, SeqCst);
- self.selecting.store(false, SeqCst);
- true // there is data, that data is that we're disconnected
- } else {
- let cur = prev + steals + 1;
- assert!(cur >= 0);
-
- // If the previous count was negative, then we just made things go
- // positive, hence we passed the -1 boundary and we're responsible
- // for removing the to_wake() field and trashing it.
- if prev < 0 {
- if take_to_wake {
- self.to_wake.take_unwrap().trash();
- } else {
- assert!(self.to_wake.is_none());
- }
-
- // We woke ourselves up, we're responsible for cancelling
- assert!(self.selecting.load(Relaxed));
- self.selecting.store(false, Relaxed);
- }
- assert_eq!(self.steals, 0);
- self.steals = steals;
-
- // if we were previously positive, then there's surely data to
- // receive
- prev >= 0
- }
- }
-
- // Decrement the reference count on a channel. This is called whenever a
- // Chan is dropped and may end up waking up a receiver. It's the receiver's
- // responsibility on the other end to figure out that we've disconnected.
- unsafe fn drop_chan(&mut self) {
- match self.channels.fetch_sub(1, SeqCst) {
- 1 => {
- match self.cnt.swap(DISCONNECTED, SeqCst) {
- -1 => { self.wakeup(); }
- DISCONNECTED => {}
- n => { assert!(n >= 0); }
- }
- }
- n if n > 1 => {},
- n => fail!("bad number of channels left {}", n),
- }
- }
-}
-
-impl Drop for Packet {
- fn drop(&mut self) {
- unsafe {
- // Note that this load is not only an assert for correctness about
- // disconnection, but also a proper fence before the read of
- // `to_wake`, so this assert cannot be removed with also removing
- // the `to_wake` assert.
- assert_eq!(self.cnt.load(SeqCst), DISCONNECTED);
- assert!(self.to_wake.is_none());
- assert_eq!(self.channels.load(SeqCst), 0);
- }
- }
+enum Flavor<T> {
+ Oneshot(UnsafeArc<oneshot::Packet<T>>),
+ Stream(UnsafeArc<stream::Packet<T>>),
+ Shared(UnsafeArc<shared::Packet<T>>),
}
impl<T: Send> Chan<T> {
/// will become available on the port as well. See the documentation of
/// `Port` and `Chan` to see what's possible with them.
pub fn new() -> (Port<T>, Chan<T>) {
- // arbitrary 128 size cache -- this is just a max cache size, not a
- // maximum buffer size
- let (a, b) = UnsafeArc::new2(SingleInner {
- queue: spsc::Queue::new(128),
- packet: Packet::new(),
- });
- (Port { inner: Single(a), marker: marker::NoFreeze },
- Chan { inner: b, marker: marker::NoFreeze })
+ let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
+ (Port::my_new(Oneshot(a)), Chan::my_new(Oneshot(b)))
+ }
+
+ fn my_new(inner: Flavor<T>) -> Chan<T> {
+ Chan { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze }
}
/// Sends a value along this channel to be received by the corresponding
/// Like `send`, this method will never block. If the failure of send cannot
/// be tolerated, then this method should be used instead.
pub fn try_send(&self, t: T) -> bool {
- unsafe {
- let inner = self.inner.get();
- (*inner).queue.push(t);
- match (*inner).packet.increment() {
- // As described above, -1 == wakeup
- -1 => { (*inner).packet.wakeup(); true }
- // Also as above, SPSC queues must be >= -2
- -2 => true,
- // We succeeded if we sent data
- DISCONNECTED => (*inner).queue.is_empty(),
- // In order to prevent starvation of other tasks in situations
- // where a task sends repeatedly without ever receiving, we
- // occassionally yield instead of doing a send immediately.
- // Only doing this if we're doing a rescheduling send, otherwise
- // the caller is expecting not to context switch.
- //
- // Note that we don't unconditionally attempt to yield because
- // the TLS overhead can be a bit much.
- n => {
- assert!(n >= 0);
- if n > 0 && n % RESCHED_FREQ == 0 {
- let task: ~Task = Local::take();
- task.maybe_yield();
+ // In order to prevent starvation of other tasks in situations where
+ // a task sends repeatedly without ever receiving, we occassionally
+ // yield instead of doing a send immediately. Only doing this if
+ // we're doing a rescheduling send, otherwise the caller is
+ // expecting not to context switch.
+ //
+ // Note that we don't unconditionally attempt to yield because the
+ // TLS overhead can be a bit much.
+ let cnt = self.sends.get() + 1;
+ self.sends.set(cnt);
+ if cnt % (RESCHED_FREQ as uint) == 0 {
+ let task: ~Task = Local::take();
+ task.maybe_yield();
+ }
+
+ let (new_inner, ret) = match self.inner {
+ Oneshot(ref p) => {
+ let p = p.get();
+ unsafe {
+ if !(*p).sent() {
+ return (*p).send(t);
+ } else {
+ let (a, b) = UnsafeArc::new2(stream::Packet::new());
+ match (*p).upgrade(Port::my_new(Stream(b))) {
+ oneshot::UpSuccess => {
+ (*a.get()).send(t);
+ (a, true)
+ }
+ oneshot::UpDisconnected => (a, false),
+ oneshot::UpWoke(task) => {
+ (*a.get()).send(t);
+ task.wake().map(|t| t.reawaken());
+ (a, true)
+ }
+ }
}
- true
}
}
- }
- }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Chan<T> {
- fn drop(&mut self) {
- unsafe { (*self.inner.get()).packet.drop_chan(); }
- }
-}
-
-impl<T: Send> SharedChan<T> {
- /// Creates a new shared channel and port pair. The purpose of a shared
- /// channel is to be cloneable such that many tasks can send data at the
- /// same time. All data sent on any channel will become available on the
- /// provided port as well.
- pub fn new() -> (Port<T>, SharedChan<T>) {
- let (a, b) = UnsafeArc::new2(SharedInner {
- queue: mpsc::Queue::new(),
- packet: Packet::new(),
- });
- (Port { inner: Shared(a), marker: marker::NoFreeze },
- SharedChan { inner: b, marker: marker::NoFreeze })
- (Port { inner: Shared(a) }, SharedChan { inner: b })
- }
+ Stream(ref p) => return unsafe { (*p.get()).send(t) },
+ Shared(ref p) => return unsafe { (*p.get()).send(t) },
+ };
- /// Equivalent method to `send` on the `Chan` type (using the same
- /// semantics)
- pub fn send(&self, t: T) {
- if !self.try_send(t) {
- fail!("sending on a closed channel");
+ unsafe {
+ let mut tmp = Chan::my_new(Stream(new_inner));
+ util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
+ return ret;
}
+}
- /// Equivalent method to `try_send` on the `Chan` type (using the same
- /// semantics)
- pub fn try_send(&self, t: T) -> bool {
- unsafe {
- // Note that the multiple sender case is a little tricker
- // semantically than the single sender case. The logic for
- // incrementing is "add and if disconnected store disconnected".
- // This could end up leading some senders to believe that there
- // wasn't a disconnect if in fact there was a disconnect. This means
- // that while one thread is attempting to re-store the disconnected
- // states, other threads could walk through merrily incrementing
- // this very-negative disconnected count. To prevent senders from
- // spuriously attempting to send when the channels is actually
- // disconnected, the count has a ranged check here.
- //
- // This is also done for another reason. Remember that the return
- // value of this function is:
- //
- // `true` == the data *may* be received, this essentially has no
- // meaning
- // `false` == the data will *never* be received, this has a lot of
- // meaning
- //
- // In the SPSC case, we have a check of 'queue.is_empty()' to see
- // whether the data was actually received, but this same condition
- // means nothing in a multi-producer context. As a result, this
- // preflight check serves as the definitive "this will never be
- // received". Once we get beyond this check, we have permanently
- // entered the realm of "this may be received"
- let inner = self.inner.get();
- if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 {
- return false
+impl<T: Send> Clone for Chan<T> {
+ fn clone(&self) -> Chan<T> {
+ let (packet, sleeper) = match self.inner {
+ Oneshot(ref p) => {
+ let (a, b) = UnsafeArc::new2(shared::Packet::new());
+ match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } {
+ oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
+ oneshot::UpWoke(task) => (b, Some(task))
+ }
}
-
- (*inner).queue.push(t);
- match (*inner).packet.increment() {
- DISCONNECTED => {} // oh well, we tried
- -1 => { (*inner).packet.wakeup(); }
- n => {
- if n > 0 && n % RESCHED_FREQ == 0 {
- let task: ~Task = Local::take();
- task.maybe_yield();
- }
+ Stream(ref p) => {
+ let (a, b) = UnsafeArc::new2(shared::Packet::new());
+ match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } {
+ stream::UpSuccess | stream::UpDisconnected => (b, None),
+ stream::UpWoke(task) => (b, Some(task)),
}
}
- true
- }
- }
-}
+ Shared(ref p) => {
+ unsafe { (*p.get()).clone_chan(); }
+ return Chan::my_new(Shared(p.clone()));
+ }
+ };
+
+ unsafe {
+ (*packet.get()).inherit_blocker(sleeper);
-impl<T: Send> Clone for SharedChan<T> {
- fn clone(&self) -> SharedChan<T> {
- unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); }
- SharedChan { inner: self.inner.clone(), marker: marker::NoFreeze }
+ let mut tmp = Chan::my_new(Shared(packet.clone()));
+ util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
+ }
+ Chan::my_new(Shared(packet))
}
}
#[unsafe_destructor]
-impl<T: Send> Drop for SharedChan<T> {
+impl<T: Send> Drop for Chan<T> {
fn drop(&mut self) {
- unsafe { (*self.inner.get()).packet.drop_chan(); }
+ match self.inner {
+ Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+ Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+ Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+ }
}
}
impl<T: Send> Port<T> {
+ fn my_new(inner: Flavor<T>) -> Port<T> {
+ Port { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze }
+ }
+
/// Blocks waiting for a value on this port
///
/// This function will block if necessary to wait for a corresponding send
///
/// This function cannot fail.
pub fn try_recv(&self) -> TryRecvResult<T> {
- self.try_recv_inc(true)
- }
-
- fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
- // This is a "best effort" situation, so if a queue is inconsistent just
- // don't worry about it.
- let this = unsafe { cast::transmute_mut(self) };
-
- // See the comment about yielding on sends, but the same applies here.
- // If a thread is spinning in try_recv we should try
- {
- let packet = this.inner.packet();
- packet.recv_cnt += 1;
- if packet.recv_cnt % RESCHED_FREQ == 0 {
- let task: ~Task = Local::take();
- task.maybe_yield();
- }
+ // If a thread is spinning in try_recv, we should take the opportunity
+ // to reschedule things occasionally. See notes above in scheduling on
+ // sends for why this doesn't always hit TLS.
+ let cnt = self.receives.get() + 1;
+ self.receives.set(cnt);
+ if cnt % (RESCHED_FREQ as uint) == 0 {
+ let task: ~Task = Local::take();
+ task.maybe_yield();
}
- let ret = match this.inner {
- Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() },
- Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } {
- mpsc::Data(t) => Some(t),
- mpsc::Empty => None,
-
- // This is a bit of an interesting case. The channel is
- // reported as having data available, but our pop() has
- // failed due to the queue being in an inconsistent state.
- // This means that there is some pusher somewhere which has
- // yet to complete, but we are guaranteed that a pop will
- // eventually succeed. In this case, we spin in a yield loop
- // because the remote sender should finish their enqueue
- // operation "very quickly".
- //
- // Note that this yield loop does *not* attempt to do a green
- // yield (regardless of the context), but *always* performs an
- // OS-thread yield. The reasoning for this is that the pusher in
- // question which is causing the inconsistent state is
- // guaranteed to *not* be a blocked task (green tasks can't get
- // pre-empted), so it must be on a different OS thread. Also,
- // `try_recv` is normally a "guaranteed no rescheduling" context
- // in a green-thread situation. By yielding control of the
- // thread, we will hopefully allow time for the remote task on
- // the other OS thread to make progress.
- //
- // Avoiding this yield loop would require a different queue
- // abstraction which provides the guarantee that after M
- // pushes have succeeded, at least M pops will succeed. The
- // current queues guarantee that if there are N active
- // pushes, you can pop N times once all N have finished.
- mpsc::Inconsistent => {
- let data;
- loop {
- Thread::yield_now();
- match unsafe { (*arc.get()).queue.pop() } {
- mpsc::Data(t) => { data = t; break }
- mpsc::Empty => fail!("inconsistent => empty"),
- mpsc::Inconsistent => {}
- }
+ loop {
+ let mut new_port = match self.inner {
+ Oneshot(ref p) => {
+ match unsafe { (*p.get()).try_recv() } {
+ Ok(t) => return Data(t),
+ Err(oneshot::Empty) => return Empty,
+ Err(oneshot::Disconnected) => return Disconnected,
+ Err(oneshot::Upgraded(port)) => port,
}
- Some(data)
}
- }
- };
- if increment && ret.is_some() {
- this.inner.packet().steals += 1;
- }
- match ret {
- Some(t) => Data(t),
- None => {
- // It's possible that between the time that we saw the queue was
- // empty and here the other side disconnected. It's also
- // possible for us to see the disconnection here while there is
- // data in the queue. It's pretty backwards-thinking to return
- // Disconnected when there's actually data on the queue, so if
- // we see a disconnected state be sure to check again to be 100%
- // sure that there's no data in the queue.
- let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
- if cnt != DISCONNECTED { return Empty }
-
- let ret = match this.queue {
- SPSC(ref mut queue) => queue.pop(),
- MPSC(ref mut queue) => match queue.pop() {
- mpsc::Data(t) => Some(t),
- mpsc::Empty => None,
- mpsc::Inconsistent => {
- fail!("inconsistent with no senders?!");
- }
+ Stream(ref p) => {
+ match unsafe { (*p.get()).try_recv() } {
+ Ok(t) => return Data(t),
+ Err(stream::Empty) => return Empty,
+ Err(stream::Disconnected) => return Disconnected,
+ Err(stream::Upgraded(port)) => port,
+ }
+ }
+ Shared(ref p) => {
+ match unsafe { (*p.get()).try_recv() } {
+ Ok(t) => return Data(t),
+ Err(shared::Empty) => return Empty,
+ Err(shared::Disconnected) => return Disconnected,
}
- };
- match ret {
- Some(data) => Data(data),
- None => Disconnected,
}
+ };
+ unsafe {
+ util::swap(&mut cast::transmute_mut(self).inner,
+ &mut new_port.inner);
}
}
}
/// If the channel has hung up, then `None` is returned. Otherwise `Some` of
/// the value found on the port is returned.
pub fn recv_opt(&self) -> Option<T> {
- // optimistic preflight check (scheduling is expensive)
- match self.try_recv() {
- Empty => {},
- Disconnected => return None,
- Data(t) => return Some(t),
- }
-
- let packet;
- let this;
- unsafe {
- this = cast::transmute_mut(self);
- packet = this.inner.packet();
- let task: ~Task = Local::take();
- task.deschedule(1, |task| {
- assert!((*packet).to_wake.is_none());
- (*packet).to_wake = Some(task);
- if (*packet).decrement() {
- Ok(())
- } else {
- Err((*packet).to_wake.take_unwrap())
+ loop {
+ let mut new_port = match self.inner {
+ Oneshot(ref p) => {
+ match unsafe { (*p.get()).recv() } {
+ Ok(t) => return Some(t),
+ Err(oneshot::Empty) => return unreachable!(),
+ Err(oneshot::Disconnected) => return None,
+ Err(oneshot::Upgraded(port)) => port,
+ }
}
- });
- }
-
- match self.try_recv_inc(false) {
- Data(t) => Some(t),
- Empty => fail!("bug: woke up too soon"),
- Disconnected => None,
+ Stream(ref p) => {
+ match unsafe { (*p.get()).recv() } {
+ Ok(t) => return Some(t),
+ Err(stream::Empty) => return unreachable!(),
+ Err(stream::Disconnected) => return None,
+ Err(stream::Upgraded(port)) => port,
+ }
+ }
+ Shared(ref p) => {
+ match unsafe { (*p.get()).recv() } {
+ Ok(t) => return Some(t),
+ Err(shared::Empty) => return unreachable!(),
+ Err(shared::Disconnected) => return None,
+ }
+ }
+ };
+ unsafe {
+ util::swap(&mut cast::transmute_mut(self).inner,
+ &mut new_port.inner);
+ }
}
}
}
}
+impl<T: Send> select::Packet for Port<T> {
+ fn can_recv(&self) -> bool {
+ loop {
+ let mut new_port = match self.inner {
+ Oneshot(ref p) => {
+ match unsafe { (*p.get()).can_recv() } {
+ Ok(ret) => return ret,
+ Err(upgrade) => upgrade,
+ }
+ }
+ Stream(ref p) => {
+ match unsafe { (*p.get()).can_recv() } {
+ Ok(ret) => return ret,
+ Err(upgrade) => upgrade,
+ }
+ }
+ Shared(ref p) => {
+ return unsafe { (*p.get()).can_recv() };
+ }
+ };
+ unsafe {
+ util::swap(&mut cast::transmute_mut(self).inner,
+ &mut new_port.inner);
+ }
+ }
+ }
+
+ fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
+ loop {
+ let (t, mut new_port) = match self.inner {
+ Oneshot(ref p) => {
+ match unsafe { (*p.get()).start_selection(task) } {
+ oneshot::SelSuccess => return Ok(()),
+ oneshot::SelCanceled(task) => return Err(task),
+ oneshot::SelUpgraded(t, port) => (t, port),
+ }
+ }
+ Stream(ref p) => {
+ match unsafe { (*p.get()).start_selection(task) } {
+ stream::SelSuccess => return Ok(()),
+ stream::SelCanceled(task) => return Err(task),
+ stream::SelUpgraded(t, port) => (t, port),
+ }
+ }
+ Shared(ref p) => {
+ return unsafe { (*p.get()).start_selection(task) };
+ }
+ };
+ task = t;
+ unsafe {
+ util::swap(&mut cast::transmute_mut(self).inner,
+ &mut new_port.inner);
+ }
+ }
+ }
+
+ fn abort_selection(&self) -> bool {
+ let mut was_upgrade = false;
+ loop {
+ let result = match self.inner {
+ Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
+ Stream(ref p) => unsafe {
+ (*p.get()).abort_selection(was_upgrade)
+ },
+ Shared(ref p) => return unsafe {
+ (*p.get()).abort_selection(was_upgrade)
+ },
+ };
+ let mut new_port = match result { Ok(b) => return b, Err(p) => p };
+ was_upgrade = true;
+ unsafe {
+ util::swap(&mut cast::transmute_mut(self).inner,
+ &mut new_port.inner);
+ }
+ }
+ }
+}
+
impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
fn next(&mut self) -> Option<T> { self.port.recv_opt() }
}
#[unsafe_destructor]
impl<T: Send> Drop for Port<T> {
fn drop(&mut self) {
- // All we need to do is store that we're disconnected. If the channel
- // half has already disconnected, then we'll just deallocate everything
- // when the shared packet is deallocated.
- self.inner.packet().cnt.store(DISCONNECTED, SeqCst);
+ match self.inner {
+ Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
+ Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
+ Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
+ }
}
}
})
test!(fn drop_full_shared() {
- let (_p, c) = SharedChan::new();
+ let (_p, c) = Chan::new();
c.send(~1);
})
test!(fn smoke_shared() {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
c.send(1);
assert_eq!(p.recv(), 1);
let c = c.clone();
} #[should_fail])
test!(fn smoke_shared_port_gone() {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
drop(p);
c.send(1);
} #[should_fail])
test!(fn smoke_shared_port_gone2() {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
drop(p);
let c2 = c.clone();
drop(c);
} #[should_fail])
test!(fn port_gone_concurrent_shared() {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
let c1 = c.clone();
spawn(proc() {
p.recv();
} #[should_fail])
test!(fn smoke_chan_gone_shared() {
- let (p, c) = SharedChan::<()>::new();
+ let (p, c) = Chan::<()>::new();
let c2 = c.clone();
drop(c);
drop(c2);
test!(fn stress_shared() {
static AMT: uint = 10000;
static NTHREADS: uint = 8;
- let (p, c) = SharedChan::<int>::new();
+ let (p, c) = Chan::<int>::new();
let (p1, c1) = Chan::new();
spawn(proc() {
fn send_from_outside_runtime() {
let (p, c) = Chan::<int>::new();
let (p1, c1) = Chan::new();
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
let chan2 = chan.clone();
spawn(proc() {
c1.send(());
fn no_runtime() {
let (p1, c1) = Chan::<int>::new();
let (p2, c2) = Chan::<int>::new();
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
let chan2 = chan.clone();
native::task::spawn(proc() {
assert_eq!(p1.recv(), 1);
})
test!(fn shared_chan_stress() {
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
let total = stress_factor() + 100;
for _ in range(0, total) {
let chan_clone = chan.clone();
p2.recv();
assert_eq!(p.try_recv(), Disconnected);
})
+
+ // This bug used to end up in a livelock inside of the Port destructor
+ // because the internal state of the Shared port was corrupted
+ test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+ let (p, c) = Chan::new();
+ let (p1, c2) = Chan::new();
+ spawn(proc() {
+ p.recv(); // wait on a oneshot port
+ drop(p); // destroy a shared port
+ c2.send(());
+ });
+ // make sure the other task has gone to sleep
+ for _ in range(0, 5000) { task::deschedule(); }
+
+ // upgrade to a shared chan and send a message
+ let t = c.clone();
+ drop(c);
+ t.send(());
+
+ // wait for the child task to exit before we exit
+ p1.recv();
+ })
}
--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Oneshot channels/ports
+///
+/// This is the initial flavor of channels/ports used for comm module. This is
+/// an optimization for the one-use case of a channel. The major optimization of
+/// this type is to have one and exactly one allocation when the chan/port pair
+/// is created.
+///
+/// Another possible optimization would be to not use an UnsafeArc box because
+/// in theory we know when the shared packet can be deallocated (no real need
+/// for the atomic reference counting), but I was having trouble how to destroy
+/// the data early in a drop of a Port.
+///
+/// # Implementation
+///
+/// Oneshots are implemented around one atomic uint variable. This variable
+/// indicates both the state of the port/chan but also contains any tasks
+/// blocked on the port. All atomic operations happen on this one word.
+///
+/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
+/// on behalf of the channel side of things (it can be mentally thought of as
+/// consuming the port). This upgrade is then also stored in the shared packet.
+/// The one caveat to consider is that when a port sees a disconnected channel
+/// it must check for data because there is no "data plus upgrade" state.
+
+use comm::Port;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use result::{Result, Ok, Err};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use sync::atomics;
+use util;
+
+// Various states you can find a port in.
+static EMPTY: uint = 0;
+static DATA: uint = 1;
+static DISCONNECTED: uint = 2;
+
+pub struct Packet<T> {
+ // Internal state of the chan/port pair (stores the blocked task as well)
+ state: atomics::AtomicUint,
+ // One-shot data slot location
+ data: Option<T>,
+ // when used for the second time, a oneshot channel must be upgraded, and
+ // this contains the slot for the upgrade
+ upgrade: MyUpgrade<T>,
+}
+
+pub enum Failure<T> {
+ Empty,
+ Disconnected,
+ Upgraded(Port<T>),
+}
+
+pub enum UpgradeResult {
+ UpSuccess,
+ UpDisconnected,
+ UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+ SelCanceled(BlockedTask),
+ SelUpgraded(BlockedTask, Port<T>),
+ SelSuccess,
+}
+
+enum MyUpgrade<T> {
+ NothingSent,
+ SendUsed,
+ GoUp(Port<T>),
+}
+
+impl<T: Send> Packet<T> {
+ pub fn new() -> Packet<T> {
+ Packet {
+ data: None,
+ upgrade: NothingSent,
+ state: atomics::AtomicUint::new(EMPTY),
+ }
+ }
+
+ pub fn send(&mut self, t: T) -> bool {
+ // Sanity check
+ match self.upgrade {
+ NothingSent => {}
+ _ => fail!("sending on a oneshot that's already sent on "),
+ }
+ assert!(self.data.is_none());
+ self.data = Some(t);
+ self.upgrade = SendUsed;
+
+ // This atomic swap uses a "Release" memory ordering to ensure that all
+ // our previous memory writes are visible to the other thread (notably
+ // the write of data/upgrade)
+ match self.state.swap(DATA, atomics::Release) {
+ // Sent the data, no one was waiting
+ EMPTY => true,
+
+ // Couldn't send the data, the port hung up first. We need to be
+ // sure to deallocate the sent data (to not leave it stuck in the
+ // queue)
+ DISCONNECTED => {
+ self.data.take_unwrap();
+ false
+ }
+
+ // Not possible, these are one-use channels
+ DATA => unreachable!(),
+
+ // Anything else means that there was a task waiting on the other
+ // end. We leave the 'DATA' state inside so it'll pick it up on the
+ // other end.
+ n => unsafe {
+ let t = BlockedTask::cast_from_uint(n);
+ t.wake().map(|t| t.reawaken());
+ true
+ }
+ }
+ }
+
+ // Just tests whether this channel has been sent on or not, this is only
+ // safe to use from the sender.
+ pub fn sent(&self) -> bool {
+ match self.upgrade {
+ NothingSent => false,
+ _ => true,
+ }
+ }
+
+ pub fn recv(&mut self) -> Result<T, Failure<T>> {
+ // Attempt to not block the task (it's a little expensive). If it looks
+ // like we're not empty, then immediately go through to `try_recv`.
+ //
+ // These atomics use an Acquire memory ordering in order to have all the
+ // previous writes of the releasing thread visible to us.
+ if self.state.load(atomics::Acquire) == EMPTY {
+ let t: ~Task = Local::take();
+ t.deschedule(1, |task| {
+ let n = unsafe { task.cast_to_uint() };
+ match self.state.compare_and_swap(EMPTY, n, atomics::Acquire) {
+ // Nothing on the channel, we legitimately block
+ EMPTY => Ok(()),
+
+ // If there's data or it's a disconnected channel, then we
+ // failed the cmpxchg, so we just wake ourselves back up
+ DATA | DISCONNECTED => {
+ unsafe { Err(BlockedTask::cast_from_uint(n)) }
+ }
+
+ // Only one thread is allowed to sleep on this port
+ _ => unreachable!()
+ }
+ });
+ }
+
+ self.try_recv()
+ }
+
+ pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+ // see above for why Acquire is used.
+ match self.state.load(atomics::Acquire) {
+ EMPTY => Err(Empty),
+
+ // We saw some data on the channel, but the channel can be used
+ // again to send us an upgrade. As a result, we need to re-insert
+ // into the channel that there's no data available (otherwise we'll
+ // just see DATA next time). This is done as a cmpxchg because if
+ // the state changes under our feet we'd rather just see that state
+ // change.
+ DATA => {
+ self.state.compare_and_swap(DATA, EMPTY, atomics::Acquire);
+ match self.data.take() {
+ Some(data) => Ok(data),
+ None => unreachable!(),
+ }
+ }
+
+ // There's no guarantee that we receive before an upgrade happens,
+ // and an upgrade flags the channel as disconnected, so when we see
+ // this we first need to check if there's data available and *then*
+ // we go through and process the upgrade.
+ DISCONNECTED => {
+ match self.data.take() {
+ Some(data) => Ok(data),
+ None => {
+ match util::replace(&mut self.upgrade, SendUsed) {
+ SendUsed | NothingSent => Err(Disconnected),
+ GoUp(upgrade) => Err(Upgraded(upgrade))
+ }
+ }
+ }
+ }
+ _ => unreachable!()
+ }
+ }
+
+ // Returns whether the upgrade was completed. If the upgrade wasn't
+ // completed, then the port couldn't get sent to the other half (it will
+ // never receive it).
+ pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult {
+ let prev = match self.upgrade {
+ NothingSent => NothingSent,
+ SendUsed => SendUsed,
+ _ => fail!("upgrading again"),
+ };
+ self.upgrade = GoUp(up);
+
+ // Use a Release memory ordering in order to make sure that our write to
+ // `upgrade` is visible to the other thread.
+ match self.state.swap(DISCONNECTED, atomics::Release) {
+ // If the channel is empty or has data on it, then we're good to go.
+ // Senders will check the data before the upgrade (in case we
+ // plastered over the DATA state).
+ DATA | EMPTY => UpSuccess,
+
+ // If the other end is already disconnected, then we failed the
+ // upgrade. Be sure to trash the port we were given.
+ DISCONNECTED => { self.upgrade = prev; UpDisconnected }
+
+ // If someone's waiting, we gotta wake them up
+ n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
+ }
+ }
+
+ pub fn drop_chan(&mut self) {
+ match self.state.swap(DISCONNECTED, atomics::SeqCst) {
+ DATA | DISCONNECTED | EMPTY => {}
+
+ // If someone's waiting, we gotta wake them up
+ n => unsafe {
+ let t = BlockedTask::cast_from_uint(n);
+ t.wake().map(|t| t.reawaken());
+ }
+ }
+ }
+
+ pub fn drop_port(&mut self) {
+ // Use an Acquire memory ordering in order to see the data that the
+ // senders are sending.
+ match self.state.swap(DISCONNECTED, atomics::Acquire) {
+ // An empty channel has nothing to do, and a remotely disconnected
+ // channel also has nothing to do b/c we're about to run the drop
+ // glue
+ DISCONNECTED | EMPTY => {}
+
+ // There's data on the channel, so make sure we destroy it promptly.
+ // This is why not using an arc is a little difficult (need the box
+ // to stay valid while we take the data).
+ DATA => { self.data.take_unwrap(); }
+
+ // We're the only ones that can block on this port
+ _ => unreachable!()
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // If Ok, the value is whether this port has data, if Err, then the upgraded
+ // port needs to be checked instead of this one.
+ pub fn can_recv(&mut self) -> Result<bool, Port<T>> {
+ // Use Acquire so we can see all previous memory writes
+ match self.state.load(atomics::Acquire) {
+ EMPTY => Ok(false), // Welp, we tried
+ DATA => Ok(true), // we have some un-acquired data
+ DISCONNECTED if self.data.is_some() => Ok(true), // we have data
+ DISCONNECTED => {
+ match util::replace(&mut self.upgrade, SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => Err(upgrade),
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => { self.upgrade = up; Ok(true) }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+
+ // Attempts to start selection on this port. This can either succeed, fail
+ // because there is data, or fail because there is an upgrade pending.
+ pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+ let n = unsafe { task.cast_to_uint() };
+ match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
+ EMPTY => SelSuccess,
+ DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
+ DISCONNECTED if self.data.is_some() => {
+ SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+ }
+ DISCONNECTED => {
+ match util::replace(&mut self.upgrade, SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => {
+ SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
+ upgrade)
+ }
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => {
+ self.upgrade = up;
+ SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+ }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+
+ // Remove a previous selecting task from this port. This ensures that the
+ // blocked task will no longer be visible to any other threads.
+ //
+ // The return value indicates whether there's data on this port.
+ pub fn abort_selection(&mut self) -> Result<bool, Port<T>> {
+ // use Acquire to make sure we see all previous memory writes
+ let state = match self.state.load(atomics::Acquire) {
+ // Each of these states means that no further activity will happen
+ // with regard to abortion selection
+ s @ EMPTY |
+ s @ DATA |
+ s @ DISCONNECTED => s,
+
+ // If we've got a blocked task, then use an atomic to gain ownership
+ // of it (may fail)
+ n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst)
+ };
+
+ // Now that we've got ownership of our state, figure out what to do
+ // about it.
+ match state {
+ EMPTY => unreachable!(),
+ // our task used for select was stolen
+ DATA => Ok(true),
+
+ // If the other end has hung up, then we have complete ownership
+ // of the port. We need to check to see if there was an upgrade
+ // requested, and if so, the other end needs to have its selection
+ // aborted.
+ DISCONNECTED => {
+ assert!(self.data.is_none());
+ match util::replace(&mut self.upgrade, SendUsed) {
+ GoUp(port) => Err(port),
+ _ => Ok(true),
+ }
+ }
+
+ // We woke ourselves up from select. Assert that the task should be
+ // trashed and returne that we don't have any data.
+ n => {
+ let t = unsafe { BlockedTask::cast_from_uint(n) };
+ t.trash();
+ Ok(false)
+ }
+ }
+ }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED);
+ }
+}
#[allow(dead_code)];
use cast;
-use comm;
+use cell::Cell;
use iter::Iterator;
use kinds::marker;
use kinds::Send;
use ops::Drop;
use option::{Some, None, Option};
use ptr::RawPtr;
-use result::{Ok, Err};
+use result::{Ok, Err, Result};
use rt::local::Local;
-use rt::task::Task;
-use super::{Packet, Port};
-use sync::atomics::{Relaxed, SeqCst};
-use task;
+use rt::task::{Task, BlockedTask};
+use super::Port;
use uint;
macro_rules! select {
) => ({
use std::comm::Select;
let sel = Select::new();
- let mut $port1 = sel.add(&mut $port1);
- $( let mut $port = sel.add(&mut $port); )*
+ let mut $port1 = sel.handle(&$port1);
+ $( let mut $port = sel.handle(&$port); )*
+ unsafe {
+ $port1.add();
+ $( $port.add(); )*
+ }
let ret = sel.wait();
if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 }
$( else if ret == $port.id { let $name = $port.$meth(); $code } )*
/// The "port set" of the select interface. This structure is used to manage a
/// set of ports which are being selected over.
pub struct Select {
- priv head: *mut Packet,
- priv tail: *mut Packet,
- priv next_id: uint,
+ priv head: *mut Handle<'static, ()>,
+ priv tail: *mut Handle<'static, ()>,
+ priv next_id: Cell<uint>,
priv marker1: marker::NoSend,
priv marker2: marker::NoFreeze,
}
/// This handle is used to keep the port in the set as well as interact with the
/// underlying port.
pub struct Handle<'port, T> {
- /// A unique ID for this Handle.
+ /// The ID of this handle, used to compare against the return value of
+ /// `Select::wait()`
id: uint,
priv selector: &'port Select,
- priv port: &'port mut Port<T>,
+ priv next: *mut Handle<'static, ()>,
+ priv prev: *mut Handle<'static, ()>,
+ priv added: bool,
+ priv packet: &'port Packet,
+
+ // due to our fun transmutes, we be sure to place this at the end. (nothing
+ // previous relies on T)
+ priv port: &'port Port<T>,
}
-struct Packets { cur: *mut Packet }
+struct Packets { cur: *mut Handle<'static, ()> }
+
+#[doc(hidden)]
+pub trait Packet {
+ fn can_recv(&self) -> bool;
+ fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
+ fn abort_selection(&self) -> bool;
+}
impl Select {
/// Creates a new selection structure. This set is initially empty and
/// rather much easier through the `select!` macro.
pub fn new() -> Select {
Select {
- head: 0 as *mut Packet,
- tail: 0 as *mut Packet,
- next_id: 1,
marker1: marker::NoSend,
marker2: marker::NoFreeze,
+ head: 0 as *mut Handle<'static, ()>,
+ tail: 0 as *mut Handle<'static, ()>,
+ next_id: Cell::new(1),
}
}
- /// Adds a new port to this set, returning a handle which is then used to
- /// receive on the port.
- ///
- /// Note that this port parameter takes `&mut Port` instead of `&Port`. None
- /// of the methods of receiving on a port require `&mut self`, but `&mut` is
- /// used here in order to have the compiler guarantee that the same port is
- /// not added to this set more than once.
- ///
- /// When the returned handle falls out of scope, the port will be removed
- /// from this set. While the handle is in this set, usage of the port can be
- /// done through the `Handle`'s receiving methods.
- pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> {
- let this = unsafe { cast::transmute_mut(self) };
- let id = this.next_id;
- this.next_id += 1;
- unsafe {
- let packet = port.inner.packet();
- assert!(!(*packet).selecting.load(Relaxed));
- assert_eq!((*packet).selection_id, 0);
- (*packet).selection_id = id;
- if this.head.is_null() {
- this.head = packet as *mut Packet;
- this.tail = packet as *mut Packet;
- } else {
- (*packet).select_prev = this.tail;
- assert!((*packet).select_next.is_null());
- (*this.tail).select_next = packet as *mut Packet;
- this.tail = packet as *mut Packet;
- }
+ /// Creates a new handle into this port set for a new port. Note that this
+ /// does *not* add the port to the port set, for that you must call the
+ /// `add` method on the handle itself.
+ pub fn handle<'a, T: Send>(&'a self, port: &'a Port<T>) -> Handle<'a, T> {
+ let id = self.next_id.get();
+ self.next_id.set(id + 1);
+ Handle {
+ id: id,
+ selector: self,
+ next: 0 as *mut Handle<'static, ()>,
+ prev: 0 as *mut Handle<'static, ()>,
+ added: false,
+ port: port,
+ packet: port,
}
- Handle { id: id, selector: this, port: port }
}
/// Waits for an event on this port set. The returned valus is *not* and
unsafe {
let mut amt = 0;
for p in self.iter() {
- assert!(!(*p).selecting.load(Relaxed));
amt += 1;
- if (*p).can_recv() {
- return (*p).selection_id;
+ if (*p).packet.can_recv() {
+ return (*p).id;
}
}
assert!(amt > 0);
let task: ~Task = Local::take();
task.deschedule(amt, |task| {
// Prepare for the block
- let (i, packet) = iter.next().unwrap();
- assert!((*packet).to_wake.is_none());
- (*packet).to_wake = Some(task);
- (*packet).selecting.store(true, SeqCst);
-
- if (*packet).decrement() {
- Ok(())
- } else {
- // Empty to_wake first to avoid tripping an assertion in
- // abort_selection in the disconnected case.
- let task = (*packet).to_wake.take_unwrap();
- (*packet).abort_selection(false);
- (*packet).selecting.store(false, SeqCst);
- ready_index = i;
- ready_id = (*packet).selection_id;
- Err(task)
+ let (i, handle) = iter.next().unwrap();
+ match (*handle).packet.start_selection(task) {
+ Ok(()) => Ok(()),
+ Err(task) => {
+ ready_index = i;
+ ready_id = (*handle).id;
+ Err(task)
+ }
}
});
// A rewrite should focus on avoiding a yield loop, and for now this
// implementation is tying us over to a more efficient "don't
// iterate over everything every time" implementation.
- for packet in self.iter().take(ready_index) {
- if (*packet).abort_selection(true) {
- ready_id = (*packet).selection_id;
- while (*packet).selecting.load(Relaxed) {
- task::deschedule();
- }
+ for handle in self.iter().take(ready_index) {
+ if (*handle).packet.abort_selection() {
+ ready_id = (*handle).id;
}
}
- // Sanity check for now to make sure that everyone is turned off.
- for packet in self.iter() {
- assert!(!(*packet).selecting.load(Relaxed));
- }
-
assert!(ready_id != uint::MAX);
return ready_id;
}
}
- unsafe fn remove(&self, packet: *mut Packet) {
- let this = cast::transmute_mut(self);
- assert!(!(*packet).selecting.load(Relaxed));
- if (*packet).select_prev.is_null() {
- assert_eq!(packet, this.head);
- this.head = (*packet).select_next;
- } else {
- (*(*packet).select_prev).select_next = (*packet).select_next;
- }
- if (*packet).select_next.is_null() {
- assert_eq!(packet, this.tail);
- this.tail = (*packet).select_prev;
- } else {
- (*(*packet).select_next).select_prev = (*packet).select_prev;
- }
- (*packet).select_next = 0 as *mut Packet;
- (*packet).select_prev = 0 as *mut Packet;
- (*packet).selection_id = 0;
- }
-
fn iter(&self) -> Packets { Packets { cur: self.head } }
}
/// success or `None` if the channel disconnects. This function has the same
/// semantics as `Port.recv_opt`
pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
- /// Immediately attempt to receive a value on a port, this function will
- /// never block. Has the same semantics as `Port.try_recv`.
- pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
- self.port.try_recv()
+
+ /// Adds this handle to the port set that the handle was created from. This
+ /// method can be called multiple times, but it has no effect if `add` was
+ /// called previously.
+ ///
+ /// This method is unsafe because it requires that the `Handle` is not moved
+ /// while it is added to the `Select` set.
+ pub unsafe fn add(&mut self) {
+ if self.added { return }
+ let selector: &mut Select = cast::transmute(&*self.selector);
+ let me: *mut Handle<'static, ()> = cast::transmute(&*self);
+
+ if selector.head.is_null() {
+ selector.head = me;
+ selector.tail = me;
+ } else {
+ (*me).prev = selector.tail;
+ assert!((*me).next.is_null());
+ (*selector.tail).next = me;
+ selector.tail = me;
+ }
+ self.added = true;
+ }
+
+ /// Removes this handle from the `Select` set. This method is unsafe because
+ /// it has no guarantee that the `Handle` was not moved since `add` was
+ /// called.
+ pub unsafe fn remove(&mut self) {
+ if !self.added { return }
+
+ let selector: &mut Select = cast::transmute(&*self.selector);
+ let me: *mut Handle<'static, ()> = cast::transmute(&*self);
+
+ if self.prev.is_null() {
+ assert_eq!(selector.head, me);
+ selector.head = self.next;
+ } else {
+ (*self.prev).next = self.next;
+ }
+ if self.next.is_null() {
+ assert_eq!(selector.tail, me);
+ selector.tail = self.prev;
+ } else {
+ (*self.next).prev = self.prev;
+ }
+
+ self.next = 0 as *mut Handle<'static, ()>;
+ self.prev = 0 as *mut Handle<'static, ()>;
+
+ self.added = false;
}
}
#[unsafe_destructor]
impl<'port, T: Send> Drop for Handle<'port, T> {
fn drop(&mut self) {
- unsafe { self.selector.remove(self.port.inner.packet()) }
+ unsafe { self.remove() }
}
}
-impl Iterator<*mut Packet> for Packets {
- fn next(&mut self) -> Option<*mut Packet> {
+impl Iterator<*mut Handle<'static, ()>> for Packets {
+ fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
if self.cur.is_null() {
None
} else {
let ret = Some(self.cur);
- unsafe { self.cur = (*self.cur).select_next; }
+ unsafe { self.cur = (*self.cur).next; }
ret
}
}
use prelude::*;
test!(fn smoke() {
- let (mut p1, c1) = Chan::<int>::new();
- let (mut p2, c2) = Chan::<int>::new();
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, c2) = Chan::<int>::new();
c1.send(1);
select! (
foo = p1.recv() => { assert_eq!(foo, 1); },
})
test!(fn smoke2() {
- let (mut p1, _c1) = Chan::<int>::new();
- let (mut p2, _c2) = Chan::<int>::new();
- let (mut p3, _c3) = Chan::<int>::new();
- let (mut p4, _c4) = Chan::<int>::new();
- let (mut p5, c5) = Chan::<int>::new();
+ let (p1, _c1) = Chan::<int>::new();
+ let (p2, _c2) = Chan::<int>::new();
+ let (p3, _c3) = Chan::<int>::new();
+ let (p4, _c4) = Chan::<int>::new();
+ let (p5, c5) = Chan::<int>::new();
c5.send(4);
select! (
_foo = p1.recv() => { fail!("1") },
})
test!(fn closed() {
- let (mut p1, _c1) = Chan::<int>::new();
- let (mut p2, c2) = Chan::<int>::new();
+ let (p1, _c1) = Chan::<int>::new();
+ let (p2, c2) = Chan::<int>::new();
drop(c2);
select! (
})
test!(fn unblocks() {
- let (mut p1, c1) = Chan::<int>::new();
- let (mut p2, _c2) = Chan::<int>::new();
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, _c2) = Chan::<int>::new();
let (p3, c3) = Chan::<int>::new();
spawn(proc() {
})
test!(fn both_ready() {
- let (mut p1, c1) = Chan::<int>::new();
- let (mut p2, c2) = Chan::<int>::new();
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, c2) = Chan::<int>::new();
let (p3, c3) = Chan::<()>::new();
spawn(proc() {
test!(fn stress() {
static AMT: int = 10000;
- let (mut p1, c1) = Chan::<int>::new();
- let (mut p2, c2) = Chan::<int>::new();
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, c2) = Chan::<int>::new();
let (p3, c3) = Chan::<()>::new();
spawn(proc() {
c3.send(());
}
})
+
+ test!(fn cloning() {
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, _c2) = Chan::<int>::new();
+ let (p3, c3) = Chan::<()>::new();
+
+ spawn(proc() {
+ p3.recv();
+ c1.clone();
+ assert_eq!(p3.try_recv(), Empty);
+ c1.send(2);
+ p3.recv();
+ });
+
+ c3.send(());
+ select!(
+ _i1 = p1.recv() => {},
+ _i2 = p2.recv() => fail!()
+ )
+ c3.send(());
+ })
+
+ test!(fn cloning2() {
+ let (p1, c1) = Chan::<int>::new();
+ let (p2, _c2) = Chan::<int>::new();
+ let (p3, c3) = Chan::<()>::new();
+
+ spawn(proc() {
+ p3.recv();
+ c1.clone();
+ assert_eq!(p3.try_recv(), Empty);
+ c1.send(2);
+ p3.recv();
+ });
+
+ c3.send(());
+ select!(
+ _i1 = p1.recv() => {},
+ _i2 = p2.recv() => fail!()
+ )
+ c3.send(());
+ })
+
+ test!(fn cloning3() {
+ let (p1, c1) = Chan::<()>::new();
+ let (p2, c2) = Chan::<()>::new();
+ let (p, c) = Chan::new();
+ spawn(proc() {
+ let mut s = Select::new();
+ let mut h1 = s.handle(&p1);
+ let mut h2 = s.handle(&p2);
+ unsafe { h2.add(); }
+ unsafe { h1.add(); }
+ assert_eq!(s.wait(), h2.id);
+ c.send(());
+ });
+
+ for _ in range(0, 1000) { task::deschedule(); }
+ drop(c1.clone());
+ c2.send(());
+ p.recv();
+ })
}
--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Shared channels
+///
+/// This is the flavor of channels which are not necessarily optimized for any
+/// particular use case, but are the most general in how they are used. Shared
+/// channels are cloneable allowing for multiple senders.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module. You'll also note that the implementation of the shared and stream
+/// channels are quite similar, and this is no coincidence!
+
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use result::{Ok, Err, Result};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use rt::thread::Thread;
+use sync::atomics;
+use unstable::mutex::Mutex;
+use vec::OwnedVector;
+
+use mpsc = sync::mpsc_queue;
+
+static DISCONNECTED: int = int::MIN;
+static FUDGE: int = 1024;
+static MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+ queue: mpsc::Queue<T>,
+ cnt: atomics::AtomicInt, // How many items are on this channel
+ steals: int, // How many times has a port received without blocking?
+ to_wake: atomics::AtomicUint, // Task to wake up
+
+ // The number of channels which are currently using this packet.
+ channels: atomics::AtomicInt,
+
+ // See the discussion in Port::drop and the channel send methods for what
+ // these are used for
+ port_dropped: atomics::AtomicBool,
+ sender_drain: atomics::AtomicInt,
+
+ // this lock protects various portions of this implementation during
+ // select()
+ select_lock: Mutex,
+}
+
+pub enum Failure {
+ Empty,
+ Disconnected,
+}
+
+impl<T: Send> Packet<T> {
+ // Creation of a packet *must* be followed by a call to inherit_blocker
+ pub fn new() -> Packet<T> {
+ let mut p = Packet {
+ queue: mpsc::Queue::new(),
+ cnt: atomics::AtomicInt::new(0),
+ steals: 0,
+ to_wake: atomics::AtomicUint::new(0),
+ channels: atomics::AtomicInt::new(2),
+ port_dropped: atomics::AtomicBool::new(false),
+ sender_drain: atomics::AtomicInt::new(0),
+ select_lock: unsafe { Mutex::new() },
+ };
+ // see comments in inherit_blocker about why we grab this lock
+ unsafe { p.select_lock.lock() }
+ return p;
+ }
+
+ // This function is used at the creation of a shared packet to inherit a
+ // previously blocked task. This is done to prevent spurious wakeups of
+ // tasks in select().
+ //
+ // This can only be called at channel-creation time
+ pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+ match task {
+ Some(task) => {
+ assert_eq!(self.cnt.load(atomics::SeqCst), 0);
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ self.to_wake.store(unsafe { task.cast_to_uint() },
+ atomics::SeqCst);
+ self.cnt.store(-1, atomics::SeqCst);
+
+ // This store is a little sketchy. What's happening here is
+ // that we're transferring a blocker from a oneshot or stream
+ // channel to this shared channel. In doing so, we never
+ // spuriously wake them up and rather only wake them up at the
+ // appropriate time. This implementation of shared channels
+ // assumes that any blocking recv() will undo the increment of
+ // steals performed in try_recv() once the recv is complete.
+ // This thread that we're inheriting, however, is not in the
+ // middle of recv. Hence, the first time we wake them up,
+ // they're going to wake up from their old port, move on to the
+ // upgraded port, and then call the block recv() function.
+ //
+ // When calling this function, they'll find there's data
+ // immediately available, counting it as a steal. This in fact
+ // wasn't a steal because we appropriately blocked them waiting
+ // for data.
+ //
+ // To offset this bad increment, we initially set the steal
+ // count to -1. You'll find some special code in
+ // abort_selection() as well to ensure that this -1 steal count
+ // doesn't escape too far.
+ self.steals = -1;
+ }
+ None => {}
+ }
+
+ // When the shared packet is constructed, we grabbed this lock. The
+ // purpose of this lock is to ensure that abort_selection() doesn't
+ // interfere with this method. After we unlock this lock, we're
+ // signifying that we're done modifying self.cnt and self.to_wake and
+ // the port is ready for the world to continue using it.
+ unsafe { self.select_lock.unlock() }
+ }
+
+ pub fn send(&mut self, t: T) -> bool {
+ // See Port::drop for what's going on
+ if self.port_dropped.load(atomics::SeqCst) { return false }
+
+ // Note that the multiple sender case is a little tricker
+ // semantically than the single sender case. The logic for
+ // incrementing is "add and if disconnected store disconnected".
+ // This could end up leading some senders to believe that there
+ // wasn't a disconnect if in fact there was a disconnect. This means
+ // that while one thread is attempting to re-store the disconnected
+ // states, other threads could walk through merrily incrementing
+ // this very-negative disconnected count. To prevent senders from
+ // spuriously attempting to send when the channels is actually
+ // disconnected, the count has a ranged check here.
+ //
+ // This is also done for another reason. Remember that the return
+ // value of this function is:
+ //
+ // `true` == the data *may* be received, this essentially has no
+ // meaning
+ // `false` == the data will *never* be received, this has a lot of
+ // meaning
+ //
+ // In the SPSC case, we have a check of 'queue.is_empty()' to see
+ // whether the data was actually received, but this same condition
+ // means nothing in a multi-producer context. As a result, this
+ // preflight check serves as the definitive "this will never be
+ // received". Once we get beyond this check, we have permanently
+ // entered the realm of "this may be received"
+ if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
+ return false
+ }
+
+ self.queue.push(t);
+ match self.cnt.fetch_add(1, atomics::SeqCst) {
+ -1 => {
+ self.take_to_wake().wake().map(|t| t.reawaken());
+ }
+
+ // In this case, we have possibly failed to send our data, and
+ // we need to consider re-popping the data in order to fully
+ // destroy it. We must arbitrate among the multiple senders,
+ // however, because the queues that we're using are
+ // single-consumer queues. In order to do this, all exiting
+ // pushers will use an atomic count in order to count those
+ // flowing through. Pushers who see 0 are required to drain as
+ // much as possible, and then can only exit when they are the
+ // only pusher (otherwise they must try again).
+ n if n < DISCONNECTED + FUDGE => {
+ // see the comment in 'try' for a shared channel for why this
+ // window of "not disconnected" is ok.
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+
+ if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
+ loop {
+ // drain the queue, for info on the thread yield see the
+ // discussion in try_recv
+ loop {
+ match self.queue.pop() {
+ mpsc::Data(..) => {}
+ mpsc::Empty => break,
+ mpsc::Inconsistent => Thread::yield_now(),
+ }
+ }
+ // maybe we're done, if we're not the last ones
+ // here, then we need to go try again.
+ if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
+ break
+ }
+ }
+
+ // At this point, there may still be data on the queue,
+ // but only if the count hasn't been incremented and
+ // some other sender hasn't finished pushing data just
+ // yet. That sender in question will drain its own data.
+ }
+ }
+
+ // Can't make any assumptions about this case like in the SPSC case.
+ _ => {}
+ }
+
+ true
+ }
+
+ pub fn recv(&mut self) -> Result<T, Failure> {
+ // This code is essentially the exact same as that found in the stream
+ // case (see stream.rs)
+ match self.try_recv() {
+ Err(Empty) => {}
+ data => return data,
+ }
+
+ let task: ~Task = Local::take();
+ task.deschedule(1, |task| {
+ self.decrement(task)
+ });
+
+ match self.try_recv() {
+ data @ Ok(..) => { self.steals -= 1; data }
+ data => data,
+ }
+ }
+
+ // Essentially the exact same thing as the stream decrement function.
+ fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ let n = unsafe { task.cast_to_uint() };
+ self.to_wake.store(n, atomics::SeqCst);
+
+ let steals = self.steals;
+ self.steals = 0;
+
+ match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
+ DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
+ // If we factor in our steals and notice that the channel has no
+ // data, we successfully sleep
+ n => {
+ assert!(n >= 0);
+ if n - steals <= 0 { return Ok(()) }
+ }
+ }
+
+ self.to_wake.store(0, atomics::SeqCst);
+ Err(unsafe { BlockedTask::cast_from_uint(n) })
+ }
+
+ pub fn try_recv(&mut self) -> Result<T, Failure> {
+ let ret = match self.queue.pop() {
+ mpsc::Data(t) => Some(t),
+ mpsc::Empty => None,
+
+ // This is a bit of an interesting case. The channel is
+ // reported as having data available, but our pop() has
+ // failed due to the queue being in an inconsistent state.
+ // This means that there is some pusher somewhere which has
+ // yet to complete, but we are guaranteed that a pop will
+ // eventually succeed. In this case, we spin in a yield loop
+ // because the remote sender should finish their enqueue
+ // operation "very quickly".
+ //
+ // Note that this yield loop does *not* attempt to do a green
+ // yield (regardless of the context), but *always* performs an
+ // OS-thread yield. The reasoning for this is that the pusher in
+ // question which is causing the inconsistent state is
+ // guaranteed to *not* be a blocked task (green tasks can't get
+ // pre-empted), so it must be on a different OS thread. Also,
+ // `try_recv` is normally a "guaranteed no rescheduling" context
+ // in a green-thread situation. By yielding control of the
+ // thread, we will hopefully allow time for the remote task on
+ // the other OS thread to make progress.
+ //
+ // Avoiding this yield loop would require a different queue
+ // abstraction which provides the guarantee that after M
+ // pushes have succeeded, at least M pops will succeed. The
+ // current queues guarantee that if there are N active
+ // pushes, you can pop N times once all N have finished.
+ mpsc::Inconsistent => {
+ let data;
+ loop {
+ Thread::yield_now();
+ match self.queue.pop() {
+ mpsc::Data(t) => { data = t; break }
+ mpsc::Empty => fail!("inconsistent => empty"),
+ mpsc::Inconsistent => {}
+ }
+ }
+ Some(data)
+ }
+ };
+ match ret {
+ // See the discussion in the stream implementation for why we we
+ // might decrement steals.
+ Some(data) => {
+ self.steals += 1;
+ if self.steals > MAX_STEALS {
+ match self.cnt.swap(0, atomics::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+ }
+ n => { self.steals -= n; }
+ }
+ assert!(self.steals >= 0);
+ }
+ Ok(data)
+ }
+
+ // See the discussion in the stream implementation for why we try
+ // again.
+ None => {
+ match self.cnt.load(atomics::SeqCst) {
+ n if n != DISCONNECTED => Err(Empty),
+ _ => {
+ match self.queue.pop() {
+ mpsc::Data(t) => Ok(t),
+ mpsc::Empty => Err(Disconnected),
+ // with no senders, an inconsistency is impossible.
+ mpsc::Inconsistent => unreachable!(),
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Prepares this shared packet for a channel clone, essentially just bumping
+ // a refcount.
+ pub fn clone_chan(&mut self) {
+ self.channels.fetch_add(1, atomics::SeqCst);
+ }
+
+ // Decrement the reference count on a channel. This is called whenever a
+ // Chan is dropped and may end up waking up a receiver. It's the receiver's
+ // responsibility on the other end to figure out that we've disconnected.
+ pub fn drop_chan(&mut self) {
+ match self.channels.fetch_sub(1, atomics::SeqCst) {
+ 1 => {}
+ n if n > 1 => return,
+ n => fail!("bad number of channels left {}", n),
+ }
+
+ match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
+ -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+ DISCONNECTED => {}
+ n => { assert!(n >= 0); }
+ }
+ }
+
+ // See the long discussion inside of stream.rs for why the queue is drained,
+ // and why it is done in this fashion.
+ pub fn drop_port(&mut self) {
+ self.port_dropped.store(true, atomics::SeqCst);
+ let mut steals = self.steals;
+ while {
+ let cnt = self.cnt.compare_and_swap(
+ steals, DISCONNECTED, atomics::SeqCst);
+ cnt != DISCONNECTED && cnt != steals
+ } {
+ // See the discussion in 'try_recv' for why we yield
+ // control of this thread.
+ loop {
+ match self.queue.pop() {
+ mpsc::Data(..) => { steals += 1; }
+ mpsc::Empty | mpsc::Inconsistent => break,
+ }
+ }
+ }
+ }
+
+ // Consumes ownership of the 'to_wake' field.
+ fn take_to_wake(&mut self) -> BlockedTask {
+ let task = self.to_wake.load(atomics::SeqCst);
+ self.to_wake.store(0, atomics::SeqCst);
+ assert!(task != 0);
+ unsafe { BlockedTask::cast_from_uint(task) }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // Helper function for select, tests whether this port can receive without
+ // blocking (obviously not an atomic decision).
+ //
+ // This is different than the stream version because there's no need to peek
+ // at the queue, we can just look at the local count.
+ pub fn can_recv(&mut self) -> bool {
+ let cnt = self.cnt.load(atomics::SeqCst);
+ cnt == DISCONNECTED || cnt - self.steals > 0
+ }
+
+ // Inserts the blocked task for selection on this port, returning it back if
+ // the port already has data on it.
+ //
+ // The code here is the same as in stream.rs, except that it doesn't need to
+ // peek at the channel to see if an upgrade is pending.
+ pub fn start_selection(&mut self,
+ task: BlockedTask) -> Result<(), BlockedTask> {
+ match self.decrement(task) {
+ Ok(()) => Ok(()),
+ Err(task) => {
+ let prev = self.cnt.fetch_add(1, atomics::SeqCst);
+ assert!(prev >= 0);
+ return Err(task);
+ }
+ }
+ }
+
+ // Cancels a previous task waiting on this port, returning whether there's
+ // data on the port.
+ //
+ // This is similar to the stream implementation (hence fewer comments), but
+ // uses a different value for the "steals" variable.
+ pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
+ // Before we do anything else, we bounce on this lock. The reason for
+ // doing this is to ensure that any upgrade-in-progress is gone and
+ // done with. Without this bounce, we can race with inherit_blocker
+ // about looking at and dealing with to_wake. Once we have acquired the
+ // lock, we are guaranteed that inherit_blocker is done.
+ unsafe {
+ self.select_lock.lock();
+ self.select_lock.unlock();
+ }
+
+ // Like the stream implementation, we want to make sure that the count
+ // on the channel goes non-negative. We don't know how negative the
+ // stream currently is, so instead of using a steal value of 1, we load
+ // the channel count and figure out what we should do to make it
+ // positive.
+ let steals = {
+ let cnt = self.cnt.load(atomics::SeqCst);
+ if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+ };
+ let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+
+ if prev == DISCONNECTED {
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+ true
+ } else {
+ let cur = prev + steals + 1;
+ assert!(cur >= 0);
+ if prev < 0 {
+ self.take_to_wake().trash();
+ } else {
+ while self.to_wake.load(atomics::SeqCst) != 0 {
+ Thread::yield_now();
+ }
+ }
+ // if the number of steals is -1, it was the pre-emptive -1 steal
+ // count from when we inherited a blocker. This is fine because
+ // we're just going to overwrite it with a real value.
+ assert!(self.steals == 0 || self.steals == -1);
+ self.steals = steals;
+ prev >= 0
+ }
+ }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+ fn drop(&mut self) {
+ unsafe {
+ // Note that this load is not only an assert for correctness about
+ // disconnection, but also a proper fence before the read of
+ // `to_wake`, so this assert cannot be removed with also removing
+ // the `to_wake` assert.
+ assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ assert_eq!(self.channels.load(atomics::SeqCst), 0);
+ self.select_lock.destroy();
+ }
+ }
+}
--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Stream channels
+///
+/// This is the flavor of channels which are optimized for one sender and one
+/// receiver. The sender will be upgraded to a shared channel if the channel is
+/// cloned.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module.
+
+use comm::Port;
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None};
+use result::{Ok, Err, Result};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use rt::thread::Thread;
+use spsc = sync::spsc_queue;
+use sync::atomics;
+use vec::OwnedVector;
+
+static DISCONNECTED: int = int::MIN;
+static MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+ queue: spsc::Queue<Message<T>>, // internal queue for all message
+
+ cnt: atomics::AtomicInt, // How many items are on this channel
+ steals: int, // How many times has a port received without blocking?
+ to_wake: atomics::AtomicUint, // Task to wake up
+
+ port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
+}
+
+pub enum Failure<T> {
+ Empty,
+ Disconnected,
+ Upgraded(Port<T>),
+}
+
+pub enum UpgradeResult {
+ UpSuccess,
+ UpDisconnected,
+ UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+ SelSuccess,
+ SelCanceled(BlockedTask),
+ SelUpgraded(BlockedTask, Port<T>),
+}
+
+// Any message could contain an "upgrade request" to a new shared port, so the
+// internal queue it's a queue of T, but rather Message<T>
+enum Message<T> {
+ Data(T),
+ GoUp(Port<T>),
+}
+
+impl<T: Send> Packet<T> {
+ pub fn new() -> Packet<T> {
+ Packet {
+ queue: spsc::Queue::new(128),
+
+ cnt: atomics::AtomicInt::new(0),
+ steals: 0,
+ to_wake: atomics::AtomicUint::new(0),
+
+ port_dropped: atomics::AtomicBool::new(false),
+ }
+ }
+
+
+ pub fn send(&mut self, t: T) -> bool {
+ match self.do_send(Data(t)) {
+ UpSuccess => true,
+ UpDisconnected => false,
+ UpWoke(task) => {
+ task.wake().map(|t| t.reawaken());
+ true
+ }
+ }
+ }
+ pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult {
+ self.do_send(GoUp(up))
+ }
+
+ fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
+ // Use an acquire/release ordering to maintain the same position with
+ // respect to the atomic loads below
+ if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
+
+ self.queue.push(t);
+ match self.cnt.fetch_add(1, atomics::SeqCst) {
+ // As described in the mod's doc comment, -1 == wakeup
+ -1 => UpWoke(self.take_to_wake()),
+ // As as described before, SPSC queues must be >= -2
+ -2 => UpSuccess,
+
+ // Be sure to preserve the disconnected state, and the return value
+ // in this case is going to be whether our data was received or not.
+ // This manifests itself on whether we have an empty queue or not.
+ //
+ // Primarily, are required to drain the queue here because the port
+ // will never remove this data. We can only have at most one item to
+ // drain (the port drains the rest).
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+ let first = self.queue.pop();
+ let second = self.queue.pop();
+ assert!(second.is_none());
+
+ match first {
+ Some(..) => UpSuccess, // we failed to send the data
+ None => UpDisconnected, // we successfully sent data
+ }
+ }
+
+ // Otherwise we just sent some data on a non-waiting queue, so just
+ // make sure the world is sane and carry on!
+ n => { assert!(n >= 0); UpSuccess }
+ }
+ }
+
+ // Consumes ownership of the 'to_wake' field.
+ fn take_to_wake(&mut self) -> BlockedTask {
+ let task = self.to_wake.load(atomics::SeqCst);
+ self.to_wake.store(0, atomics::SeqCst);
+ assert!(task != 0);
+ unsafe { BlockedTask::cast_from_uint(task) }
+ }
+
+ // Decrements the count on the channel for a sleeper, returning the sleeper
+ // back if it shouldn't sleep. Note that this is the location where we take
+ // steals into account.
+ fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ let n = unsafe { task.cast_to_uint() };
+ self.to_wake.store(n, atomics::SeqCst);
+
+ let steals = self.steals;
+ self.steals = 0;
+
+ match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
+ DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
+ // If we factor in our steals and notice that the channel has no
+ // data, we successfully sleep
+ n => {
+ assert!(n >= 0);
+ if n - steals <= 0 { return Ok(()) }
+ }
+ }
+
+ self.to_wake.store(0, atomics::SeqCst);
+ Err(unsafe { BlockedTask::cast_from_uint(n) })
+ }
+
+ pub fn recv(&mut self) -> Result<T, Failure<T>> {
+ // Optimistic preflight check (scheduling is expensive).
+ match self.try_recv() {
+ Err(Empty) => {}
+ data => return data,
+ }
+
+ // Welp, our channel has no data. Deschedule the current task and
+ // initiate the blocking protocol.
+ let task: ~Task = Local::take();
+ task.deschedule(1, |task| {
+ self.decrement(task)
+ });
+
+ match self.try_recv() {
+ // Messages which actually popped from the queue shouldn't count as
+ // a steal, so offset the decrement here (we already have our
+ // "steal" factored into the channel count above).
+ data @ Ok(..) |
+ data @ Err(Upgraded(..)) => {
+ self.steals -= 1;
+ data
+ }
+
+ data => data,
+ }
+ }
+
+ pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+ match self.queue.pop() {
+ // If we stole some data, record to that effect (this will be
+ // factored into cnt later on). Note that we don't allow steals to
+ // grow without bound in order to prevent eventual overflow of
+ // either steals or cnt as an overflow would have catastrophic
+ // results. Also note that we don't unconditionally set steals to 0
+ // because it can be true that steals > cnt.
+ Some(data) => {
+ self.steals += 1;
+ if self.steals > MAX_STEALS {
+ match self.cnt.swap(0, atomics::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+ }
+ n => { self.steals -= n; }
+ }
+ assert!(self.steals >= 0);
+ }
+ match data {
+ Data(t) => Ok(t),
+ GoUp(up) => Err(Upgraded(up)),
+ }
+ }
+
+ None => {
+ match self.cnt.load(atomics::SeqCst) {
+ n if n != DISCONNECTED => Err(Empty),
+
+ // This is a little bit of a tricky case. We failed to pop
+ // data above, and then we have viewed that the channel is
+ // disconnected. In this window more data could have been
+ // sent on the channel. It doesn't really make sense to
+ // return that the channel is disconnected when there's
+ // actually data on it, so be extra sure there's no data by
+ // popping one more time.
+ //
+ // We can ignore steals because the other end is
+ // disconnected and we'll never need to really factor in our
+ // steals again.
+ _ => {
+ match self.queue.pop() {
+ Some(Data(t)) => Ok(t),
+ Some(GoUp(up)) => Err(Upgraded(up)),
+ None => Err(Disconnected),
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub fn drop_chan(&mut self) {
+ // Dropping a channel is pretty simple, we just flag it as disconnected
+ // and then wakeup a blocker if there is one.
+ match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
+ -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+ DISCONNECTED => {}
+ n => { assert!(n >= 0); }
+ }
+ }
+
+ pub fn drop_port(&mut self) {
+ // Dropping a port seems like a fairly trivial thing. In theory all we
+ // need to do is flag that we're disconnected and then everything else
+ // can take over (we don't have anyone to wake up).
+ //
+ // The catch for Ports is that we want to drop the entire contents of
+ // the queue. There are multiple reasons for having this property, the
+ // largest of which is that if another chan is waiting in this channel
+ // (but not received yet), then waiting on that port will cause a
+ // deadlock.
+ //
+ // So if we accept that we must now destroy the entire contents of the
+ // queue, this code may make a bit more sense. The tricky part is that
+ // we can't let any in-flight sends go un-dropped, we have to make sure
+ // *everything* is dropped and nothing new will come onto the channel.
+
+ // The first thing we do is set a flag saying that we're done for. All
+ // sends are gated on this flag, so we're immediately guaranteed that
+ // there are a bounded number of active sends that we'll have to deal
+ // with.
+ self.port_dropped.store(true, atomics::SeqCst);
+
+ // Now that we're guaranteed to deal with a bounded number of senders,
+ // we need to drain the queue. This draining process happens atomically
+ // with respect to the "count" of the channel. If the count is nonzero
+ // (with steals taken into account), then there must be data on the
+ // channel. In this case we drain everything and then try again. We will
+ // continue to fail while active senders send data while we're dropping
+ // data, but eventually we're guaranteed to break out of this loop
+ // (because there is a bounded number of senders).
+ let mut steals = self.steals;
+ while {
+ let cnt = self.cnt.compare_and_swap(
+ steals, DISCONNECTED, atomics::SeqCst);
+ cnt != DISCONNECTED && cnt != steals
+ } {
+ loop {
+ match self.queue.pop() {
+ Some(..) => { steals += 1; }
+ None => break
+ }
+ }
+ }
+
+ // At this point in time, we have gated all future senders from sending,
+ // and we have flagged the channel as being disconnected. The senders
+ // still have some responsibility, however, because some sends may not
+ // complete until after we flag the disconnection. There are more
+ // details in the sending methods that see DISCONNECTED
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // Tests to see whether this port can receive without blocking. If Ok is
+ // returned, then that's the answer. If Err is returned, then the returned
+ // port needs to be queried instead (an upgrade happened)
+ pub fn can_recv(&mut self) -> Result<bool, Port<T>> {
+ // We peek at the queue to see if there's anything on it, and we use
+ // this return value to determine if we should pop from the queue and
+ // upgrade this channel immediately. If it looks like we've got an
+ // upgrade pending, then go through the whole recv rigamarole to update
+ // the internal state.
+ match self.queue.peek() {
+ Some(&GoUp(..)) => {
+ match self.recv() {
+ Err(Upgraded(port)) => Err(port),
+ _ => unreachable!(),
+ }
+ }
+ Some(..) => Ok(true),
+ None => Ok(false)
+ }
+ }
+
+ // Attempts to start selecting on this port. Like a oneshot, this can fail
+ // immediately because of an upgrade.
+ pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+ match self.decrement(task) {
+ Ok(()) => SelSuccess,
+ Err(task) => {
+ let ret = match self.queue.peek() {
+ Some(&GoUp(..)) => {
+ match self.queue.pop() {
+ Some(GoUp(port)) => SelUpgraded(task, port),
+ _ => unreachable!(),
+ }
+ }
+ Some(..) => SelCanceled(task),
+ None => SelCanceled(task),
+ };
+ // Undo our decrement above, and we should be guaranteed that the
+ // previous value is positive because we're not going to sleep
+ let prev = self.cnt.fetch_add(1, atomics::SeqCst);
+ assert!(prev >= 0);
+ return ret;
+ }
+ }
+ }
+
+ // Removes a previous task from being blocked in this port
+ pub fn abort_selection(&mut self,
+ was_upgrade: bool) -> Result<bool, Port<T>> {
+ // If we're aborting selection after upgrading from a oneshot, then
+ // we're guarantee that no one is waiting. The only way that we could
+ // have seen the upgrade is if data was actually sent on the channel
+ // half again. For us, this means that there is guaranteed to be data on
+ // this channel. Furthermore, we're guaranteed that there was no
+ // start_selection previously, so there's no need to modify `self.cnt`
+ // at all.
+ //
+ // Hence, because of these invariants, we immediately return `Ok(true)`.
+ // Note that the data may not actually be sent on the channel just yet.
+ // The other end could have flagged the upgrade but not sent data to
+ // this end. This is fine because we know it's a small bounded windows
+ // of time until the data is actually sent.
+ if was_upgrade {
+ assert_eq!(self.steals, 0);
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ return Ok(true)
+ }
+
+ // We want to make sure that the count on the channel goes non-negative,
+ // and in the stream case we can have at most one steal, so just assume
+ // that we had one steal.
+ let steals = 1;
+ let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+
+ // If we were previously disconnected, then we know for sure that there
+ // is no task in to_wake, so just keep going
+ let has_data = if prev == DISCONNECTED {
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ self.cnt.store(DISCONNECTED, atomics::SeqCst);
+ true // there is data, that data is that we're disconnected
+ } else {
+ let cur = prev + steals + 1;
+ assert!(cur >= 0);
+
+ // If the previous count was negative, then we just made things go
+ // positive, hence we passed the -1 boundary and we're responsible
+ // for removing the to_wake() field and trashing it.
+ //
+ // If the previous count was positive then we're in a tougher
+ // situation. A possible race is that a sender just incremented
+ // through -1 (meaning it's going to try to wake a task up), but it
+ // hasn't yet read the to_wake. In order to prevent a future recv()
+ // from waking up too early (this sender picking up the plastered
+ // over to_wake), we spin loop here waiting for to_wake to be 0.
+ // Note that this entire select() implementation needs an overhaul,
+ // and this is *not* the worst part of it, so this is not done as a
+ // final solution but rather out of necessity for now to get
+ // something working.
+ if prev < 0 {
+ self.take_to_wake().trash();
+ } else {
+ while self.to_wake.load(atomics::SeqCst) != 0 {
+ Thread::yield_now();
+ }
+ }
+ assert_eq!(self.steals, 0);
+ self.steals = steals;
+
+ // if we were previously positive, then there's surely data to
+ // receive
+ prev >= 0
+ };
+
+ // Now that we've determined that this queue "has data", we peek at the
+ // queue to see if the data is an upgrade or not. If it's an upgrade,
+ // then we need to destroy this port and abort selection on the
+ // upgraded port.
+ if has_data {
+ match self.queue.peek() {
+ Some(&GoUp(..)) => {
+ match self.queue.pop() {
+ Some(GoUp(port)) => Err(port),
+ _ => unreachable!(),
+ }
+ }
+ _ => Ok(true),
+ }
+ } else {
+ Ok(false)
+ }
+ }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+ fn drop(&mut self) {
+ unsafe {
+ // Note that this load is not only an assert for correctness about
+ // disconnection, but also a proper fence before the read of
+ // `to_wake`, so this assert cannot be removed with also removing
+ // the `to_wake` assert.
+ assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
+ assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+ }
+ }
+}
use clone::Clone;
use result::{Ok, Err};
-use comm::{Port, SharedChan};
+use comm::{Port, Chan};
use container::{Map, MutableMap};
use hashmap;
use io;
priv handles: hashmap::HashMap<Signum, ~RtioSignal>,
/// chan is where all the handles send signums, which are received by
/// the clients from port.
- priv chan: SharedChan<Signum>,
+ priv chan: Chan<Signum>,
/// Clients of Listener can `recv()` from this port. This is exposed to
/// allow selection over this port as well as manipulation of the port
/// Creates a new listener for signals. Once created, signals are bound via
/// the `register` method (otherwise nothing will ever be received)
pub fn new() -> Listener {
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
Listener {
chan: chan,
port: port,
pub use vec::{Vector, VectorVector, CloneableVector, ImmutableVector};
// Reexported runtime types
-pub use comm::{Port, Chan, SharedChan};
+pub use comm::{Port, Chan};
pub use task::spawn;
// Reexported statics
// FIXME: these probably shouldn't be public...
#[doc(hidden)]
pub mod shouldnt_be_public {
+ #[cfg(not(test))]
pub use super::local_ptr::native::maybe_tls_key;
#[cfg(not(windows), not(target_os = "android"))]
pub use super::local_ptr::compiled::RT_TLS_PTR;
use c_str::CString;
use cast;
-use comm::{SharedChan, Port};
+use comm::{Chan, Port};
use libc::c_int;
use libc;
use ops::Drop;
fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>;
fn tty_open(&mut self, fd: c_int, readable: bool)
-> Result<~RtioTTY, IoError>;
- fn signal(&mut self, signal: Signum, channel: SharedChan<Signum>)
+ fn signal(&mut self, signal: Signum, channel: Chan<Signum>)
-> Result<~RtioSignal, IoError>;
}
#[test]
fn comm_shared_chan() {
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
chan.send(10);
assert!(port.recv() == 10);
}
use uw = self::libunwind;
+#[allow(dead_code)]
mod libunwind {
//! Unwind library interface
#[allow(missing_doc)];
#[deny(unused_must_use)];
-use comm::SharedChan;
+use comm::Chan;
use io::Reader;
use io::process::ProcessExit;
use io::process;
// in parallel so we don't deadlock while blocking on one
// or the other. FIXME (#2625): Surely there's a much more
// clever way to do this.
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
let ch_clone = ch.clone();
spawn(proc() {
let nmsgs = 1000u;
let mut q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
for _ in range(0, nthreads) {
let q = q.clone();
mod tests {
use prelude::*;
- use super::{Queue, Data, Empty, Inconsistent};
use native;
+ use super::{Queue, Data, Empty, Inconsistent};
+ use sync::arc::UnsafeArc;
#[test]
fn test_full() {
let mut q = Queue::new();
- p.push(~1);
- p.push(~2);
+ q.push(~1);
+ q.push(~2);
}
#[test]
let nthreads = 8u;
let nmsgs = 1000u;
let mut q = Queue::new();
- match c.pop() {
+ match q.pop() {
Empty => {}
Inconsistent | Data(..) => fail!()
}
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
let q = UnsafeArc::new(q);
for _ in range(0, nthreads) {
}
}
- /// Tests whether this queue is empty or not. Remember that there can only
- /// be one tester/popper, and also keep in mind that the answer returned
- /// from this is likely to change if it is `false`.
- pub fn is_empty(&self) -> bool {
+ /// Attempts to peek at the head of the queue, returning `None` if the queue
+ /// has no data currently
+ pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+ // This is essentially the same as above with all the popping bits
+ // stripped out.
unsafe {
let tail = self.tail;
let next = (*tail).next.load(Acquire);
- return next.is_null();
+ if next.is_null() { return None }
+ return (*next).value.as_mut();
}
}
}
#[cfg(test)]
mod test {
use prelude::*;
- use super::Queue;
use native;
+ use super::Queue;
+ use sync::arc::UnsafeArc;
#[test]
fn smoke() {
let (a, b) = UnsafeArc::new2(Queue::new(bound));
let (port, chan) = Chan::new();
native::task::spawn(proc() {
- let mut c = c;
for _ in range(0, 100000) {
loop {
match unsafe { (*b.get()).pop() } {
use str::{Str, SendStr, IntoMaybeOwned};
#[cfg(test)] use any::{AnyOwnExt, AnyRefExt};
-#[cfg(test)] use comm::SharedChan;
#[cfg(test)] use ptr;
#[cfg(test)] use result;
fn test_spawn_sched() {
use clone::Clone;
- let (po, ch) = SharedChan::new();
+ let (po, ch) = Chan::new();
- fn f(i: int, ch: SharedChan<()>) {
+ fn f(i: int, ch: Chan<()>) {
let ch = ch.clone();
spawn(proc() {
if i == 0 {
use std::cast;
use std::result;
use std::task;
- use std::comm::{SharedChan, Empty};
+ use std::comm::Empty;
/************************************************************************
* Semaphore tests
#[test]
fn test_barrier() {
let barrier = Barrier::new(10);
- let (port, chan) = SharedChan::new();
+ let (port, chan) = Chan::new();
for _ in range(0, 9) {
let c = barrier.clone();
}
}
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
for _ in range(0, N) {
let c2 = c.clone();
native::task::spawn(proc() { inc(); c2.send(()); });
static mut o: Once = ONCE_INIT;
static mut run: bool = false;
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
for _ in range(0, 10) {
let c = c.clone();
spawn(proc() {
fn run(args: &[~str]) {
let (from_child, to_parent) = Chan::new();
- let (from_parent, to_child) = SharedChan::new();
+ let (from_parent, to_child) = Chan::new();
let size = from_str::<uint>(args[1]).unwrap();
let workers = from_str::<uint>(args[2]).unwrap();
});
from_parent
} else {
- let (from_parent, to_child) = SharedChan::new();
+ let (from_parent, to_child) = Chan::new();
for _ in range(0u, workers) {
let to_child = to_child.clone();
let mut builder = task::task();
name: uint,
color: color,
from_rendezvous: Port<Option<CreatureInfo>>,
- to_rendezvous: SharedChan<CreatureInfo>,
- to_rendezvous_log: SharedChan<~str>
+ to_rendezvous: Chan<CreatureInfo>,
+ to_rendezvous_log: Chan<~str>
) {
let mut color = color;
let mut creatures_met = 0;
fn rendezvous(nn: uint, set: ~[color]) {
// these ports will allow us to hear from the creatures
- let (from_creatures, to_rendezvous) = SharedChan::<CreatureInfo>::new();
- let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new();
+ let (from_creatures, to_rendezvous) = Chan::<CreatureInfo>::new();
+ let (from_creatures_log, to_rendezvous_log) = Chan::<~str>::new();
// these channels will be passed to the creatures so they can talk to us
use std::uint;
fn fib(n: int) -> int {
- fn pfib(c: &SharedChan<int>, n: int) {
+ fn pfib(c: &Chan<int>, n: int) {
if n == 0 {
c.send(0);
} else if n <= 2 {
c.send(1);
} else {
- let (pp, cc) = SharedChan::new();
+ let (pp, cc) = Chan::new();
let ch = cc.clone();
task::spawn(proc() pfib(&ch, n - 1));
let ch = cc.clone();
}
}
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
let _t = task::spawn(proc() pfib(&ch, n) );
p.recv()
}
// Creates in the background 'num_tasks' tasks, all blocked forever.
// Doesn't return until all such tasks are ready, but doesn't block forever itself.
-use std::comm::{stream, SharedChan};
+use std::comm::{stream, Chan};
use std::os;
use std::result;
use std::task;
fn grandchild_group(num_tasks: uint) {
let (po, ch) = stream();
- let ch = SharedChan::new(ch);
+ let ch = Chan::new(ch);
for _ in range(0, num_tasks) {
let ch = ch.clone();
fn main() {
test::<Chan<int>>(); //~ ERROR: does not fulfill `Freeze`
test::<Port<int>>(); //~ ERROR: does not fulfill `Freeze`
- test::<SharedChan<int>>(); //~ ERROR: does not fulfill `Freeze`
+ test::<Chan<int>>(); //~ ERROR: does not fulfill `Freeze`
}
enum ctrl_proto { find_reducer(~[u8], Chan<int>), mapper_done, }
- fn start_mappers(ctrl: SharedChan<ctrl_proto>, inputs: ~[~str]) {
+ fn start_mappers(ctrl: Chan<ctrl_proto>, inputs: ~[~str]) {
for i in inputs.iter() {
let ctrl = ctrl.clone();
let i = i.clone();
}
}
- fn map_task(ctrl: SharedChan<ctrl_proto>, input: ~str) {
+ fn map_task(ctrl: Chan<ctrl_proto>, input: ~str) {
let mut intermediates = HashMap::new();
fn emit(im: &mut HashMap<~str, int>,
- ctrl: SharedChan<ctrl_proto>, key: ~str,
+ ctrl: Chan<ctrl_proto>, key: ~str,
_val: ~str) {
if im.contains_key(&key) {
return;
}
pub fn map_reduce(inputs: ~[~str]) {
- let (ctrl_port, ctrl_chan) = SharedChan::new();
+ let (ctrl_port, ctrl_chan) = Chan::new();
// This task becomes the master control task. It spawns others
// to do the rest.
use std::task;
pub fn main() {
- let (po, ch) = SharedChan::new();
+ let (po, ch) = Chan::new();
// Spawn 10 tasks each sending us back one int.
let mut i = 10;
info!("main thread exiting");
}
-fn child(x: int, ch: &SharedChan<int>) {
+fn child(x: int, ch: &Chan<int>) {
info!("{}", x);
ch.send(x);
}
pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
-fn test00_start(ch: &SharedChan<int>, message: int, count: int) {
+fn test00_start(ch: &Chan<int>, message: int, count: int) {
info!("Starting test00_start");
let mut i: int = 0;
while i < count {
info!("Creating tasks");
- let (po, ch) = SharedChan::new();
+ let (po, ch) = Chan::new();
let mut i: int = 0;
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
let mut c0 = ch.clone();
let mut c1 = ch.clone();
let mut c2 = ch.clone();
pub fn main() { test00(); }
-fn test00_start(c: &SharedChan<int>, start: int,
+fn test00_start(c: &Chan<int>, start: int,
number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
let number_of_messages: int = 10;
let c = ch.clone();
use std::task;
-fn child(c: &SharedChan<~uint>, i: uint) {
+fn child(c: &Chan<~uint>, i: uint) {
c.send(~i);
}
pub fn main() {
- let (p, ch) = SharedChan::new();
+ let (p, ch) = Chan::new();
let n = 100u;
let mut expected = 0u;
for i in range(0u, n) {
use std::task;
struct complainer {
- c: SharedChan<bool>,
+ c: Chan<bool>,
}
impl Drop for complainer {
}
}
-fn complainer(c: SharedChan<bool>) -> complainer {
+fn complainer(c: Chan<bool>) -> complainer {
error!("Hello!");
complainer {
c: c
}
}
-fn f(c: SharedChan<bool>) {
+fn f(c: Chan<bool>) {
let _c = complainer(c);
fail!();
}
pub fn main() {
- let (p, c) = SharedChan::new();
+ let (p, c) = Chan::new();
task::spawn(proc() f(c.clone()));
error!("hiiiiiiiii");
assert!(p.recv());