use cell::Cell;
use clone::Clone;
use rt::{context, SchedulerContext};
+use tuple::ImmutableTuple;
/// A combined refcount / BlockedTask-as-uint pointer.
///
}
}
+ /// Send a message on the one-shot channel. If a receiver task is blocked
+ /// waiting for the message, will wake it up and reschedule to it.
pub fn send(self, val: T) {
self.try_send(val);
}
+ /// As `send`, but also returns whether or not the receiver endpoint is still open.
pub fn try_send(self, val: T) -> bool {
+ self.try_send_inner(val, true)
+ }
+
+ /// Send a message without immediately rescheduling to a blocked receiver.
+ /// This can be useful in contexts where rescheduling is forbidden, or to
+ /// optimize for when the sender expects to still have useful work to do.
+ pub fn send_deferred(self, val: T) {
+ self.try_send_deferred(val);
+ }
+
+ /// As `send_deferred` and `try_send` together.
+ pub fn try_send_deferred(self, val: T) -> bool {
+ self.try_send_inner(val, false)
+ }
+ // 'do_resched' configures whether the scheduler immediately switches to
+ // the receiving task, or leaves the sending task still running.
+ fn try_send_inner(self, val: T, do_resched: bool) -> bool {
rtassert!(context() != SchedulerContext);
let mut this = self;
task_as_state => {
// Port is blocked. Wake it up.
let recvr = BlockedTask::cast_from_uint(task_as_state);
- do recvr.wake().map_consume |woken_task| {
- Scheduler::run_task(woken_task);
- };
+ if do_resched {
+ do recvr.wake().map_consume |woken_task| {
+ Scheduler::run_task(woken_task);
+ };
+ } else {
+ let recvr = Cell::new(recvr);
+ do Local::borrow::<Scheduler, ()> |sched| {
+ sched.enqueue_blocked_task(recvr.take());
+ }
+ }
}
}
}
}
}
+ /// Wait for a message on the one-shot port. Fails if the send end is closed.
pub fn recv(self) -> T {
match self.try_recv() {
Some(val) => val,
}
}
+ /// As `recv`, but returns `None` if the send end is closed rather than failing.
pub fn try_recv(self) -> Option<T> {
let mut this = self;
}
}
+/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
+pub trait SendDeferred<T> {
+ fn send_deferred(&self, val: T);
+ fn try_send_deferred(&self, val: T) -> bool;
+}
+
struct StreamPayload<T> {
val: T,
next: PortOne<StreamPayload<T>>
return (port, chan);
}
+impl<T: Send> Chan<T> {
+ fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+ let (next_pone, next_cone) = oneshot();
+ let cone = self.next.take();
+ self.next.put_back(next_cone);
+ cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
+ }
+}
+
impl<T: Send> GenericChan<T> for Chan<T> {
fn send(&self, val: T) {
self.try_send(val);
impl<T: Send> GenericSmartChan<T> for Chan<T> {
fn try_send(&self, val: T) -> bool {
- let (next_pone, next_cone) = oneshot();
- let cone = self.next.take();
- self.next.put_back(next_cone);
- cone.try_send(StreamPayload { val: val, next: next_pone })
+ self.try_send_inner(val, true)
+ }
+}
+
+impl<T: Send> SendDeferred<T> for Chan<T> {
+ fn send_deferred(&self, val: T) {
+ self.try_send_deferred(val);
+ }
+ fn try_send_deferred(&self, val: T) -> bool {
+ self.try_send_inner(val, false)
}
}
}
}
+impl<T: Send> SharedChan<T> {
+ fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+ unsafe {
+ let (next_pone, next_cone) = oneshot();
+ let cone = (*self.next.get()).swap(~next_cone, SeqCst);
+ cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
+ do_resched)
+ }
+ }
+}
+
impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, val: T) {
self.try_send(val);
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, val: T) -> bool {
- unsafe {
- let (next_pone, next_cone) = oneshot();
- let cone = (*self.next.get()).swap(~next_cone, SeqCst);
- cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
- }
+ self.try_send_inner(val, true)
+ }
+}
+
+impl<T: Send> SendDeferred<T> for SharedChan<T> {
+ fn send_deferred(&self, val: T) {
+ self.try_send_deferred(val);
+ }
+ fn try_send_deferred(&self, val: T) -> bool {
+ self.try_send_inner(val, false)
}
}
impl<T: Send> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
- match *self {
- (_, ref c) => c.send(val)
- }
+ self.second_ref().send(val)
}
}
impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
- match *self {
- (_, ref c) => c.try_send(val)
- }
+ self.second_ref().try_send(val)
}
}
impl<T: Send> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
- match *self {
- (ref p, _) => p.recv()
- }
+ self.first_ref().recv()
}
fn try_recv(&self) -> Option<T> {
- match *self {
- (ref p, _) => p.try_recv()
- }
+ self.first_ref().try_recv()
+ }
+}
+
+impl<T: Send> SendDeferred<T> for MegaPipe<T> {
+ fn send_deferred(&self, val: T) {
+ self.second_ref().send_deferred(val)
+ }
+ fn try_send_deferred(&self, val: T) -> bool {
+ self.second_ref().try_send_deferred(val)
}
}
}
}
}
+
+ #[test]
+ fn send_deferred() {
+ use unstable::sync::atomically;
+
+ // Tests no-rescheduling of send_deferred on all types of channels.
+ do run_in_newsched_task {
+ let (pone, cone) = oneshot();
+ let (pstream, cstream) = stream();
+ let (pshared, cshared) = stream();
+ let cshared = SharedChan::new(cshared);
+ let mp = megapipe();
+
+ let pone = Cell::new(pone);
+ do spawntask { pone.take().recv(); }
+ let pstream = Cell::new(pstream);
+ do spawntask { pstream.take().recv(); }
+ let pshared = Cell::new(pshared);
+ do spawntask { pshared.take().recv(); }
+ let p_mp = Cell::new(mp.clone());
+ do spawntask { p_mp.take().recv(); }
+
+ let cs = Cell::new((cone, cstream, cshared, mp));
+ unsafe {
+ do atomically {
+ let (cone, cstream, cshared, mp) = cs.take();
+ cone.send_deferred(());
+ cstream.send_deferred(());
+ cshared.send_deferred(());
+ mp.send_deferred(());
+ }
+ }
+ }
+ }
+
}