]> git.lizzy.rs Git - rust.git/commitdiff
std: Make std::comm return types consistent
authorAlex Crichton <alex@alexcrichton.com>
Thu, 10 Apr 2014 17:53:49 +0000 (10:53 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Fri, 11 Apr 2014 04:41:19 +0000 (21:41 -0700)
There are currently a number of return values from the std::comm methods, not
all of which are necessarily completely expressive:

  Sender::try_send(t: T) -> bool
    This method currently doesn't transmit back the data `t` if the send fails
    due to the other end having disconnected. Additionally, this shares the name
    of the synchronous try_send method, but it differs in semantics in that it
    only has one failure case, not two (the buffer can never be full).

  SyncSender::try_send(t: T) -> TrySendResult<T>
    This method accurately conveys all possible information, but it uses a
    custom type to the std::comm module with no convenience methods on it.
    Additionally, if you want to inspect the result you're forced to import
    something from `std::comm`.

  SyncSender::send_opt(t: T) -> Option<T>
    This method uses Some(T) as an "error value" and None as a "success value",
    but almost all other uses of Option<T> have Some/None the other way

  Receiver::try_recv(t: T) -> TryRecvResult<T>
    Similarly to the synchronous try_send, this custom return type is lacking in
    terms of usability (no convenience methods).

With this number of drawbacks in mind, I believed it was time to re-work the
return types of these methods. The new API for the comm module is:

  Sender::send(t: T) -> ()
  Sender::send_opt(t: T) -> Result<(), T>
  SyncSender::send(t: T) -> ()
  SyncSender::send_opt(t: T) -> Result<(), T>
  SyncSender::try_send(t: T) -> Result<(), TrySendError<T>>
  Receiver::recv() -> T
  Receiver::recv_opt() -> Result<T, ()>
  Receiver::try_recv() -> Result<T, TryRecvError>

The notable changes made are:

* Sender::try_send => Sender::send_opt. This renaming brings the semantics in
  line with the SyncSender::send_opt method. An asychronous send only has one
  failure case, unlike the synchronous try_send method which has two failure
  cases (full/disconnected).

* Sender::send_opt returns the data back to the caller if the send is guaranteed
  to fail. This method previously returned `bool`, but then it was unable to
  retrieve the data if the data was guaranteed to fail to send. There is still a
  race such that when `Ok(())` is returned the data could still fail to be
  received, but that's inherent to an asynchronous channel.

* Result is now the basis of all return values. This not only adds lots of
  convenience methods to all return values for free, but it also means that you
  can inspect the return values with no extra imports (Ok/Err are in the
  prelude). Additionally, it's now self documenting when something failed or not
  because the return value has "Err" in the name.

Things I'm a little uneasy about:

* The methods send_opt and recv_opt are not returning options, but rather
  results. I felt more strongly that Option was the wrong return type than the
  _opt prefix was wrong, and I coudn't think of a much better name for these
  methods. One possible way to think about them is to read the _opt suffix as
  "optionally".

* Result<T, ()> is often better expressed as Option<T>. This is only applicable
  to the recv_opt() method, but I thought it would be more consistent for
  everything to return Result rather than one method returning an Option.

Despite my two reasons to feel uneasy, I feel much better about the consistency
in return values at this point, and I think the only real open question is if
there's a better suffix for {send,recv}_opt.

Closes #11527

27 files changed:
src/libgreen/sched.rs
src/libgreen/task.rs
src/libnative/io/timer_other.rs
src/libnative/io/timer_timerfd.rs
src/libnative/io/timer_win32.rs
src/libnative/task.rs
src/librustuv/net.rs
src/librustuv/signal.rs
src/librustuv/timer.rs
src/libstd/comm/mod.rs
src/libstd/comm/oneshot.rs
src/libstd/comm/select.rs
src/libstd/comm/shared.rs
src/libstd/comm/stream.rs
src/libstd/comm/sync.rs
src/libstd/io/comm_adapters.rs
src/libstd/io/net/udp.rs
src/libstd/io/signal.rs
src/libstd/io/timer.rs
src/libstd/rt/task.rs
src/libsync/comm.rs
src/libsync/lock.rs
src/libsync/raw.rs
src/test/bench/msgsend-pipes-shared.rs
src/test/bench/msgsend-pipes.rs
src/test/bench/task-perf-jargon-metal-smoke.rs
src/test/run-pass/issue-9396.rs

index 9971dfee828157e0c5f5a05af6138705e62d0d73..e214797d4f84bdfb547541348ca42a331da3e119 100644 (file)
@@ -1011,7 +1011,6 @@ fn new_sched_rng() -> XorShiftRng {
 mod test {
     use rustuv;
 
-    use std::comm;
     use std::task::TaskOpts;
     use std::rt::task::Task;
     use std::rt::local::Local;
@@ -1428,7 +1427,7 @@ fn dont_starve_1() {
             // This task should not be able to starve the sender;
             // The sender should get stolen to another thread.
             spawn(proc() {
-                while rx.try_recv() != comm::Data(()) { }
+                while rx.try_recv().is_err() { }
             });
 
             tx.send(());
@@ -1445,7 +1444,7 @@ fn dont_starve_2() {
             // This task should not be able to starve the other task.
             // The sends should eventually yield.
             spawn(proc() {
-                while rx1.try_recv() != comm::Data(()) {
+                while rx1.try_recv().is_err() {
                     tx2.send(());
                 }
             });
@@ -1499,7 +1498,7 @@ fn pingpong(po: &Receiver<int>, ch: &Sender<int>) {
                     let mut val = 20;
                     while val > 0 {
                         val = po.recv();
-                        ch.try_send(val - 1);
+                        let _ = ch.send_opt(val - 1);
                     }
                 }
 
index 6fa40c0e42b646c53fd62853b65086c3e1f4540a..534e9f8401e9d718580143efc40f4bab0dd21509 100644 (file)
@@ -515,7 +515,7 @@ fn smoke_fail() {
             let _tx = tx;
             fail!()
         });
-        assert_eq!(rx.recv_opt(), None);
+        assert_eq!(rx.recv_opt(), Err(()));
     }
 
     #[test]
index 569b4cbb258e0fa0b2d2f097c65e6e20a1ff208a..0bf97d58ffdff5313c462223c430c949fce7f8aa 100644 (file)
@@ -46,7 +46,6 @@
 //!
 //! Note that all time units in this file are in *milliseconds*.
 
-use std::comm::Data;
 use libc;
 use std::mem;
 use std::os;
@@ -119,7 +118,7 @@ fn signal(active: &mut Vec<~Inner>, dead: &mut Vec<(uint, ~Inner)>) {
             Some(timer) => timer, None => return
         };
         let tx = timer.tx.take_unwrap();
-        if tx.try_send(()) && timer.repeat {
+        if tx.send_opt(()).is_ok() && timer.repeat {
             timer.tx = Some(tx);
             timer.target += timer.interval;
             insert(timer, active);
@@ -162,14 +161,14 @@ fn signal(active: &mut Vec<~Inner>, dead: &mut Vec<(uint, ~Inner)>) {
             1 => {
                 loop {
                     match messages.try_recv() {
-                        Data(Shutdown) => {
+                        Ok(Shutdown) => {
                             assert!(active.len() == 0);
                             break 'outer;
                         }
 
-                        Data(NewTimer(timer)) => insert(timer, &mut active),
+                        Ok(NewTimer(timer)) => insert(timer, &mut active),
 
-                        Data(RemoveTimer(id, ack)) => {
+                        Ok(RemoveTimer(id, ack)) => {
                             match dead.iter().position(|&(i, _)| id == i) {
                                 Some(i) => {
                                     let (_, i) = dead.remove(i).unwrap();
index d37a39fc30e8d6aa11774d1b9a9e6020daa9cc18..3fd61dc1da5d09086cf0e711905a78d9560c003b 100644 (file)
@@ -28,7 +28,6 @@
 //!
 //! As with timer_other, all units in this file are in units of millseconds.
 
-use std::comm::Data;
 use libc;
 use std::ptr;
 use std::os;
@@ -107,7 +106,7 @@ fn del(efd: libc::c_int, fd: libc::c_int) {
                     match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) {
                         Some(i) => {
                             let (_, ref c, oneshot) = *list.get(i);
-                            (!c.try_send(()) || oneshot, i)
+                            (c.send_opt(()).is_err() || oneshot, i)
                         }
                         None => fail!("fd not active: {}", fd),
                     }
@@ -121,7 +120,7 @@ fn del(efd: libc::c_int, fd: libc::c_int) {
 
         while incoming {
             match messages.try_recv() {
-                Data(NewTimer(fd, chan, one, timeval)) => {
+                Ok(NewTimer(fd, chan, one, timeval)) => {
                     // acknowledge we have the new channel, we will never send
                     // another message to the old channel
                     chan.send(());
@@ -149,7 +148,7 @@ fn del(efd: libc::c_int, fd: libc::c_int) {
                     assert_eq!(ret, 0);
                 }
 
-                Data(RemoveTimer(fd, chan)) => {
+                Ok(RemoveTimer(fd, chan)) => {
                     match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) {
                         Some(i) => {
                             drop(list.remove(i));
@@ -160,7 +159,7 @@ fn del(efd: libc::c_int, fd: libc::c_int) {
                     chan.send(());
                 }
 
-                Data(Shutdown) => {
+                Ok(Shutdown) => {
                     assert!(list.len() == 0);
                     break 'outer;
                 }
index 8b7592783da044dbaada0ff9125f34630e5d21fd..a15898feb92b793612a478c441fbd5f4eb376a68 100644 (file)
@@ -20,7 +20,6 @@
 //! Other than that, the implementation is pretty straightforward in terms of
 //! the other two implementations of timers with nothing *that* new showing up.
 
-use std::comm::Data;
 use libc;
 use std::ptr;
 use std::rt::rtio;
@@ -54,11 +53,11 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
         if idx == 0 {
             loop {
                 match messages.try_recv() {
-                    Data(NewTimer(obj, c, one)) => {
+                    Ok(NewTimer(obj, c, one)) => {
                         objs.push(obj);
                         chans.push((c, one));
                     }
-                    Data(RemoveTimer(obj, c)) => {
+                    Ok(RemoveTimer(obj, c)) => {
                         c.send(());
                         match objs.iter().position(|&o| o == obj) {
                             Some(i) => {
@@ -68,7 +67,7 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
                             None => {}
                         }
                     }
-                    Data(Shutdown) => {
+                    Ok(Shutdown) => {
                         assert_eq!(objs.len(), 1);
                         assert_eq!(chans.len(), 0);
                         break 'outer;
@@ -79,7 +78,7 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
         } else {
             let remove = {
                 match chans.get(idx as uint - 1) {
-                    &(ref c, oneshot) => !c.try_send(()) || oneshot
+                    &(ref c, oneshot) => c.send_opt(()).is_err() || oneshot
                 }
             };
             if remove {
index 871fc94bde46a73641ebf4306ebe81c6ba2372fe..ddfd46ecad9b5bc9fa07d0c3c7850d7aa20cf50e 100644 (file)
@@ -274,7 +274,7 @@ fn smoke_fail() {
             let _tx = tx;
             fail!()
         });
-        assert_eq!(rx.recv_opt(), None);
+        assert_eq!(rx.recv_opt(), Err(()));
     }
 
     #[test]
index 4d4b62dddd4a0642cc986db228cb9bdb1f7c2e99..b893f5f693fa73beba94e24b5695b26c9d9591ea 100644 (file)
@@ -1065,7 +1065,7 @@ fn test_read_and_block() {
             }
             reads += 1;
 
-            tx2.try_send(());
+            let _ = tx2.send_opt(());
         }
 
         // Make sure we had multiple reads
index c38b4fdd96faf8d1680fe9e83ba6e335ec76c8a2..2dcf2de681c39b8b0f7bcb125e524aa9557a058a 100644 (file)
@@ -51,7 +51,7 @@ pub fn new(io: &mut UvIoFactory, signum: Signum,
 extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
     let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
     assert_eq!(signum as int, s.signal as int);
-    s.channel.try_send(s.signal);
+    let _ = s.channel.send_opt(s.signal);
 }
 
 impl HomingIO for SignalWatcher {
index 3d323382ad5368abf2aca5a34ed94a0be9924ad4..58008002837d1fdd7612883db9b1c14763b49484 100644 (file)
@@ -140,9 +140,9 @@ fn period(&mut self, msecs: u64) -> Receiver<()> {
             let task = timer.blocker.take_unwrap();
             let _ = task.wake().map(|t| t.reawaken());
         }
-        SendOnce(chan) => { let _ = chan.try_send(()); }
+        SendOnce(chan) => { let _ = chan.send_opt(()); }
         SendMany(chan, id) => {
-            let _ = chan.try_send(());
+            let _ = chan.send_opt(());
 
             // Note that the above operation could have performed some form of
             // scheduling. This means that the timer may have decided to insert
@@ -196,8 +196,8 @@ fn override() {
         let oport = timer.oneshot(1);
         let pport = timer.period(1);
         timer.sleep(1);
-        assert_eq!(oport.recv_opt(), None);
-        assert_eq!(pport.recv_opt(), None);
+        assert_eq!(oport.recv_opt(), Err(()));
+        assert_eq!(pport.recv_opt(), Err(()));
         timer.oneshot(1).recv();
     }
 
@@ -284,7 +284,7 @@ fn sender_goes_away_oneshot() {
             let mut timer = TimerWatcher::new(local_loop());
             timer.oneshot(1000)
         };
-        assert_eq!(port.recv_opt(), None);
+        assert_eq!(port.recv_opt(), Err(()));
     }
 
     #[test]
@@ -293,7 +293,7 @@ fn sender_goes_away_period() {
             let mut timer = TimerWatcher::new(local_loop());
             timer.period(1000)
         };
-        assert_eq!(port.recv_opt(), None);
+        assert_eq!(port.recv_opt(), Err(()));
     }
 
     #[test]
index f210bfc88bc351d1574704f4e6549ee13e1f6bc1..58781c01d662e69af43816752572978871881d3c 100644 (file)
@@ -322,25 +322,19 @@ pub struct SyncSender<T> {
 /// This enumeration is the list of the possible reasons that try_recv could not
 /// return data when called.
 #[deriving(Eq, Clone, Show)]
-pub enum TryRecvResult<T> {
+pub enum TryRecvError {
     /// This channel is currently empty, but the sender(s) have not yet
     /// disconnected, so data may yet become available.
     Empty,
     /// This channel's sending half has become disconnected, and there will
     /// never be any more data received on this channel
     Disconnected,
-    /// The channel had some data and we successfully popped it
-    Data(T),
 }
 
-/// This enumeration is the list of the possible outcomes for the
+/// This enumeration is the list of the possible error outcomes for the
 /// `SyncSender::try_send` method.
 #[deriving(Eq, Clone, Show)]
-pub enum TrySendResult<T> {
-    /// The data was successfully sent along the channel. This either means that
-    /// it was buffered in the channel, or handed off to a receiver. In either
-    /// case, the callee no longer has ownership of the data.
-    Sent,
+pub enum TrySendError<T> {
     /// The data could not be sent on the channel because it would require that
     /// the callee block to send the data.
     ///
@@ -365,7 +359,7 @@ enum Flavor<T> {
 /// of `Receiver` and `Sender` to see what's possible with them.
 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
     let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
-    (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
+    (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
 }
 
 /// Creates a new synchronous, bounded channel.
@@ -401,7 +395,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
 /// ```
 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
     let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
-    (SyncSender::new(a), Receiver::my_new(Sync(b)))
+    (SyncSender::new(a), Receiver::new(Sync(b)))
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -409,7 +403,7 @@ pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T: Send> Sender<T> {
-    fn my_new(inner: Flavor<T>) -> Sender<T> {
+    fn new(inner: Flavor<T>) -> Sender<T> {
         Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
     }
 
@@ -433,25 +427,42 @@ fn my_new(inner: Flavor<T>) -> Sender<T> {
     /// The purpose of this functionality is to propagate failure among tasks.
     /// If failure is not desired, then consider using the `try_send` method
     pub fn send(&self, t: T) {
-        if !self.try_send(t) {
+        if self.send_opt(t).is_err() {
             fail!("sending on a closed channel");
         }
     }
 
-    /// Attempts to send a value on this channel, returning whether it was
-    /// successfully sent.
+    /// Attempts to send a value on this channel, returning it back if it could
+    /// not be sent.
     ///
     /// A successful send occurs when it is determined that the other end of
     /// the channel has not hung up already. An unsuccessful send would be one
     /// where the corresponding receiver has already been deallocated. Note
-    /// that a return value of `false` means that the data will never be
-    /// received, but a return value of `true` does *not* mean that the data
+    /// that a return value of `Err` means that the data will never be
+    /// received, but a return value of `Ok` does *not* mean that the data
     /// will be received.  It is possible for the corresponding receiver to
-    /// hang up immediately after this function returns `true`.
+    /// hang up immediately after this function returns `Ok`.
     ///
-    /// Like `send`, this method will never block. If the failure of send cannot
-    /// be tolerated, then this method should be used instead.
-    pub fn try_send(&self, t: T) -> bool {
+    /// Like `send`, this method will never block.
+    ///
+    /// # Failure
+    ///
+    /// This method will never fail, it will return the message back to the
+    /// caller if the other end is disconnected
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// let (tx, rx) = channel();
+    ///
+    /// // This send is always successful
+    /// assert_eq!(tx.send_opt(1), Ok(()));
+    ///
+    /// // This send will fail because the receiver is gone
+    /// drop(rx);
+    /// assert_eq!(tx.send_opt(1), Err(1));
+    /// ```
+    pub fn send_opt(&self, t: T) -> Result<(), T> {
         // In order to prevent starvation of other tasks in situations where
         // a task sends repeatedly without ever receiving, we occassionally
         // yield instead of doing a send immediately.
@@ -475,16 +486,19 @@ pub fn try_send(&self, t: T) -> bool {
                         return (*p).send(t);
                     } else {
                         let (a, b) = UnsafeArc::new2(stream::Packet::new());
-                        match (*p).upgrade(Receiver::my_new(Stream(b))) {
+                        match (*p).upgrade(Receiver::new(Stream(b))) {
                             oneshot::UpSuccess => {
-                                (*a.get()).send(t);
-                                (a, true)
+                                let ret = (*a.get()).send(t);
+                                (a, ret)
                             }
-                            oneshot::UpDisconnected => (a, false),
+                            oneshot::UpDisconnected => (a, Err(t)),
                             oneshot::UpWoke(task) => {
-                                (*a.get()).send(t);
+                                // This send cannot fail because the task is
+                                // asleep (we're looking at it), so the receiver
+                                // can't go away.
+                                (*a.get()).send(t).unwrap();
                                 task.wake().map(|t| t.reawaken());
-                                (a, true)
+                                (a, Ok(()))
                             }
                         }
                     }
@@ -496,7 +510,7 @@ pub fn try_send(&self, t: T) -> bool {
         };
 
         unsafe {
-            let mut tmp = Sender::my_new(Stream(new_inner));
+            let mut tmp = Sender::new(Stream(new_inner));
             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
         }
         return ret;
@@ -508,21 +522,21 @@ fn clone(&self) -> Sender<T> {
         let (packet, sleeper) = match self.inner {
             Oneshot(ref p) => {
                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
-                match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
+                match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
                     oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
                     oneshot::UpWoke(task) => (b, Some(task))
                 }
             }
             Stream(ref p) => {
                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
-                match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
+                match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
                     stream::UpSuccess | stream::UpDisconnected => (b, None),
                     stream::UpWoke(task) => (b, Some(task)),
                 }
             }
             Shared(ref p) => {
                 unsafe { (*p.get()).clone_chan(); }
-                return Sender::my_new(Shared(p.clone()));
+                return Sender::new(Shared(p.clone()));
             }
             Sync(..) => unreachable!(),
         };
@@ -530,10 +544,10 @@ fn clone(&self) -> Sender<T> {
         unsafe {
             (*packet.get()).inherit_blocker(sleeper);
 
-            let mut tmp = Sender::my_new(Shared(packet.clone()));
+            let mut tmp = Sender::new(Shared(packet.clone()));
             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
         }
-        Sender::my_new(Shared(packet))
+        Sender::new(Shared(packet))
     }
 }
 
@@ -579,7 +593,7 @@ fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
     /// `SyncSender::send_opt` method which will not fail if the receiver
     /// disconnects.
     pub fn send(&self, t: T) {
-        if self.send_opt(t).is_some() {
+        if self.send_opt(t).is_err() {
             fail!("sending on a closed channel");
         }
     }
@@ -595,11 +609,8 @@ pub fn send(&self, t: T) {
     /// # Failure
     ///
     /// This function cannot fail.
-    pub fn send_opt(&self, t: T) -> Option<T> {
-        match unsafe { (*self.inner.get()).send(t) } {
-            Ok(()) => None,
-            Err(t) => Some(t),
-        }
+    pub fn send_opt(&self, t: T) -> Result<(), T> {
+        unsafe { (*self.inner.get()).send(t) }
     }
 
     /// Attempts to send a value on this channel without blocking.
@@ -615,7 +626,7 @@ pub fn send_opt(&self, t: T) -> Option<T> {
     /// # Failure
     ///
     /// This function cannot fail
-    pub fn try_send(&self, t: T) -> TrySendResult<T> {
+    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
         unsafe { (*self.inner.get()).try_send(t) }
     }
 }
@@ -639,7 +650,7 @@ fn drop(&mut self) {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T: Send> Receiver<T> {
-    fn my_new(inner: Flavor<T>) -> Receiver<T> {
+    fn new(inner: Flavor<T>) -> Receiver<T> {
         Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
     }
 
@@ -664,8 +675,8 @@ fn my_new(inner: Flavor<T>) -> Receiver<T> {
     ///   peek at a value on this receiver.
     pub fn recv(&self) -> T {
         match self.recv_opt() {
-            Some(t) => t,
-            None => fail!("receiving on a closed channel"),
+            Ok(t) => t,
+            Err(()) => fail!("receiving on a closed channel"),
         }
     }
 
@@ -679,7 +690,7 @@ pub fn recv(&self) -> T {
     /// block on a receiver.
     ///
     /// This function cannot fail.
-    pub fn try_recv(&self) -> TryRecvResult<T> {
+    pub fn try_recv(&self) -> Result<T, TryRecvError> {
         // If a thread is spinning in try_recv, we should take the opportunity
         // to reschedule things occasionally. See notes above in scheduling on
         // sends for why this doesn't always hit TLS, and also for why this uses
@@ -695,32 +706,32 @@ pub fn try_recv(&self) -> TryRecvResult<T> {
             let mut new_port = match self.inner {
                 Oneshot(ref p) => {
                     match unsafe { (*p.get()).try_recv() } {
-                        Ok(t) => return Data(t),
-                        Err(oneshot::Empty) => return Empty,
-                        Err(oneshot::Disconnected) => return Disconnected,
+                        Ok(t) => return Ok(t),
+                        Err(oneshot::Empty) => return Err(Empty),
+                        Err(oneshot::Disconnected) => return Err(Disconnected),
                         Err(oneshot::Upgraded(rx)) => rx,
                     }
                 }
                 Stream(ref p) => {
                     match unsafe { (*p.get()).try_recv() } {
-                        Ok(t) => return Data(t),
-                        Err(stream::Empty) => return Empty,
-                        Err(stream::Disconnected) => return Disconnected,
+                        Ok(t) => return Ok(t),
+                        Err(stream::Empty) => return Err(Empty),
+                        Err(stream::Disconnected) => return Err(Disconnected),
                         Err(stream::Upgraded(rx)) => rx,
                     }
                 }
                 Shared(ref p) => {
                     match unsafe { (*p.get()).try_recv() } {
-                        Ok(t) => return Data(t),
-                        Err(shared::Empty) => return Empty,
-                        Err(shared::Disconnected) => return Disconnected,
+                        Ok(t) => return Ok(t),
+                        Err(shared::Empty) => return Err(Empty),
+                        Err(shared::Disconnected) => return Err(Disconnected),
                     }
                 }
                 Sync(ref p) => {
                     match unsafe { (*p.get()).try_recv() } {
-                        Ok(t) => return Data(t),
-                        Err(sync::Empty) => return Empty,
-                        Err(sync::Disconnected) => return Disconnected,
+                        Ok(t) => return Ok(t),
+                        Err(sync::Empty) => return Err(Empty),
+                        Err(sync::Disconnected) => return Err(Disconnected),
                     }
                 }
             };
@@ -741,32 +752,32 @@ pub fn try_recv(&self) -> TryRecvResult<T> {
     /// In other words, this function has the same semantics as the `recv`
     /// method except for the failure aspect.
     ///
-    /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
+    /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
     /// the value found on the receiver is returned.
-    pub fn recv_opt(&self) -> Option<T> {
+    pub fn recv_opt(&self) -> Result<T, ()> {
         loop {
             let mut new_port = match self.inner {
                 Oneshot(ref p) => {
                     match unsafe { (*p.get()).recv() } {
-                        Ok(t) => return Some(t),
+                        Ok(t) => return Ok(t),
                         Err(oneshot::Empty) => return unreachable!(),
-                        Err(oneshot::Disconnected) => return None,
+                        Err(oneshot::Disconnected) => return Err(()),
                         Err(oneshot::Upgraded(rx)) => rx,
                     }
                 }
                 Stream(ref p) => {
                     match unsafe { (*p.get()).recv() } {
-                        Ok(t) => return Some(t),
+                        Ok(t) => return Ok(t),
                         Err(stream::Empty) => return unreachable!(),
-                        Err(stream::Disconnected) => return None,
+                        Err(stream::Disconnected) => return Err(()),
                         Err(stream::Upgraded(rx)) => rx,
                     }
                 }
                 Shared(ref p) => {
                     match unsafe { (*p.get()).recv() } {
-                        Ok(t) => return Some(t),
+                        Ok(t) => return Ok(t),
                         Err(shared::Empty) => return unreachable!(),
-                        Err(shared::Disconnected) => return None,
+                        Err(shared::Disconnected) => return Err(()),
                     }
                 }
                 Sync(ref p) => return unsafe { (*p.get()).recv() }
@@ -873,7 +884,7 @@ fn abort_selection(&self) -> bool {
 }
 
 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
-    fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
+    fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
 }
 
 #[unsafe_destructor]
@@ -1022,7 +1033,7 @@ pub fn stress_factor() -> uint {
                 assert_eq!(rx.recv(), 1);
             }
             match rx.try_recv() {
-                Data(..) => fail!(),
+                Ok(..) => fail!(),
                 _ => {}
             }
             dtx.send(());
@@ -1136,45 +1147,45 @@ fn no_runtime() {
 
     test!(fn oneshot_single_thread_try_send_open() {
         let (tx, rx) = channel::<int>();
-        assert!(tx.try_send(10));
+        assert!(tx.send_opt(10).is_ok());
         assert!(rx.recv() == 10);
     })
 
     test!(fn oneshot_single_thread_try_send_closed() {
         let (tx, rx) = channel::<int>();
         drop(rx);
-        assert!(!tx.try_send(10));
+        assert!(tx.send_opt(10).is_err());
     })
 
     test!(fn oneshot_single_thread_try_recv_open() {
         let (tx, rx) = channel::<int>();
         tx.send(10);
-        assert!(rx.recv_opt() == Some(10));
+        assert!(rx.recv_opt() == Ok(10));
     })
 
     test!(fn oneshot_single_thread_try_recv_closed() {
         let (tx, rx) = channel::<int>();
         drop(tx);
-        assert!(rx.recv_opt() == None);
+        assert!(rx.recv_opt() == Err(()));
     })
 
     test!(fn oneshot_single_thread_peek_data() {
         let (tx, rx) = channel::<int>();
-        assert_eq!(rx.try_recv(), Empty)
+        assert_eq!(rx.try_recv(), Err(Empty))
         tx.send(10);
-        assert_eq!(rx.try_recv(), Data(10));
+        assert_eq!(rx.try_recv(), Ok(10));
     })
 
     test!(fn oneshot_single_thread_peek_close() {
         let (tx, rx) = channel::<int>();
         drop(tx);
-        assert_eq!(rx.try_recv(), Disconnected);
-        assert_eq!(rx.try_recv(), Disconnected);
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+        assert_eq!(rx.try_recv(), Err(Disconnected));
     })
 
     test!(fn oneshot_single_thread_peek_open() {
         let (_tx, rx) = channel::<int>();
-        assert_eq!(rx.try_recv(), Empty);
+        assert_eq!(rx.try_recv(), Err(Empty));
     })
 
     test!(fn oneshot_multi_task_recv_then_send() {
@@ -1335,7 +1346,7 @@ fn recv(rx: Receiver<~int>, i: int) {
         tx.send(2);
         tx.send(2);
         tx.send(2);
-        tx.try_send(2);
+        let _ = tx.send_opt(2);
         drop(tx);
         assert_eq!(count_rx.recv(), 4);
     })
@@ -1353,14 +1364,14 @@ fn recv(rx: Receiver<~int>, i: int) {
             tx3.send(());
         });
 
-        assert_eq!(rx1.try_recv(), Empty);
+        assert_eq!(rx1.try_recv(), Err(Empty));
         tx2.send(());
         rx3.recv();
-        assert_eq!(rx1.try_recv(), Data(1));
-        assert_eq!(rx1.try_recv(), Empty);
+        assert_eq!(rx1.try_recv(), Ok(1));
+        assert_eq!(rx1.try_recv(), Err(Empty));
         tx2.send(());
         rx3.recv();
-        assert_eq!(rx1.try_recv(), Disconnected);
+        assert_eq!(rx1.try_recv(), Err(Disconnected));
     })
 
     // This bug used to end up in a livelock inside of the Receiver destructor
@@ -1409,9 +1420,9 @@ fn recv(rx: Receiver<~int>, i: int) {
             let mut hits = 0;
             while hits < 10 {
                 match rx.try_recv() {
-                    Data(()) => { hits += 1; }
-                    Empty => { Thread::yield_now(); }
-                    Disconnected => return,
+                    Ok(()) => { hits += 1; }
+                    Err(Empty) => { Thread::yield_now(); }
+                    Err(Disconnected) => return,
                 }
             }
             cdone.send(());
@@ -1542,7 +1553,7 @@ pub fn stress_factor() -> uint {
                 assert_eq!(rx.recv(), 1);
             }
             match rx.try_recv() {
-                Data(..) => fail!(),
+                Ok(..) => fail!(),
                 _ => {}
             }
             dtx.send(());
@@ -1596,50 +1607,50 @@ pub fn stress_factor() -> uint {
 
     test!(fn oneshot_single_thread_try_send_open() {
         let (tx, rx) = sync_channel::<int>(1);
-        assert_eq!(tx.try_send(10), Sent);
+        assert_eq!(tx.try_send(10), Ok(()));
         assert!(rx.recv() == 10);
     })
 
     test!(fn oneshot_single_thread_try_send_closed() {
         let (tx, rx) = sync_channel::<int>(0);
         drop(rx);
-        assert_eq!(tx.try_send(10), RecvDisconnected(10));
+        assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
     })
 
     test!(fn oneshot_single_thread_try_send_closed2() {
         let (tx, _rx) = sync_channel::<int>(0);
-        assert_eq!(tx.try_send(10), Full(10));
+        assert_eq!(tx.try_send(10), Err(Full(10)));
     })
 
     test!(fn oneshot_single_thread_try_recv_open() {
         let (tx, rx) = sync_channel::<int>(1);
         tx.send(10);
-        assert!(rx.recv_opt() == Some(10));
+        assert!(rx.recv_opt() == Ok(10));
     })
 
     test!(fn oneshot_single_thread_try_recv_closed() {
         let (tx, rx) = sync_channel::<int>(0);
         drop(tx);
-        assert!(rx.recv_opt() == None);
+        assert!(rx.recv_opt() == Err(()));
     })
 
     test!(fn oneshot_single_thread_peek_data() {
         let (tx, rx) = sync_channel::<int>(1);
-        assert_eq!(rx.try_recv(), Empty)
+        assert_eq!(rx.try_recv(), Err(Empty))
         tx.send(10);
-        assert_eq!(rx.try_recv(), Data(10));
+        assert_eq!(rx.try_recv(), Ok(10));
     })
 
     test!(fn oneshot_single_thread_peek_close() {
         let (tx, rx) = sync_channel::<int>(0);
         drop(tx);
-        assert_eq!(rx.try_recv(), Disconnected);
-        assert_eq!(rx.try_recv(), Disconnected);
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+        assert_eq!(rx.try_recv(), Err(Disconnected));
     })
 
     test!(fn oneshot_single_thread_peek_open() {
         let (_tx, rx) = sync_channel::<int>(0);
-        assert_eq!(rx.try_recv(), Empty);
+        assert_eq!(rx.try_recv(), Err(Empty));
     })
 
     test!(fn oneshot_multi_task_recv_then_send() {
@@ -1800,7 +1811,7 @@ fn recv(rx: Receiver<~int>, i: int) {
         tx.send(2);
         tx.send(2);
         tx.send(2);
-        tx.try_send(2);
+        let _ = tx.try_send(2);
         drop(tx);
         assert_eq!(count_rx.recv(), 4);
     })
@@ -1818,14 +1829,14 @@ fn recv(rx: Receiver<~int>, i: int) {
             tx3.send(());
         });
 
-        assert_eq!(rx1.try_recv(), Empty);
+        assert_eq!(rx1.try_recv(), Err(Empty));
         tx2.send(());
         rx3.recv();
-        assert_eq!(rx1.try_recv(), Data(1));
-        assert_eq!(rx1.try_recv(), Empty);
+        assert_eq!(rx1.try_recv(), Ok(1));
+        assert_eq!(rx1.try_recv(), Err(Empty));
         tx2.send(());
         rx3.recv();
-        assert_eq!(rx1.try_recv(), Disconnected);
+        assert_eq!(rx1.try_recv(), Err(Disconnected));
     })
 
     // This bug used to end up in a livelock inside of the Receiver destructor
@@ -1859,9 +1870,9 @@ fn recv(rx: Receiver<~int>, i: int) {
             let mut hits = 0;
             while hits < 10 {
                 match rx.try_recv() {
-                    Data(()) => { hits += 1; }
-                    Empty => { Thread::yield_now(); }
-                    Disconnected => return,
+                    Ok(()) => { hits += 1; }
+                    Err(Empty) => { Thread::yield_now(); }
+                    Err(Disconnected) => return,
                 }
             }
             cdone.send(());
@@ -1876,20 +1887,20 @@ fn recv(rx: Receiver<~int>, i: int) {
     test!(fn send_opt1() {
         let (tx, rx) = sync_channel(0);
         spawn(proc() { rx.recv(); });
-        assert_eq!(tx.send_opt(1), None);
+        assert_eq!(tx.send_opt(1), Ok(()));
     })
 
     test!(fn send_opt2() {
         let (tx, rx) = sync_channel(0);
         spawn(proc() { drop(rx); });
-        assert_eq!(tx.send_opt(1), Some(1));
+        assert_eq!(tx.send_opt(1), Err(1));
     })
 
     test!(fn send_opt3() {
         let (tx, rx) = sync_channel(1);
-        assert_eq!(tx.send_opt(1), None);
+        assert_eq!(tx.send_opt(1), Ok(()));
         spawn(proc() { drop(rx); });
-        assert_eq!(tx.send_opt(1), Some(1));
+        assert_eq!(tx.send_opt(1), Err(1));
     })
 
     test!(fn send_opt4() {
@@ -1898,11 +1909,11 @@ fn recv(rx: Receiver<~int>, i: int) {
         let (done, donerx) = channel();
         let done2 = done.clone();
         spawn(proc() {
-            assert_eq!(tx.send_opt(1), Some(1));
+            assert_eq!(tx.send_opt(1), Err(1));
             done.send(());
         });
         spawn(proc() {
-            assert_eq!(tx2.send_opt(2), Some(2));
+            assert_eq!(tx2.send_opt(2), Err(2));
             done2.send(());
         });
         drop(rx);
@@ -1912,27 +1923,27 @@ fn recv(rx: Receiver<~int>, i: int) {
 
     test!(fn try_send1() {
         let (tx, _rx) = sync_channel(0);
-        assert_eq!(tx.try_send(1), Full(1));
+        assert_eq!(tx.try_send(1), Err(Full(1)));
     })
 
     test!(fn try_send2() {
         let (tx, _rx) = sync_channel(1);
-        assert_eq!(tx.try_send(1), Sent);
-        assert_eq!(tx.try_send(1), Full(1));
+        assert_eq!(tx.try_send(1), Ok(()));
+        assert_eq!(tx.try_send(1), Err(Full(1)));
     })
 
     test!(fn try_send3() {
         let (tx, rx) = sync_channel(1);
-        assert_eq!(tx.try_send(1), Sent);
+        assert_eq!(tx.try_send(1), Ok(()));
         drop(rx);
-        assert_eq!(tx.try_send(1), RecvDisconnected(1));
+        assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
     })
 
     test!(fn try_send4() {
         let (tx, rx) = sync_channel(0);
         spawn(proc() {
             for _ in range(0, 1000) { task::deschedule(); }
-            assert_eq!(tx.try_send(1), Sent);
+            assert_eq!(tx.try_send(1), Ok(()));
         });
         assert_eq!(rx.recv(), 1);
     } #[ignore(reason = "flaky on libnative")])
index 1bc7349a70d0bebe71714a24c3b8b2c93db96e82..e92b5cb272a809ce7347dbcbe7de7ed809898f48 100644 (file)
@@ -90,7 +90,7 @@ pub fn new() -> Packet<T> {
         }
     }
 
-    pub fn send(&mut self, t: T) -> bool {
+    pub fn send(&mut self, t: T) -> Result<(), T> {
         // Sanity check
         match self.upgrade {
             NothingSent => {}
@@ -102,14 +102,12 @@ pub fn send(&mut self, t: T) -> bool {
 
         match self.state.swap(DATA, atomics::SeqCst) {
             // Sent the data, no one was waiting
-            EMPTY => true,
+            EMPTY => Ok(()),
 
-            // Couldn't send the data, the port hung up first. We need to be
-            // sure to deallocate the sent data (to not leave it stuck in the
-            // queue)
+            // Couldn't send the data, the port hung up first. Return the data
+            // back up the stack.
             DISCONNECTED => {
-                self.data.take_unwrap();
-                false
+                Err(self.data.take_unwrap())
             }
 
             // Not possible, these are one-use channels
@@ -121,7 +119,7 @@ pub fn send(&mut self, t: T) -> bool {
             n => unsafe {
                 let t = BlockedTask::cast_from_uint(n);
                 t.wake().map(|t| t.reawaken());
-                true
+                Ok(())
             }
         }
     }
index 84191ed6b28c52475638e6f25f8d2e6ef3c61cfa..c286fd84849340b1e35a5077d48aa0a43cd8a252 100644 (file)
@@ -236,7 +236,7 @@ pub fn recv(&mut self) -> T { self.rx.recv() }
     /// Block to receive a value on the underlying receiver, returning `Some` on
     /// success or `None` if the channel disconnects. This function has the same
     /// semantics as `Receiver.recv_opt`
-    pub fn recv_opt(&mut self) -> Option<T> { self.rx.recv_opt() }
+    pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
 
     /// Adds this handle to the receiver set that the handle was created from. This
     /// method can be called multiple times, but it has no effect if `add` was
@@ -338,12 +338,12 @@ mod test {
         )
         drop(tx1);
         select! (
-            foo = rx1.recv_opt() => { assert_eq!(foo, None); },
+            foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
             _bar = rx2.recv() => { fail!() }
         )
         drop(tx2);
         select! (
-            bar = rx2.recv_opt() => { assert_eq!(bar, None); }
+            bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
         )
     })
 
@@ -370,7 +370,7 @@ mod test {
 
         select! (
             _a1 = rx1.recv_opt() => { fail!() },
-            a2 = rx2.recv_opt() => { assert_eq!(a2, None); }
+            a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
         )
     })
 
@@ -392,7 +392,7 @@ mod test {
         )
         tx3.send(1);
         select! (
-            a = rx1.recv_opt() => { assert_eq!(a, None); },
+            a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
             _b = rx2.recv() => { fail!() }
         )
     })
@@ -417,8 +417,8 @@ mod test {
             a = rx1.recv() => { assert_eq!(a, 1); },
             a = rx2.recv() => { assert_eq!(a, 2); }
         )
-        assert_eq!(rx1.try_recv(), Empty);
-        assert_eq!(rx2.try_recv(), Empty);
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        assert_eq!(rx2.try_recv(), Err(Empty));
         tx3.send(());
     })
 
@@ -456,7 +456,7 @@ mod test {
         spawn(proc() {
             rx3.recv();
             tx1.clone();
-            assert_eq!(rx3.try_recv(), Empty);
+            assert_eq!(rx3.try_recv(), Err(Empty));
             tx1.send(2);
             rx3.recv();
         });
@@ -477,7 +477,7 @@ mod test {
         spawn(proc() {
             rx3.recv();
             tx1.clone();
-            assert_eq!(rx3.try_recv(), Empty);
+            assert_eq!(rx3.try_recv(), Err(Empty));
             tx1.send(2);
             rx3.recv();
         });
index e8ba9d6e62809badaabe510e61a4d3024ee48dbb..525786f5d1e76bc58cb1edfeaca94c279e5f2167 100644 (file)
@@ -131,9 +131,9 @@ pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
         unsafe { self.select_lock.unlock_noguard() }
     }
 
-    pub fn send(&mut self, t: T) -> bool {
+    pub fn send(&mut self, t: T) -> Result<(), T> {
         // See Port::drop for what's going on
-        if self.port_dropped.load(atomics::SeqCst) { return false }
+        if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
 
         // Note that the multiple sender case is a little tricker
         // semantically than the single sender case. The logic for
@@ -161,7 +161,7 @@ pub fn send(&mut self, t: T) -> bool {
         // received". Once we get beyond this check, we have permanently
         // entered the realm of "this may be received"
         if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
-            return false
+            return Err(t)
         }
 
         self.queue.push(t);
@@ -213,7 +213,7 @@ pub fn send(&mut self, t: T) -> bool {
             _ => {}
         }
 
-        true
+        Ok(())
     }
 
     pub fn recv(&mut self) -> Result<T, Failure> {
index 5820b13a35f4672eb59110b7b86db1f45d89bf48..6c9280e0abc69156313d3df97bf15b3301ead0f9 100644 (file)
@@ -87,25 +87,27 @@ pub fn new() -> Packet<T> {
     }
 
 
-    pub fn send(&mut self, t: T) -> bool {
+    pub fn send(&mut self, t: T) -> Result<(), T> {
+        // If the other port has deterministically gone away, then definitely
+        // must return the data back up the stack. Otherwise, the data is
+        // considered as being sent.
+        if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
+
         match self.do_send(Data(t)) {
-            UpSuccess => true,
-            UpDisconnected => false,
-            UpWoke(task) => {
-                task.wake().map(|t| t.reawaken());
-                true
-            }
+            UpSuccess | UpDisconnected => {},
+            UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
         }
+        Ok(())
     }
     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
+        // If the port has gone away, then there's no need to proceed any
+        // further.
+        if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
+
         self.do_send(GoUp(up))
     }
 
     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
-        // Use an acquire/release ordering to maintain the same position with
-        // respect to the atomic loads below
-        if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
-
         self.queue.push(t);
         match self.cnt.fetch_add(1, atomics::SeqCst) {
             // As described in the mod's doc comment, -1 == wakeup
index b3591dad274b231de227fef8dda0d887ddaeb287..6228c4c682b061340315f4a92921179404e08e3e 100644 (file)
@@ -201,22 +201,22 @@ pub fn send(&self, t: T) -> Result<(), T> {
         }
     }
 
-    pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
+    pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
         let (guard, state) = self.lock();
         if state.disconnected {
-            super::RecvDisconnected(t)
+            Err(super::RecvDisconnected(t))
         } else if state.buf.size() == state.buf.cap() {
-            super::Full(t)
+            Err(super::Full(t))
         } else if state.cap == 0 {
             // With capacity 0, even though we have buffer space we can't
             // transfer the data unless there's a receiver waiting.
             match mem::replace(&mut state.blocker, NoneBlocked) {
-                NoneBlocked => super::Full(t),
+                NoneBlocked => Err(super::Full(t)),
                 BlockedSender(..) => unreachable!(),
                 BlockedReceiver(task) => {
                     state.buf.enqueue(t);
                     wakeup(task, guard);
-                    super::Sent
+                    Ok(())
                 }
             }
         } else {
@@ -224,7 +224,7 @@ pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
             // just enqueue the data for later retrieval.
             assert!(state.buf.size() < state.buf.cap());
             state.buf.enqueue(t);
-            super::Sent
+            Ok(())
         }
     }
 
@@ -232,7 +232,7 @@ pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
     //
     // When reading this, remember that there can only ever be one receiver at
     // time.
-    pub fn recv(&self) -> Option<T> {
+    pub fn recv(&self) -> Result<T, ()> {
         let (guard, state) = self.lock();
 
         // Wait for the buffer to have something in it. No need for a while loop
@@ -242,13 +242,13 @@ pub fn recv(&self) -> Option<T> {
             wait(&mut state.blocker, BlockedReceiver, &self.lock);
             waited = true;
         }
-        if state.disconnected && state.buf.size() == 0 { return None }
+        if state.disconnected && state.buf.size() == 0 { return Err(()) }
 
         // Pick up the data, wake up our neighbors, and carry on
         assert!(state.buf.size() > 0);
         let ret = state.buf.dequeue();
         self.wakeup_senders(waited, guard, state);
-        return Some(ret);
+        return Ok(ret);
     }
 
     pub fn try_recv(&self) -> Result<T, Failure> {
index 06e020721358bae3aa7e5526f94ff035e8caf548..aa7371944daf73a25d16ef36ad480e70d06b2341 100644 (file)
@@ -73,7 +73,7 @@ fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
                 break;
             }
             self.pos = 0;
-            self.buf = self.rx.recv_opt();
+            self.buf = self.rx.recv_opt().ok();
             self.closed = self.buf.is_none();
         }
         if self.closed && num_read == 0 {
@@ -116,15 +116,13 @@ fn clone(&self) -> ChanWriter {
 
 impl Writer for ChanWriter {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        if !self.tx.try_send(buf.to_owned()) {
-            Err(io::IoError {
+        self.tx.send_opt(buf.to_owned()).map_err(|_| {
+            io::IoError {
                 kind: io::BrokenPipe,
                 desc: "Pipe closed",
                 detail: None
-            })
-        } else {
-            Ok(())
-        }
+            }
+        })
     }
 }
 
index 8dd59e859b877dddace4b86e7157df95f60c95f0..cd2c81d284ad1c07f7ca167cc53dea4838e4ddbb 100644 (file)
@@ -422,13 +422,13 @@ pub fn socket_name(addr: SocketAddr) {
         spawn(proc() {
             let mut sock3 = sock3;
             match sock3.sendto([1], addr2) {
-                Ok(..) => { let _ = tx2.try_send(()); }
+                Ok(..) => { let _ = tx2.send_opt(()); }
                 Err(..) => {}
             }
             done.send(());
         });
         match sock1.sendto([2], addr2) {
-            Ok(..) => { let _ = tx.try_send(()); }
+            Ok(..) => { let _ = tx.send_opt(()); }
             Err(..) => {}
         }
         drop(tx);
index 00b2e4f2307d8efbf443a6ac4e602f31364c5a43..d3f3d888b87b9f66ce72ffc3246599d28493c603 100644 (file)
@@ -149,6 +149,7 @@ pub fn unregister(&mut self, signum: Signum) {
 
 #[cfg(test, unix)]
 mod test_unix {
+    use prelude::*;
     use libc;
     use comm::Empty;
     use io::timer;
@@ -199,7 +200,7 @@ fn test_io_signal_unregister() {
         s2.unregister(Interrupt);
         sigint();
         timer::sleep(10);
-        assert_eq!(s2.rx.try_recv(), Empty);
+        assert_eq!(s2.rx.try_recv(), Err(Empty));
     }
 }
 
index 839fcab8f86a4308c66d2d6f0f291b54cb1ff91f..1ca36df968ccef3fb9b71af9632d2020dc98cd27 100644 (file)
@@ -137,7 +137,7 @@ mod test {
         let rx1 = timer.oneshot(10000);
         let rx = timer.oneshot(1);
         rx.recv();
-        assert_eq!(rx1.recv_opt(), None);
+        assert_eq!(rx1.recv_opt(), Err(()));
     })
 
     iotest!(fn test_io_timer_oneshot_then_sleep() {
@@ -145,7 +145,7 @@ mod test {
         let rx = timer.oneshot(100000000000);
         timer.sleep(1); // this should inalidate rx
 
-        assert_eq!(rx.recv_opt(), None);
+        assert_eq!(rx.recv_opt(), Err(()));
     })
 
     iotest!(fn test_io_timer_sleep_periodic() {
@@ -170,11 +170,11 @@ mod test {
 
         let rx = timer.oneshot(1);
         rx.recv();
-        assert!(rx.recv_opt().is_none());
+        assert!(rx.recv_opt().is_err());
 
         let rx = timer.oneshot(1);
         rx.recv();
-        assert!(rx.recv_opt().is_none());
+        assert!(rx.recv_opt().is_err());
     })
 
     iotest!(fn override() {
@@ -182,8 +182,8 @@ mod test {
         let orx = timer.oneshot(100);
         let prx = timer.periodic(100);
         timer.sleep(1);
-        assert_eq!(orx.recv_opt(), None);
-        assert_eq!(prx.recv_opt(), None);
+        assert_eq!(orx.recv_opt(), Err(()));
+        assert_eq!(prx.recv_opt(), Err(()));
         timer.oneshot(1).recv();
     })
 
@@ -226,7 +226,7 @@ mod test {
         let timer_rx = timer.periodic(1000);
 
         spawn(proc() {
-            timer_rx.recv_opt();
+            let _ = timer_rx.recv_opt();
         });
 
         // when we drop the TimerWatcher we're going to destroy the channel,
@@ -239,7 +239,7 @@ mod test {
         let timer_rx = timer.periodic(1000);
 
         spawn(proc() {
-            timer_rx.recv_opt();
+            let _ = timer_rx.recv_opt();
         });
 
         timer.oneshot(1);
@@ -251,7 +251,7 @@ mod test {
         let timer_rx = timer.periodic(1000);
 
         spawn(proc() {
-            timer_rx.recv_opt();
+            let _ = timer_rx.recv_opt();
         });
 
         timer.sleep(1);
@@ -262,7 +262,7 @@ mod test {
             let mut timer = Timer::new().unwrap();
             timer.oneshot(1000)
         };
-        assert_eq!(rx.recv_opt(), None);
+        assert_eq!(rx.recv_opt(), Err(()));
     })
 
     iotest!(fn sender_goes_away_period() {
@@ -270,7 +270,7 @@ mod test {
             let mut timer = Timer::new().unwrap();
             timer.periodic(1000)
         };
-        assert_eq!(rx.recv_opt(), None);
+        assert_eq!(rx.recv_opt(), Err(()));
     })
 
     iotest!(fn receiver_goes_away_oneshot() {
index bae20d3bb9b9072c90f6712519eb50d03451f273..a112ed77f094fba756c3b48231ec4a12ea4aaff9 100644 (file)
@@ -385,7 +385,7 @@ pub fn new() -> Death {
     pub fn collect_failure(&mut self, result: TaskResult) {
         match self.on_exit.take() {
             Some(Execute(f)) => f(result),
-            Some(SendMessage(ch)) => { ch.try_send(result); }
+            Some(SendMessage(ch)) => { let _ = ch.send_opt(result); }
             None => {}
         }
     }
index 9e01b16ee9ba9b544fba49340c69f61d096478be..13e075501d9f58027042856b7e90a123aef62bf4 100644 (file)
@@ -37,16 +37,16 @@ impl<S:Send,R:Send> DuplexStream<S, R> {
     pub fn send(&self, x: S) {
         self.tx.send(x)
     }
-    pub fn try_send(&self, x: S) -> bool {
-        self.tx.try_send(x)
+    pub fn send_opt(&self, x: S) -> Result<(), S> {
+        self.tx.send_opt(x)
     }
     pub fn recv(&self) -> R {
         self.rx.recv()
     }
-    pub fn try_recv(&self) -> comm::TryRecvResult<R> {
+    pub fn try_recv(&self) -> Result<R, comm::TryRecvError> {
         self.rx.try_recv()
     }
-    pub fn recv_opt(&self) -> Option<R> {
+    pub fn recv_opt(&self) -> Result<R, ()> {
         self.rx.recv_opt()
     }
 }
index b83bdf9df299e0152b010bc552f23365d3e7c8bf..911cd1d2eb1ccdb80036a64160363a8509a37887 100644 (file)
@@ -800,7 +800,7 @@ fn test_barrier() {
         // At this point, all spawned tasks should be blocked,
         // so we shouldn't get anything from the port
         assert!(match rx.try_recv() {
-            Empty => true,
+            Err(Empty) => true,
             _ => false,
         });
 
index 9bb7a81a2ff00b88c4635a659687d628ba39c978..eb90797395edfed6458ac34ff7c0ac55d11b458d 100644 (file)
@@ -16,7 +16,6 @@
 //! containing data.
 
 use std::cast;
-use std::comm;
 use std::kinds::marker;
 use std::mem::replace;
 use std::sync::atomics;
@@ -46,10 +45,10 @@ fn new() -> WaitQueue {
     // Signals one live task from the queue.
     fn signal(&self) -> bool {
         match self.head.try_recv() {
-            comm::Data(ch) => {
+            Ok(ch) => {
                 // Send a wakeup signal. If the waiter was killed, its port will
                 // have closed. Keep trying until we get a live task.
-                if ch.try_send(()) {
+                if ch.send_opt(()).is_ok() {
                     true
                 } else {
                     self.signal()
@@ -63,8 +62,8 @@ fn broadcast(&self) -> uint {
         let mut count = 0;
         loop {
             match self.head.try_recv() {
-                comm::Data(ch) => {
-                    if ch.try_send(()) {
+                Ok(ch) => {
+                    if ch.send_opt(()).is_ok() {
                         count += 1;
                     }
                 }
@@ -76,7 +75,7 @@ fn broadcast(&self) -> uint {
 
     fn wait_end(&self) -> WaitEnd {
         let (signal_end, wait_end) = channel();
-        assert!(self.tail.try_send(signal_end));
+        self.tail.send(signal_end);
         wait_end
     }
 }
index 629b4cbfeea2be0bab5c9b5369f92734bb1cf5b5..d2e08cfccf890be1fcc12f4f06cd7cd08f442b55 100644 (file)
@@ -38,12 +38,12 @@ fn server(requests: &Receiver<request>, responses: &Sender<uint>) {
     let mut done = false;
     while !done {
         match requests.recv_opt() {
-          Some(get_count) => { responses.send(count.clone()); }
-          Some(bytes(b)) => {
+          Ok(get_count) => { responses.send(count.clone()); }
+          Ok(bytes(b)) => {
             //println!("server: received {:?} bytes", b);
             count += b;
           }
-          None => { done = true; }
+          Err(..) => { done = true; }
           _ => { }
         }
     }
index 49d9c5d3a2e3157136a423ac97cc3b18c8b677c1..dc9b3561bb16857bd677edd52c54c4d9c61ca5dc 100644 (file)
@@ -33,12 +33,12 @@ fn server(requests: &Receiver<request>, responses: &Sender<uint>) {
     let mut done = false;
     while !done {
         match requests.recv_opt() {
-          Some(get_count) => { responses.send(count.clone()); }
-          Some(bytes(b)) => {
+          Ok(get_count) => { responses.send(count.clone()); }
+          Ok(bytes(b)) => {
             //println!("server: received {:?} bytes", b);
             count += b;
           }
-          None => { done = true; }
+          Err(..) => { done = true; }
           _ => { }
         }
     }
index f5711d91447d9daa3ccbb5299a010e0d55eeb2d8..a45f8c61be50f9ec738eb00ffde871e141aedc26 100644 (file)
@@ -50,7 +50,7 @@ fn main() {
 
     let (tx, rx) = channel();
     child_generation(from_str::<uint>(*args.get(1)).unwrap(), tx);
-    if rx.recv_opt().is_none() {
+    if rx.recv_opt().is_err() {
         fail!("it happened when we slumbered");
     }
 }
index 2630057c988a064753db66c087cf5902f673f2ee..9f08f1db41057a6b5c9935172166d5a4b85d2abe 100644 (file)
@@ -20,9 +20,9 @@ pub fn main() {
     });
     loop {
         match rx.try_recv() {
-            comm::Data(()) => break,
-            comm::Empty => {}
-            comm::Disconnected => unreachable!()
+            Ok(()) => break,
+            Err(comm::Empty) => {}
+            Err(comm::Disconnected) => unreachable!()
         }
     }
 }