1 /// Synchronous channels/ports
3 /// This channel implementation differs significantly from the asynchronous
4 /// implementations found next to it (oneshot/stream/share). This is an
5 /// implementation of a synchronous, bounded buffer channel.
7 /// Each channel is created with some amount of backing buffer, and sends will
8 /// *block* until buffer space becomes available. A buffer size of 0 is valid,
9 /// which means that every successful send is paired with a successful recv.
11 /// This flavor of channels defines a new `send_opt` method for channels which
12 /// is the method by which a message is sent but the thread does not panic if it
13 /// cannot be delivered.
15 /// Another major difference is that send() will *always* return back the data
16 /// if it couldn't be sent. This is because it is deterministically known when
17 /// the data is received and when it is not received.
19 /// Implementation-wise, it can all be summed up with "use a mutex plus some
20 /// logic". The mutex used here is an OS native mutex, meaning that no user code
21 /// is run inside of the mutex (to prevent context switching). This
22 /// implementation shares almost all code for the buffered and unbuffered cases
23 /// of a synchronous channel. There are a few branches for the unbuffered case,
24 /// but they're mostly just relevant to blocking senders.
26 pub use self::Failure::*;
29 use core::intrinsics::abort;
34 use crate::sync::atomic::{Ordering, AtomicUsize};
35 use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken};
36 use crate::sync::{Mutex, MutexGuard};
37 use crate::time::Instant;
39 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
41 pub struct Packet<T> {
42 /// Only field outside of the mutex. Just done for kicks, but mainly because
43 /// the other shared channel already had the code implemented
44 channels: AtomicUsize,
46 lock: Mutex<State<T>>,
49 unsafe impl<T: Send> Send for Packet<T> { }
51 unsafe impl<T: Send> Sync for Packet<T> { }
54 disconnected: bool, // Is the channel disconnected yet?
55 queue: Queue, // queue of senders waiting to send data
56 blocker: Blocker, // currently blocked thread on this channel
57 buf: Buffer<T>, // storage for buffered messages
58 cap: usize, // capacity of this channel
60 /// A curious flag used to indicate whether a sender failed or succeeded in
61 /// blocking. This is used to transmit information back to the thread that it
62 /// must dequeue its message from the buffer because it was not received.
63 /// This is only relevant in the 0-buffer case. This obviously cannot be
64 /// safely constructed, but it's guaranteed to always have a valid pointer
66 canceled: Option<&'static mut bool>,
69 unsafe impl<T: Send> Send for State<T> {}
71 /// Possible flavors of threads who can be blocked on this channel.
73 BlockedSender(SignalToken),
74 BlockedReceiver(SignalToken),
78 /// Simple queue for threading threads together. Nodes are stack-allocated, so
79 /// this structure is not safe at all
86 token: Option<SignalToken>,
90 unsafe impl Send for Node {}
92 /// A simple ring-buffer
105 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
106 /// in the meantime. This re-locks the mutex upon returning.
107 fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
108 mut guard: MutexGuard<'b, State<T>>,
109 f: fn(SignalToken) -> Blocker)
110 -> MutexGuard<'a, State<T>>
112 let (wait_token, signal_token) = blocking::tokens();
113 match mem::replace(&mut guard.blocker, f(signal_token)) {
117 drop(guard); // unlock
118 wait_token.wait(); // block
119 lock.lock().unwrap() // relock
122 /// Same as wait, but waiting at most until `deadline`.
123 fn wait_timeout_receiver<'a, 'b, T>(lock: &'a Mutex<State<T>>,
125 mut guard: MutexGuard<'b, State<T>>,
127 -> MutexGuard<'a, State<T>>
129 let (wait_token, signal_token) = blocking::tokens();
130 match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
134 drop(guard); // unlock
135 *success = wait_token.wait_max_until(deadline); // block
136 let mut new_guard = lock.lock().unwrap(); // relock
138 abort_selection(&mut new_guard);
143 fn abort_selection<'a, T>(guard: &mut MutexGuard<'a , State<T>>) -> bool {
144 match mem::replace(&mut guard.blocker, NoneBlocked) {
146 BlockedSender(token) => {
147 guard.blocker = BlockedSender(token);
150 BlockedReceiver(token) => { drop(token); false }
154 /// Wakes up a thread, dropping the lock at the correct time
155 fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
156 // We need to be careful to wake up the waiting thread *outside* of the mutex
157 // in case it incurs a context switch.
163 pub fn new(cap: usize) -> Packet<T> {
165 channels: AtomicUsize::new(1),
166 lock: Mutex::new(State {
168 blocker: NoneBlocked,
172 head: ptr::null_mut(),
173 tail: ptr::null_mut(),
176 buf: (0..cap + if cap == 0 {1} else {0}).map(|_| None).collect(),
184 // wait until a send slot is available, returning locked access to
185 // the channel state.
186 fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
187 let mut node = Node { token: None, next: ptr::null_mut() };
189 let mut guard = self.lock.lock().unwrap();
190 // are we ready to go?
191 if guard.disconnected || guard.buf.size() < guard.buf.cap() {
194 // no room; actually block
195 let wait_token = guard.queue.enqueue(&mut node);
201 pub fn send(&self, t: T) -> Result<(), T> {
202 let mut guard = self.acquire_send_slot();
203 if guard.disconnected { return Err(t) }
204 guard.buf.enqueue(t);
206 match mem::replace(&mut guard.blocker, NoneBlocked) {
207 // if our capacity is 0, then we need to wait for a receiver to be
208 // available to take our data. After waiting, we check again to make
209 // sure the port didn't go away in the meantime. If it did, we need
210 // to hand back our data.
211 NoneBlocked if guard.cap == 0 => {
212 let mut canceled = false;
213 assert!(guard.canceled.is_none());
214 guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
215 let mut guard = wait(&self.lock, guard, BlockedSender);
216 if canceled {Err(guard.buf.dequeue())} else {Ok(())}
219 // success, we buffered some data
220 NoneBlocked => Ok(()),
222 // success, someone's about to receive our buffered data.
223 BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
225 BlockedSender(..) => panic!("lolwut"),
229 pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
230 let mut guard = self.lock.lock().unwrap();
231 if guard.disconnected {
232 Err(super::TrySendError::Disconnected(t))
233 } else if guard.buf.size() == guard.buf.cap() {
234 Err(super::TrySendError::Full(t))
235 } else if guard.cap == 0 {
236 // With capacity 0, even though we have buffer space we can't
237 // transfer the data unless there's a receiver waiting.
238 match mem::replace(&mut guard.blocker, NoneBlocked) {
239 NoneBlocked => Err(super::TrySendError::Full(t)),
240 BlockedSender(..) => unreachable!(),
241 BlockedReceiver(token) => {
242 guard.buf.enqueue(t);
243 wakeup(token, guard);
248 // If the buffer has some space and the capacity isn't 0, then we
249 // just enqueue the data for later retrieval, ensuring to wake up
250 // any blocked receiver if there is one.
251 assert!(guard.buf.size() < guard.buf.cap());
252 guard.buf.enqueue(t);
253 match mem::replace(&mut guard.blocker, NoneBlocked) {
254 BlockedReceiver(token) => wakeup(token, guard),
256 BlockedSender(..) => unreachable!(),
262 // Receives a message from this channel
264 // When reading this, remember that there can only ever be one receiver at
266 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
267 let mut guard = self.lock.lock().unwrap();
269 let mut woke_up_after_waiting = false;
270 // Wait for the buffer to have something in it. No need for a
271 // while loop because we're the only receiver.
272 if !guard.disconnected && guard.buf.size() == 0 {
273 if let Some(deadline) = deadline {
274 guard = wait_timeout_receiver(&self.lock,
277 &mut woke_up_after_waiting);
279 guard = wait(&self.lock, guard, BlockedReceiver);
280 woke_up_after_waiting = true;
284 // N.B., channel could be disconnected while waiting, so the order of
285 // these conditionals is important.
286 if guard.disconnected && guard.buf.size() == 0 {
287 return Err(Disconnected);
290 // Pick up the data, wake up our neighbors, and carry on
291 assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
293 if guard.buf.size() == 0 { return Err(Empty); }
295 let ret = guard.buf.dequeue();
296 self.wakeup_senders(woke_up_after_waiting, guard);
300 pub fn try_recv(&self) -> Result<T, Failure> {
301 let mut guard = self.lock.lock().unwrap();
304 if guard.disconnected && guard.buf.size() == 0 { return Err(Disconnected) }
305 if guard.buf.size() == 0 { return Err(Empty) }
307 // Be sure to wake up neighbors
308 let ret = Ok(guard.buf.dequeue());
309 self.wakeup_senders(false, guard);
313 // Wake up pending senders after some data has been received
315 // * `waited` - flag if the receiver blocked to receive some data, or if it
316 // just picked up some data on the way out
317 // * `guard` - the lock guard that is held over this channel's lock
318 fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
319 let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
321 // If this is a no-buffer channel (cap == 0), then if we didn't wait we
322 // need to ACK the sender. If we waited, then the sender waking us up
323 // was already the ACK.
324 let pending_sender2 = if guard.cap == 0 && !waited {
325 match mem::replace(&mut guard.blocker, NoneBlocked) {
327 BlockedReceiver(..) => unreachable!(),
328 BlockedSender(token) => {
329 guard.canceled.take();
338 // only outside of the lock do we wake up the pending threads
339 pending_sender1.map(|t| t.signal());
340 pending_sender2.map(|t| t.signal());
343 // Prepares this shared packet for a channel clone, essentially just bumping
345 pub fn clone_chan(&self) {
346 let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
348 // See comments on Arc::clone() on why we do this (for `mem::forget`).
349 if old_count > MAX_REFCOUNT {
356 pub fn drop_chan(&self) {
357 // Only flag the channel as disconnected if we're the last channel
358 match self.channels.fetch_sub(1, Ordering::SeqCst) {
363 // Not much to do other than wake up a receiver if one's there
364 let mut guard = self.lock.lock().unwrap();
365 if guard.disconnected { return }
366 guard.disconnected = true;
367 match mem::replace(&mut guard.blocker, NoneBlocked) {
369 BlockedSender(..) => unreachable!(),
370 BlockedReceiver(token) => wakeup(token, guard),
374 pub fn drop_port(&self) {
375 let mut guard = self.lock.lock().unwrap();
377 if guard.disconnected { return }
378 guard.disconnected = true;
380 // If the capacity is 0, then the sender may want its data back after
381 // we're disconnected. Otherwise it's now our responsibility to destroy
382 // the buffered data. As with many other portions of this code, this
383 // needs to be careful to destroy the data *outside* of the lock to
385 let _data = if guard.cap != 0 {
386 mem::replace(&mut guard.buf.buf, Vec::new())
390 let mut queue = mem::replace(&mut guard.queue, Queue {
391 head: ptr::null_mut(),
392 tail: ptr::null_mut(),
395 let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
397 BlockedSender(token) => {
398 *guard.canceled.take().unwrap() = true;
401 BlockedReceiver(..) => unreachable!(),
405 while let Some(token) = queue.dequeue() { token.signal(); }
406 waiter.map(|t| t.signal());
410 impl<T> Drop for Packet<T> {
412 assert_eq!(self.channels.load(Ordering::SeqCst), 0);
413 let mut guard = self.lock.lock().unwrap();
414 assert!(guard.queue.dequeue().is_none());
415 assert!(guard.canceled.is_none());
420 ////////////////////////////////////////////////////////////////////////////////
421 // Buffer, a simple ring buffer backed by Vec<T>
422 ////////////////////////////////////////////////////////////////////////////////
425 fn enqueue(&mut self, t: T) {
426 let pos = (self.start + self.size) % self.buf.len();
428 let prev = mem::replace(&mut self.buf[pos], Some(t));
429 assert!(prev.is_none());
432 fn dequeue(&mut self) -> T {
433 let start = self.start;
435 self.start = (self.start + 1) % self.buf.len();
436 let result = &mut self.buf[start];
437 result.take().unwrap()
440 fn size(&self) -> usize { self.size }
441 fn cap(&self) -> usize { self.buf.len() }
444 ////////////////////////////////////////////////////////////////////////////////
445 // Queue, a simple queue to enqueue threads with (stack-allocated nodes)
446 ////////////////////////////////////////////////////////////////////////////////
449 fn enqueue(&mut self, node: &mut Node) -> WaitToken {
450 let (wait_token, signal_token) = blocking::tokens();
451 node.token = Some(signal_token);
452 node.next = ptr::null_mut();
454 if self.tail.is_null() {
455 self.head = node as *mut Node;
456 self.tail = node as *mut Node;
459 (*self.tail).next = node as *mut Node;
460 self.tail = node as *mut Node;
467 fn dequeue(&mut self) -> Option<SignalToken> {
468 if self.head.is_null() {
471 let node = self.head;
472 self.head = unsafe { (*node).next };
473 if self.head.is_null() {
474 self.tail = ptr::null_mut();
477 (*node).next = ptr::null_mut();
478 Some((*node).token.take().unwrap())