/// the other shared channel already had the code implemented
channels: atomic::AtomicUint,
- /// The state field is protected by this mutex
- lock: NativeMutex,
- state: UnsafeCell<State<T>>,
+ lock: Mutex<State<T>>,
}
struct State<T> {
/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
+fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
+ guard: MutexGuard<'b, State<T>>,
+ f: fn(BlockedTask) -> Blocker)
+ -> MutexGuard<'a, State<T>>
+{
+ let me: Box<Task> = Local::take();
+ me.deschedule(1, |task| {
+ match mem::replace(&mut guard.blocker, f(task)) {
+ NoneBlocked => {}
+ _ => unreachable!(),
+ }
+ mem::drop(guard);
+ Ok(())
+ });
+ lock.lock()
+}
-/// Wakes up a thread, dropping the lock at the correct time
-fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
+/// Wakes up a task, dropping the lock at the correct time
+fn wakeup<T>(task: BlockedTask, guard: MutexGuard<State<T>>) {
// We need to be careful to wake up the waiting task *outside* of the mutex
// in case it incurs a context switch.
drop(guard);
pub fn new(cap: uint) -> Packet<T> {
Packet {
channels: atomic::AtomicUint::new(1),
- lock: unsafe { NativeMutex::new() },
- state: UnsafeCell::new(State {
+ lock: Mutex::new(State {
disconnected: false,
blocker: NoneBlocked,
cap: cap,
if guard.disconnected { return Err(t) }
guard.buf.enqueue(t);
- match mem::replace(&mut state.blocker, NoneBlocked) {
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
// available to take our data. After waiting, we check again to make
// sure the port didn't go away in the meantime. If it did, we need
// to hand back our data.
- NoneBlocked if state.cap == 0 => {
+ NoneBlocked if guard.cap == 0 => {
let mut canceled = false;
- assert!(state.canceled.is_none());
- state.canceled = Some(unsafe { mem::transmute(&mut canceled) });
- wait(&mut state.blocker, BlockedSender, &self.lock);
- if canceled {Err(state.buf.dequeue())} else {Ok(())}
+ assert!(guard.canceled.is_none());
+ guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
+ let guard = wait(&self.lock, guard, BlockedSender);
+ if canceled {Err(guard.buf.dequeue())} else {Ok(())}
}
// success, we buffered some data
}
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
- let (guard, state) = self.lock();
- if state.disconnected {
+ let guard = self.lock.lock();
+ if guard.disconnected {
Err(super::RecvDisconnected(t))
- } else if state.buf.size() == state.buf.cap() {
+ } else if guard.buf.size() == guard.buf.cap() {
Err(super::Full(t))
- } else if state.cap == 0 {
+ } else if guard.cap == 0 {
// With capacity 0, even though we have buffer space we can't
// transfer the data unless there's a receiver waiting.
- match mem::replace(&mut state.blocker, NoneBlocked) {
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => Err(super::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => {
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
let mut waited = false;
- if !state.disconnected && state.buf.size() == 0 {
- wait(&mut state.blocker, BlockedReceiver, &self.lock);
+ if !guard.disconnected && guard.buf.size() == 0 {
+ wait(&mut guard.blocker, BlockedReceiver, &self.lock);
waited = true;
}
- if state.disconnected && state.buf.size() == 0 { return Err(()) }
+ if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
// Pick up the data, wake up our neighbors, and carry on
- assert!(state.buf.size() > 0);
- let ret = state.buf.dequeue();
+ assert!(guard.buf.size() > 0);
+ let ret = guard.buf.dequeue();
self.wakeup_senders(waited, guard, state);
return Ok(ret);
}
pub fn try_recv(&self) -> Result<T, Failure> {
- let (guard, state) = self.lock();
+ let guard = self.lock();
// Easy cases first
- if state.disconnected { return Err(Disconnected) }
- if state.buf.size() == 0 { return Err(Empty) }
+ if guard.disconnected { return Err(Disconnected) }
+ if guard.buf.size() == 0 { return Err(Empty) }
// Be sure to wake up neighbors
- let ret = Ok(state.buf.dequeue());
+ let ret = Ok(guard.buf.dequeue());
self.wakeup_senders(false, guard, state);
return ret;
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
// was already the ACK.
- let pending_sender2 = if state.cap == 0 && !waited {
- match mem::replace(&mut state.blocker, NoneBlocked) {
+ let pending_sender2 = if guard.cap == 0 && !waited {
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(token) => {
} else {
None
};
- mem::drop((state, guard));
+ mem::drop(guard);
// only outside of the lock do we wake up the pending tasks
pending_sender1.map(|t| t.signal());
}
// Not much to do other than wake up a receiver if one's there
- let (guard, state) = self.lock();
- if state.disconnected { return }
- state.disconnected = true;
- match mem::replace(&mut state.blocker, NoneBlocked) {
+ let guard = self.lock();
+ if guard.disconnected { return }
+ guard.disconnected = true;
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => wakeup(token, guard),
}
pub fn drop_port(&self) {
- let (guard, state) = self.lock();
+ let guard = self.lock();
- if state.disconnected { return }
- state.disconnected = true;
+ if guard.disconnected { return }
+ guard.disconnected = true;
// If the capacity is 0, then the sender may want its data back after
// we're disconnected. Otherwise it's now our responsibility to destroy
// the buffered data. As with many other portions of this code, this
// needs to be careful to destroy the data *outside* of the lock to
// prevent deadlock.
- let _data = if state.cap != 0 {
- mem::replace(&mut state.buf.buf, Vec::new())
+ let _data = if guard.cap != 0 {
+ mem::replace(&mut guard.buf.buf, Vec::new())
} else {
Vec::new()
};
- let mut queue = mem::replace(&mut state.queue, Queue {
+ let mut queue = mem::replace(&mut guard.queue, Queue {
head: 0 as *mut Node,
tail: 0 as *mut Node,
});
- let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
+ let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(token) => {
*guard.canceled.take().unwrap() = true;
}
BlockedReceiver(..) => unreachable!(),
};
- mem::drop((state, guard));
+ mem::drop(guard);
loop {
match queue.dequeue() {
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> bool {
- let (_g, state) = self.lock();
- state.disconnected || state.buf.size() > 0
+ let guard = self.lock();
+ guard.disconnected || guard.buf.size() > 0
}
// Attempts to start selection on this port. This can either succeed or fail
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&self) -> bool {
- let (_g, state) = self.lock();
- match mem::replace(&mut state.blocker, NoneBlocked) {
+ let guard = self.lock();
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(token) => {
guard.blocker = BlockedSender(token);
impl<T: Send> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(atomic::SeqCst), 0);
- let (_g, state) = self.lock();
- assert!(state.queue.dequeue().is_none());
- assert!(state.canceled.is_none());
+ let guard = self.lock();
+ assert!(guard.queue.dequeue().is_none());
+ assert!(guard.canceled.is_none());
}
}