mod oneshot;
mod stream;
mod shared;
+mod sync;
// Use a power of 2 to allow LLVM to optimize to something that's not a
// division, this is hit pretty regularly.
priv rx: &'a Receiver<T>
}
-/// The sending-half of Rust's channel type. This half can only be owned by one
-/// task
+/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
pub struct Sender<T> {
priv inner: Flavor<T>,
priv sends: Cell<uint>,
priv marker: marker::NoShare,
}
+/// The sending-half of Rust's synchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
+pub struct SyncSender<T> {
+ priv inner: UnsafeArc<sync::Packet<T>>,
+ // can't share in an arc
+ priv marker: marker::NoShare,
+}
+
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone, Show)]
Data(T),
}
+/// This enumeration is the list of the possible outcomes for the
+/// `SyncSender::try_send` method.
+#[deriving(Eq, Clone, Show)]
+pub enum TrySendResult<T> {
+ /// The data was successfully sent along the channel. This either means that
+ /// it was buffered in the channel, or handed off to a receiver. In either
+ /// case, the callee no longer has ownership of the data.
+ Sent,
+ /// The data could not be sent on the channel because it would require that
+ /// the callee block to send the data.
+ ///
+ /// If this is a buffered channel, then the buffer is full at this time. If
+ /// this is not a buffered channel, then there is no receiver available to
+ /// acquire the data.
+ Full(T),
+ /// This channel's receiving half has disconnected, so the data could not be
+ /// sent. The data is returned back to the callee in this case.
+ RecvDisconnected(T),
+}
+
enum Flavor<T> {
Oneshot(UnsafeArc<oneshot::Packet<T>>),
Stream(UnsafeArc<stream::Packet<T>>),
Shared(UnsafeArc<shared::Packet<T>>),
+ Sync(UnsafeArc<sync::Packet<T>>),
}
/// Creates a new channel, returning the sender/receiver halves. All data sent
(Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
}
+/// Creates a new synchronous, bounded channel.
+///
+/// Like asynchronous channels, the `Receiver` will block until a message
+/// becomes available. These channels differ greatly in the semantics of the
+/// sender from asynchronous channels, however.
+///
+/// This channel has an internal buffer on which messages will be queued. When
+/// the internal buffer becomes full, future sends will *block* waiting for the
+/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
+/// becomes "rendezvous channel" where each send will not return until a recv
+/// is paired with it.
+///
+/// As with asynchronous channels, all senders will fail in `send` if the
+/// `Receiver` has been destroyed.
+///
+/// # Example
+///
+/// ```
+/// let (tx, rx) = sync_channel(1);
+///
+/// // this returns immediately
+/// tx.send(1);
+///
+/// spawn(proc() {
+/// // this will block until the previous message has been received
+/// tx.send(2);
+/// });
+///
+/// assert_eq!(rx.recv(), 1);
+/// assert_eq!(rx.recv(), 2);
+/// ```
+pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
+ let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
+ (SyncSender::new(a), Receiver::my_new(Sync(b)))
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Sender
+////////////////////////////////////////////////////////////////////////////////
+
impl<T: Send> Sender<T> {
fn my_new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
}
Stream(ref p) => return unsafe { (*p.get()).send(t) },
Shared(ref p) => return unsafe { (*p.get()).send(t) },
+ Sync(..) => unreachable!(),
};
unsafe {
unsafe { (*p.get()).clone_chan(); }
return Sender::my_new(Shared(p.clone()));
}
+ Sync(..) => unreachable!(),
};
unsafe {
Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+ Sync(..) => unreachable!(),
}
}
}
+////////////////////////////////////////////////////////////////////////////////
+// SyncSender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T: Send> SyncSender<T> {
+ fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
+ SyncSender { inner: inner, marker: marker::NoShare }
+ }
+
+ /// Sends a value on this synchronous channel.
+ ///
+ /// This function will *block* until space in the internal buffer becomes
+ /// available or a receiver is available to hand off the message to.
+ ///
+ /// Note that a successful send does *not* guarantee that the receiver will
+ /// ever see the data if there is a buffer on this channel. Messages may be
+ /// enqueued in the internal buffer for the receiver to receive at a later
+ /// time. If the buffer size is 0, however, it can be guaranteed that the
+ /// receiver has indeed received the data if this function returns success.
+ ///
+ /// # Failure
+ ///
+ /// Similarly to `Sender::send`, this function will fail if the
+ /// corresponding `Receiver` for this channel has disconnected. This
+ /// behavior is used to propagate failure among tasks.
+ ///
+ /// If failure is not desired, you can achieve the same semantics with the
+ /// `SyncSender::send_opt` method which will not fail if the receiver
+ /// disconnects.
+ pub fn send(&self, t: T) {
+ if self.send_opt(t).is_some() {
+ fail!("sending on a closed channel");
+ }
+ }
+
+ /// Send a value on a channel, returning it back if the receiver
+ /// disconnected
+ ///
+ /// This method will *block* to send the value `t` on the channel, but if
+ /// the value could not be sent due to the receiver disconnecting, the value
+ /// is returned back to the callee. This function is similar to `try_send`,
+ /// except that it will block if the channel is currently full.
+ ///
+ /// # Failure
+ ///
+ /// This function cannot fail.
+ pub fn send_opt(&self, t: T) -> Option<T> {
+ match unsafe { (*self.inner.get()).send(t) } {
+ Ok(()) => None,
+ Err(t) => Some(t),
+ }
+ }
+
+ /// Attempts to send a value on this channel without blocking.
+ ///
+ /// This method semantically differs from `Sender::try_send` because it can
+ /// fail if the receiver has not disconnected yet. If the buffer on this
+ /// channel is full, this function will immediately return the data back to
+ /// the callee.
+ ///
+ /// See `SyncSender::send` for notes about guarantees of whether the
+ /// receiver has received the data or not if this function is successful.
+ ///
+ /// # Failure
+ ///
+ /// This function cannot fail
+ pub fn try_send(&self, t: T) -> TrySendResult<T> {
+ unsafe { (*self.inner.get()).try_send(t) }
+ }
+}
+
+impl<T: Send> Clone for SyncSender<T> {
+ fn clone(&self) -> SyncSender<T> {
+ unsafe { (*self.inner.get()).clone_chan(); }
+ return SyncSender::new(self.inner.clone());
+ }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for SyncSender<T> {
+ fn drop(&mut self) {
+ unsafe { (*self.inner.get()).drop_chan(); }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Receiver
+////////////////////////////////////////////////////////////////////////////////
+
impl<T: Send> Receiver<T> {
fn my_new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
Err(shared::Disconnected) => return Disconnected,
}
}
+ Sync(ref p) => {
+ match unsafe { (*p.get()).try_recv() } {
+ Ok(t) => return Data(t),
+ Err(sync::Empty) => return Empty,
+ Err(sync::Disconnected) => return Disconnected,
+ }
+ }
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
Err(shared::Disconnected) => return None,
}
}
+ Sync(ref p) => return unsafe { (*p.get()).recv() }
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
Shared(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
+ Sync(ref p) => {
+ return unsafe { (*p.get()).can_recv() };
+ }
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
Shared(ref p) => {
return unsafe { (*p.get()).start_selection(task) };
}
+ Sync(ref p) => {
+ return unsafe { (*p.get()).start_selection(task) };
+ }
};
task = t;
unsafe {
Shared(ref p) => return unsafe {
(*p.get()).abort_selection(was_upgrade)
},
+ Sync(ref p) => return unsafe {
+ (*p.get()).abort_selection()
+ },
};
let mut new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
+ Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
}
}
}
pdone.recv();
})
}
+
+#[cfg(test)]
+mod sync_tests {
+ use prelude::*;
+ use os;
+
+ pub fn stress_factor() -> uint {
+ match os::getenv("RUST_TEST_STRESS") {
+ Some(val) => from_str::<uint>(val).unwrap(),
+ None => 1,
+ }
+ }
+
+ test!(fn smoke() {
+ let (tx, rx) = sync_channel(1);
+ tx.send(1);
+ assert_eq!(rx.recv(), 1);
+ })
+
+ test!(fn drop_full() {
+ let (tx, _rx) = sync_channel(1);
+ tx.send(~1);
+ })
+
+ test!(fn smoke_shared() {
+ let (tx, rx) = sync_channel(1);
+ tx.send(1);
+ assert_eq!(rx.recv(), 1);
+ let tx = tx.clone();
+ tx.send(1);
+ assert_eq!(rx.recv(), 1);
+ })
+
+ test!(fn smoke_threads() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ tx.send(1);
+ });
+ assert_eq!(rx.recv(), 1);
+ })
+
+ test!(fn smoke_port_gone() {
+ let (tx, rx) = sync_channel(0);
+ drop(rx);
+ tx.send(1);
+ } #[should_fail])
+
+ test!(fn smoke_shared_port_gone2() {
+ let (tx, rx) = sync_channel(0);
+ drop(rx);
+ let tx2 = tx.clone();
+ drop(tx);
+ tx2.send(1);
+ } #[should_fail])
+
+ test!(fn port_gone_concurrent() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ rx.recv();
+ });
+ loop { tx.send(1) }
+ } #[should_fail])
+
+ test!(fn port_gone_concurrent_shared() {
+ let (tx, rx) = sync_channel(0);
+ let tx2 = tx.clone();
+ spawn(proc() {
+ rx.recv();
+ });
+ loop {
+ tx.send(1);
+ tx2.send(1);
+ }
+ } #[should_fail])
+
+ test!(fn smoke_chan_gone() {
+ let (tx, rx) = sync_channel::<int>(0);
+ drop(tx);
+ rx.recv();
+ } #[should_fail])
+
+ test!(fn smoke_chan_gone_shared() {
+ let (tx, rx) = sync_channel::<()>(0);
+ let tx2 = tx.clone();
+ drop(tx);
+ drop(tx2);
+ rx.recv();
+ } #[should_fail])
+
+ test!(fn chan_gone_concurrent() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ tx.send(1);
+ tx.send(1);
+ });
+ loop { rx.recv(); }
+ } #[should_fail])
+
+ test!(fn stress() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ for _ in range(0, 10000) { tx.send(1); }
+ });
+ for _ in range(0, 10000) {
+ assert_eq!(rx.recv(), 1);
+ }
+ })
+
+ test!(fn stress_shared() {
+ static AMT: uint = 1000;
+ static NTHREADS: uint = 8;
+ let (tx, rx) = sync_channel::<int>(0);
+ let (dtx, drx) = sync_channel::<()>(0);
+
+ spawn(proc() {
+ for _ in range(0, AMT * NTHREADS) {
+ assert_eq!(rx.recv(), 1);
+ }
+ match rx.try_recv() {
+ Data(..) => fail!(),
+ _ => {}
+ }
+ dtx.send(());
+ });
+
+ for _ in range(0, NTHREADS) {
+ let tx = tx.clone();
+ spawn(proc() {
+ for _ in range(0, AMT) { tx.send(1); }
+ });
+ }
+ drop(tx);
+ drx.recv();
+ })
+
+ test!(fn oneshot_single_thread_close_port_first() {
+ // Simple test of closing without sending
+ let (_tx, rx) = sync_channel::<int>(0);
+ drop(rx);
+ })
+
+ test!(fn oneshot_single_thread_close_chan_first() {
+ // Simple test of closing without sending
+ let (tx, _rx) = sync_channel::<int>(0);
+ drop(tx);
+ })
+
+ test!(fn oneshot_single_thread_send_port_close() {
+ // Testing that the sender cleans up the payload if receiver is closed
+ let (tx, rx) = sync_channel::<~int>(0);
+ drop(rx);
+ tx.send(~0);
+ } #[should_fail])
+
+ test!(fn oneshot_single_thread_recv_chan_close() {
+ // Receiving on a closed chan will fail
+ let res = task::try(proc() {
+ let (tx, rx) = sync_channel::<int>(0);
+ drop(tx);
+ rx.recv();
+ });
+ // What is our res?
+ assert!(res.is_err());
+ })
+
+ test!(fn oneshot_single_thread_send_then_recv() {
+ let (tx, rx) = sync_channel::<~int>(1);
+ tx.send(~10);
+ assert!(rx.recv() == ~10);
+ })
+
+ test!(fn oneshot_single_thread_try_send_open() {
+ let (tx, rx) = sync_channel::<int>(1);
+ assert_eq!(tx.try_send(10), Sent);
+ assert!(rx.recv() == 10);
+ })
+
+ test!(fn oneshot_single_thread_try_send_closed() {
+ let (tx, rx) = sync_channel::<int>(0);
+ drop(rx);
+ assert_eq!(tx.try_send(10), RecvDisconnected(10));
+ })
+
+ test!(fn oneshot_single_thread_try_send_closed2() {
+ let (tx, _rx) = sync_channel::<int>(0);
+ assert_eq!(tx.try_send(10), Full(10));
+ })
+
+ test!(fn oneshot_single_thread_try_recv_open() {
+ let (tx, rx) = sync_channel::<int>(1);
+ tx.send(10);
+ assert!(rx.recv_opt() == Some(10));
+ })
+
+ test!(fn oneshot_single_thread_try_recv_closed() {
+ let (tx, rx) = sync_channel::<int>(0);
+ drop(tx);
+ assert!(rx.recv_opt() == None);
+ })
+
+ test!(fn oneshot_single_thread_peek_data() {
+ let (tx, rx) = sync_channel::<int>(1);
+ assert_eq!(rx.try_recv(), Empty)
+ tx.send(10);
+ assert_eq!(rx.try_recv(), Data(10));
+ })
+
+ test!(fn oneshot_single_thread_peek_close() {
+ let (tx, rx) = sync_channel::<int>(0);
+ drop(tx);
+ assert_eq!(rx.try_recv(), Disconnected);
+ assert_eq!(rx.try_recv(), Disconnected);
+ })
+
+ test!(fn oneshot_single_thread_peek_open() {
+ let (_tx, rx) = sync_channel::<int>(0);
+ assert_eq!(rx.try_recv(), Empty);
+ })
+
+ test!(fn oneshot_multi_task_recv_then_send() {
+ let (tx, rx) = sync_channel::<~int>(0);
+ spawn(proc() {
+ assert!(rx.recv() == ~10);
+ });
+
+ tx.send(~10);
+ })
+
+ test!(fn oneshot_multi_task_recv_then_close() {
+ let (tx, rx) = sync_channel::<~int>(0);
+ spawn(proc() {
+ drop(tx);
+ });
+ let res = task::try(proc() {
+ assert!(rx.recv() == ~10);
+ });
+ assert!(res.is_err());
+ })
+
+ test!(fn oneshot_multi_thread_close_stress() {
+ for _ in range(0, stress_factor()) {
+ let (tx, rx) = sync_channel::<int>(0);
+ spawn(proc() {
+ drop(rx);
+ });
+ drop(tx);
+ }
+ })
+
+ test!(fn oneshot_multi_thread_send_close_stress() {
+ for _ in range(0, stress_factor()) {
+ let (tx, rx) = sync_channel::<int>(0);
+ spawn(proc() {
+ drop(rx);
+ });
+ let _ = task::try(proc() {
+ tx.send(1);
+ });
+ }
+ })
+
+ test!(fn oneshot_multi_thread_recv_close_stress() {
+ for _ in range(0, stress_factor()) {
+ let (tx, rx) = sync_channel::<int>(0);
+ spawn(proc() {
+ let res = task::try(proc() {
+ rx.recv();
+ });
+ assert!(res.is_err());
+ });
+ spawn(proc() {
+ spawn(proc() {
+ drop(tx);
+ });
+ });
+ }
+ })
+
+ test!(fn oneshot_multi_thread_send_recv_stress() {
+ for _ in range(0, stress_factor()) {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ tx.send(~10);
+ });
+ spawn(proc() {
+ assert!(rx.recv() == ~10);
+ });
+ }
+ })
+
+ test!(fn stream_send_recv_stress() {
+ for _ in range(0, stress_factor()) {
+ let (tx, rx) = sync_channel(0);
+
+ send(tx, 0);
+ recv(rx, 0);
+
+ fn send(tx: SyncSender<~int>, i: int) {
+ if i == 10 { return }
+
+ spawn(proc() {
+ tx.send(~i);
+ send(tx, i + 1);
+ });
+ }
+
+ fn recv(rx: Receiver<~int>, i: int) {
+ if i == 10 { return }
+
+ spawn(proc() {
+ assert!(rx.recv() == ~i);
+ recv(rx, i + 1);
+ });
+ }
+ }
+ })
+
+ test!(fn recv_a_lot() {
+ // Regression test that we don't run out of stack in scheduler context
+ let (tx, rx) = sync_channel(10000);
+ for _ in range(0, 10000) { tx.send(()); }
+ for _ in range(0, 10000) { rx.recv(); }
+ })
+
+ test!(fn shared_chan_stress() {
+ let (tx, rx) = sync_channel(0);
+ let total = stress_factor() + 100;
+ for _ in range(0, total) {
+ let tx = tx.clone();
+ spawn(proc() {
+ tx.send(());
+ });
+ }
+
+ for _ in range(0, total) {
+ rx.recv();
+ }
+ })
+
+ test!(fn test_nested_recv_iter() {
+ let (tx, rx) = sync_channel::<int>(0);
+ let (total_tx, total_rx) = sync_channel::<int>(0);
+
+ spawn(proc() {
+ let mut acc = 0;
+ for x in rx.iter() {
+ acc += x;
+ }
+ total_tx.send(acc);
+ });
+
+ tx.send(3);
+ tx.send(1);
+ tx.send(2);
+ drop(tx);
+ assert_eq!(total_rx.recv(), 6);
+ })
+
+ test!(fn test_recv_iter_break() {
+ let (tx, rx) = sync_channel::<int>(0);
+ let (count_tx, count_rx) = sync_channel(0);
+
+ spawn(proc() {
+ let mut count = 0;
+ for x in rx.iter() {
+ if count >= 3 {
+ break;
+ } else {
+ count += x;
+ }
+ }
+ count_tx.send(count);
+ });
+
+ tx.send(2);
+ tx.send(2);
+ tx.send(2);
+ tx.try_send(2);
+ drop(tx);
+ assert_eq!(count_rx.recv(), 4);
+ })
+
+ test!(fn try_recv_states() {
+ let (tx1, rx1) = sync_channel::<int>(1);
+ let (tx2, rx2) = sync_channel::<()>(1);
+ let (tx3, rx3) = sync_channel::<()>(1);
+ spawn(proc() {
+ rx2.recv();
+ tx1.send(1);
+ tx3.send(());
+ rx2.recv();
+ drop(tx1);
+ tx3.send(());
+ });
+
+ assert_eq!(rx1.try_recv(), Empty);
+ tx2.send(());
+ rx3.recv();
+ assert_eq!(rx1.try_recv(), Data(1));
+ assert_eq!(rx1.try_recv(), Empty);
+ tx2.send(());
+ rx3.recv();
+ assert_eq!(rx1.try_recv(), Disconnected);
+ })
+
+ // This bug used to end up in a livelock inside of the Receiver destructor
+ // because the internal state of the Shared packet was corrupted
+ test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+ let (tx, rx) = sync_channel(0);
+ let (tx2, rx2) = sync_channel(0);
+ spawn(proc() {
+ rx.recv(); // wait on a oneshot
+ drop(rx); // destroy a shared
+ tx2.send(());
+ });
+ // make sure the other task has gone to sleep
+ for _ in range(0, 5000) { task::deschedule(); }
+
+ // upgrade to a shared chan and send a message
+ let t = tx.clone();
+ drop(tx);
+ t.send(());
+
+ // wait for the child task to exit before we exit
+ rx2.recv();
+ })
+
+ test!(fn try_recvs_off_the_runtime() {
+ use std::rt::thread::Thread;
+
+ let (tx, rx) = sync_channel(0);
+ let (cdone, pdone) = channel();
+ let t = Thread::start(proc() {
+ let mut hits = 0;
+ while hits < 10 {
+ match rx.try_recv() {
+ Data(()) => { hits += 1; }
+ Empty => { Thread::yield_now(); }
+ Disconnected => return,
+ }
+ }
+ cdone.send(());
+ });
+ for _ in range(0, 10) {
+ tx.send(());
+ }
+ t.join();
+ pdone.recv();
+ })
+
+ test!(fn send_opt1() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() { rx.recv(); });
+ assert_eq!(tx.send_opt(1), None);
+ })
+
+ test!(fn send_opt2() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() { drop(rx); });
+ assert_eq!(tx.send_opt(1), Some(1));
+ })
+
+ test!(fn send_opt3() {
+ let (tx, rx) = sync_channel(1);
+ assert_eq!(tx.send_opt(1), None);
+ spawn(proc() { drop(rx); });
+ assert_eq!(tx.send_opt(1), Some(1));
+ })
+
+ test!(fn send_opt4() {
+ let (tx, rx) = sync_channel(0);
+ let tx2 = tx.clone();
+ let (done, donerx) = channel();
+ let done2 = done.clone();
+ spawn(proc() {
+ assert_eq!(tx.send_opt(1), Some(1));
+ done.send(());
+ });
+ spawn(proc() {
+ assert_eq!(tx2.send_opt(2), Some(2));
+ done2.send(());
+ });
+ drop(rx);
+ donerx.recv();
+ donerx.recv();
+ })
+
+ test!(fn try_send1() {
+ let (tx, _rx) = sync_channel(0);
+ assert_eq!(tx.try_send(1), Full(1));
+ })
+
+ test!(fn try_send2() {
+ let (tx, _rx) = sync_channel(1);
+ assert_eq!(tx.try_send(1), Sent);
+ assert_eq!(tx.try_send(1), Full(1));
+ })
+
+ test!(fn try_send3() {
+ let (tx, rx) = sync_channel(1);
+ assert_eq!(tx.try_send(1), Sent);
+ drop(rx);
+ assert_eq!(tx.try_send(1), RecvDisconnected(1));
+ })
+
+ test!(fn try_send4() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ for _ in range(0, 1000) { task::deschedule(); }
+ assert_eq!(tx.try_send(1), Sent);
+ });
+ assert_eq!(rx.recv(), 1);
+ })
+}
tx1.send(());
rx2.recv();
})
+
+ test!(fn sync1() {
+ let (tx, rx) = sync_channel(1);
+ tx.send(1);
+ select! {
+ n = rx.recv() => { assert_eq!(n, 1); }
+ }
+ })
+
+ test!(fn sync2() {
+ let (tx, rx) = sync_channel(0);
+ spawn(proc() {
+ for _ in range(0, 100) { task::deschedule() }
+ tx.send(1);
+ });
+ select! {
+ n = rx.recv() => { assert_eq!(n, 1); }
+ }
+ })
+
+ test!(fn sync3() {
+ let (tx1, rx1) = sync_channel(0);
+ let (tx2, rx2) = channel();
+ spawn(proc() { tx1.send(1); });
+ spawn(proc() { tx2.send(2); });
+ select! {
+ n = rx1.recv() => {
+ assert_eq!(n, 1);
+ assert_eq!(rx2.recv(), 2);
+ },
+ n = rx2.recv() => {
+ assert_eq!(n, 2);
+ assert_eq!(rx1.recv(), 1);
+ }
+ }
+ })
}
--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Synchronous channels/ports
+///
+/// This channel implementation differs significantly from the asynchronous
+/// implementations found next to it (oneshot/stream/share). This is an
+/// implementation of a synchronous, bounded buffer channel.
+///
+/// Each channel is created with some amount of backing buffer, and sends will
+/// *block* until buffer space becomes available. A buffer size of 0 is valid,
+/// which means that every successful send is paired with a successful recv.
+///
+/// This flavor of channels defines a new `send_opt` method for channels which
+/// is the method by which a message is sent but the task does not fail if it
+/// cannot be delivered.
+///
+/// Another major difference is that send() will *always* return back the data
+/// if it couldn't be sent. This is because it is deterministically known when
+/// the data is received and when it is not received.
+///
+/// Implementation-wise, it can all be summed up with "use a mutex plus some
+/// logic". The mutex used here is an OS native mutex, meaning that no user code
+/// is run inside of the mutex (to prevent context switching). This
+/// implementation shares almost all code for the buffered and unbuffered cases
+/// of a synchronous channel. There are a few branches for the unbuffered case,
+/// but they're mostly just relevant to blocking senders.
+
+use cast;
+use container::Container;
+use iter::Iterator;
+use kinds::Send;
+use mem;
+use ops::Drop;
+use option::{Some, None, Option};
+use ptr::RawPtr;
+use result::{Result, Ok, Err};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use sync::atomics;
+use ty::Unsafe;
+use unstable::mutex::{NativeMutex, LockGuard};
+use vec::Vec;
+
+pub struct Packet<T> {
+ /// Only field outside of the mutex. Just done for kicks, but mainly because
+ /// the other shared channel already had the code implemented
+ channels: atomics::AtomicUint,
+
+ /// The state field is protected by this mutex
+ lock: NativeMutex,
+ state: Unsafe<State<T>>,
+}
+
+struct State<T> {
+ disconnected: bool, // Is the channel disconnected yet?
+ queue: Queue, // queue of senders waiting to send data
+ blocker: Blocker, // currently blocked task on this channel
+ buf: Buffer<T>, // storage for buffered messages
+ cap: uint, // capacity of this channel
+
+ /// A curious flag used to indicate whether a sender failed or succeeded in
+ /// blocking. This is used to transmit information back to the task that it
+ /// must dequeue its message from the buffer because it was not received.
+ /// This is only relevant in the 0-buffer case. This obviously cannot be
+ /// safely constructed, but it's guaranteed to always have a valid pointer
+ /// value.
+ canceled: Option<&'static mut bool>,
+}
+
+/// Possible flavors of tasks who can be blocked on this channel.
+enum Blocker {
+ BlockedSender(BlockedTask),
+ BlockedReceiver(BlockedTask),
+ NoneBlocked
+}
+
+/// Simple queue for threading tasks together. Nodes are stack-allocated, so
+/// this structure is not safe at all
+struct Queue {
+ head: *mut Node,
+ tail: *mut Node,
+}
+
+struct Node {
+ task: Option<BlockedTask>,
+ next: *mut Node,
+}
+
+/// A simple ring-buffer
+struct Buffer<T> {
+ buf: Vec<Option<T>>,
+ start: uint,
+ size: uint,
+}
+
+#[deriving(Show)]
+pub enum Failure {
+ Empty,
+ Disconnected,
+}
+
+/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
+/// in the meantime. This re-locks the mutex upon returning.
+fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
+ lock: &NativeMutex) {
+ let me: ~Task = Local::take();
+ me.deschedule(1, |task| {
+ match mem::replace(slot, f(task)) {
+ NoneBlocked => {}
+ _ => unreachable!(),
+ }
+ unsafe { lock.unlock_noguard(); }
+ Ok(())
+ });
+ unsafe { lock.lock_noguard(); }
+}
+
+/// Wakes up a task, dropping the lock at the correct time
+fn wakeup(task: BlockedTask, guard: LockGuard) {
+ // We need to be careful to wake up the waiting task *outside* of the mutex
+ // in case it incurs a context switch.
+ mem::drop(guard);
+ task.wake().map(|t| t.reawaken());
+}
+
+impl<T: Send> Packet<T> {
+ pub fn new(cap: uint) -> Packet<T> {
+ Packet {
+ channels: atomics::AtomicUint::new(1),
+ lock: unsafe { NativeMutex::new() },
+ state: Unsafe::new(State {
+ disconnected: false,
+ blocker: NoneBlocked,
+ cap: cap,
+ canceled: None,
+ queue: Queue {
+ head: 0 as *mut Node,
+ tail: 0 as *mut Node,
+ },
+ buf: Buffer {
+ buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
+ start: 0,
+ size: 0,
+ },
+ }),
+ }
+ }
+
+ // Locks this channel, returning a guard for the state and the mutable state
+ // itself. Care should be taken to ensure that the state does not escape the
+ // guard!
+ //
+ // Note that we're ok promoting an & reference to an &mut reference because
+ // the lock ensures that we're the only ones in the world with a pointer to
+ // the state.
+ fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
+ unsafe {
+ let guard = self.lock.lock();
+ (guard, &mut *self.state.get())
+ }
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ let (guard, state) = self.lock();
+
+ // wait for a slot to become available, and enqueue the data
+ while !state.disconnected && state.buf.size() == state.buf.cap() {
+ state.queue.enqueue(&self.lock);
+ }
+ if state.disconnected { return Err(t) }
+ state.buf.enqueue(t);
+
+ match mem::replace(&mut state.blocker, NoneBlocked) {
+ // if our capacity is 0, then we need to wait for a receiver to be
+ // available to take our data. After waiting, we check again to make
+ // sure the port didn't go away in the meantime. If it did, we need
+ // to hand back our data.
+ NoneBlocked if state.cap == 0 => {
+ let mut canceled = false;
+ assert!(state.canceled.is_none());
+ state.canceled = Some(unsafe { cast::transmute(&mut canceled) });
+ wait(&mut state.blocker, BlockedSender, &self.lock);
+ if canceled {Err(state.buf.dequeue())} else {Ok(())}
+ }
+
+ // success, we buffered some data
+ NoneBlocked => Ok(()),
+
+ // success, someone's about to receive our buffered data.
+ BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
+
+ BlockedSender(..) => fail!("lolwut"),
+ }
+ }
+
+ pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
+ let (guard, state) = self.lock();
+ if state.disconnected {
+ super::RecvDisconnected(t)
+ } else if state.buf.size() == state.buf.cap() {
+ super::Full(t)
+ } else if state.cap == 0 {
+ // With capacity 0, even though we have buffer space we can't
+ // transfer the data unless there's a receiver waiting.
+ match mem::replace(&mut state.blocker, NoneBlocked) {
+ NoneBlocked => super::Full(t),
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(task) => {
+ state.buf.enqueue(t);
+ wakeup(task, guard);
+ super::Sent
+ }
+ }
+ } else {
+ // If the buffer has some space and the capacity isn't 0, then we
+ // just enqueue the data for later retrieval.
+ assert!(state.buf.size() < state.buf.cap());
+ state.buf.enqueue(t);
+ super::Sent
+ }
+ }
+
+ // Receives a message from this channel
+ //
+ // When reading this, remember that there can only ever be one receiver at
+ // time.
+ pub fn recv(&self) -> Option<T> {
+ let (guard, state) = self.lock();
+
+ // Wait for the buffer to have something in it. No need for a while loop
+ // because we're the only receiver.
+ let mut waited = false;
+ if !state.disconnected && state.buf.size() == 0 {
+ wait(&mut state.blocker, BlockedReceiver, &self.lock);
+ waited = true;
+ }
+ if state.disconnected && state.buf.size() == 0 { return None }
+
+ // Pick up the data, wake up our neighbors, and carry on
+ assert!(state.buf.size() > 0);
+ let ret = state.buf.dequeue();
+ self.wakeup_senders(waited, guard, state);
+ return Some(ret);
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure> {
+ let (guard, state) = self.lock();
+
+ // Easy cases first
+ if state.disconnected { return Err(Disconnected) }
+ if state.buf.size() == 0 { return Err(Empty) }
+
+ // Be sure to wake up neighbors
+ let ret = Ok(state.buf.dequeue());
+ self.wakeup_senders(false, guard, state);
+
+ return ret;
+ }
+
+ // Wake up pending senders after some data has been received
+ //
+ // * `waited` - flag if the receiver blocked to receive some data, or if it
+ // just picked up some data on the way out
+ // * `guard` - the lock guard that is held over this channel's lock
+ fn wakeup_senders(&self, waited: bool,
+ guard: LockGuard,
+ state: &mut State<T>) {
+ let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
+
+ // If this is a no-buffer channel (cap == 0), then if we didn't wait we
+ // need to ACK the sender. If we waited, then the sender waking us up
+ // was already the ACK.
+ let pending_sender2 = if state.cap == 0 && !waited {
+ match mem::replace(&mut state.blocker, NoneBlocked) {
+ NoneBlocked => None,
+ BlockedReceiver(..) => unreachable!(),
+ BlockedSender(task) => {
+ state.canceled.take();
+ Some(task)
+ }
+ }
+ } else {
+ None
+ };
+ mem::drop((state, guard));
+
+ // only outside of the lock do we wake up the pending tasks
+ pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
+ pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
+ }
+
+ // Prepares this shared packet for a channel clone, essentially just bumping
+ // a refcount.
+ pub fn clone_chan(&self) {
+ self.channels.fetch_add(1, atomics::SeqCst);
+ }
+
+ pub fn drop_chan(&self) {
+ // Only flag the channel as disconnected if we're the last channel
+ match self.channels.fetch_sub(1, atomics::SeqCst) {
+ 1 => {}
+ _ => return
+ }
+
+ // Not much to do other than wake up a receiver if one's there
+ let (guard, state) = self.lock();
+ if state.disconnected { return }
+ state.disconnected = true;
+ match mem::replace(&mut state.blocker, NoneBlocked) {
+ NoneBlocked => {}
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(task) => wakeup(task, guard),
+ }
+ }
+
+ pub fn drop_port(&self) {
+ let (guard, state) = self.lock();
+
+ if state.disconnected { return }
+ state.disconnected = true;
+
+ // If the capacity is 0, then the sender may want its data back after
+ // we're disconnected. Otherwise it's now our responsibility to destroy
+ // the buffered data. As with many other portions of this code, this
+ // needs to be careful to destroy the data *outside* of the lock to
+ // prevent deadlock.
+ let _data = if state.cap != 0 {
+ mem::replace(&mut state.buf.buf, Vec::new())
+ } else {
+ Vec::new()
+ };
+ let mut queue = mem::replace(&mut state.queue, Queue {
+ head: 0 as *mut Node,
+ tail: 0 as *mut Node,
+ });
+
+ let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
+ NoneBlocked => None,
+ BlockedSender(task) => {
+ *state.canceled.take_unwrap() = true;
+ Some(task)
+ }
+ BlockedReceiver(..) => unreachable!(),
+ };
+ mem::drop((state, guard));
+
+ loop {
+ match queue.dequeue() {
+ Some(task) => { task.wake().map(|t| t.reawaken()); }
+ None => break,
+ }
+ }
+ waiter.map(|t| t.wake().map(|t| t.reawaken()));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // If Ok, the value is whether this port has data, if Err, then the upgraded
+ // port needs to be checked instead of this one.
+ pub fn can_recv(&self) -> bool {
+ let (_g, state) = self.lock();
+ state.disconnected || state.buf.size() > 0
+ }
+
+ // Attempts to start selection on this port. This can either succeed or fail
+ // because there is data waiting.
+ pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
+ let (_g, state) = self.lock();
+ if state.disconnected || state.buf.size() > 0 {
+ Err(task)
+ } else {
+ match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
+ NoneBlocked => {}
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(..) => unreachable!(),
+ }
+ Ok(())
+ }
+ }
+
+ // Remove a previous selecting task from this port. This ensures that the
+ // blocked task will no longer be visible to any other threads.
+ //
+ // The return value indicates whether there's data on this port.
+ pub fn abort_selection(&self) -> bool {
+ let (_g, state) = self.lock();
+ match mem::replace(&mut state.blocker, NoneBlocked) {
+ NoneBlocked => true,
+ BlockedSender(task) => {
+ state.blocker = BlockedSender(task);
+ true
+ }
+ BlockedReceiver(task) => { task.trash(); false }
+ }
+ }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.channels.load(atomics::SeqCst), 0);
+ let (_g, state) = self.lock();
+ assert!(state.queue.dequeue().is_none());
+ assert!(state.canceled.is_none());
+ }
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Buffer, a simple ring buffer backed by Vec<T>
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Buffer<T> {
+ fn enqueue(&mut self, t: T) {
+ let pos = (self.start + self.size) % self.buf.len();
+ self.size += 1;
+ let prev = mem::replace(self.buf.get_mut(pos), Some(t));
+ assert!(prev.is_none());
+ }
+
+ fn dequeue(&mut self) -> T {
+ let start = self.start;
+ self.size -= 1;
+ self.start = (self.start + 1) % self.buf.len();
+ self.buf.get_mut(start).take_unwrap()
+ }
+
+ fn size(&self) -> uint { self.size }
+ fn cap(&self) -> uint { self.buf.len() }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
+////////////////////////////////////////////////////////////////////////////////
+
+impl Queue {
+ fn enqueue(&mut self, lock: &NativeMutex) {
+ let task: ~Task = Local::take();
+ let mut node = Node {
+ task: None,
+ next: 0 as *mut Node,
+ };
+ task.deschedule(1, |task| {
+ node.task = Some(task);
+ if self.tail.is_null() {
+ self.head = &mut node as *mut Node;
+ self.tail = &mut node as *mut Node;
+ } else {
+ unsafe {
+ (*self.tail).next = &mut node as *mut Node;
+ self.tail = &mut node as *mut Node;
+ }
+ }
+ unsafe { lock.unlock_noguard(); }
+ Ok(())
+ });
+ unsafe { lock.lock_noguard(); }
+ assert!(node.next.is_null());
+ }
+
+ fn dequeue(&mut self) -> Option<BlockedTask> {
+ if self.head.is_null() {
+ return None
+ }
+ let node = self.head;
+ self.head = unsafe { (*node).next };
+ if self.head.is_null() {
+ self.tail = 0 as *mut Node;
+ }
+ unsafe {
+ (*node).next = 0 as *mut Node;
+ Some((*node).task.take_unwrap())
+ }
+ }
+}
}
iotest!(fn binary_file() {
- use rand::{Rng, task_rng};
+ use rand::{StdRng, Rng};
let mut bytes = [0, ..1024];
- task_rng().fill_bytes(bytes);
+ StdRng::new().fill_bytes(bytes);
let tmpdir = tmpdir();
pub use vec::Vec;
// Reexported runtime types
-pub use comm::{channel, Sender, Receiver};
+pub use comm::{sync_channel, channel, SyncSender, Sender, Receiver};
pub use task::spawn;
// Reexported statics
use iter::Iterator;
use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use mem;
+ #[cfg(not(test))] use ptr::RawPtr;
static mut global_args_ptr: uint = 0;
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
#[test]
fn rng() {
- use rand::{Rng, task_rng};
- let mut r = task_rng();
+ use rand::{StdRng, Rng};
+ let mut r = StdRng::new();
let _ = r.next_u32();
}
fn smoke_cond() {
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
unsafe {
- let mut guard = lock.lock();
+ let guard = lock.lock();
let t = Thread::start(proc() {
- let mut guard = lock.lock();
+ let guard = lock.lock();
guard.signal();
});
guard.wait();
#[cfg(test)]
mod tests {
- use super::Vec;
- use iter::{Iterator, range, Extendable};
- use mem::{drop, size_of};
- use ops::Drop;
- use option::{Some, None};
- use container::Container;
- use slice::{Vector, MutableVector, ImmutableVector};
+ use prelude::*;
+ use mem::size_of;
#[test]
fn test_small_vec_struct() {
}
}
-/// An extension of `pipes::stream` that provides synchronous message sending.
-pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> }
-/// An extension of `pipes::stream` that acknowledges each message received.
-pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> }
-
-impl<S: Send> SyncSender<S> {
- pub fn send(&self, val: S) {
- assert!(self.try_send(val), "SyncSender.send: receiving port closed");
- }
-
- /// Sends a message, or report if the receiver has closed the connection
- /// before receiving.
- pub fn try_send(&self, val: S) -> bool {
- self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
- }
-}
-
-impl<R: Send> SyncReceiver<R> {
- pub fn recv(&self) -> R {
- self.recv_opt().expect("SyncReceiver.recv: sending channel closed")
- }
-
- pub fn recv_opt(&self) -> Option<R> {
- self.duplex_stream.recv_opt().map(|val| {
- self.duplex_stream.try_send(());
- val
- })
- }
-
- pub fn try_recv(&self) -> comm::TryRecvResult<R> {
- match self.duplex_stream.try_recv() {
- comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
- state => state,
- }
- }
-}
-
-/// Creates a stream whose channel, upon sending a message, blocks until the
-/// message is received.
-pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) {
- let (chan_stream, port_stream) = duplex();
- (SyncReceiver { duplex_stream: port_stream },
- SyncSender { duplex_stream: chan_stream })
-}
-
#[cfg(test)]
mod test {
- use comm::{duplex, rendezvous};
+ use comm::{duplex};
#[test]
assert!(left.recv() == 123);
assert!(right.recv() == ~"abc");
}
-
- #[test]
- pub fn basic_rendezvous_test() {
- let (port, chan) = rendezvous();
-
- spawn(proc() {
- chan.send("abc");
- });
-
- assert!(port.recv() == "abc");
- }
-
- #[test]
- fn recv_a_lot() {
- // Rendezvous streams should be able to handle any number of messages being sent
- let (port, chan) = rendezvous();
- spawn(proc() {
- for _ in range(0, 10000) { chan.send(()); }
- });
- for _ in range(0, 10000) { port.recv(); }
- }
-
- #[test]
- fn send_and_fail_and_try_recv() {
- let (port, chan) = rendezvous();
- spawn(proc() {
- chan.duplex_stream.send(()); // Can't access this field outside this module
- fail!()
- });
- port.recv()
- }
-
- #[test]
- fn try_send_and_recv_then_fail_before_ack() {
- let (port, chan) = rendezvous();
- spawn(proc() {
- port.duplex_stream.recv();
- fail!()
- });
- chan.try_send(());
- }
-
- #[test]
- #[should_fail]
- fn send_and_recv_then_fail_before_ack() {
- let (port, chan) = rendezvous();
- spawn(proc() {
- port.duplex_stream.recv();
- fail!()
- });
- chan.send(());
- }
}
#[cfg(test)]
#[phase(syntax, link)] extern crate log;
-pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex};
+pub use comm::{DuplexStream, duplex};
pub use task_pool::TaskPool;
pub use future::Future;
pub use arc::{Arc, Weak};