use select::{Select, SelectPort};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
use unstable::sync::UnsafeArc;
+use util;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable, SendDeferred};
use cell::Cell;
use clone::Clone;
use tuple::ImmutableTuple;
+use mutable::Mut;
/// A combined refcount / BlockedTask-as-uint pointer.
///
/// A channel with unbounded size.
pub struct Chan<T> {
- // FIXME #5372. Using Cell because we don't take &mut self
- next: Cell<StreamChanOne<T>>
+ // FIXME #5372. Using Mut because we don't take &mut self
+ next: Mut<StreamChanOne<T>>
}
/// An port with unbounded size.
pub struct Port<T> {
- // FIXME #5372. Using Cell because we don't take &mut self
- next: Cell<StreamPortOne<T>>
+ // FIXME #5372. Using Mut because we don't take &mut self
+ next: Mut<Option<StreamPortOne<T>>>
}
pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
let (pone, cone) = oneshot();
- let port = Port { next: Cell::new(pone) };
- let chan = Chan { next: Cell::new(cone) };
+ let port = Port { next: Mut::new(Some(pone)) };
+ let chan = Chan { next: Mut::new(cone) };
return (port, chan);
}
impl<T: Send> Chan<T> {
fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
- let (next_pone, next_cone) = oneshot();
- let cone = self.next.take();
- self.next.put_back(next_cone);
+ let (next_pone, mut cone) = oneshot();
+ let mut b = self.next.borrow_mut();
+ util::swap(&mut cone, b.get());
cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
}
}
}
fn try_recv(&self) -> Option<T> {
- do self.next.take_opt().map_default(None) |pone| {
+ let mut b = self.next.borrow_mut();
+ do b.get().take().map_default(None) |pone| {
match pone.try_recv() {
Some(StreamPayload { val, next }) => {
- self.next.put_back(next);
+ *b.get() = Some(next);
Some(val)
}
None => None
impl<T: Send> Peekable<T> for Port<T> {
fn peek(&self) -> bool {
- self.next.with_mut_ref(|p| p.peek())
+ self.next.map_mut(|p| p.get_mut_ref().peek())
}
}
impl<'self, T: Send> SelectInner for &'self Port<T> {
#[inline]
fn optimistic_check(&mut self) -> bool {
- do self.next.with_mut_ref |pone| { pone.optimistic_check() }
+ do self.next.map_mut |pone| { pone.get_mut_ref().optimistic_check() }
}
#[inline]
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
- let task = Cell::new(task);
- do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
+ let mut b = self.next.borrow_mut();
+ b.get().get_mut_ref().block_on(sched, task)
}
#[inline]
fn unblock_from(&mut self) -> bool {
- do self.next.with_mut_ref |pone| { pone.unblock_from() }
+ do self.next.map_mut |pone| { pone.get_mut_ref().unblock_from() }
}
}
impl<'self, T: Send> SelectPortInner<T> for &'self Port<T> {
fn recv_ready(self) -> Option<T> {
- match self.next.take().recv_ready() {
+ let mut b = self.next.borrow_mut();
+ match b.get().take_unwrap().recv_ready() {
Some(StreamPayload { val, next }) => {
- self.next.put_back(next);
+ *b.get() = Some(next);
Some(val)
}
None => None
impl<T: Send> SharedChan<T> {
pub fn new(chan: Chan<T>) -> SharedChan<T> {
- let next = chan.next.take();
+ let next = chan.next.unwrap();
let next = AtomicOption::new(~next);
SharedChan { next: UnsafeArc::new(next) }
}
impl<T: Send> SharedPort<T> {
pub fn new(port: Port<T>) -> SharedPort<T> {
// Put the data port into a new link pipe
- let next_data_port = port.next.take();
+ let next_data_port = port.next.unwrap().unwrap();
let (next_link_port, next_link_chan) = oneshot();
next_link_chan.send(next_data_port);
let next_link = AtomicOption::new(~next_link_port);