]> git.lizzy.rs Git - rust.git/commitdiff
Add blocking support module for channels
authorAaron Turon <aturon@mozilla.com>
Mon, 1 Dec 2014 16:14:35 +0000 (08:14 -0800)
committerAaron Turon <aturon@mozilla.com>
Fri, 19 Dec 2014 07:31:51 +0000 (23:31 -0800)
src/libstd/comm/blocking.rs [new file with mode: 0644]
src/libstd/comm/mod.rs
src/libstd/comm/oneshot.rs
src/libstd/comm/select.rs
src/libstd/comm/shared.rs
src/libstd/comm/stream.rs
src/libstd/comm/sync.rs

diff --git a/src/libstd/comm/blocking.rs b/src/libstd/comm/blocking.rs
new file mode 100644 (file)
index 0000000..5e9a01d
--- /dev/null
@@ -0,0 +1,81 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Generic support for building blocking abstractions.
+
+use thread::Thread;
+use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering};
+use sync::Arc;
+use kinds::marker::NoSend;
+use mem;
+use clone::Clone;
+
+struct Inner {
+    thread: Thread,
+    woken: AtomicBool,
+}
+
+#[deriving(Clone)]
+pub struct SignalToken {
+    inner: Arc<Inner>,
+}
+
+pub struct WaitToken {
+    inner: Arc<Inner>,
+    no_send: NoSend,
+}
+
+fn token() -> (WaitToken, SignalToken) {
+    let inner = Arc::new(Inner {
+        thread: Thread::current(),
+        woken: INIT_ATOMIC_BOOL,
+    });
+    let wait_token = WaitToken {
+        inner: inner.clone(),
+        no_send: NoSend,
+    };
+    let signal_token = SignalToken {
+        inner: inner
+    };
+    (wait_token, signal_token)
+}
+
+impl SignalToken {
+    fn signal(&self) -> bool {
+        let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
+        if wake {
+            self.inner.thread.unpark();
+        }
+        wake
+    }
+
+    /// Convert to an unsafe uint value. Useful for storing in a pipe's state
+    /// flag.
+    #[inline]
+    pub unsafe fn cast_to_uint(self) -> uint {
+        mem::transmute(self.inner)
+    }
+
+    /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
+    /// flag.
+    #[inline]
+    pub unsafe fn cast_from_uint(signal_ptr: uint) -> SignalToken {
+        SignalToken { inner: mem::transmute(signal_ptr) }
+    }
+
+}
+
+impl WaitToken {
+    fn wait(self) {
+        while !self.inner.woken.load(Ordering::SeqCst) {
+            Thread::park()
+        }
+    }
+}
index dfbb09d26b5b42b27e486e73ed6cfb83bad8c601..e5ec0078c5ef0b8f8468ea56a3edfdee5f46d21b 100644 (file)
 //! There are methods on both of senders and receivers to perform their
 //! respective operations without panicking, however.
 //!
-//! ## Runtime Requirements
-//!
-//! The channel types defined in this module generally have very few runtime
-//! requirements in order to operate. The major requirement they have is for a
-//! local rust `Task` to be available if any *blocking* operation is performed.
-//!
-//! If a local `Task` is not available (for example an FFI callback), then the
-//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
-//! the `try_send` method on a `SyncSender`, but no other operations are
-//! guaranteed to be safe.
-//!
 //! # Example
 //!
 //! Simple usage:
 use core::kinds::marker;
 use core::mem;
 use core::cell::UnsafeCell;
-use rt::task::BlockedTask;
 
 pub use comm::select::{Select, Handle};
+use comm::select::StartResult::*;
 
 macro_rules! test {
     { fn $name:ident() $b:block $(#[$a:meta])*} => (
@@ -348,6 +337,7 @@ mod $name {
     )
 }
 
+mod blocking;
 mod oneshot;
 mod select;
 mod shared;
@@ -947,34 +937,33 @@ fn can_recv(&self) -> bool {
         }
     }
 
-    fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
+    fn start_selection(&self, mut token: SignalToken) -> bool {
         loop {
             let (t, new_port) = match *unsafe { self.inner() } {
                 Oneshot(ref p) => {
-                    match unsafe { (*p.get()).start_selection(task) } {
-                        oneshot::SelSuccess => return Ok(()),
-                        oneshot::SelCanceled(task) => return Err(task),
+                    match unsafe { (*p.get()).start_selection(token) } {
+                        oneshot::SelSuccess => return Installed,
+                        oneshot::SelCanceled => return Abort,
                         oneshot::SelUpgraded(t, rx) => (t, rx),
                     }
                 }
                 Stream(ref p) => {
-                    match unsafe { (*p.get()).start_selection(task) } {
-                        stream::SelSuccess => return Ok(()),
-                        stream::SelCanceled(task) => return Err(task),
+                    match unsafe { (*p.get()).start_selection(token) } {
+                        stream::SelSuccess => return Installed,
+                        stream::SelCanceled => return Abort,
                         stream::SelUpgraded(t, rx) => (t, rx),
                     }
                 }
                 Shared(ref p) => {
-                    return unsafe { (*p.get()).start_selection(task) };
+                    return unsafe { (*p.get()).start_selection(token) };
                 }
                 Sync(ref p) => {
-                    return unsafe { (*p.get()).start_selection(task) };
+                    return unsafe { (*p.get()).start_selection(token) };
                 }
             };
-            task = t;
+            token = t;
             unsafe {
-                mem::swap(self.inner_mut(),
-                          new_port.inner_mut());
+                mem::swap(self.inner_mut(), new_port.inner_mut());
             }
         }
     }
index 2c5248c0897b26bde969966164b9ae6eed2f976e..68f3f229cb48783927952db50efc4b3a17a7a6e1 100644 (file)
 use core::prelude::*;
 
 use alloc::boxed::Box;
+use comm::Receiver;
+use comm::blocking::{mod, WaitToken, SignalToken};
 use core::mem;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-
 use sync::atomic;
-use comm::Receiver;
 
 // Various states you can find a port in.
-const EMPTY: uint = 0;
-const DATA: uint = 1;
-const DISCONNECTED: uint = 2;
+const EMPTY: uint = 0;          // initial state: no data, no blocked reciever
+const DATA: uint = 1;           // data ready for receiver to take
+const DISCONNECTED: uint = 2;   // channel is disconnected OR upgraded
+// Any other value represents a pointer to a SignalToken value. The
+// protocol ensures that when the state moves *to* a pointer,
+// ownership of the token is given to the packet, and when the state
+// moves *from* a pointer, ownership of the token is transferred to
+// whoever changed the state.
 
 pub struct Packet<T> {
     // Internal state of the chan/port pair (stores the blocked task as well)
@@ -71,12 +74,12 @@ pub enum Failure<T> {
 pub enum UpgradeResult {
     UpSuccess,
     UpDisconnected,
-    UpWoke(BlockedTask),
+    UpWoke(SignalToken),
 }
 
 pub enum SelectionResult<T> {
-    SelCanceled(BlockedTask),
-    SelUpgraded(BlockedTask, Receiver<T>),
+    SelCanceled,
+    SelUpgraded(SignalToken, Receiver<T>),
     SelSuccess,
 }
 
@@ -118,12 +121,10 @@ pub fn send(&mut self, t: T) -> Result<(), T> {
             // Not possible, these are one-use channels
             DATA => unreachable!(),
 
-            // Anything else means that there was a task waiting on the other
-            // end. We leave the 'DATA' state inside so it'll pick it up on the
-            // other end.
-            n => unsafe {
-                let t = BlockedTask::cast_from_uint(n);
-                t.wake().map(|t| t.reawaken());
+            // There is a thread waiting on the other end. We leave the 'DATA'
+            // state inside so it'll pick it up on the other end.
+            ptr => unsafe {
+                SignalToken::cast_from_uint(ptr).signal();
                 Ok(())
             }
         }
@@ -142,23 +143,17 @@ pub fn recv(&mut self) -> Result<T, Failure<T>> {
         // Attempt to not block the task (it's a little expensive). If it looks
         // like we're not empty, then immediately go through to `try_recv`.
         if self.state.load(atomic::SeqCst) == EMPTY {
-            let t: Box<Task> = Local::take();
-            t.deschedule(1, |task| {
-                let n = unsafe { task.cast_to_uint() };
-                match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
-                    // Nothing on the channel, we legitimately block
-                    EMPTY => Ok(()),
-
-                    // If there's data or it's a disconnected channel, then we
-                    // failed the cmpxchg, so we just wake ourselves back up
-                    DATA | DISCONNECTED => {
-                        unsafe { Err(BlockedTask::cast_from_uint(n)) }
-                    }
-
-                    // Only one thread is allowed to sleep on this port
-                    _ => unreachable!()
-                }
-            });
+            let (wait_token, signal_token) = blocking::token();
+            let ptr = unsafe { signal_token.cast_to_uint() };
+
+            // race with senders to enter the blocking state
+            if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY {
+                wait_token.wait();
+                debug_assert!(self.state.load(atomic::SeqCst) != EMPTY);
+            } else {
+                // drop the signal token, since we never blocked
+                drop(unsafe { SignalToken::cast_from_uint(ptr) });
+            }
         }
 
         self.try_recv()
@@ -197,6 +192,9 @@ pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
                     }
                 }
             }
+
+            // We are the sole receiver; there cannot be a blocking
+            // receiver already.
             _ => unreachable!()
         }
     }
@@ -223,7 +221,7 @@ pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
             DISCONNECTED => { self.upgrade = prev; UpDisconnected }
 
             // If someone's waiting, we gotta wake them up
-            n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
+            ptr => UpWoke(unsafe { SignalToken::cast_from_uint(ptr) })
         }
     }
 
@@ -232,9 +230,8 @@ pub fn drop_chan(&mut self) {
             DATA | DISCONNECTED | EMPTY => {}
 
             // If someone's waiting, we gotta wake them up
-            n => unsafe {
-                let t = BlockedTask::cast_from_uint(n);
-                t.wake().map(|t| t.reawaken());
+            ptr => unsafe {
+                SignalToken::cast_from_uint(ptr).signal();
             }
         }
     }
@@ -286,13 +283,17 @@ pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
 
     // Attempts to start selection on this port. This can either succeed, fail
     // because there is data, or fail because there is an upgrade pending.
-    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
-        let n = unsafe { task.cast_to_uint() };
-        match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
+    pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
+        let ptr = unsafe { token.cast_to_uint() };
+        match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) {
             EMPTY => SelSuccess,
-            DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
+            DATA => {
+                drop(unsafe { SignalToken::cast_from_uint(ptr) });
+                SelCanceled
+            }
             DISCONNECTED if self.data.is_some() => {
-                SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+                drop(unsafe { SignalToken::cast_from_uint(ptr) });
+                SelCanceled
             }
             DISCONNECTED => {
                 match mem::replace(&mut self.upgrade, SendUsed) {
@@ -300,8 +301,7 @@ pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
                     // propagate upwards whether the upgrade can receive
                     // data
                     GoUp(upgrade) => {
-                        SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
-                                    upgrade)
+                        SelUpgraded(unsafe { SignalToken::cast_from_uint(ptr) }, upgrade)
                     }
 
                     // If the other end disconnected without sending an
@@ -309,7 +309,8 @@ pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
                     // disconnected).
                     up => {
                         self.upgrade = up;
-                        SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+                        drop(unsafe { SignalToken::cast_from_uint(ptr) });
+                        SelCanceled
                     }
                 }
             }
@@ -331,7 +332,7 @@ pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
 
             // If we've got a blocked task, then use an atomic to gain ownership
             // of it (may fail)
-            n => self.state.compare_and_swap(n, EMPTY, atomic::SeqCst)
+            BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst)
         };
 
         // Now that we've got ownership of our state, figure out what to do
@@ -358,11 +359,9 @@ pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
                 }
             }
 
-            // We woke ourselves up from select. Assert that the task should be
-            // trashed and returned that we don't have any data.
-            n => {
-                let t = unsafe { BlockedTask::cast_from_uint(n) };
-                t.trash();
+            // We woke ourselves up from select.
+            ptr => unsafe {
+                drop(SignalToken::cast_from_uint(ptr));
                 Ok(false)
             }
         }
index 4da9b4cfa369f77b87573ec5397374798b09f740..536d38c6e55086006015a288f212516464266359 100644 (file)
 use core::kinds::marker;
 use core::mem;
 use core::uint;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
 
 use comm::Receiver;
+use comm::blocking::{mod, SignalToken};
+
+use self::StartResult::*;
 
 /// The "receiver set" of the select interface. This structure is used to manage
 /// a set of receivers which are being selected over.
@@ -93,10 +94,17 @@ pub struct Handle<'rx, T:'rx> {
 
 struct Packets { cur: *mut Handle<'static, ()> }
 
+#[doc(hidden)]
+#[deriving(PartialEq)]
+pub enum StartResult {
+    Installed,
+    Abort,
+}
+
 #[doc(hidden)]
 pub trait Packet {
     fn can_recv(&self) -> bool;
-    fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
+    fn start_selection(&self, token: SignalToken) -> StartResult;
     fn abort_selection(&self) -> bool;
 }
 
@@ -165,36 +173,39 @@ fn wait2(&self, do_preflight_checks: bool) -> uint {
         // Most notably, the iterations over all of the receivers shouldn't be
         // necessary.
         unsafe {
-            let mut amt = 0;
-            for p in self.iter() {
-                amt += 1;
-                if do_preflight_checks && (*p).packet.can_recv() {
-                    return (*p).id;
+            // Stage 1: preflight checks. Look for any packets ready to receive
+            if do_preflight_checks {
+                for handle in self.iter() {
+                    if (*handle).packet.can_recv() {
+                        return (*handle).id();
+                    }
                 }
             }
-            assert!(amt > 0);
 
-            let mut ready_index = amt;
-            let mut ready_id = uint::MAX;
-            let mut iter = self.iter().enumerate();
-
-            // Acquire a number of blocking contexts, and block on each one
-            // sequentially until one fails. If one fails, then abort
-            // immediately so we can go unblock on all the other receivers.
-            let task: Box<Task> = Local::take();
-            task.deschedule(amt, |task| {
-                // Prepare for the block
-                let (i, handle) = iter.next().unwrap();
-                match (*handle).packet.start_selection(task) {
-                    Ok(()) => Ok(()),
-                    Err(task) => {
-                        ready_index = i;
-                        ready_id = (*handle).id;
-                        Err(task)
+            // Stage 2: begin the blocking process
+            //
+            // Create a number of signal tokens, and install each one
+            // sequentially until one fails. If one fails, then abort the
+            // selection on the already-installed tokens.
+            let (wait_token, signal_token) = blocking::tokens();
+            for (i, handle) in self.iter().enumerate() {
+                match (*handle).packet.start_selection(signal_token.clone()) {
+                    Installed => {}
+                    Abort => {
+                        // Go back and abort the already-begun selections
+                        for handle in self.iter().take(i) {
+                            (*handle).packet.abort_selection();
+                        }
+                        return (*handle).id;
                     }
                 }
-            });
+            }
+
+            // Stage 3: no messages available, actually block
+            wait_token.wait();
 
+            // Stage 4: there *must* be message available; find it.
+            //
             // Abort the selection process on each receiver. If the abort
             // process returns `true`, then that means that the receiver is
             // ready to receive some data. Note that this also means that the
@@ -216,12 +227,14 @@ fn wait2(&self, do_preflight_checks: bool) -> uint {
             // A rewrite should focus on avoiding a yield loop, and for now this
             // implementation is tying us over to a more efficient "don't
             // iterate over everything every time" implementation.
-            for handle in self.iter().take(ready_index) {
+            let mut ready_id = uint::MAX;
+            for handle in self.iter() {
                 if (*handle).packet.abort_selection() {
                     ready_id = (*handle).id;
                 }
             }
 
+            // We must have found a ready receiver
             assert!(ready_id != uint::MAX);
             return ready_id;
         }
index b3856e588e233dd3dcb3dca5f63b97ab803bc670..1f1ea2ca9a1b2cbeb40698e50f2ea0e97ca77801 100644 (file)
 use alloc::boxed::Box;
 use core::cmp;
 use core::int;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
 
 use sync::{atomic, Mutex, MutexGuard};
 use comm::mpsc_queue as mpsc;
+use comm::blocking::{mod, SignalToken};
+use comm::select::StartResult;
+use comm::select::StartResult::*;
 
 const DISCONNECTED: int = int::MIN;
 const FUDGE: int = 1024;
@@ -43,7 +43,7 @@ pub struct Packet<T> {
     queue: mpsc::Queue<T>,
     cnt: atomic::AtomicInt, // How many items are on this channel
     steals: int, // How many times has a port received without blocking?
-    to_wake: atomic::AtomicUint, // Task to wake up
+    to_wake: atomic::AtomicUint, // SignalToken for wake up
 
     // The number of channels which are currently using this packet.
     channels: atomic::AtomicInt,
@@ -95,41 +95,34 @@ pub fn postinit_lock(&self) -> MutexGuard<()> {
     //
     // This can only be called at channel-creation time
     pub fn inherit_blocker(&mut self,
-                           task: Option<BlockedTask>,
+                           token: Option<SignalToken>,
                            guard: MutexGuard<()>) {
-        match task {
-            Some(task) => {
-                assert_eq!(self.cnt.load(atomic::SeqCst), 0);
-                assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-                self.to_wake.store(unsafe { task.cast_to_uint() },
-                                   atomic::SeqCst);
-                self.cnt.store(-1, atomic::SeqCst);
-
-                // This store is a little sketchy. What's happening here is
-                // that we're transferring a blocker from a oneshot or stream
-                // channel to this shared channel. In doing so, we never
-                // spuriously wake them up and rather only wake them up at the
-                // appropriate time. This implementation of shared channels
-                // assumes that any blocking recv() will undo the increment of
-                // steals performed in try_recv() once the recv is complete.
-                // This thread that we're inheriting, however, is not in the
-                // middle of recv. Hence, the first time we wake them up,
-                // they're going to wake up from their old port, move on to the
-                // upgraded port, and then call the block recv() function.
-                //
-                // When calling this function, they'll find there's data
-                // immediately available, counting it as a steal. This in fact
-                // wasn't a steal because we appropriately blocked them waiting
-                // for data.
-                //
-                // To offset this bad increment, we initially set the steal
-                // count to -1. You'll find some special code in
-                // abort_selection() as well to ensure that this -1 steal count
-                // doesn't escape too far.
-                self.steals = -1;
-            }
-            None => {}
-        }
+        token.map(|token| {
+            assert_eq!(self.cnt.load(atomic::SeqCst), 0);
+            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+            self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
+            self.cnt.store(-1, atomic::SeqCst);
+
+            // This store is a little sketchy. What's happening here is that
+            // we're transferring a blocker from a oneshot or stream channel to
+            // this shared channel. In doing so, we never spuriously wake them
+            // up and rather only wake them up at the appropriate time. This
+            // implementation of shared channels assumes that any blocking
+            // recv() will undo the increment of steals performed in try_recv()
+            // once the recv is complete.  This thread that we're inheriting,
+            // however, is not in the middle of recv. Hence, the first time we
+            // wake them up, they're going to wake up from their old port, move
+            // on to the upgraded port, and then call the block recv() function.
+            //
+            // When calling this function, they'll find there's data immediately
+            // available, counting it as a steal. This in fact wasn't a steal
+            // because we appropriately blocked them waiting for data.
+            //
+            // To offset this bad increment, we initially set the steal count to
+            // -1. You'll find some special code in abort_selection() as well to
+            // ensure that this -1 steal count doesn't escape too far.
+            self.steals = -1;
+        });
 
         // When the shared packet is constructed, we grabbed this lock. The
         // purpose of this lock is to ensure that abort_selection() doesn't
@@ -175,7 +168,7 @@ pub fn send(&mut self, t: T) -> Result<(), T> {
         self.queue.push(t);
         match self.cnt.fetch_add(1, atomic::SeqCst) {
             -1 => {
-                self.take_to_wake().wake().map(|t| t.reawaken());
+                self.take_to_wake().signal();
             }
 
             // In this case, we have possibly failed to send our data, and
@@ -232,10 +225,10 @@ pub fn recv(&mut self) -> Result<T, Failure> {
             data => return data,
         }
 
-        let task: Box<Task> = Local::take();
-        task.deschedule(1, |task| {
-            self.decrement(task)
-        });
+        let (wait_token, signal_token) = blocking::tokens();
+        if self.decrement(signal_token) == Installed {
+            wait_token.wait()
+        }
 
         match self.try_recv() {
             data @ Ok(..) => { self.steals -= 1; data }
@@ -244,10 +237,11 @@ pub fn recv(&mut self) -> Result<T, Failure> {
     }
 
     // Essentially the exact same thing as the stream decrement function.
-    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+    // Returns true if blocking should proceed.
+    fn decrement(&mut self, token: SignalToken) -> StartResult {
         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-        let n = unsafe { task.cast_to_uint() };
-        self.to_wake.store(n, atomic::SeqCst);
+        let ptr = unsafe { token.cast_to_uint() };
+        self.to_wake.store(ptr, atomic::SeqCst);
 
         let steals = self.steals;
         self.steals = 0;
@@ -258,12 +252,13 @@ fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
             // data, we successfully sleep
             n => {
                 assert!(n >= 0);
-                if n - steals <= 0 { return Ok(()) }
+                if n - steals <= 0 { return Installed }
             }
         }
 
         self.to_wake.store(0, atomic::SeqCst);
-        Err(unsafe { BlockedTask::cast_from_uint(n) })
+        drop(unsafe { SignalToken::cast_from_uint(ptr) });
+        Abort
     }
 
     pub fn try_recv(&mut self) -> Result<T, Failure> {
@@ -271,20 +266,19 @@ pub fn try_recv(&mut self) -> Result<T, Failure> {
             mpsc::Data(t) => Some(t),
             mpsc::Empty => None,
 
-            // This is a bit of an interesting case. The channel is
-            // reported as having data available, but our pop() has
-            // failed due to the queue being in an inconsistent state.
-            // This means that there is some pusher somewhere which has
-            // yet to complete, but we are guaranteed that a pop will
-            // eventually succeed. In this case, we spin in a yield loop
-            // because the remote sender should finish their enqueue
+            // This is a bit of an interesting case. The channel is reported as
+            // having data available, but our pop() has failed due to the queue
+            // being in an inconsistent state.  This means that there is some
+            // pusher somewhere which has yet to complete, but we are guaranteed
+            // that a pop will eventually succeed. In this case, we spin in a
+            // yield loop because the remote sender should finish their enqueue
             // operation "very quickly".
             //
             // Avoiding this yield loop would require a different queue
-            // abstraction which provides the guarantee that after M
-            // pushes have succeeded, at least M pops will succeed. The
-            // current queues guarantee that if there are N active
-            // pushes, you can pop N times once all N have finished.
+            // abstraction which provides the guarantee that after M pushes have
+            // succeeded, at least M pops will succeed. The current queues
+            // guarantee that if there are N active pushes, you can pop N times
+            // once all N have finished.
             mpsc::Inconsistent => {
                 let data;
                 loop {
@@ -354,7 +348,7 @@ pub fn drop_chan(&mut self) {
         }
 
         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            -1 => { self.take_to_wake().signal(); }
             DISCONNECTED => {}
             n => { assert!(n >= 0); }
         }
@@ -366,8 +360,7 @@ pub fn drop_port(&mut self) {
         self.port_dropped.store(true, atomic::SeqCst);
         let mut steals = self.steals;
         while {
-            let cnt = self.cnt.compare_and_swap(
-                            steals, DISCONNECTED, atomic::SeqCst);
+            let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
             cnt != DISCONNECTED && cnt != steals
         } {
             // See the discussion in 'try_recv' for why we yield
@@ -382,11 +375,11 @@ pub fn drop_port(&mut self) {
     }
 
     // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> BlockedTask {
-        let task = self.to_wake.load(atomic::SeqCst);
+    fn take_to_wake(&mut self) -> SignalToken {
+        let ptr = self.to_wake.load(atomic::SeqCst);
         self.to_wake.store(0, atomic::SeqCst);
-        assert!(task != 0);
-        unsafe { BlockedTask::cast_from_uint(task) }
+        assert!(ptr != 0);
+        unsafe { SignalToken::cast_from_uint(ptr) }
     }
 
     ////////////////////////////////////////////////////////////////////////////
@@ -414,19 +407,18 @@ fn bump(&mut self, amt: int) -> int {
         }
     }
 
-    // Inserts the blocked task for selection on this port, returning it back if
-    // the port already has data on it.
+    // Inserts the signal token for selection on this port, returning true if
+    // blocking should proceed.
     //
     // The code here is the same as in stream.rs, except that it doesn't need to
     // peek at the channel to see if an upgrade is pending.
-    pub fn start_selection(&mut self,
-                           task: BlockedTask) -> Result<(), BlockedTask> {
-        match self.decrement(task) {
-            Ok(()) => Ok(()),
-            Err(task) => {
+    pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
+        match self.decrement(token) {
+            Installed => Installed,
+            Abort => {
                 let prev = self.bump(1);
                 assert!(prev == DISCONNECTED || prev >= 0);
-                return Err(task);
+                Abort
             }
         }
     }
@@ -464,7 +456,7 @@ pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
             let cur = prev + steals + 1;
             assert!(cur >= 0);
             if prev < 0 {
-                self.take_to_wake().trash();
+                drop(self.take_to_wake());
             } else {
                 while self.to_wake.load(atomic::SeqCst) != 0 {
                     Thread::yield_now();
index 827b1d51ac48d2bff566aed715817be49bdc9cda..a15366d5ebc0cb700974a6195880ca32e419c4a2 100644 (file)
 use alloc::boxed::Box;
 use core::cmp;
 use core::int;
-use rt::local::Local;
-use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
+use thread::Thread;
 
 use sync::atomic;
 use comm::spsc_queue as spsc;
 use comm::Receiver;
+use comm::blocking::{mod, WaitToken, SignalToken};
 
 const DISCONNECTED: int = int::MIN;
 #[cfg(test)]
@@ -46,7 +45,7 @@ pub struct Packet<T> {
 
     cnt: atomic::AtomicInt, // How many items are on this channel
     steals: int, // How many times has a port received without blocking?
-    to_wake: atomic::AtomicUint, // Task to wake up
+    to_wake: atomic::AtomicUint, // SignalToken for the blocked thread to wake up
 
     port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
 }
@@ -60,13 +59,13 @@ pub enum Failure<T> {
 pub enum UpgradeResult {
     UpSuccess,
     UpDisconnected,
-    UpWoke(BlockedTask),
+    UpWoke(SignalToken),
 }
 
 pub enum SelectionResult<T> {
     SelSuccess,
-    SelCanceled(BlockedTask),
-    SelUpgraded(BlockedTask, Receiver<T>),
+    SelCanceled,
+    SelUpgraded(SignalToken, Receiver<T>),
 }
 
 // Any message could contain an "upgrade request" to a new shared port, so the
@@ -89,7 +88,6 @@ pub fn new() -> Packet<T> {
         }
     }
 
-
     pub fn send(&mut self, t: T) -> Result<(), T> {
         // If the other port has deterministically gone away, then definitely
         // must return the data back up the stack. Otherwise, the data is
@@ -98,10 +96,11 @@ pub fn send(&mut self, t: T) -> Result<(), T> {
 
         match self.do_send(Data(t)) {
             UpSuccess | UpDisconnected => {},
-            UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
+            UpWoke(token) => { token.signal(); }
         }
         Ok(())
     }
+
     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
         // If the port has gone away, then there's no need to proceed any
         // further.
@@ -144,20 +143,20 @@ fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
     }
 
     // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> BlockedTask {
-        let task = self.to_wake.load(atomic::SeqCst);
+    fn take_to_wake(&mut self) -> SignalToken {
+        let ptr = self.to_wake.load(atomic::SeqCst);
         self.to_wake.store(0, atomic::SeqCst);
-        assert!(task != 0);
-        unsafe { BlockedTask::cast_from_uint(task) }
+        assert!(ptr != 0);
+        unsafe { SignaToken::cast_from_uint(ptr) }
     }
 
     // Decrements the count on the channel for a sleeper, returning the sleeper
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
-    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+    fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-        let n = unsafe { task.cast_to_uint() };
-        self.to_wake.store(n, atomic::SeqCst);
+        let ptr = unsafe { token.cast_to_uint() };
+        self.to_wake.store(ptr, atomic::SeqCst);
 
         let steals = self.steals;
         self.steals = 0;
@@ -173,7 +172,7 @@ fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
         }
 
         self.to_wake.store(0, atomic::SeqCst);
-        Err(unsafe { BlockedTask::cast_from_uint(n) })
+        Err(unsafe { SignalToken::cast_from_uint(ptr) })
     }
 
     pub fn recv(&mut self) -> Result<T, Failure<T>> {
@@ -185,10 +184,10 @@ pub fn recv(&mut self) -> Result<T, Failure<T>> {
 
         // Welp, our channel has no data. Deschedule the current task and
         // initiate the blocking protocol.
-        let task: Box<Task> = Local::take();
-        task.deschedule(1, |task| {
-            self.decrement(task)
-        });
+        let (wait_token, signal_token) = blocking::tokens();
+        if self.decrement(signal_token).is_ok() {
+            wait_token.wait()
+        }
 
         match self.try_recv() {
             // Messages which actually popped from the queue shouldn't count as
@@ -269,7 +268,7 @@ pub fn drop_chan(&mut self) {
         // Dropping a channel is pretty simple, we just flag it as disconnected
         // and then wakeup a blocker if there is one.
         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            -1 => { self.take_to_wake().signal(); }
             DISCONNECTED => {}
             n => { assert!(n >= 0); }
         }
@@ -364,19 +363,19 @@ fn bump(&mut self, amt: int) -> int {
 
     // Attempts to start selecting on this port. Like a oneshot, this can fail
     // immediately because of an upgrade.
-    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
-        match self.decrement(task) {
+    pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
+        match self.decrement(token) {
             Ok(()) => SelSuccess,
-            Err(task) => {
+            Err(token) => {
                 let ret = match self.queue.peek() {
                     Some(&GoUp(..)) => {
                         match self.queue.pop() {
-                            Some(GoUp(port)) => SelUpgraded(task, port),
+                            Some(GoUp(port)) => SelUpgraded(token, port),
                             _ => unreachable!(),
                         }
                     }
-                    Some(..) => SelCanceled(task),
-                    None => SelCanceled(task),
+                    Some(..) => SelCanceled,
+                    None => SelCanceled,
                 };
                 // Undo our decrement above, and we should be guaranteed that the
                 // previous value is positive because we're not going to sleep
@@ -439,7 +438,7 @@ pub fn abort_selection(&mut self,
             // final solution but rather out of necessity for now to get
             // something working.
             if prev < 0 {
-                self.take_to_wake().trash();
+                drop(self.take_to_wake());
             } else {
                 while self.to_wake.load(atomic::SeqCst) != 0 {
                     Thread::yield_now();
index 933cd43c662aa86be83ac5bea1378d4347ed88c4..7e87596429c6f17dc9e8e3f2a28679de88a3510e 100644 (file)
 use vec::Vec;
 use core::mem;
 use core::cell::UnsafeCell;
-use rt::local::Local;
-use rt::mutex::{NativeMutex, LockGuard};
-use rt::task::{Task, BlockedTask};
 
-use sync::atomic;
+use sync::{atomic, Mutex, MutexGuard};
+use comm::blocking::{mod, WaitToken, SignalToken};
+use comm::select::StartResult::{mod, Installed, Abort};
 
 pub struct Packet<T> {
     /// Only field outside of the mutex. Just done for kicks, but mainly because
@@ -74,10 +73,10 @@ struct State<T> {
     canceled: Option<&'static mut bool>,
 }
 
-/// Possible flavors of tasks who can be blocked on this channel.
+/// Possible flavors of threads who can be blocked on this channel.
 enum Blocker {
-    BlockedSender(BlockedTask),
-    BlockedReceiver(BlockedTask),
+    BlockedSender(SignalToken),
+    BlockedReceiver(SignalToken),
     NoneBlocked
 }
 
@@ -89,7 +88,7 @@ struct Queue {
 }
 
 struct Node {
-    task: Option<BlockedTask>,
+    token: Option<SignalToken>,
     next: *mut Node,
 }
 
@@ -106,28 +105,15 @@ pub enum Failure {
     Disconnected,
 }
 
-/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
+/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
 /// in the meantime. This re-locks the mutex upon returning.
-fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
-        lock: &NativeMutex) {
-    let me: Box<Task> = Local::take();
-    me.deschedule(1, |task| {
-        match mem::replace(slot, f(task)) {
-            NoneBlocked => {}
-            _ => unreachable!(),
-        }
-        unsafe { lock.unlock_noguard(); }
-        Ok(())
-    });
-    unsafe { lock.lock_noguard(); }
-}
 
-/// Wakes up a task, dropping the lock at the correct time
-fn wakeup(task: BlockedTask, guard: LockGuard) {
+/// Wakes up a thread, dropping the lock at the correct time
+fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
     // We need to be careful to wake up the waiting task *outside* of the mutex
     // in case it incurs a context switch.
-    mem::drop(guard);
-    task.wake().map(|t| t.reawaken());
+    drop(guard);
+    token.signal();
 }
 
 impl<T: Send> Packet<T> {
@@ -153,29 +139,27 @@ pub fn new(cap: uint) -> Packet<T> {
         }
     }
 
-    // Locks this channel, returning a guard for the state and the mutable state
-    // itself. Care should be taken to ensure that the state does not escape the
-    // guard!
-    //
-    // Note that we're ok promoting an & reference to an &mut reference because
-    // the lock ensures that we're the only ones in the world with a pointer to
-    // the state.
-    fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
-        unsafe {
-            let guard = self.lock.lock();
-            (guard, &mut *self.state.get())
+    // wait until a send slot is available, returning locked access to
+    // the channel state.
+    fn acquire_send_slot(&self) -> MutexGuard<State<T>> {
+        let mut node = Node { token: None, next: 0 as *mut Node };
+        loop {
+            let mut guard = self.lock.lock();
+            // are we ready to go?
+            if guard.disconnected || guard.buf.size() < guard.buf.cap() {
+                return guard;
+            }
+            // no room; actually block
+            let wait_token = guard.queue.enqueue(&mut node);
+            drop(guard);
+            wait_token.wait();
         }
     }
 
     pub fn send(&self, t: T) -> Result<(), T> {
-        let (guard, state) = self.lock();
-
-        // wait for a slot to become available, and enqueue the data
-        while !state.disconnected && state.buf.size() == state.buf.cap() {
-            state.queue.enqueue(&self.lock);
-        }
-        if state.disconnected { return Err(t) }
-        state.buf.enqueue(t);
+        let guard = self.acquire_send_slot();
+        if guard.disconnected { return Err(t) }
+        guard.buf.enqueue(t);
 
         match mem::replace(&mut state.blocker, NoneBlocked) {
             // if our capacity is 0, then we need to wait for a receiver to be
@@ -194,7 +178,7 @@ pub fn send(&self, t: T) -> Result<(), T> {
             NoneBlocked => Ok(()),
 
             // success, someone's about to receive our buffered data.
-            BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
+            BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
 
             BlockedSender(..) => panic!("lolwut"),
         }
@@ -212,9 +196,9 @@ pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
             match mem::replace(&mut state.blocker, NoneBlocked) {
                 NoneBlocked => Err(super::Full(t)),
                 BlockedSender(..) => unreachable!(),
-                BlockedReceiver(task) => {
-                    state.buf.enqueue(t);
-                    wakeup(task, guard);
+                BlockedReceiver(token) => {
+                    guard.buf.enqueue(t);
+                    wakeup(token, guard);
                     Ok(())
                 }
             }
@@ -222,10 +206,10 @@ pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
             // If the buffer has some space and the capacity isn't 0, then we
             // just enqueue the data for later retrieval, ensuring to wake up
             // any blocked receiver if there is one.
-            assert!(state.buf.size() < state.buf.cap());
-            state.buf.enqueue(t);
-            match mem::replace(&mut state.blocker, NoneBlocked) {
-                BlockedReceiver(task) => wakeup(task, guard),
+            assert!(guard.buf.size() < guard.buf.cap());
+            guard.buf.enqueue(t);
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
+                BlockedReceiver(token) => wakeup(token, guard),
                 NoneBlocked => {}
                 BlockedSender(..) => unreachable!(),
             }
@@ -238,7 +222,7 @@ pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
     // When reading this, remember that there can only ever be one receiver at
     // time.
     pub fn recv(&self) -> Result<T, ()> {
-        let (guard, state) = self.lock();
+        let guard = self.lock.lock();
 
         // Wait for the buffer to have something in it. No need for a while loop
         // because we're the only receiver.
@@ -275,10 +259,8 @@ pub fn try_recv(&self) -> Result<T, Failure> {
     // * `waited` - flag if the receiver blocked to receive some data, or if it
     //              just picked up some data on the way out
     // * `guard` - the lock guard that is held over this channel's lock
-    fn wakeup_senders(&self, waited: bool,
-                      guard: LockGuard,
-                      state: &mut State<T>) {
-        let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
+    fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) {
+        let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
 
         // If this is a no-buffer channel (cap == 0), then if we didn't wait we
         // need to ACK the sender. If we waited, then the sender waking us up
@@ -287,9 +269,9 @@ fn wakeup_senders(&self, waited: bool,
             match mem::replace(&mut state.blocker, NoneBlocked) {
                 NoneBlocked => None,
                 BlockedReceiver(..) => unreachable!(),
-                BlockedSender(task) => {
-                    state.canceled.take();
-                    Some(task)
+                BlockedSender(token) => {
+                    guard.canceled.take();
+                    Some(token)
                 }
             }
         } else {
@@ -298,8 +280,8 @@ fn wakeup_senders(&self, waited: bool,
         mem::drop((state, guard));
 
         // only outside of the lock do we wake up the pending tasks
-        pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
-        pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
+        pending_sender1.map(|t| t.signal());
+        pending_sender2.map(|t| t.signal());
     }
 
     // Prepares this shared packet for a channel clone, essentially just bumping
@@ -322,7 +304,7 @@ pub fn drop_chan(&self) {
         match mem::replace(&mut state.blocker, NoneBlocked) {
             NoneBlocked => {}
             BlockedSender(..) => unreachable!(),
-            BlockedReceiver(task) => wakeup(task, guard),
+            BlockedReceiver(token) => wakeup(token, guard),
         }
     }
 
@@ -349,9 +331,9 @@ pub fn drop_port(&self) {
 
         let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
             NoneBlocked => None,
-            BlockedSender(task) => {
-                *state.canceled.take().unwrap() = true;
-                Some(task)
+            BlockedSender(token) => {
+                *guard.canceled.take().unwrap() = true;
+                Some(token)
             }
             BlockedReceiver(..) => unreachable!(),
         };
@@ -359,11 +341,11 @@ pub fn drop_port(&self) {
 
         loop {
             match queue.dequeue() {
-                Some(task) => { task.wake().map(|t| t.reawaken()); }
+                Some(token) => { token.signal(); }
                 None => break,
             }
         }
-        waiter.map(|t| t.wake().map(|t| t.reawaken()));
+        waiter.map(|t| t.signal());
     }
 
     ////////////////////////////////////////////////////////////////////////////
@@ -379,17 +361,17 @@ pub fn can_recv(&self) -> bool {
 
     // Attempts to start selection on this port. This can either succeed or fail
     // because there is data waiting.
-    pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
-        let (_g, state) = self.lock();
-        if state.disconnected || state.buf.size() > 0 {
-            Err(task)
+    pub fn start_selection(&self, token: SignalToken) -> StartResult {
+        let guard = self.lock();
+        if guard.disconnected || guard.buf.size() > 0 {
+            Abort
         } else {
-            match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
+            match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
                 NoneBlocked => {}
                 BlockedSender(..) => unreachable!(),
                 BlockedReceiver(..) => unreachable!(),
             }
-            Ok(())
+            Installed
         }
     }
 
@@ -401,11 +383,11 @@ pub fn abort_selection(&self) -> bool {
         let (_g, state) = self.lock();
         match mem::replace(&mut state.blocker, NoneBlocked) {
             NoneBlocked => true,
-            BlockedSender(task) => {
-                state.blocker = BlockedSender(task);
+            BlockedSender(token) => {
+                guard.blocker = BlockedSender(token);
                 true
             }
-            BlockedReceiver(task) => { task.trash(); false }
+            BlockedReceiver(token) => { drop(token); false }
         }
     }
 }
@@ -449,31 +431,25 @@ fn cap(&self) -> uint { self.buf.len() }
 ////////////////////////////////////////////////////////////////////////////////
 
 impl Queue {
-    fn enqueue(&mut self, lock: &NativeMutex) {
-        let task: Box<Task> = Local::take();
-        let mut node = Node {
-            task: None,
-            next: 0 as *mut Node,
-        };
-        task.deschedule(1, |task| {
-            node.task = Some(task);
-            if self.tail.is_null() {
-                self.head = &mut node as *mut Node;
-                self.tail = &mut node as *mut Node;
-            } else {
-                unsafe {
-                    (*self.tail).next = &mut node as *mut Node;
-                    self.tail = &mut node as *mut Node;
-                }
+    fn enqueue(&mut self, node: &mut Node) -> WaitToken {
+        let (wait_token, signal_token) = blocking::tokens();
+        node.token = Some(signal_token);
+        node.next = 0 as *mut Node;
+
+        if self.tail.is_null() {
+            self.head = node as *mut Node;
+            self.tail = node as *mut Node;
+        } else {
+            unsafe {
+                (*self.tail).next = node as *mut Node;
+                self.tail = node as *mut Node;
             }
-            unsafe { lock.unlock_noguard(); }
-            Ok(())
-        });
-        unsafe { lock.lock_noguard(); }
-        assert!(node.next.is_null());
+        }
+
+        wait_token
     }
 
-    fn dequeue(&mut self) -> Option<BlockedTask> {
+    fn dequeue(&mut self) -> Option<SignalToken> {
         if self.head.is_null() {
             return None
         }
@@ -484,7 +460,7 @@ fn dequeue(&mut self) -> Option<BlockedTask> {
         }
         unsafe {
             (*node).next = 0 as *mut Node;
-            Some((*node).task.take().unwrap())
+            Some((*node).token.take().unwrap())
         }
     }
 }