]> git.lizzy.rs Git - rust.git/commitdiff
comm: Implement synchronous channels
authorAlex Crichton <alex@alexcrichton.com>
Mon, 17 Mar 2014 21:34:25 +0000 (14:34 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Tue, 25 Mar 2014 03:06:37 +0000 (20:06 -0700)
This commit contains an implementation of synchronous, bounded channels for
Rust. This is an implementation of the proposal made last January [1]. These
channels are built on mutexes, and currently focus on a working implementation
rather than speed. Receivers for sync channels have select() implemented for
them, but there is currently no implementation of select() for sync senders.

Rust will continue to provide both synchronous and asynchronous channels as part
of the standard distribution, there is no intent to remove asynchronous
channels. This flavor of channels is meant to provide an alternative to
asynchronous channels because like green tasks, asynchronous channels are not
appropriate for all situations.

[1] - https://mail.mozilla.org/pipermail/rust-dev/2014-January/007924.html

src/libstd/comm/mod.rs
src/libstd/comm/select.rs
src/libstd/comm/sync.rs [new file with mode: 0644]
src/libstd/io/fs.rs
src/libstd/prelude.rs
src/libstd/rt/args.rs
src/libstd/rt/task.rs
src/libstd/unstable/mutex.rs
src/libstd/vec.rs
src/libsync/comm.rs
src/libsync/lib.rs

index 267140a0089bdfbbea30a81ecf2d30f605eb9d6f..94e3d5ce2d3f643df023db8b3bb6647aecd0e45a 100644 (file)
@@ -280,6 +280,7 @@ fn f() $b
 mod oneshot;
 mod stream;
 mod shared;
+mod sync;
 
 // Use a power of 2 to allow LLVM to optimize to something that's not a
 // division, this is hit pretty regularly.
@@ -301,8 +302,8 @@ pub struct Messages<'a, T> {
     priv rx: &'a Receiver<T>
 }
 
-/// The sending-half of Rust's channel type. This half can only be owned by one
-/// task
+/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
 pub struct Sender<T> {
     priv inner: Flavor<T>,
     priv sends: Cell<uint>,
@@ -310,6 +311,14 @@ pub struct Sender<T> {
     priv marker: marker::NoShare,
 }
 
+/// The sending-half of Rust's synchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
+pub struct SyncSender<T> {
+    priv inner: UnsafeArc<sync::Packet<T>>,
+    // can't share in an arc
+    priv marker: marker::NoShare,
+}
+
 /// This enumeration is the list of the possible reasons that try_recv could not
 /// return data when called.
 #[deriving(Eq, Clone, Show)]
@@ -324,10 +333,31 @@ pub enum TryRecvResult<T> {
     Data(T),
 }
 
+/// This enumeration is the list of the possible outcomes for the
+/// `SyncSender::try_send` method.
+#[deriving(Eq, Clone, Show)]
+pub enum TrySendResult<T> {
+    /// The data was successfully sent along the channel. This either means that
+    /// it was buffered in the channel, or handed off to a receiver. In either
+    /// case, the callee no longer has ownership of the data.
+    Sent,
+    /// The data could not be sent on the channel because it would require that
+    /// the callee block to send the data.
+    ///
+    /// If this is a buffered channel, then the buffer is full at this time. If
+    /// this is not a buffered channel, then there is no receiver available to
+    /// acquire the data.
+    Full(T),
+    /// This channel's receiving half has disconnected, so the data could not be
+    /// sent. The data is returned back to the callee in this case.
+    RecvDisconnected(T),
+}
+
 enum Flavor<T> {
     Oneshot(UnsafeArc<oneshot::Packet<T>>),
     Stream(UnsafeArc<stream::Packet<T>>),
     Shared(UnsafeArc<shared::Packet<T>>),
+    Sync(UnsafeArc<sync::Packet<T>>),
 }
 
 /// Creates a new channel, returning the sender/receiver halves. All data sent
@@ -338,6 +368,46 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
     (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
 }
 
+/// Creates a new synchronous, bounded channel.
+///
+/// Like asynchronous channels, the `Receiver` will block until a message
+/// becomes available. These channels differ greatly in the semantics of the
+/// sender from asynchronous channels, however.
+///
+/// This channel has an internal buffer on which messages will be queued. When
+/// the internal buffer becomes full, future sends will *block* waiting for the
+/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
+/// becomes  "rendezvous channel" where each send will not return until a recv
+/// is paired with it.
+///
+/// As with asynchronous channels, all senders will fail in `send` if the
+/// `Receiver` has been destroyed.
+///
+/// # Example
+///
+/// ```
+/// let (tx, rx) = sync_channel(1);
+///
+/// // this returns immediately
+/// tx.send(1);
+///
+/// spawn(proc() {
+///     // this will block until the previous message has been received
+///     tx.send(2);
+/// });
+///
+/// assert_eq!(rx.recv(), 1);
+/// assert_eq!(rx.recv(), 2);
+/// ```
+pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
+    let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
+    (SyncSender::new(a), Receiver::my_new(Sync(b)))
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Sender
+////////////////////////////////////////////////////////////////////////////////
+
 impl<T: Send> Sender<T> {
     fn my_new(inner: Flavor<T>) -> Sender<T> {
         Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
@@ -422,6 +492,7 @@ pub fn try_send(&self, t: T) -> bool {
             }
             Stream(ref p) => return unsafe { (*p.get()).send(t) },
             Shared(ref p) => return unsafe { (*p.get()).send(t) },
+            Sync(..) => unreachable!(),
         };
 
         unsafe {
@@ -453,6 +524,7 @@ fn clone(&self) -> Sender<T> {
                 unsafe { (*p.get()).clone_chan(); }
                 return Sender::my_new(Shared(p.clone()));
             }
+            Sync(..) => unreachable!(),
         };
 
         unsafe {
@@ -472,10 +544,100 @@ fn drop(&mut self) {
             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Sync(..) => unreachable!(),
         }
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// SyncSender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T: Send> SyncSender<T> {
+    fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
+        SyncSender { inner: inner, marker: marker::NoShare }
+    }
+
+    /// Sends a value on this synchronous channel.
+    ///
+    /// This function will *block* until space in the internal buffer becomes
+    /// available or a receiver is available to hand off the message to.
+    ///
+    /// Note that a successful send does *not* guarantee that the receiver will
+    /// ever see the data if there is a buffer on this channel. Messages may be
+    /// enqueued in the internal buffer for the receiver to receive at a later
+    /// time. If the buffer size is 0, however, it can be guaranteed that the
+    /// receiver has indeed received the data if this function returns success.
+    ///
+    /// # Failure
+    ///
+    /// Similarly to `Sender::send`, this function will fail if the
+    /// corresponding `Receiver` for this channel has disconnected. This
+    /// behavior is used to propagate failure among tasks.
+    ///
+    /// If failure is not desired, you can achieve the same semantics with the
+    /// `SyncSender::send_opt` method which will not fail if the receiver
+    /// disconnects.
+    pub fn send(&self, t: T) {
+        if self.send_opt(t).is_some() {
+            fail!("sending on a closed channel");
+        }
+    }
+
+    /// Send a value on a channel, returning it back if the receiver
+    /// disconnected
+    ///
+    /// This method will *block* to send the value `t` on the channel, but if
+    /// the value could not be sent due to the receiver disconnecting, the value
+    /// is returned back to the callee. This function is similar to `try_send`,
+    /// except that it will block if the channel is currently full.
+    ///
+    /// # Failure
+    ///
+    /// This function cannot fail.
+    pub fn send_opt(&self, t: T) -> Option<T> {
+        match unsafe { (*self.inner.get()).send(t) } {
+            Ok(()) => None,
+            Err(t) => Some(t),
+        }
+    }
+
+    /// Attempts to send a value on this channel without blocking.
+    ///
+    /// This method semantically differs from `Sender::try_send` because it can
+    /// fail if the receiver has not disconnected yet. If the buffer on this
+    /// channel is full, this function will immediately return the data back to
+    /// the callee.
+    ///
+    /// See `SyncSender::send` for notes about guarantees of whether the
+    /// receiver has received the data or not if this function is successful.
+    ///
+    /// # Failure
+    ///
+    /// This function cannot fail
+    pub fn try_send(&self, t: T) -> TrySendResult<T> {
+        unsafe { (*self.inner.get()).try_send(t) }
+    }
+}
+
+impl<T: Send> Clone for SyncSender<T> {
+    fn clone(&self) -> SyncSender<T> {
+        unsafe { (*self.inner.get()).clone_chan(); }
+        return SyncSender::new(self.inner.clone());
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for SyncSender<T> {
+    fn drop(&mut self) {
+        unsafe { (*self.inner.get()).drop_chan(); }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Receiver
+////////////////////////////////////////////////////////////////////////////////
+
 impl<T: Send> Receiver<T> {
     fn my_new(inner: Flavor<T>) -> Receiver<T> {
         Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
@@ -554,6 +716,13 @@ pub fn try_recv(&self) -> TryRecvResult<T> {
                         Err(shared::Disconnected) => return Disconnected,
                     }
                 }
+                Sync(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Data(t),
+                        Err(sync::Empty) => return Empty,
+                        Err(sync::Disconnected) => return Disconnected,
+                    }
+                }
             };
             unsafe {
                 mem::swap(&mut cast::transmute_mut(self).inner,
@@ -600,6 +769,7 @@ pub fn recv_opt(&self) -> Option<T> {
                         Err(shared::Disconnected) => return None,
                     }
                 }
+                Sync(ref p) => return unsafe { (*p.get()).recv() }
             };
             unsafe {
                 mem::swap(&mut cast::transmute_mut(self).inner,
@@ -634,6 +804,9 @@ fn can_recv(&self) -> bool {
                 Shared(ref p) => {
                     return unsafe { (*p.get()).can_recv() };
                 }
+                Sync(ref p) => {
+                    return unsafe { (*p.get()).can_recv() };
+                }
             };
             unsafe {
                 mem::swap(&mut cast::transmute_mut(self).inner,
@@ -662,6 +835,9 @@ fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
                 Shared(ref p) => {
                     return unsafe { (*p.get()).start_selection(task) };
                 }
+                Sync(ref p) => {
+                    return unsafe { (*p.get()).start_selection(task) };
+                }
             };
             task = t;
             unsafe {
@@ -682,6 +858,9 @@ fn abort_selection(&self) -> bool {
                 Shared(ref p) => return unsafe {
                     (*p.get()).abort_selection(was_upgrade)
                 },
+                Sync(ref p) => return unsafe {
+                    (*p.get()).abort_selection()
+                },
             };
             let mut new_port = match result { Ok(b) => return b, Err(p) => p };
             was_upgrade = true;
@@ -704,6 +883,7 @@ fn drop(&mut self) {
             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
         }
     }
 }
@@ -1243,3 +1423,517 @@ fn recv(rx: Receiver<~int>, i: int) {
         pdone.recv();
     })
 }
+
+#[cfg(test)]
+mod sync_tests {
+    use prelude::*;
+    use os;
+
+    pub fn stress_factor() -> uint {
+        match os::getenv("RUST_TEST_STRESS") {
+            Some(val) => from_str::<uint>(val).unwrap(),
+            None => 1,
+        }
+    }
+
+    test!(fn smoke() {
+        let (tx, rx) = sync_channel(1);
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn drop_full() {
+        let (tx, _rx) = sync_channel(1);
+        tx.send(~1);
+    })
+
+    test!(fn smoke_shared() {
+        let (tx, rx) = sync_channel(1);
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+        let tx = tx.clone();
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_threads() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            tx.send(1);
+        });
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_port_gone() {
+        let (tx, rx) = sync_channel(0);
+        drop(rx);
+        tx.send(1);
+    } #[should_fail])
+
+    test!(fn smoke_shared_port_gone2() {
+        let (tx, rx) = sync_channel(0);
+        drop(rx);
+        let tx2 = tx.clone();
+        drop(tx);
+        tx2.send(1);
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            rx.recv();
+        });
+        loop { tx.send(1) }
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent_shared() {
+        let (tx, rx) = sync_channel(0);
+        let tx2 = tx.clone();
+        spawn(proc() {
+            rx.recv();
+        });
+        loop {
+            tx.send(1);
+            tx2.send(1);
+        }
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone_shared() {
+        let (tx, rx) = sync_channel::<()>(0);
+        let tx2 = tx.clone();
+        drop(tx);
+        drop(tx2);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn chan_gone_concurrent() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            tx.send(1);
+            tx.send(1);
+        });
+        loop { rx.recv(); }
+    } #[should_fail])
+
+    test!(fn stress() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            for _ in range(0, 10000) { tx.send(1); }
+        });
+        for _ in range(0, 10000) {
+            assert_eq!(rx.recv(), 1);
+        }
+    })
+
+    test!(fn stress_shared() {
+        static AMT: uint = 1000;
+        static NTHREADS: uint = 8;
+        let (tx, rx) = sync_channel::<int>(0);
+        let (dtx, drx) = sync_channel::<()>(0);
+
+        spawn(proc() {
+            for _ in range(0, AMT * NTHREADS) {
+                assert_eq!(rx.recv(), 1);
+            }
+            match rx.try_recv() {
+                Data(..) => fail!(),
+                _ => {}
+            }
+            dtx.send(());
+        });
+
+        for _ in range(0, NTHREADS) {
+            let tx = tx.clone();
+            spawn(proc() {
+                for _ in range(0, AMT) { tx.send(1); }
+            });
+        }
+        drop(tx);
+        drx.recv();
+    })
+
+    test!(fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        let (_tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+    })
+
+    test!(fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        let (tx, _rx) = sync_channel::<int>(0);
+        drop(tx);
+    })
+
+    test!(fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        let (tx, rx) = sync_channel::<~int>(0);
+        drop(rx);
+        tx.send(~0);
+    } #[should_fail])
+
+    test!(fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will fail
+        let res = task::try(proc() {
+            let (tx, rx) = sync_channel::<int>(0);
+            drop(tx);
+            rx.recv();
+        });
+        // What is our res?
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_single_thread_send_then_recv() {
+        let (tx, rx) = sync_channel::<~int>(1);
+        tx.send(~10);
+        assert!(rx.recv() == ~10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_open() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(tx.try_send(10), Sent);
+        assert!(rx.recv() == 10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_closed() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+        assert_eq!(tx.try_send(10), RecvDisconnected(10));
+    })
+
+    test!(fn oneshot_single_thread_try_send_closed2() {
+        let (tx, _rx) = sync_channel::<int>(0);
+        assert_eq!(tx.try_send(10), Full(10));
+    })
+
+    test!(fn oneshot_single_thread_try_recv_open() {
+        let (tx, rx) = sync_channel::<int>(1);
+        tx.send(10);
+        assert!(rx.recv_opt() == Some(10));
+    })
+
+    test!(fn oneshot_single_thread_try_recv_closed() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        assert!(rx.recv_opt() == None);
+    })
+
+    test!(fn oneshot_single_thread_peek_data() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(rx.try_recv(), Empty)
+        tx.send(10);
+        assert_eq!(rx.try_recv(), Data(10));
+    })
+
+    test!(fn oneshot_single_thread_peek_close() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        assert_eq!(rx.try_recv(), Disconnected);
+        assert_eq!(rx.try_recv(), Disconnected);
+    })
+
+    test!(fn oneshot_single_thread_peek_open() {
+        let (_tx, rx) = sync_channel::<int>(0);
+        assert_eq!(rx.try_recv(), Empty);
+    })
+
+    test!(fn oneshot_multi_task_recv_then_send() {
+        let (tx, rx) = sync_channel::<~int>(0);
+        spawn(proc() {
+            assert!(rx.recv() == ~10);
+        });
+
+        tx.send(~10);
+    })
+
+    test!(fn oneshot_multi_task_recv_then_close() {
+        let (tx, rx) = sync_channel::<~int>(0);
+        spawn(proc() {
+            drop(tx);
+        });
+        let res = task::try(proc() {
+            assert!(rx.recv() == ~10);
+        });
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_multi_thread_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                drop(rx);
+            });
+            drop(tx);
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                drop(rx);
+            });
+            let _ = task::try(proc() {
+                tx.send(1);
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_recv_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                let res = task::try(proc() {
+                    rx.recv();
+                });
+                assert!(res.is_err());
+            });
+            spawn(proc() {
+                spawn(proc() {
+                    drop(tx);
+                });
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel(0);
+            spawn(proc() {
+                tx.send(~10);
+            });
+            spawn(proc() {
+                assert!(rx.recv() == ~10);
+            });
+        }
+    })
+
+    test!(fn stream_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel(0);
+
+            send(tx, 0);
+            recv(rx, 0);
+
+            fn send(tx: SyncSender<~int>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    tx.send(~i);
+                    send(tx, i + 1);
+                });
+            }
+
+            fn recv(rx: Receiver<~int>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    assert!(rx.recv() == ~i);
+                    recv(rx, i + 1);
+                });
+            }
+        }
+    })
+
+    test!(fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        let (tx, rx) = sync_channel(10000);
+        for _ in range(0, 10000) { tx.send(()); }
+        for _ in range(0, 10000) { rx.recv(); }
+    })
+
+    test!(fn shared_chan_stress() {
+        let (tx, rx) = sync_channel(0);
+        let total = stress_factor() + 100;
+        for _ in range(0, total) {
+            let tx = tx.clone();
+            spawn(proc() {
+                tx.send(());
+            });
+        }
+
+        for _ in range(0, total) {
+            rx.recv();
+        }
+    })
+
+    test!(fn test_nested_recv_iter() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let (total_tx, total_rx) = sync_channel::<int>(0);
+
+        spawn(proc() {
+            let mut acc = 0;
+            for x in rx.iter() {
+                acc += x;
+            }
+            total_tx.send(acc);
+        });
+
+        tx.send(3);
+        tx.send(1);
+        tx.send(2);
+        drop(tx);
+        assert_eq!(total_rx.recv(), 6);
+    })
+
+    test!(fn test_recv_iter_break() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let (count_tx, count_rx) = sync_channel(0);
+
+        spawn(proc() {
+            let mut count = 0;
+            for x in rx.iter() {
+                if count >= 3 {
+                    break;
+                } else {
+                    count += x;
+                }
+            }
+            count_tx.send(count);
+        });
+
+        tx.send(2);
+        tx.send(2);
+        tx.send(2);
+        tx.try_send(2);
+        drop(tx);
+        assert_eq!(count_rx.recv(), 4);
+    })
+
+    test!(fn try_recv_states() {
+        let (tx1, rx1) = sync_channel::<int>(1);
+        let (tx2, rx2) = sync_channel::<()>(1);
+        let (tx3, rx3) = sync_channel::<()>(1);
+        spawn(proc() {
+            rx2.recv();
+            tx1.send(1);
+            tx3.send(());
+            rx2.recv();
+            drop(tx1);
+            tx3.send(());
+        });
+
+        assert_eq!(rx1.try_recv(), Empty);
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Data(1));
+        assert_eq!(rx1.try_recv(), Empty);
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Disconnected);
+    })
+
+    // This bug used to end up in a livelock inside of the Receiver destructor
+    // because the internal state of the Shared packet was corrupted
+    test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+        let (tx, rx) = sync_channel(0);
+        let (tx2, rx2) = sync_channel(0);
+        spawn(proc() {
+            rx.recv(); // wait on a oneshot
+            drop(rx);  // destroy a shared
+            tx2.send(());
+        });
+        // make sure the other task has gone to sleep
+        for _ in range(0, 5000) { task::deschedule(); }
+
+        // upgrade to a shared chan and send a message
+        let t = tx.clone();
+        drop(tx);
+        t.send(());
+
+        // wait for the child task to exit before we exit
+        rx2.recv();
+    })
+
+    test!(fn try_recvs_off_the_runtime() {
+        use std::rt::thread::Thread;
+
+        let (tx, rx) = sync_channel(0);
+        let (cdone, pdone) = channel();
+        let t = Thread::start(proc() {
+            let mut hits = 0;
+            while hits < 10 {
+                match rx.try_recv() {
+                    Data(()) => { hits += 1; }
+                    Empty => { Thread::yield_now(); }
+                    Disconnected => return,
+                }
+            }
+            cdone.send(());
+        });
+        for _ in range(0, 10) {
+            tx.send(());
+        }
+        t.join();
+        pdone.recv();
+    })
+
+    test!(fn send_opt1() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() { rx.recv(); });
+        assert_eq!(tx.send_opt(1), None);
+    })
+
+    test!(fn send_opt2() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() { drop(rx); });
+        assert_eq!(tx.send_opt(1), Some(1));
+    })
+
+    test!(fn send_opt3() {
+        let (tx, rx) = sync_channel(1);
+        assert_eq!(tx.send_opt(1), None);
+        spawn(proc() { drop(rx); });
+        assert_eq!(tx.send_opt(1), Some(1));
+    })
+
+    test!(fn send_opt4() {
+        let (tx, rx) = sync_channel(0);
+        let tx2 = tx.clone();
+        let (done, donerx) = channel();
+        let done2 = done.clone();
+        spawn(proc() {
+            assert_eq!(tx.send_opt(1), Some(1));
+            done.send(());
+        });
+        spawn(proc() {
+            assert_eq!(tx2.send_opt(2), Some(2));
+            done2.send(());
+        });
+        drop(rx);
+        donerx.recv();
+        donerx.recv();
+    })
+
+    test!(fn try_send1() {
+        let (tx, _rx) = sync_channel(0);
+        assert_eq!(tx.try_send(1), Full(1));
+    })
+
+    test!(fn try_send2() {
+        let (tx, _rx) = sync_channel(1);
+        assert_eq!(tx.try_send(1), Sent);
+        assert_eq!(tx.try_send(1), Full(1));
+    })
+
+    test!(fn try_send3() {
+        let (tx, rx) = sync_channel(1);
+        assert_eq!(tx.try_send(1), Sent);
+        drop(rx);
+        assert_eq!(tx.try_send(1), RecvDisconnected(1));
+    })
+
+    test!(fn try_send4() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            for _ in range(0, 1000) { task::deschedule(); }
+            assert_eq!(tx.try_send(1), Sent);
+        });
+        assert_eq!(rx.recv(), 1);
+    })
+}
index 5872c308f938bc0bd68e7786164575160e47f1a1..1b2e79e02b4187ebb1b97cef5f9c2dad19772e0b 100644 (file)
@@ -648,4 +648,40 @@ mod test {
         tx1.send(());
         rx2.recv();
     })
+
+    test!(fn sync1() {
+        let (tx, rx) = sync_channel(1);
+        tx.send(1);
+        select! {
+            n = rx.recv() => { assert_eq!(n, 1); }
+        }
+    })
+
+    test!(fn sync2() {
+        let (tx, rx) = sync_channel(0);
+        spawn(proc() {
+            for _ in range(0, 100) { task::deschedule() }
+            tx.send(1);
+        });
+        select! {
+            n = rx.recv() => { assert_eq!(n, 1); }
+        }
+    })
+
+    test!(fn sync3() {
+        let (tx1, rx1) = sync_channel(0);
+        let (tx2, rx2) = channel();
+        spawn(proc() { tx1.send(1); });
+        spawn(proc() { tx2.send(2); });
+        select! {
+            n = rx1.recv() => {
+                assert_eq!(n, 1);
+                assert_eq!(rx2.recv(), 2);
+            },
+            n = rx2.recv() => {
+                assert_eq!(n, 2);
+                assert_eq!(rx1.recv(), 1);
+            }
+        }
+    })
 }
diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs
new file mode 100644 (file)
index 0000000..b3591da
--- /dev/null
@@ -0,0 +1,485 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Synchronous channels/ports
+///
+/// This channel implementation differs significantly from the asynchronous
+/// implementations found next to it (oneshot/stream/share). This is an
+/// implementation of a synchronous, bounded buffer channel.
+///
+/// Each channel is created with some amount of backing buffer, and sends will
+/// *block* until buffer space becomes available. A buffer size of 0 is valid,
+/// which means that every successful send is paired with a successful recv.
+///
+/// This flavor of channels defines a new `send_opt` method for channels which
+/// is the method by which a message is sent but the task does not fail if it
+/// cannot be delivered.
+///
+/// Another major difference is that send() will *always* return back the data
+/// if it couldn't be sent. This is because it is deterministically known when
+/// the data is received and when it is not received.
+///
+/// Implementation-wise, it can all be summed up with "use a mutex plus some
+/// logic". The mutex used here is an OS native mutex, meaning that no user code
+/// is run inside of the mutex (to prevent context switching). This
+/// implementation shares almost all code for the buffered and unbuffered cases
+/// of a synchronous channel. There are a few branches for the unbuffered case,
+/// but they're mostly just relevant to blocking senders.
+
+use cast;
+use container::Container;
+use iter::Iterator;
+use kinds::Send;
+use mem;
+use ops::Drop;
+use option::{Some, None, Option};
+use ptr::RawPtr;
+use result::{Result, Ok, Err};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use sync::atomics;
+use ty::Unsafe;
+use unstable::mutex::{NativeMutex, LockGuard};
+use vec::Vec;
+
+pub struct Packet<T> {
+    /// Only field outside of the mutex. Just done for kicks, but mainly because
+    /// the other shared channel already had the code implemented
+    channels: atomics::AtomicUint,
+
+    /// The state field is protected by this mutex
+    lock: NativeMutex,
+    state: Unsafe<State<T>>,
+}
+
+struct State<T> {
+    disconnected: bool, // Is the channel disconnected yet?
+    queue: Queue,       // queue of senders waiting to send data
+    blocker: Blocker,   // currently blocked task on this channel
+    buf: Buffer<T>,     // storage for buffered messages
+    cap: uint,          // capacity of this channel
+
+    /// A curious flag used to indicate whether a sender failed or succeeded in
+    /// blocking. This is used to transmit information back to the task that it
+    /// must dequeue its message from the buffer because it was not received.
+    /// This is only relevant in the 0-buffer case. This obviously cannot be
+    /// safely constructed, but it's guaranteed to always have a valid pointer
+    /// value.
+    canceled: Option<&'static mut bool>,
+}
+
+/// Possible flavors of tasks who can be blocked on this channel.
+enum Blocker {
+    BlockedSender(BlockedTask),
+    BlockedReceiver(BlockedTask),
+    NoneBlocked
+}
+
+/// Simple queue for threading tasks together. Nodes are stack-allocated, so
+/// this structure is not safe at all
+struct Queue {
+    head: *mut Node,
+    tail: *mut Node,
+}
+
+struct Node {
+    task: Option<BlockedTask>,
+    next: *mut Node,
+}
+
+/// A simple ring-buffer
+struct Buffer<T> {
+    buf: Vec<Option<T>>,
+    start: uint,
+    size: uint,
+}
+
+#[deriving(Show)]
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
+/// in the meantime. This re-locks the mutex upon returning.
+fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
+        lock: &NativeMutex) {
+    let me: ~Task = Local::take();
+    me.deschedule(1, |task| {
+        match mem::replace(slot, f(task)) {
+            NoneBlocked => {}
+            _ => unreachable!(),
+        }
+        unsafe { lock.unlock_noguard(); }
+        Ok(())
+    });
+    unsafe { lock.lock_noguard(); }
+}
+
+/// Wakes up a task, dropping the lock at the correct time
+fn wakeup(task: BlockedTask, guard: LockGuard) {
+    // We need to be careful to wake up the waiting task *outside* of the mutex
+    // in case it incurs a context switch.
+    mem::drop(guard);
+    task.wake().map(|t| t.reawaken());
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new(cap: uint) -> Packet<T> {
+        Packet {
+            channels: atomics::AtomicUint::new(1),
+            lock: unsafe { NativeMutex::new() },
+            state: Unsafe::new(State {
+                disconnected: false,
+                blocker: NoneBlocked,
+                cap: cap,
+                canceled: None,
+                queue: Queue {
+                    head: 0 as *mut Node,
+                    tail: 0 as *mut Node,
+                },
+                buf: Buffer {
+                    buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
+                    start: 0,
+                    size: 0,
+                },
+            }),
+        }
+    }
+
+    // Locks this channel, returning a guard for the state and the mutable state
+    // itself. Care should be taken to ensure that the state does not escape the
+    // guard!
+    //
+    // Note that we're ok promoting an & reference to an &mut reference because
+    // the lock ensures that we're the only ones in the world with a pointer to
+    // the state.
+    fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
+        unsafe {
+            let guard = self.lock.lock();
+            (guard, &mut *self.state.get())
+        }
+    }
+
+    pub fn send(&self, t: T) -> Result<(), T> {
+        let (guard, state) = self.lock();
+
+        // wait for a slot to become available, and enqueue the data
+        while !state.disconnected && state.buf.size() == state.buf.cap() {
+            state.queue.enqueue(&self.lock);
+        }
+        if state.disconnected { return Err(t) }
+        state.buf.enqueue(t);
+
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            // if our capacity is 0, then we need to wait for a receiver to be
+            // available to take our data. After waiting, we check again to make
+            // sure the port didn't go away in the meantime. If it did, we need
+            // to hand back our data.
+            NoneBlocked if state.cap == 0 => {
+                let mut canceled = false;
+                assert!(state.canceled.is_none());
+                state.canceled = Some(unsafe { cast::transmute(&mut canceled) });
+                wait(&mut state.blocker, BlockedSender, &self.lock);
+                if canceled {Err(state.buf.dequeue())} else {Ok(())}
+            }
+
+            // success, we buffered some data
+            NoneBlocked => Ok(()),
+
+            // success, someone's about to receive our buffered data.
+            BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
+
+            BlockedSender(..) => fail!("lolwut"),
+        }
+    }
+
+    pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
+        let (guard, state) = self.lock();
+        if state.disconnected {
+            super::RecvDisconnected(t)
+        } else if state.buf.size() == state.buf.cap() {
+            super::Full(t)
+        } else if state.cap == 0 {
+            // With capacity 0, even though we have buffer space we can't
+            // transfer the data unless there's a receiver waiting.
+            match mem::replace(&mut state.blocker, NoneBlocked) {
+                NoneBlocked => super::Full(t),
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(task) => {
+                    state.buf.enqueue(t);
+                    wakeup(task, guard);
+                    super::Sent
+                }
+            }
+        } else {
+            // If the buffer has some space and the capacity isn't 0, then we
+            // just enqueue the data for later retrieval.
+            assert!(state.buf.size() < state.buf.cap());
+            state.buf.enqueue(t);
+            super::Sent
+        }
+    }
+
+    // Receives a message from this channel
+    //
+    // When reading this, remember that there can only ever be one receiver at
+    // time.
+    pub fn recv(&self) -> Option<T> {
+        let (guard, state) = self.lock();
+
+        // Wait for the buffer to have something in it. No need for a while loop
+        // because we're the only receiver.
+        let mut waited = false;
+        if !state.disconnected && state.buf.size() == 0 {
+            wait(&mut state.blocker, BlockedReceiver, &self.lock);
+            waited = true;
+        }
+        if state.disconnected && state.buf.size() == 0 { return None }
+
+        // Pick up the data, wake up our neighbors, and carry on
+        assert!(state.buf.size() > 0);
+        let ret = state.buf.dequeue();
+        self.wakeup_senders(waited, guard, state);
+        return Some(ret);
+    }
+
+    pub fn try_recv(&self) -> Result<T, Failure> {
+        let (guard, state) = self.lock();
+
+        // Easy cases first
+        if state.disconnected { return Err(Disconnected) }
+        if state.buf.size() == 0 { return Err(Empty) }
+
+        // Be sure to wake up neighbors
+        let ret = Ok(state.buf.dequeue());
+        self.wakeup_senders(false, guard, state);
+
+        return ret;
+    }
+
+    // Wake up pending senders after some data has been received
+    //
+    // * `waited` - flag if the receiver blocked to receive some data, or if it
+    //              just picked up some data on the way out
+    // * `guard` - the lock guard that is held over this channel's lock
+    fn wakeup_senders(&self, waited: bool,
+                      guard: LockGuard,
+                      state: &mut State<T>) {
+        let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
+
+        // If this is a no-buffer channel (cap == 0), then if we didn't wait we
+        // need to ACK the sender. If we waited, then the sender waking us up
+        // was already the ACK.
+        let pending_sender2 = if state.cap == 0 && !waited {
+            match mem::replace(&mut state.blocker, NoneBlocked) {
+                NoneBlocked => None,
+                BlockedReceiver(..) => unreachable!(),
+                BlockedSender(task) => {
+                    state.canceled.take();
+                    Some(task)
+                }
+            }
+        } else {
+            None
+        };
+        mem::drop((state, guard));
+
+        // only outside of the lock do we wake up the pending tasks
+        pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
+        pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&self) {
+        self.channels.fetch_add(1, atomics::SeqCst);
+    }
+
+    pub fn drop_chan(&self) {
+        // Only flag the channel as disconnected if we're the last channel
+        match self.channels.fetch_sub(1, atomics::SeqCst) {
+            1 => {}
+            _ => return
+        }
+
+        // Not much to do other than wake up a receiver if one's there
+        let (guard, state) = self.lock();
+        if state.disconnected { return }
+        state.disconnected = true;
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => {}
+            BlockedSender(..) => unreachable!(),
+            BlockedReceiver(task) => wakeup(task, guard),
+        }
+    }
+
+    pub fn drop_port(&self) {
+        let (guard, state) = self.lock();
+
+        if state.disconnected { return }
+        state.disconnected = true;
+
+        // If the capacity is 0, then the sender may want its data back after
+        // we're disconnected. Otherwise it's now our responsibility to destroy
+        // the buffered data. As with many other portions of this code, this
+        // needs to be careful to destroy the data *outside* of the lock to
+        // prevent deadlock.
+        let _data = if state.cap != 0 {
+            mem::replace(&mut state.buf.buf, Vec::new())
+        } else {
+            Vec::new()
+        };
+        let mut queue = mem::replace(&mut state.queue, Queue {
+            head: 0 as *mut Node,
+            tail: 0 as *mut Node,
+        });
+
+        let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => None,
+            BlockedSender(task) => {
+                *state.canceled.take_unwrap() = true;
+                Some(task)
+            }
+            BlockedReceiver(..) => unreachable!(),
+        };
+        mem::drop((state, guard));
+
+        loop {
+            match queue.dequeue() {
+                Some(task) => { task.wake().map(|t| t.reawaken()); }
+                None => break,
+            }
+        }
+        waiter.map(|t| t.wake().map(|t| t.reawaken()));
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // If Ok, the value is whether this port has data, if Err, then the upgraded
+    // port needs to be checked instead of this one.
+    pub fn can_recv(&self) -> bool {
+        let (_g, state) = self.lock();
+        state.disconnected || state.buf.size() > 0
+    }
+
+    // Attempts to start selection on this port. This can either succeed or fail
+    // because there is data waiting.
+    pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
+        let (_g, state) = self.lock();
+        if state.disconnected || state.buf.size() > 0 {
+            Err(task)
+        } else {
+            match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
+                NoneBlocked => {}
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(..) => unreachable!(),
+            }
+            Ok(())
+        }
+    }
+
+    // Remove a previous selecting task from this port. This ensures that the
+    // blocked task will no longer be visible to any other threads.
+    //
+    // The return value indicates whether there's data on this port.
+    pub fn abort_selection(&self) -> bool {
+        let (_g, state) = self.lock();
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => true,
+            BlockedSender(task) => {
+                state.blocker = BlockedSender(task);
+                true
+            }
+            BlockedReceiver(task) => { task.trash(); false }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        assert_eq!(self.channels.load(atomics::SeqCst), 0);
+        let (_g, state) = self.lock();
+        assert!(state.queue.dequeue().is_none());
+        assert!(state.canceled.is_none());
+    }
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Buffer, a simple ring buffer backed by Vec<T>
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Buffer<T> {
+    fn enqueue(&mut self, t: T) {
+        let pos = (self.start + self.size) % self.buf.len();
+        self.size += 1;
+        let prev = mem::replace(self.buf.get_mut(pos), Some(t));
+        assert!(prev.is_none());
+    }
+
+    fn dequeue(&mut self) -> T {
+        let start = self.start;
+        self.size -= 1;
+        self.start = (self.start + 1) % self.buf.len();
+        self.buf.get_mut(start).take_unwrap()
+    }
+
+    fn size(&self) -> uint { self.size }
+    fn cap(&self) -> uint { self.buf.len() }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
+////////////////////////////////////////////////////////////////////////////////
+
+impl Queue {
+    fn enqueue(&mut self, lock: &NativeMutex) {
+        let task: ~Task = Local::take();
+        let mut node = Node {
+            task: None,
+            next: 0 as *mut Node,
+        };
+        task.deschedule(1, |task| {
+            node.task = Some(task);
+            if self.tail.is_null() {
+                self.head = &mut node as *mut Node;
+                self.tail = &mut node as *mut Node;
+            } else {
+                unsafe {
+                    (*self.tail).next = &mut node as *mut Node;
+                    self.tail = &mut node as *mut Node;
+                }
+            }
+            unsafe { lock.unlock_noguard(); }
+            Ok(())
+        });
+        unsafe { lock.lock_noguard(); }
+        assert!(node.next.is_null());
+    }
+
+    fn dequeue(&mut self) -> Option<BlockedTask> {
+        if self.head.is_null() {
+            return None
+        }
+        let node = self.head;
+        self.head = unsafe { (*node).next };
+        if self.head.is_null() {
+            self.tail = 0 as *mut Node;
+        }
+        unsafe {
+            (*node).next = 0 as *mut Node;
+            Some((*node).task.take_unwrap())
+        }
+    }
+}
index 8681ab21f10c363f48604721a4a20e0aa744d894..e66aa8c004617ff12fad8363049cd8202259f434 100644 (file)
@@ -1282,10 +1282,10 @@ fn utime_noexist() {
     }
 
     iotest!(fn binary_file() {
-        use rand::{Rng, task_rng};
+        use rand::{StdRng, Rng};
 
         let mut bytes = [0, ..1024];
-        task_rng().fill_bytes(bytes);
+        StdRng::new().fill_bytes(bytes);
 
         let tmpdir = tmpdir();
 
index d487aa638ac09014593d2fad1b23ac46becdd4b0..a42ee80b53a51959276599b4e1028f1c1d2392b7 100644 (file)
@@ -62,7 +62,7 @@
 pub use vec::Vec;
 
 // Reexported runtime types
-pub use comm::{channel, Sender, Receiver};
+pub use comm::{sync_channel, channel, SyncSender, Sender, Receiver};
 pub use task::spawn;
 
 // Reexported statics
index fc9e571b270596ae8ed3630f186dd157475d0370..328de69691436112b77edae3a67f5df3e28e2310 100644 (file)
@@ -69,6 +69,7 @@ mod imp {
     use iter::Iterator;
     use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
     use mem;
+    #[cfg(not(test))] use ptr::RawPtr;
 
     static mut global_args_ptr: uint = 0;
     static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
index cd047c815e9ec94cd2bbef48c1f20563b20a87e4..ededc69c5a15b58350b3b25b99ab829917469d87 100644 (file)
@@ -433,8 +433,8 @@ fn unwind() {
 
     #[test]
     fn rng() {
-        use rand::{Rng, task_rng};
-        let mut r = task_rng();
+        use rand::{StdRng, Rng};
+        let mut r = StdRng::new();
         let _ = r.next_u32();
     }
 
index 9802271e28f269cdba8e3ecaf1fd54d6ceff29e0..6f5ef067e891a4f8bd0e0ca3a4f37137e10a1492 100644 (file)
@@ -580,9 +580,9 @@ fn smoke_lock() {
     fn smoke_cond() {
         static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
         unsafe {
-            let mut guard = lock.lock();
+            let guard = lock.lock();
             let t = Thread::start(proc() {
-                let mut guard = lock.lock();
+                let guard = lock.lock();
                 guard.signal();
             });
             guard.wait();
index 4b2953204d0b80c29530916a5ab03c0d948c4319..5809fca9682e6f920a94e5149d300a459d0d4835 100644 (file)
@@ -1355,13 +1355,8 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod tests {
-    use super::Vec;
-    use iter::{Iterator, range, Extendable};
-    use mem::{drop, size_of};
-    use ops::Drop;
-    use option::{Some, None};
-    use container::Container;
-    use slice::{Vector, MutableVector, ImmutableVector};
+    use prelude::*;
+    use mem::size_of;
 
     #[test]
     fn test_small_vec_struct() {
index aecea37cce8b563aca65478c5fd23ad8a1fe2486..628f6459badf9bf2a0361592ac465274a5cc4500 100644 (file)
@@ -51,54 +51,9 @@ pub fn recv_opt(&self) -> Option<R> {
     }
 }
 
-/// An extension of `pipes::stream` that provides synchronous message sending.
-pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> }
-/// An extension of `pipes::stream` that acknowledges each message received.
-pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> }
-
-impl<S: Send> SyncSender<S> {
-    pub fn send(&self, val: S) {
-        assert!(self.try_send(val), "SyncSender.send: receiving port closed");
-    }
-
-    /// Sends a message, or report if the receiver has closed the connection
-    /// before receiving.
-    pub fn try_send(&self, val: S) -> bool {
-        self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
-    }
-}
-
-impl<R: Send> SyncReceiver<R> {
-    pub fn recv(&self) -> R {
-        self.recv_opt().expect("SyncReceiver.recv: sending channel closed")
-    }
-
-    pub fn recv_opt(&self) -> Option<R> {
-        self.duplex_stream.recv_opt().map(|val| {
-            self.duplex_stream.try_send(());
-            val
-        })
-    }
-
-    pub fn try_recv(&self) -> comm::TryRecvResult<R> {
-        match self.duplex_stream.try_recv() {
-            comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
-            state => state,
-        }
-    }
-}
-
-/// Creates a stream whose channel, upon sending a message, blocks until the
-/// message is received.
-pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) {
-    let (chan_stream, port_stream) = duplex();
-    (SyncReceiver { duplex_stream: port_stream },
-     SyncSender { duplex_stream: chan_stream })
-}
-
 #[cfg(test)]
 mod test {
-    use comm::{duplex, rendezvous};
+    use comm::{duplex};
 
 
     #[test]
@@ -111,56 +66,4 @@ pub fn DuplexStream1() {
         assert!(left.recv() == 123);
         assert!(right.recv() == ~"abc");
     }
-
-    #[test]
-    pub fn basic_rendezvous_test() {
-        let (port, chan) = rendezvous();
-
-        spawn(proc() {
-            chan.send("abc");
-        });
-
-        assert!(port.recv() == "abc");
-    }
-
-    #[test]
-    fn recv_a_lot() {
-        // Rendezvous streams should be able to handle any number of messages being sent
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            for _ in range(0, 10000) { chan.send(()); }
-        });
-        for _ in range(0, 10000) { port.recv(); }
-    }
-
-    #[test]
-    fn send_and_fail_and_try_recv() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            chan.duplex_stream.send(()); // Can't access this field outside this module
-            fail!()
-        });
-        port.recv()
-    }
-
-    #[test]
-    fn try_send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            port.duplex_stream.recv();
-            fail!()
-        });
-        chan.try_send(());
-    }
-
-    #[test]
-    #[should_fail]
-    fn send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            port.duplex_stream.recv();
-            fail!()
-        });
-        chan.send(());
-    }
 }
index d166076e96e15044ebf415d87196975b13b408fc..4df644e3b23c12025e5958cd49468433c6783496 100644 (file)
@@ -25,7 +25,7 @@
 #[cfg(test)]
 #[phase(syntax, link)] extern crate log;
 
-pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex};
+pub use comm::{DuplexStream, duplex};
 pub use task_pool::TaskPool;
 pub use future::Future;
 pub use arc::{Arc, Weak};