/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone, Show)]
-pub enum TryRecvResult<T> {
+pub enum TryRecvError {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// This channel's sending half has become disconnected, and there will
/// never be any more data received on this channel
Disconnected,
- /// The channel had some data and we successfully popped it
- Data(T),
}
-/// This enumeration is the list of the possible outcomes for the
+/// This enumeration is the list of the possible error outcomes for the
/// `SyncSender::try_send` method.
#[deriving(Eq, Clone, Show)]
-pub enum TrySendResult<T> {
- /// The data was successfully sent along the channel. This either means that
- /// it was buffered in the channel, or handed off to a receiver. In either
- /// case, the callee no longer has ownership of the data.
- Sent,
+pub enum TrySendError<T> {
/// The data could not be sent on the channel because it would require that
/// the callee block to send the data.
///
/// of `Receiver` and `Sender` to see what's possible with them.
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
- (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
+ (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
}
/// Creates a new synchronous, bounded channel.
/// ```
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
- (SyncSender::new(a), Receiver::my_new(Sync(b)))
+ (SyncSender::new(a), Receiver::new(Sync(b)))
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Sender<T> {
- fn my_new(inner: Flavor<T>) -> Sender<T> {
+ fn new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
}
/// The purpose of this functionality is to propagate failure among tasks.
/// If failure is not desired, then consider using the `try_send` method
pub fn send(&self, t: T) {
- if !self.try_send(t) {
+ if self.send_opt(t).is_err() {
fail!("sending on a closed channel");
}
}
- /// Attempts to send a value on this channel, returning whether it was
- /// successfully sent.
+ /// Attempts to send a value on this channel, returning it back if it could
+ /// not be sent.
///
/// A successful send occurs when it is determined that the other end of
/// the channel has not hung up already. An unsuccessful send would be one
/// where the corresponding receiver has already been deallocated. Note
- /// that a return value of `false` means that the data will never be
- /// received, but a return value of `true` does *not* mean that the data
+ /// that a return value of `Err` means that the data will never be
+ /// received, but a return value of `Ok` does *not* mean that the data
/// will be received. It is possible for the corresponding receiver to
- /// hang up immediately after this function returns `true`.
+ /// hang up immediately after this function returns `Ok`.
///
- /// Like `send`, this method will never block. If the failure of send cannot
- /// be tolerated, then this method should be used instead.
- pub fn try_send(&self, t: T) -> bool {
+ /// Like `send`, this method will never block.
+ ///
+ /// # Failure
+ ///
+ /// This method will never fail, it will return the message back to the
+ /// caller if the other end is disconnected
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let (tx, rx) = channel();
+ ///
+ /// // This send is always successful
+ /// assert_eq!(tx.send_opt(1), Ok(()));
+ ///
+ /// // This send will fail because the receiver is gone
+ /// drop(rx);
+ /// assert_eq!(tx.send_opt(1), Err(1));
+ /// ```
+ pub fn send_opt(&self, t: T) -> Result<(), T> {
// In order to prevent starvation of other tasks in situations where
// a task sends repeatedly without ever receiving, we occassionally
// yield instead of doing a send immediately.
return (*p).send(t);
} else {
let (a, b) = UnsafeArc::new2(stream::Packet::new());
- match (*p).upgrade(Receiver::my_new(Stream(b))) {
+ match (*p).upgrade(Receiver::new(Stream(b))) {
oneshot::UpSuccess => {
- (*a.get()).send(t);
- (a, true)
+ let ret = (*a.get()).send(t);
+ (a, ret)
}
- oneshot::UpDisconnected => (a, false),
+ oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(task) => {
- (*a.get()).send(t);
+ // This send cannot fail because the task is
+ // asleep (we're looking at it), so the receiver
+ // can't go away.
+ (*a.get()).send(t).unwrap();
task.wake().map(|t| t.reawaken());
- (a, true)
+ (a, Ok(()))
}
}
}
};
unsafe {
- let mut tmp = Sender::my_new(Stream(new_inner));
+ let mut tmp = Sender::new(Stream(new_inner));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
return ret;
let (packet, sleeper) = match self.inner {
Oneshot(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
- match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
+ match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
oneshot::UpWoke(task) => (b, Some(task))
}
}
Stream(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
- match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
+ match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
stream::UpSuccess | stream::UpDisconnected => (b, None),
stream::UpWoke(task) => (b, Some(task)),
}
}
Shared(ref p) => {
unsafe { (*p.get()).clone_chan(); }
- return Sender::my_new(Shared(p.clone()));
+ return Sender::new(Shared(p.clone()));
}
Sync(..) => unreachable!(),
};
unsafe {
(*packet.get()).inherit_blocker(sleeper);
- let mut tmp = Sender::my_new(Shared(packet.clone()));
+ let mut tmp = Sender::new(Shared(packet.clone()));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
- Sender::my_new(Shared(packet))
+ Sender::new(Shared(packet))
}
}
/// `SyncSender::send_opt` method which will not fail if the receiver
/// disconnects.
pub fn send(&self, t: T) {
- if self.send_opt(t).is_some() {
+ if self.send_opt(t).is_err() {
fail!("sending on a closed channel");
}
}
/// # Failure
///
/// This function cannot fail.
- pub fn send_opt(&self, t: T) -> Option<T> {
- match unsafe { (*self.inner.get()).send(t) } {
- Ok(()) => None,
- Err(t) => Some(t),
- }
+ pub fn send_opt(&self, t: T) -> Result<(), T> {
+ unsafe { (*self.inner.get()).send(t) }
}
/// Attempts to send a value on this channel without blocking.
/// # Failure
///
/// This function cannot fail
- pub fn try_send(&self, t: T) -> TrySendResult<T> {
+ pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.inner.get()).try_send(t) }
}
}
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Receiver<T> {
- fn my_new(inner: Flavor<T>) -> Receiver<T> {
+ fn new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
}
/// peek at a value on this receiver.
pub fn recv(&self) -> T {
match self.recv_opt() {
- Some(t) => t,
- None => fail!("receiving on a closed channel"),
+ Ok(t) => t,
+ Err(()) => fail!("receiving on a closed channel"),
}
}
/// block on a receiver.
///
/// This function cannot fail.
- pub fn try_recv(&self) -> TryRecvResult<T> {
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
// If a thread is spinning in try_recv, we should take the opportunity
// to reschedule things occasionally. See notes above in scheduling on
// sends for why this doesn't always hit TLS, and also for why this uses
let mut new_port = match self.inner {
Oneshot(ref p) => {
match unsafe { (*p.get()).try_recv() } {
- Ok(t) => return Data(t),
- Err(oneshot::Empty) => return Empty,
- Err(oneshot::Disconnected) => return Disconnected,
+ Ok(t) => return Ok(t),
+ Err(oneshot::Empty) => return Err(Empty),
+ Err(oneshot::Disconnected) => return Err(Disconnected),
Err(oneshot::Upgraded(rx)) => rx,
}
}
Stream(ref p) => {
match unsafe { (*p.get()).try_recv() } {
- Ok(t) => return Data(t),
- Err(stream::Empty) => return Empty,
- Err(stream::Disconnected) => return Disconnected,
+ Ok(t) => return Ok(t),
+ Err(stream::Empty) => return Err(Empty),
+ Err(stream::Disconnected) => return Err(Disconnected),
Err(stream::Upgraded(rx)) => rx,
}
}
Shared(ref p) => {
match unsafe { (*p.get()).try_recv() } {
- Ok(t) => return Data(t),
- Err(shared::Empty) => return Empty,
- Err(shared::Disconnected) => return Disconnected,
+ Ok(t) => return Ok(t),
+ Err(shared::Empty) => return Err(Empty),
+ Err(shared::Disconnected) => return Err(Disconnected),
}
}
Sync(ref p) => {
match unsafe { (*p.get()).try_recv() } {
- Ok(t) => return Data(t),
- Err(sync::Empty) => return Empty,
- Err(sync::Disconnected) => return Disconnected,
+ Ok(t) => return Ok(t),
+ Err(sync::Empty) => return Err(Empty),
+ Err(sync::Disconnected) => return Err(Disconnected),
}
}
};
/// In other words, this function has the same semantics as the `recv`
/// method except for the failure aspect.
///
- /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
+ /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
/// the value found on the receiver is returned.
- pub fn recv_opt(&self) -> Option<T> {
+ pub fn recv_opt(&self) -> Result<T, ()> {
loop {
let mut new_port = match self.inner {
Oneshot(ref p) => {
match unsafe { (*p.get()).recv() } {
- Ok(t) => return Some(t),
+ Ok(t) => return Ok(t),
Err(oneshot::Empty) => return unreachable!(),
- Err(oneshot::Disconnected) => return None,
+ Err(oneshot::Disconnected) => return Err(()),
Err(oneshot::Upgraded(rx)) => rx,
}
}
Stream(ref p) => {
match unsafe { (*p.get()).recv() } {
- Ok(t) => return Some(t),
+ Ok(t) => return Ok(t),
Err(stream::Empty) => return unreachable!(),
- Err(stream::Disconnected) => return None,
+ Err(stream::Disconnected) => return Err(()),
Err(stream::Upgraded(rx)) => rx,
}
}
Shared(ref p) => {
match unsafe { (*p.get()).recv() } {
- Ok(t) => return Some(t),
+ Ok(t) => return Ok(t),
Err(shared::Empty) => return unreachable!(),
- Err(shared::Disconnected) => return None,
+ Err(shared::Disconnected) => return Err(()),
}
}
Sync(ref p) => return unsafe { (*p.get()).recv() }
}
impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
- fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
+ fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
}
#[unsafe_destructor]
assert_eq!(rx.recv(), 1);
}
match rx.try_recv() {
- Data(..) => fail!(),
+ Ok(..) => fail!(),
_ => {}
}
dtx.send(());
test!(fn oneshot_single_thread_try_send_open() {
let (tx, rx) = channel::<int>();
- assert!(tx.try_send(10));
+ assert!(tx.send_opt(10).is_ok());
assert!(rx.recv() == 10);
})
test!(fn oneshot_single_thread_try_send_closed() {
let (tx, rx) = channel::<int>();
drop(rx);
- assert!(!tx.try_send(10));
+ assert!(tx.send_opt(10).is_err());
})
test!(fn oneshot_single_thread_try_recv_open() {
let (tx, rx) = channel::<int>();
tx.send(10);
- assert!(rx.recv_opt() == Some(10));
+ assert!(rx.recv_opt() == Ok(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
let (tx, rx) = channel::<int>();
drop(tx);
- assert!(rx.recv_opt() == None);
+ assert!(rx.recv_opt() == Err(()));
})
test!(fn oneshot_single_thread_peek_data() {
let (tx, rx) = channel::<int>();
- assert_eq!(rx.try_recv(), Empty)
+ assert_eq!(rx.try_recv(), Err(Empty))
tx.send(10);
- assert_eq!(rx.try_recv(), Data(10));
+ assert_eq!(rx.try_recv(), Ok(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (tx, rx) = channel::<int>();
drop(tx);
- assert_eq!(rx.try_recv(), Disconnected);
- assert_eq!(rx.try_recv(), Disconnected);
+ assert_eq!(rx.try_recv(), Err(Disconnected));
+ assert_eq!(rx.try_recv(), Err(Disconnected));
})
test!(fn oneshot_single_thread_peek_open() {
let (_tx, rx) = channel::<int>();
- assert_eq!(rx.try_recv(), Empty);
+ assert_eq!(rx.try_recv(), Err(Empty));
})
test!(fn oneshot_multi_task_recv_then_send() {
tx.send(2);
tx.send(2);
tx.send(2);
- tx.try_send(2);
+ let _ = tx.send_opt(2);
drop(tx);
assert_eq!(count_rx.recv(), 4);
})
tx3.send(());
});
- assert_eq!(rx1.try_recv(), Empty);
+ assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
- assert_eq!(rx1.try_recv(), Data(1));
- assert_eq!(rx1.try_recv(), Empty);
+ assert_eq!(rx1.try_recv(), Ok(1));
+ assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
- assert_eq!(rx1.try_recv(), Disconnected);
+ assert_eq!(rx1.try_recv(), Err(Disconnected));
})
// This bug used to end up in a livelock inside of the Receiver destructor
let mut hits = 0;
while hits < 10 {
match rx.try_recv() {
- Data(()) => { hits += 1; }
- Empty => { Thread::yield_now(); }
- Disconnected => return,
+ Ok(()) => { hits += 1; }
+ Err(Empty) => { Thread::yield_now(); }
+ Err(Disconnected) => return,
}
}
cdone.send(());
assert_eq!(rx.recv(), 1);
}
match rx.try_recv() {
- Data(..) => fail!(),
+ Ok(..) => fail!(),
_ => {}
}
dtx.send(());
test!(fn oneshot_single_thread_try_send_open() {
let (tx, rx) = sync_channel::<int>(1);
- assert_eq!(tx.try_send(10), Sent);
+ assert_eq!(tx.try_send(10), Ok(()));
assert!(rx.recv() == 10);
})
test!(fn oneshot_single_thread_try_send_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(rx);
- assert_eq!(tx.try_send(10), RecvDisconnected(10));
+ assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
})
test!(fn oneshot_single_thread_try_send_closed2() {
let (tx, _rx) = sync_channel::<int>(0);
- assert_eq!(tx.try_send(10), Full(10));
+ assert_eq!(tx.try_send(10), Err(Full(10)));
})
test!(fn oneshot_single_thread_try_recv_open() {
let (tx, rx) = sync_channel::<int>(1);
tx.send(10);
- assert!(rx.recv_opt() == Some(10));
+ assert!(rx.recv_opt() == Ok(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
- assert!(rx.recv_opt() == None);
+ assert!(rx.recv_opt() == Err(()));
})
test!(fn oneshot_single_thread_peek_data() {
let (tx, rx) = sync_channel::<int>(1);
- assert_eq!(rx.try_recv(), Empty)
+ assert_eq!(rx.try_recv(), Err(Empty))
tx.send(10);
- assert_eq!(rx.try_recv(), Data(10));
+ assert_eq!(rx.try_recv(), Ok(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
- assert_eq!(rx.try_recv(), Disconnected);
- assert_eq!(rx.try_recv(), Disconnected);
+ assert_eq!(rx.try_recv(), Err(Disconnected));
+ assert_eq!(rx.try_recv(), Err(Disconnected));
})
test!(fn oneshot_single_thread_peek_open() {
let (_tx, rx) = sync_channel::<int>(0);
- assert_eq!(rx.try_recv(), Empty);
+ assert_eq!(rx.try_recv(), Err(Empty));
})
test!(fn oneshot_multi_task_recv_then_send() {
tx.send(2);
tx.send(2);
tx.send(2);
- tx.try_send(2);
+ let _ = tx.try_send(2);
drop(tx);
assert_eq!(count_rx.recv(), 4);
})
tx3.send(());
});
- assert_eq!(rx1.try_recv(), Empty);
+ assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
- assert_eq!(rx1.try_recv(), Data(1));
- assert_eq!(rx1.try_recv(), Empty);
+ assert_eq!(rx1.try_recv(), Ok(1));
+ assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
- assert_eq!(rx1.try_recv(), Disconnected);
+ assert_eq!(rx1.try_recv(), Err(Disconnected));
})
// This bug used to end up in a livelock inside of the Receiver destructor
let mut hits = 0;
while hits < 10 {
match rx.try_recv() {
- Data(()) => { hits += 1; }
- Empty => { Thread::yield_now(); }
- Disconnected => return,
+ Ok(()) => { hits += 1; }
+ Err(Empty) => { Thread::yield_now(); }
+ Err(Disconnected) => return,
}
}
cdone.send(());
test!(fn send_opt1() {
let (tx, rx) = sync_channel(0);
spawn(proc() { rx.recv(); });
- assert_eq!(tx.send_opt(1), None);
+ assert_eq!(tx.send_opt(1), Ok(()));
})
test!(fn send_opt2() {
let (tx, rx) = sync_channel(0);
spawn(proc() { drop(rx); });
- assert_eq!(tx.send_opt(1), Some(1));
+ assert_eq!(tx.send_opt(1), Err(1));
})
test!(fn send_opt3() {
let (tx, rx) = sync_channel(1);
- assert_eq!(tx.send_opt(1), None);
+ assert_eq!(tx.send_opt(1), Ok(()));
spawn(proc() { drop(rx); });
- assert_eq!(tx.send_opt(1), Some(1));
+ assert_eq!(tx.send_opt(1), Err(1));
})
test!(fn send_opt4() {
let (done, donerx) = channel();
let done2 = done.clone();
spawn(proc() {
- assert_eq!(tx.send_opt(1), Some(1));
+ assert_eq!(tx.send_opt(1), Err(1));
done.send(());
});
spawn(proc() {
- assert_eq!(tx2.send_opt(2), Some(2));
+ assert_eq!(tx2.send_opt(2), Err(2));
done2.send(());
});
drop(rx);
test!(fn try_send1() {
let (tx, _rx) = sync_channel(0);
- assert_eq!(tx.try_send(1), Full(1));
+ assert_eq!(tx.try_send(1), Err(Full(1)));
})
test!(fn try_send2() {
let (tx, _rx) = sync_channel(1);
- assert_eq!(tx.try_send(1), Sent);
- assert_eq!(tx.try_send(1), Full(1));
+ assert_eq!(tx.try_send(1), Ok(()));
+ assert_eq!(tx.try_send(1), Err(Full(1)));
})
test!(fn try_send3() {
let (tx, rx) = sync_channel(1);
- assert_eq!(tx.try_send(1), Sent);
+ assert_eq!(tx.try_send(1), Ok(()));
drop(rx);
- assert_eq!(tx.try_send(1), RecvDisconnected(1));
+ assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
})
test!(fn try_send4() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
for _ in range(0, 1000) { task::deschedule(); }
- assert_eq!(tx.try_send(1), Sent);
+ assert_eq!(tx.try_send(1), Ok(()));
});
assert_eq!(rx.recv(), 1);
} #[ignore(reason = "flaky on libnative")])