--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Generic support for building blocking abstractions.
+
+use thread::Thread;
+use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering};
+use sync::Arc;
+use kinds::marker::NoSend;
+use mem;
+use clone::Clone;
+
+struct Inner {
+ thread: Thread,
+ woken: AtomicBool,
+}
+
+#[deriving(Clone)]
+pub struct SignalToken {
+ inner: Arc<Inner>,
+}
+
+pub struct WaitToken {
+ inner: Arc<Inner>,
+ no_send: NoSend,
+}
+
+fn token() -> (WaitToken, SignalToken) {
+ let inner = Arc::new(Inner {
+ thread: Thread::current(),
+ woken: INIT_ATOMIC_BOOL,
+ });
+ let wait_token = WaitToken {
+ inner: inner.clone(),
+ no_send: NoSend,
+ };
+ let signal_token = SignalToken {
+ inner: inner
+ };
+ (wait_token, signal_token)
+}
+
+impl SignalToken {
+ fn signal(&self) -> bool {
+ let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
+ if wake {
+ self.inner.thread.unpark();
+ }
+ wake
+ }
+
+ /// Convert to an unsafe uint value. Useful for storing in a pipe's state
+ /// flag.
+ #[inline]
+ pub unsafe fn cast_to_uint(self) -> uint {
+ mem::transmute(self.inner)
+ }
+
+ /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
+ /// flag.
+ #[inline]
+ pub unsafe fn cast_from_uint(signal_ptr: uint) -> SignalToken {
+ SignalToken { inner: mem::transmute(signal_ptr) }
+ }
+
+}
+
+impl WaitToken {
+ fn wait(self) {
+ while !self.inner.woken.load(Ordering::SeqCst) {
+ Thread::park()
+ }
+ }
+}
//! There are methods on both of senders and receivers to perform their
//! respective operations without panicking, however.
//!
-//! ## Runtime Requirements
-//!
-//! The channel types defined in this module generally have very few runtime
-//! requirements in order to operate. The major requirement they have is for a
-//! local rust `Task` to be available if any *blocking* operation is performed.
-//!
-//! If a local `Task` is not available (for example an FFI callback), then the
-//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
-//! the `try_send` method on a `SyncSender`, but no other operations are
-//! guaranteed to be safe.
-//!
//! # Example
//!
//! Simple usage:
use core::kinds::marker;
use core::mem;
use core::cell::UnsafeCell;
-use rt::task::BlockedTask;
pub use comm::select::{Select, Handle};
+use comm::select::StartResult::*;
macro_rules! test {
{ fn $name:ident() $b:block $(#[$a:meta])*} => (
)
}
+mod blocking;
mod oneshot;
mod select;
mod shared;
}
}
- fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
+ fn start_selection(&self, mut token: SignalToken) -> bool {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
- match unsafe { (*p.get()).start_selection(task) } {
- oneshot::SelSuccess => return Ok(()),
- oneshot::SelCanceled(task) => return Err(task),
+ match unsafe { (*p.get()).start_selection(token) } {
+ oneshot::SelSuccess => return Installed,
+ oneshot::SelCanceled => return Abort,
oneshot::SelUpgraded(t, rx) => (t, rx),
}
}
Stream(ref p) => {
- match unsafe { (*p.get()).start_selection(task) } {
- stream::SelSuccess => return Ok(()),
- stream::SelCanceled(task) => return Err(task),
+ match unsafe { (*p.get()).start_selection(token) } {
+ stream::SelSuccess => return Installed,
+ stream::SelCanceled => return Abort,
stream::SelUpgraded(t, rx) => (t, rx),
}
}
Shared(ref p) => {
- return unsafe { (*p.get()).start_selection(task) };
+ return unsafe { (*p.get()).start_selection(token) };
}
Sync(ref p) => {
- return unsafe { (*p.get()).start_selection(task) };
+ return unsafe { (*p.get()).start_selection(token) };
}
};
- task = t;
+ token = t;
unsafe {
- mem::swap(self.inner_mut(),
- new_port.inner_mut());
+ mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}
use core::prelude::*;
use alloc::boxed::Box;
+use comm::Receiver;
+use comm::blocking::{mod, WaitToken, SignalToken};
use core::mem;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-
use sync::atomic;
-use comm::Receiver;
// Various states you can find a port in.
-const EMPTY: uint = 0;
-const DATA: uint = 1;
-const DISCONNECTED: uint = 2;
+const EMPTY: uint = 0; // initial state: no data, no blocked reciever
+const DATA: uint = 1; // data ready for receiver to take
+const DISCONNECTED: uint = 2; // channel is disconnected OR upgraded
+// Any other value represents a pointer to a SignalToken value. The
+// protocol ensures that when the state moves *to* a pointer,
+// ownership of the token is given to the packet, and when the state
+// moves *from* a pointer, ownership of the token is transferred to
+// whoever changed the state.
pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked task as well)
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
- UpWoke(BlockedTask),
+ UpWoke(SignalToken),
}
pub enum SelectionResult<T> {
- SelCanceled(BlockedTask),
- SelUpgraded(BlockedTask, Receiver<T>),
+ SelCanceled,
+ SelUpgraded(SignalToken, Receiver<T>),
SelSuccess,
}
// Not possible, these are one-use channels
DATA => unreachable!(),
- // Anything else means that there was a task waiting on the other
- // end. We leave the 'DATA' state inside so it'll pick it up on the
- // other end.
- n => unsafe {
- let t = BlockedTask::cast_from_uint(n);
- t.wake().map(|t| t.reawaken());
+ // There is a thread waiting on the other end. We leave the 'DATA'
+ // state inside so it'll pick it up on the other end.
+ ptr => unsafe {
+ SignalToken::cast_from_uint(ptr).signal();
Ok(())
}
}
// Attempt to not block the task (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(atomic::SeqCst) == EMPTY {
- let t: Box<Task> = Local::take();
- t.deschedule(1, |task| {
- let n = unsafe { task.cast_to_uint() };
- match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
- // Nothing on the channel, we legitimately block
- EMPTY => Ok(()),
-
- // If there's data or it's a disconnected channel, then we
- // failed the cmpxchg, so we just wake ourselves back up
- DATA | DISCONNECTED => {
- unsafe { Err(BlockedTask::cast_from_uint(n)) }
- }
-
- // Only one thread is allowed to sleep on this port
- _ => unreachable!()
- }
- });
+ let (wait_token, signal_token) = blocking::token();
+ let ptr = unsafe { signal_token.cast_to_uint() };
+
+ // race with senders to enter the blocking state
+ if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY {
+ wait_token.wait();
+ debug_assert!(self.state.load(atomic::SeqCst) != EMPTY);
+ } else {
+ // drop the signal token, since we never blocked
+ drop(unsafe { SignalToken::cast_from_uint(ptr) });
+ }
}
self.try_recv()
}
}
}
+
+ // We are the sole receiver; there cannot be a blocking
+ // receiver already.
_ => unreachable!()
}
}
DISCONNECTED => { self.upgrade = prev; UpDisconnected }
// If someone's waiting, we gotta wake them up
- n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
+ ptr => UpWoke(unsafe { SignalToken::cast_from_uint(ptr) })
}
}
DATA | DISCONNECTED | EMPTY => {}
// If someone's waiting, we gotta wake them up
- n => unsafe {
- let t = BlockedTask::cast_from_uint(n);
- t.wake().map(|t| t.reawaken());
+ ptr => unsafe {
+ SignalToken::cast_from_uint(ptr).signal();
}
}
}
// Attempts to start selection on this port. This can either succeed, fail
// because there is data, or fail because there is an upgrade pending.
- pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
- let n = unsafe { task.cast_to_uint() };
- match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
+ pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
+ let ptr = unsafe { token.cast_to_uint() };
+ match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) {
EMPTY => SelSuccess,
- DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
+ DATA => {
+ drop(unsafe { SignalToken::cast_from_uint(ptr) });
+ SelCanceled
+ }
DISCONNECTED if self.data.is_some() => {
- SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+ drop(unsafe { SignalToken::cast_from_uint(ptr) });
+ SelCanceled
}
DISCONNECTED => {
match mem::replace(&mut self.upgrade, SendUsed) {
// propagate upwards whether the upgrade can receive
// data
GoUp(upgrade) => {
- SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
- upgrade)
+ SelUpgraded(unsafe { SignalToken::cast_from_uint(ptr) }, upgrade)
}
// If the other end disconnected without sending an
// disconnected).
up => {
self.upgrade = up;
- SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+ drop(unsafe { SignalToken::cast_from_uint(ptr) });
+ SelCanceled
}
}
}
// If we've got a blocked task, then use an atomic to gain ownership
// of it (may fail)
- n => self.state.compare_and_swap(n, EMPTY, atomic::SeqCst)
+ BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst)
};
// Now that we've got ownership of our state, figure out what to do
}
}
- // We woke ourselves up from select. Assert that the task should be
- // trashed and returned that we don't have any data.
- n => {
- let t = unsafe { BlockedTask::cast_from_uint(n) };
- t.trash();
+ // We woke ourselves up from select.
+ ptr => unsafe {
+ drop(SignalToken::cast_from_uint(ptr));
Ok(false)
}
}
use core::kinds::marker;
use core::mem;
use core::uint;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
use comm::Receiver;
+use comm::blocking::{mod, SignalToken};
+
+use self::StartResult::*;
/// The "receiver set" of the select interface. This structure is used to manage
/// a set of receivers which are being selected over.
struct Packets { cur: *mut Handle<'static, ()> }
+#[doc(hidden)]
+#[deriving(PartialEq)]
+pub enum StartResult {
+ Installed,
+ Abort,
+}
+
#[doc(hidden)]
pub trait Packet {
fn can_recv(&self) -> bool;
- fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
+ fn start_selection(&self, token: SignalToken) -> StartResult;
fn abort_selection(&self) -> bool;
}
// Most notably, the iterations over all of the receivers shouldn't be
// necessary.
unsafe {
- let mut amt = 0;
- for p in self.iter() {
- amt += 1;
- if do_preflight_checks && (*p).packet.can_recv() {
- return (*p).id;
+ // Stage 1: preflight checks. Look for any packets ready to receive
+ if do_preflight_checks {
+ for handle in self.iter() {
+ if (*handle).packet.can_recv() {
+ return (*handle).id();
+ }
}
}
- assert!(amt > 0);
- let mut ready_index = amt;
- let mut ready_id = uint::MAX;
- let mut iter = self.iter().enumerate();
-
- // Acquire a number of blocking contexts, and block on each one
- // sequentially until one fails. If one fails, then abort
- // immediately so we can go unblock on all the other receivers.
- let task: Box<Task> = Local::take();
- task.deschedule(amt, |task| {
- // Prepare for the block
- let (i, handle) = iter.next().unwrap();
- match (*handle).packet.start_selection(task) {
- Ok(()) => Ok(()),
- Err(task) => {
- ready_index = i;
- ready_id = (*handle).id;
- Err(task)
+ // Stage 2: begin the blocking process
+ //
+ // Create a number of signal tokens, and install each one
+ // sequentially until one fails. If one fails, then abort the
+ // selection on the already-installed tokens.
+ let (wait_token, signal_token) = blocking::tokens();
+ for (i, handle) in self.iter().enumerate() {
+ match (*handle).packet.start_selection(signal_token.clone()) {
+ Installed => {}
+ Abort => {
+ // Go back and abort the already-begun selections
+ for handle in self.iter().take(i) {
+ (*handle).packet.abort_selection();
+ }
+ return (*handle).id;
}
}
- });
+ }
+
+ // Stage 3: no messages available, actually block
+ wait_token.wait();
+ // Stage 4: there *must* be message available; find it.
+ //
// Abort the selection process on each receiver. If the abort
// process returns `true`, then that means that the receiver is
// ready to receive some data. Note that this also means that the
// A rewrite should focus on avoiding a yield loop, and for now this
// implementation is tying us over to a more efficient "don't
// iterate over everything every time" implementation.
- for handle in self.iter().take(ready_index) {
+ let mut ready_id = uint::MAX;
+ for handle in self.iter() {
if (*handle).packet.abort_selection() {
ready_id = (*handle).id;
}
}
+ // We must have found a ready receiver
assert!(ready_id != uint::MAX);
return ready_id;
}
use alloc::boxed::Box;
use core::cmp;
use core::int;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
use sync::{atomic, Mutex, MutexGuard};
use comm::mpsc_queue as mpsc;
+use comm::blocking::{mod, SignalToken};
+use comm::select::StartResult;
+use comm::select::StartResult::*;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
queue: mpsc::Queue<T>,
cnt: atomic::AtomicInt, // How many items are on this channel
steals: int, // How many times has a port received without blocking?
- to_wake: atomic::AtomicUint, // Task to wake up
+ to_wake: atomic::AtomicUint, // SignalToken for wake up
// The number of channels which are currently using this packet.
channels: atomic::AtomicInt,
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self,
- task: Option<BlockedTask>,
+ token: Option<SignalToken>,
guard: MutexGuard<()>) {
- match task {
- Some(task) => {
- assert_eq!(self.cnt.load(atomic::SeqCst), 0);
- assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
- self.to_wake.store(unsafe { task.cast_to_uint() },
- atomic::SeqCst);
- self.cnt.store(-1, atomic::SeqCst);
-
- // This store is a little sketchy. What's happening here is
- // that we're transferring a blocker from a oneshot or stream
- // channel to this shared channel. In doing so, we never
- // spuriously wake them up and rather only wake them up at the
- // appropriate time. This implementation of shared channels
- // assumes that any blocking recv() will undo the increment of
- // steals performed in try_recv() once the recv is complete.
- // This thread that we're inheriting, however, is not in the
- // middle of recv. Hence, the first time we wake them up,
- // they're going to wake up from their old port, move on to the
- // upgraded port, and then call the block recv() function.
- //
- // When calling this function, they'll find there's data
- // immediately available, counting it as a steal. This in fact
- // wasn't a steal because we appropriately blocked them waiting
- // for data.
- //
- // To offset this bad increment, we initially set the steal
- // count to -1. You'll find some special code in
- // abort_selection() as well to ensure that this -1 steal count
- // doesn't escape too far.
- self.steals = -1;
- }
- None => {}
- }
+ token.map(|token| {
+ assert_eq!(self.cnt.load(atomic::SeqCst), 0);
+ assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+ self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
+ self.cnt.store(-1, atomic::SeqCst);
+
+ // This store is a little sketchy. What's happening here is that
+ // we're transferring a blocker from a oneshot or stream channel to
+ // this shared channel. In doing so, we never spuriously wake them
+ // up and rather only wake them up at the appropriate time. This
+ // implementation of shared channels assumes that any blocking
+ // recv() will undo the increment of steals performed in try_recv()
+ // once the recv is complete. This thread that we're inheriting,
+ // however, is not in the middle of recv. Hence, the first time we
+ // wake them up, they're going to wake up from their old port, move
+ // on to the upgraded port, and then call the block recv() function.
+ //
+ // When calling this function, they'll find there's data immediately
+ // available, counting it as a steal. This in fact wasn't a steal
+ // because we appropriately blocked them waiting for data.
+ //
+ // To offset this bad increment, we initially set the steal count to
+ // -1. You'll find some special code in abort_selection() as well to
+ // ensure that this -1 steal count doesn't escape too far.
+ self.steals = -1;
+ });
// When the shared packet is constructed, we grabbed this lock. The
// purpose of this lock is to ensure that abort_selection() doesn't
self.queue.push(t);
match self.cnt.fetch_add(1, atomic::SeqCst) {
-1 => {
- self.take_to_wake().wake().map(|t| t.reawaken());
+ self.take_to_wake().signal();
}
// In this case, we have possibly failed to send our data, and
data => return data,
}
- let task: Box<Task> = Local::take();
- task.deschedule(1, |task| {
- self.decrement(task)
- });
+ let (wait_token, signal_token) = blocking::tokens();
+ if self.decrement(signal_token) == Installed {
+ wait_token.wait()
+ }
match self.try_recv() {
data @ Ok(..) => { self.steals -= 1; data }
}
// Essentially the exact same thing as the stream decrement function.
- fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+ // Returns true if blocking should proceed.
+ fn decrement(&mut self, token: SignalToken) -> StartResult {
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
- let n = unsafe { task.cast_to_uint() };
- self.to_wake.store(n, atomic::SeqCst);
+ let ptr = unsafe { token.cast_to_uint() };
+ self.to_wake.store(ptr, atomic::SeqCst);
let steals = self.steals;
self.steals = 0;
// data, we successfully sleep
n => {
assert!(n >= 0);
- if n - steals <= 0 { return Ok(()) }
+ if n - steals <= 0 { return Installed }
}
}
self.to_wake.store(0, atomic::SeqCst);
- Err(unsafe { BlockedTask::cast_from_uint(n) })
+ drop(unsafe { SignalToken::cast_from_uint(ptr) });
+ Abort
}
pub fn try_recv(&mut self) -> Result<T, Failure> {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
- // This is a bit of an interesting case. The channel is
- // reported as having data available, but our pop() has
- // failed due to the queue being in an inconsistent state.
- // This means that there is some pusher somewhere which has
- // yet to complete, but we are guaranteed that a pop will
- // eventually succeed. In this case, we spin in a yield loop
- // because the remote sender should finish their enqueue
+ // This is a bit of an interesting case. The channel is reported as
+ // having data available, but our pop() has failed due to the queue
+ // being in an inconsistent state. This means that there is some
+ // pusher somewhere which has yet to complete, but we are guaranteed
+ // that a pop will eventually succeed. In this case, we spin in a
+ // yield loop because the remote sender should finish their enqueue
// operation "very quickly".
//
// Avoiding this yield loop would require a different queue
- // abstraction which provides the guarantee that after M
- // pushes have succeeded, at least M pops will succeed. The
- // current queues guarantee that if there are N active
- // pushes, you can pop N times once all N have finished.
+ // abstraction which provides the guarantee that after M pushes have
+ // succeeded, at least M pops will succeed. The current queues
+ // guarantee that if there are N active pushes, you can pop N times
+ // once all N have finished.
mpsc::Inconsistent => {
let data;
loop {
}
match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
- -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+ -1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
self.port_dropped.store(true, atomic::SeqCst);
let mut steals = self.steals;
while {
- let cnt = self.cnt.compare_and_swap(
- steals, DISCONNECTED, atomic::SeqCst);
+ let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
cnt != DISCONNECTED && cnt != steals
} {
// See the discussion in 'try_recv' for why we yield
}
// Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&mut self) -> BlockedTask {
- let task = self.to_wake.load(atomic::SeqCst);
+ fn take_to_wake(&mut self) -> SignalToken {
+ let ptr = self.to_wake.load(atomic::SeqCst);
self.to_wake.store(0, atomic::SeqCst);
- assert!(task != 0);
- unsafe { BlockedTask::cast_from_uint(task) }
+ assert!(ptr != 0);
+ unsafe { SignalToken::cast_from_uint(ptr) }
}
////////////////////////////////////////////////////////////////////////////
}
}
- // Inserts the blocked task for selection on this port, returning it back if
- // the port already has data on it.
+ // Inserts the signal token for selection on this port, returning true if
+ // blocking should proceed.
//
// The code here is the same as in stream.rs, except that it doesn't need to
// peek at the channel to see if an upgrade is pending.
- pub fn start_selection(&mut self,
- task: BlockedTask) -> Result<(), BlockedTask> {
- match self.decrement(task) {
- Ok(()) => Ok(()),
- Err(task) => {
+ pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
+ match self.decrement(token) {
+ Installed => Installed,
+ Abort => {
let prev = self.bump(1);
assert!(prev == DISCONNECTED || prev >= 0);
- return Err(task);
+ Abort
}
}
}
let cur = prev + steals + 1;
assert!(cur >= 0);
if prev < 0 {
- self.take_to_wake().trash();
+ drop(self.take_to_wake());
} else {
while self.to_wake.load(atomic::SeqCst) != 0 {
Thread::yield_now();
use alloc::boxed::Box;
use core::cmp;
use core::int;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
+use thread::Thread;
use sync::atomic;
use comm::spsc_queue as spsc;
use comm::Receiver;
+use comm::blocking::{mod, WaitToken, SignalToken};
const DISCONNECTED: int = int::MIN;
#[cfg(test)]
cnt: atomic::AtomicInt, // How many items are on this channel
steals: int, // How many times has a port received without blocking?
- to_wake: atomic::AtomicUint, // Task to wake up
+ to_wake: atomic::AtomicUint, // SignalToken for the blocked thread to wake up
port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
}
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
- UpWoke(BlockedTask),
+ UpWoke(SignalToken),
}
pub enum SelectionResult<T> {
SelSuccess,
- SelCanceled(BlockedTask),
- SelUpgraded(BlockedTask, Receiver<T>),
+ SelCanceled,
+ SelUpgraded(SignalToken, Receiver<T>),
}
// Any message could contain an "upgrade request" to a new shared port, so the
}
}
-
pub fn send(&mut self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
match self.do_send(Data(t)) {
UpSuccess | UpDisconnected => {},
- UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
+ UpWoke(token) => { token.signal(); }
}
Ok(())
}
+
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
}
// Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&mut self) -> BlockedTask {
- let task = self.to_wake.load(atomic::SeqCst);
+ fn take_to_wake(&mut self) -> SignalToken {
+ let ptr = self.to_wake.load(atomic::SeqCst);
self.to_wake.store(0, atomic::SeqCst);
- assert!(task != 0);
- unsafe { BlockedTask::cast_from_uint(task) }
+ assert!(ptr != 0);
+ unsafe { SignaToken::cast_from_uint(ptr) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
- fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+ fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
- let n = unsafe { task.cast_to_uint() };
- self.to_wake.store(n, atomic::SeqCst);
+ let ptr = unsafe { token.cast_to_uint() };
+ self.to_wake.store(ptr, atomic::SeqCst);
let steals = self.steals;
self.steals = 0;
}
self.to_wake.store(0, atomic::SeqCst);
- Err(unsafe { BlockedTask::cast_from_uint(n) })
+ Err(unsafe { SignalToken::cast_from_uint(ptr) })
}
pub fn recv(&mut self) -> Result<T, Failure<T>> {
// Welp, our channel has no data. Deschedule the current task and
// initiate the blocking protocol.
- let task: Box<Task> = Local::take();
- task.deschedule(1, |task| {
- self.decrement(task)
- });
+ let (wait_token, signal_token) = blocking::tokens();
+ if self.decrement(signal_token).is_ok() {
+ wait_token.wait()
+ }
match self.try_recv() {
// Messages which actually popped from the queue shouldn't count as
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
- -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+ -1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
}
// Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade.
- pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
- match self.decrement(task) {
+ pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
+ match self.decrement(token) {
Ok(()) => SelSuccess,
- Err(task) => {
+ Err(token) => {
let ret = match self.queue.peek() {
Some(&GoUp(..)) => {
match self.queue.pop() {
- Some(GoUp(port)) => SelUpgraded(task, port),
+ Some(GoUp(port)) => SelUpgraded(token, port),
_ => unreachable!(),
}
}
- Some(..) => SelCanceled(task),
- None => SelCanceled(task),
+ Some(..) => SelCanceled,
+ None => SelCanceled,
};
// Undo our decrement above, and we should be guaranteed that the
// previous value is positive because we're not going to sleep
// final solution but rather out of necessity for now to get
// something working.
if prev < 0 {
- self.take_to_wake().trash();
+ drop(self.take_to_wake());
} else {
while self.to_wake.load(atomic::SeqCst) != 0 {
Thread::yield_now();
use vec::Vec;
use core::mem;
use core::cell::UnsafeCell;
-use rt::local::Local;
-use rt::mutex::{NativeMutex, LockGuard};
-use rt::task::{Task, BlockedTask};
-use sync::atomic;
+use sync::{atomic, Mutex, MutexGuard};
+use comm::blocking::{mod, WaitToken, SignalToken};
+use comm::select::StartResult::{mod, Installed, Abort};
pub struct Packet<T> {
/// Only field outside of the mutex. Just done for kicks, but mainly because
canceled: Option<&'static mut bool>,
}
-/// Possible flavors of tasks who can be blocked on this channel.
+/// Possible flavors of threads who can be blocked on this channel.
enum Blocker {
- BlockedSender(BlockedTask),
- BlockedReceiver(BlockedTask),
+ BlockedSender(SignalToken),
+ BlockedReceiver(SignalToken),
NoneBlocked
}
}
struct Node {
- task: Option<BlockedTask>,
+ token: Option<SignalToken>,
next: *mut Node,
}
Disconnected,
}
-/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
+/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
-fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
- lock: &NativeMutex) {
- let me: Box<Task> = Local::take();
- me.deschedule(1, |task| {
- match mem::replace(slot, f(task)) {
- NoneBlocked => {}
- _ => unreachable!(),
- }
- unsafe { lock.unlock_noguard(); }
- Ok(())
- });
- unsafe { lock.lock_noguard(); }
-}
-/// Wakes up a task, dropping the lock at the correct time
-fn wakeup(task: BlockedTask, guard: LockGuard) {
+/// Wakes up a thread, dropping the lock at the correct time
+fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
// We need to be careful to wake up the waiting task *outside* of the mutex
// in case it incurs a context switch.
- mem::drop(guard);
- task.wake().map(|t| t.reawaken());
+ drop(guard);
+ token.signal();
}
impl<T: Send> Packet<T> {
}
}
- // Locks this channel, returning a guard for the state and the mutable state
- // itself. Care should be taken to ensure that the state does not escape the
- // guard!
- //
- // Note that we're ok promoting an & reference to an &mut reference because
- // the lock ensures that we're the only ones in the world with a pointer to
- // the state.
- fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
- unsafe {
- let guard = self.lock.lock();
- (guard, &mut *self.state.get())
+ // wait until a send slot is available, returning locked access to
+ // the channel state.
+ fn acquire_send_slot(&self) -> MutexGuard<State<T>> {
+ let mut node = Node { token: None, next: 0 as *mut Node };
+ loop {
+ let mut guard = self.lock.lock();
+ // are we ready to go?
+ if guard.disconnected || guard.buf.size() < guard.buf.cap() {
+ return guard;
+ }
+ // no room; actually block
+ let wait_token = guard.queue.enqueue(&mut node);
+ drop(guard);
+ wait_token.wait();
}
}
pub fn send(&self, t: T) -> Result<(), T> {
- let (guard, state) = self.lock();
-
- // wait for a slot to become available, and enqueue the data
- while !state.disconnected && state.buf.size() == state.buf.cap() {
- state.queue.enqueue(&self.lock);
- }
- if state.disconnected { return Err(t) }
- state.buf.enqueue(t);
+ let guard = self.acquire_send_slot();
+ if guard.disconnected { return Err(t) }
+ guard.buf.enqueue(t);
match mem::replace(&mut state.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
NoneBlocked => Ok(()),
// success, someone's about to receive our buffered data.
- BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
+ BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
BlockedSender(..) => panic!("lolwut"),
}
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => Err(super::Full(t)),
BlockedSender(..) => unreachable!(),
- BlockedReceiver(task) => {
- state.buf.enqueue(t);
- wakeup(task, guard);
+ BlockedReceiver(token) => {
+ guard.buf.enqueue(t);
+ wakeup(token, guard);
Ok(())
}
}
// If the buffer has some space and the capacity isn't 0, then we
// just enqueue the data for later retrieval, ensuring to wake up
// any blocked receiver if there is one.
- assert!(state.buf.size() < state.buf.cap());
- state.buf.enqueue(t);
- match mem::replace(&mut state.blocker, NoneBlocked) {
- BlockedReceiver(task) => wakeup(task, guard),
+ assert!(guard.buf.size() < guard.buf.cap());
+ guard.buf.enqueue(t);
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ BlockedReceiver(token) => wakeup(token, guard),
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
}
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self) -> Result<T, ()> {
- let (guard, state) = self.lock();
+ let guard = self.lock.lock();
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
// * `waited` - flag if the receiver blocked to receive some data, or if it
// just picked up some data on the way out
// * `guard` - the lock guard that is held over this channel's lock
- fn wakeup_senders(&self, waited: bool,
- guard: LockGuard,
- state: &mut State<T>) {
- let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
+ fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) {
+ let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
- BlockedSender(task) => {
- state.canceled.take();
- Some(task)
+ BlockedSender(token) => {
+ guard.canceled.take();
+ Some(token)
}
}
} else {
mem::drop((state, guard));
// only outside of the lock do we wake up the pending tasks
- pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
- pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
+ pending_sender1.map(|t| t.signal());
+ pending_sender2.map(|t| t.signal());
}
// Prepares this shared packet for a channel clone, essentially just bumping
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
- BlockedReceiver(task) => wakeup(task, guard),
+ BlockedReceiver(token) => wakeup(token, guard),
}
}
let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
- BlockedSender(task) => {
- *state.canceled.take().unwrap() = true;
- Some(task)
+ BlockedSender(token) => {
+ *guard.canceled.take().unwrap() = true;
+ Some(token)
}
BlockedReceiver(..) => unreachable!(),
};
loop {
match queue.dequeue() {
- Some(task) => { task.wake().map(|t| t.reawaken()); }
+ Some(token) => { token.signal(); }
None => break,
}
}
- waiter.map(|t| t.wake().map(|t| t.reawaken()));
+ waiter.map(|t| t.signal());
}
////////////////////////////////////////////////////////////////////////////
// Attempts to start selection on this port. This can either succeed or fail
// because there is data waiting.
- pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
- let (_g, state) = self.lock();
- if state.disconnected || state.buf.size() > 0 {
- Err(task)
+ pub fn start_selection(&self, token: SignalToken) -> StartResult {
+ let guard = self.lock();
+ if guard.disconnected || guard.buf.size() > 0 {
+ Abort
} else {
- match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
+ match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(..) => unreachable!(),
}
- Ok(())
+ Installed
}
}
let (_g, state) = self.lock();
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => true,
- BlockedSender(task) => {
- state.blocker = BlockedSender(task);
+ BlockedSender(token) => {
+ guard.blocker = BlockedSender(token);
true
}
- BlockedReceiver(task) => { task.trash(); false }
+ BlockedReceiver(token) => { drop(token); false }
}
}
}
////////////////////////////////////////////////////////////////////////////////
impl Queue {
- fn enqueue(&mut self, lock: &NativeMutex) {
- let task: Box<Task> = Local::take();
- let mut node = Node {
- task: None,
- next: 0 as *mut Node,
- };
- task.deschedule(1, |task| {
- node.task = Some(task);
- if self.tail.is_null() {
- self.head = &mut node as *mut Node;
- self.tail = &mut node as *mut Node;
- } else {
- unsafe {
- (*self.tail).next = &mut node as *mut Node;
- self.tail = &mut node as *mut Node;
- }
+ fn enqueue(&mut self, node: &mut Node) -> WaitToken {
+ let (wait_token, signal_token) = blocking::tokens();
+ node.token = Some(signal_token);
+ node.next = 0 as *mut Node;
+
+ if self.tail.is_null() {
+ self.head = node as *mut Node;
+ self.tail = node as *mut Node;
+ } else {
+ unsafe {
+ (*self.tail).next = node as *mut Node;
+ self.tail = node as *mut Node;
}
- unsafe { lock.unlock_noguard(); }
- Ok(())
- });
- unsafe { lock.lock_noguard(); }
- assert!(node.next.is_null());
+ }
+
+ wait_token
}
- fn dequeue(&mut self) -> Option<BlockedTask> {
+ fn dequeue(&mut self) -> Option<SignalToken> {
if self.head.is_null() {
return None
}
}
unsafe {
(*node).next = 0 as *mut Node;
- Some((*node).task.take().unwrap())
+ Some((*node).token.take().unwrap())
}
}
}