// option. This file may not be copied, modified, or distributed
// except according to those terms.
-//! Multi-producer, single-consumer communication primitives threads
+//! Multi-producer, single-consumer FIFO queue communication primitives.
//!
//! This module provides message-based communication over channels, concretely
//! defined among three types:
//! // Create a simple streaming channel
//! let (tx, rx) = channel();
//! Thread::spawn(move|| {
-//! tx.send(10i).unwrap();
+//! tx.send(10).unwrap();
//! });
-//! assert_eq!(rx.recv().unwrap(), 10i);
+//! assert_eq!(rx.recv().unwrap(), 10);
//! ```
//!
//! Shared usage:
//! // where tx is the sending half (tx for transmission), and rx is the receiving
//! // half (rx for receiving).
//! let (tx, rx) = channel();
-//! for i in range(0i, 10i) {
+//! for i in 0..10 {
//! let tx = tx.clone();
//! Thread::spawn(move|| {
//! tx.send(i).unwrap();
//! });
//! }
//!
-//! for _ in range(0i, 10i) {
+//! for _ in 0..10 {
//! let j = rx.recv().unwrap();
//! assert!(0 <= j && j < 10);
//! }
//!
//! ```no_run
//! use std::sync::mpsc::channel;
-//! use std::io::timer::Timer;
+//! use std::old_io::timer::Timer;
//! use std::time::Duration;
//!
//! let (tx, rx) = channel::<int>();
//!
//! ```no_run
//! use std::sync::mpsc::channel;
-//! use std::io::timer::Timer;
+//! use std::old_io::timer::Timer;
//! use std::time::Duration;
//!
//! let (tx, rx) = channel::<int>();
//! }
//! ```
-#![stable]
+#![stable(feature = "rust1", since = "1.0.0")]
// A description of how Rust's channel implementation works
//
/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub struct Receiver<T> {
inner: UnsafeCell<Flavor<T>>,
}
/// An iterator over messages on a receiver, this iterator will block
/// whenever `next` is called, waiting for a new message, and `None` will be
/// returned when the corresponding channel has hung up.
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub struct Iter<'a, T:'a> {
rx: &'a Receiver<T>
}
/// The sending-half of Rust's asynchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
inner: UnsafeCell<Flavor<T>>,
}
/// The sending-half of Rust's synchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<UnsafeCell<sync::Packet<T>>>,
}
/// A `send` operation can only fail if the receiving end of a channel is
/// disconnected, implying that the data could never be received. The error
/// contains the data being sent as a payload so it can be recovered.
-#[derive(PartialEq, Eq)]
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
+#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
/// An error returned from the `recv` function on a `Receiver`.
///
/// The `recv` operation can only fail if the sending half of a channel is
/// disconnected, implying that no further messages will ever be received.
-#[derive(PartialEq, Eq, Clone, Copy, Show)]
-#[stable]
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[stable(feature = "rust1", since = "1.0.0")]
pub struct RecvError;
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
-#[derive(PartialEq, Clone, Copy, Show)]
-#[stable]
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[stable(feature = "rust1", since = "1.0.0")]
pub enum TryRecvError {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
Empty,
/// This channel's sending half has become disconnected, and there will
/// never be any more data received on this channel
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
Disconnected,
}
/// This enumeration is the list of the possible error outcomes for the
/// `SyncSender::try_send` method.
-#[derive(PartialEq, Clone)]
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
+#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
/// The data could not be sent on the channel because it would require that
/// the callee block to send the data.
/// If this is a buffered channel, then the buffer is full at this time. If
/// this is not a buffered channel, then there is no receiver available to
/// acquire the data.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
Full(T),
/// This channel's receiving half has disconnected, so the data could not be
/// sent. The data is returned back to the callee in this case.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
Disconnected(T),
}
/// // Let's see what that answer was
/// println!("{:?}", rx.recv().unwrap());
/// ```
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
(Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
/// let (tx, rx) = sync_channel(1);
///
/// // this returns immediately
-/// tx.send(1i).unwrap();
+/// tx.send(1).unwrap();
///
/// Thread::spawn(move|| {
/// // this will block until the previous message has been received
-/// tx.send(2i).unwrap();
+/// tx.send(2).unwrap();
/// });
///
-/// assert_eq!(rx.recv().unwrap(), 1i);
-/// assert_eq!(rx.recv().unwrap(), 2i);
+/// assert_eq!(rx.recv().unwrap(), 1);
+/// assert_eq!(rx.recv().unwrap(), 2);
/// ```
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
(SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
/// let (tx, rx) = channel();
///
/// // This send is always successful
- /// tx.send(1i).unwrap();
+ /// tx.send(1).unwrap();
///
/// // This send will fail because the receiver is gone
/// drop(rx);
- /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
+ /// assert_eq!(tx.send(1).err().unwrap().0, 1);
/// ```
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
}
#[unsafe_destructor]
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> Drop for Sender<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
/// This function will never panic, but it may return `Err` if the
/// `Receiver` has disconnected and is no longer able to receive
/// information.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
unsafe { (*self.inner.get()).send(t).map_err(SendError) }
}
///
/// See `SyncSender::send` for notes about guarantees of whether the
/// receiver has received the data or not if this function is successful.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.inner.get()).try_send(t) }
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
unsafe { (*self.inner.get()).clone_chan(); }
}
#[unsafe_destructor]
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> Drop for SyncSender<T> {
fn drop(&mut self) {
unsafe { (*self.inner.get()).drop_chan(); }
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// block on a receiver.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
/// If the corresponding `Sender` has disconnected, or it disconnects while
/// this call is blocking, this call will wake up and return `Err` to
/// indicate that no more messages can ever be received on this channel.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn recv(&self) -> Result<T, RecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
/// Returns an iterator that will block waiting for messages, but never
/// `panic!`. It will return `None` when the channel has hung up.
- #[stable]
+ #[stable(feature = "rust1", since = "1.0.0")]
pub fn iter(&self) -> Iter<T> {
Iter { rx: self }
}
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<'a, T: Send> Iterator for Iter<'a, T> {
type Item = T;
}
#[unsafe_destructor]
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Send> Drop for Receiver<T> {
fn drop(&mut self) {
match *unsafe { self.inner_mut() } {
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"SendError(..)".fmt(f)
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"sending on a closed channel".fmt(f)
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"receiving on a closed channel".fmt(f)
}
}
-#[stable]
+#[stable(feature = "rust1", since = "1.0.0")]
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
#[test]
fn drop_full() {
let (tx, _rx) = channel();
- tx.send(box 1i).unwrap();
+ tx.send(box 1).unwrap();
}
#[test]
let (tx, _rx) = channel();
drop(tx.clone());
drop(tx.clone());
- tx.send(box 1i).unwrap();
+ tx.send(box 1).unwrap();
}
#[test]
fn stress() {
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
- for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
+ for _ in 0u..10000 { tx.send(1).unwrap(); }
});
- for _ in range(0u, 10000) {
+ for _ in 0u..10000 {
assert_eq!(rx.recv().unwrap(), 1);
}
t.join().ok().unwrap();
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
- for _ in range(0, AMT * NTHREADS) {
+ for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
match rx.try_recv() {
}
});
- for _ in range(0, NTHREADS) {
+ for _ in 0..NTHREADS {
let tx = tx.clone();
Thread::spawn(move|| {
- for _ in range(0, AMT) { tx.send(1).unwrap(); }
+ for _ in 0..AMT { tx.send(1).unwrap(); }
});
}
drop(tx);
let (tx2, rx2) = channel::<int>();
let t1 = Thread::scoped(move|| {
tx1.send(()).unwrap();
- for _ in range(0i, 40) {
+ for _ in 0..40 {
assert_eq!(rx2.recv().unwrap(), 1);
}
});
rx1.recv().unwrap();
let t2 = Thread::scoped(move|| {
- for _ in range(0i, 40) {
+ for _ in 0..40 {
tx2.send(1).unwrap();
}
});
fn recv_from_outside_runtime() {
let (tx, rx) = channel::<int>();
let t = Thread::scoped(move|| {
- for _ in range(0i, 40) {
+ for _ in 0..40 {
assert_eq!(rx.recv().unwrap(), 1);
}
});
- for _ in range(0u, 40) {
+ for _ in 0u..40 {
tx.send(1).unwrap();
}
t.join().ok().unwrap();
#[test]
fn oneshot_multi_thread_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
drop(rx);
#[test]
fn oneshot_multi_thread_send_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
let _t = Thread::spawn(move|| {
drop(rx);
#[test]
fn oneshot_multi_thread_recv_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = channel::<int>();
Thread::spawn(move|| {
let res = Thread::scoped(move|| {
#[test]
fn oneshot_multi_thread_send_recv_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = channel();
let _t = Thread::spawn(move|| {
- tx.send(box 10i).unwrap();
+ tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10i);
+ assert!(rx.recv().unwrap() == box 10);
}
}
#[test]
fn stream_send_recv_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = channel();
send(tx, 0);
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = channel();
- for _ in range(0i, 10000) { tx.send(()).unwrap(); }
- for _ in range(0i, 10000) { rx.recv().unwrap(); }
+ for _ in 0..10000 { tx.send(()).unwrap(); }
+ for _ in 0..10000 { rx.recv().unwrap(); }
}
#[test]
fn shared_chan_stress() {
let (tx, rx) = channel();
let total = stress_factor() + 100;
- for _ in range(0, total) {
+ for _ in 0..total {
let tx = tx.clone();
Thread::spawn(move|| {
tx.send(()).unwrap();
});
}
- for _ in range(0, total) {
+ for _ in 0..total {
rx.recv().unwrap();
}
}
tx2.send(()).unwrap();
});
// make sure the other task has gone to sleep
- for _ in range(0u, 5000) { Thread::yield_now(); }
+ for _ in 0u..5000 { Thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
#[test]
fn drop_full() {
let (tx, _rx) = sync_channel(1);
- tx.send(box 1i).unwrap();
+ tx.send(box 1).unwrap();
}
#[test]
fn stress() {
let (tx, rx) = sync_channel::<int>(0);
Thread::spawn(move|| {
- for _ in range(0u, 10000) { tx.send(1).unwrap(); }
+ for _ in 0u..10000 { tx.send(1).unwrap(); }
});
- for _ in range(0u, 10000) {
+ for _ in 0u..10000 {
assert_eq!(rx.recv().unwrap(), 1);
}
}
let (dtx, drx) = sync_channel::<()>(0);
Thread::spawn(move|| {
- for _ in range(0, AMT * NTHREADS) {
+ for _ in 0..AMT * NTHREADS {
assert_eq!(rx.recv().unwrap(), 1);
}
match rx.try_recv() {
dtx.send(()).unwrap();
});
- for _ in range(0, NTHREADS) {
+ for _ in 0..NTHREADS {
let tx = tx.clone();
Thread::spawn(move|| {
- for _ in range(0, AMT) { tx.send(1).unwrap(); }
+ for _ in 0..AMT { tx.send(1).unwrap(); }
});
}
drop(tx);
#[test]
fn oneshot_multi_thread_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
drop(rx);
#[test]
fn oneshot_multi_thread_send_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
drop(rx);
#[test]
fn oneshot_multi_thread_recv_close_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<int>(0);
let _t = Thread::spawn(move|| {
let res = Thread::scoped(move|| {
#[test]
fn oneshot_multi_thread_send_recv_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<Box<int>>(0);
let _t = Thread::spawn(move|| {
- tx.send(box 10i).unwrap();
+ tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10i);
+ assert!(rx.recv().unwrap() == box 10);
}
}
#[test]
fn stream_send_recv_stress() {
- for _ in range(0, stress_factor()) {
+ for _ in 0..stress_factor() {
let (tx, rx) = sync_channel::<Box<int>>(0);
send(tx, 0);
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = sync_channel(10000);
- for _ in range(0u, 10000) { tx.send(()).unwrap(); }
- for _ in range(0u, 10000) { rx.recv().unwrap(); }
+ for _ in 0u..10000 { tx.send(()).unwrap(); }
+ for _ in 0u..10000 { rx.recv().unwrap(); }
}
#[test]
fn shared_chan_stress() {
let (tx, rx) = sync_channel(0);
let total = stress_factor() + 100;
- for _ in range(0, total) {
+ for _ in 0..total {
let tx = tx.clone();
Thread::spawn(move|| {
tx.send(()).unwrap();
});
}
- for _ in range(0, total) {
+ for _ in 0..total {
rx.recv().unwrap();
}
}
tx2.send(()).unwrap();
});
// make sure the other task has gone to sleep
- for _ in range(0u, 5000) { Thread::yield_now(); }
+ for _ in 0u..5000 { Thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
rx2.recv().unwrap();
}
- for _ in range(0u, 100) {
+ for _ in 0u..100 {
repro()
}
}