]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/sync.rs
initial port of crossbeam-channel
[rust.git] / library / std / src / sync / mpsc / sync.rs
1 use self::Blocker::*;
2 /// Synchronous channels/ports
3 ///
4 /// This channel implementation differs significantly from the asynchronous
5 /// implementations found next to it (oneshot/stream/share). This is an
6 /// implementation of a synchronous, bounded buffer channel.
7 ///
8 /// Each channel is created with some amount of backing buffer, and sends will
9 /// *block* until buffer space becomes available. A buffer size of 0 is valid,
10 /// which means that every successful send is paired with a successful recv.
11 ///
12 /// This flavor of channels defines a new `send_opt` method for channels which
13 /// is the method by which a message is sent but the thread does not panic if it
14 /// cannot be delivered.
15 ///
16 /// Another major difference is that send() will *always* return back the data
17 /// if it couldn't be sent. This is because it is deterministically known when
18 /// the data is received and when it is not received.
19 ///
20 /// Implementation-wise, it can all be summed up with "use a mutex plus some
21 /// logic". The mutex used here is an OS native mutex, meaning that no user code
22 /// is run inside of the mutex (to prevent context switching). This
23 /// implementation shares almost all code for the buffered and unbuffered cases
24 /// of a synchronous channel. There are a few branches for the unbuffered case,
25 /// but they're mostly just relevant to blocking senders.
26 pub use self::Failure::*;
27
28 use core::intrinsics::abort;
29 use core::mem;
30 use core::ptr;
31
32 use crate::sync::atomic::{AtomicUsize, Ordering};
33 use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
34 use crate::sync::{Mutex, MutexGuard};
35 use crate::time::Instant;
36
37 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
38
39 pub struct Packet<T> {
40     /// Only field outside of the mutex. Just done for kicks, but mainly because
41     /// the other shared channel already had the code implemented
42     channels: AtomicUsize,
43
44     lock: Mutex<State<T>>,
45 }
46
47 unsafe impl<T: Send> Send for Packet<T> {}
48
49 unsafe impl<T: Send> Sync for Packet<T> {}
50
51 struct State<T> {
52     disconnected: bool, // Is the channel disconnected yet?
53     queue: Queue,       // queue of senders waiting to send data
54     blocker: Blocker,   // currently blocked thread on this channel
55     buf: Buffer<T>,     // storage for buffered messages
56     cap: usize,         // capacity of this channel
57
58     /// A curious flag used to indicate whether a sender failed or succeeded in
59     /// blocking. This is used to transmit information back to the thread that it
60     /// must dequeue its message from the buffer because it was not received.
61     /// This is only relevant in the 0-buffer case. This obviously cannot be
62     /// safely constructed, but it's guaranteed to always have a valid pointer
63     /// value.
64     canceled: Option<&'static mut bool>,
65 }
66
67 unsafe impl<T: Send> Send for State<T> {}
68
69 /// Possible flavors of threads who can be blocked on this channel.
70 enum Blocker {
71     BlockedSender(SignalToken),
72     BlockedReceiver(SignalToken),
73     NoneBlocked,
74 }
75
76 /// Simple queue for threading threads together. Nodes are stack-allocated, so
77 /// this structure is not safe at all
78 struct Queue {
79     head: *mut Node,
80     tail: *mut Node,
81 }
82
83 struct Node {
84     token: Option<SignalToken>,
85     next: *mut Node,
86 }
87
88 unsafe impl Send for Node {}
89
90 /// A simple ring-buffer
91 struct Buffer<T> {
92     buf: Vec<Option<T>>,
93     start: usize,
94     size: usize,
95 }
96
97 #[derive(Debug)]
98 pub enum Failure {
99     Empty,
100     Disconnected,
101 }
102
103 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
104 /// in the meantime. This re-locks the mutex upon returning.
105 fn wait<'a, 'b, T>(
106     lock: &'a Mutex<State<T>>,
107     mut guard: MutexGuard<'b, State<T>>,
108     f: fn(SignalToken) -> Blocker,
109 ) -> MutexGuard<'a, State<T>> {
110     let (wait_token, signal_token) = blocking::tokens();
111     match mem::replace(&mut guard.blocker, f(signal_token)) {
112         NoneBlocked => {}
113         _ => unreachable!(),
114     }
115     drop(guard); // unlock
116     wait_token.wait(); // block
117     lock.lock().unwrap() // relock
118 }
119
120 /// Same as wait, but waiting at most until `deadline`.
121 fn wait_timeout_receiver<'a, 'b, T>(
122     lock: &'a Mutex<State<T>>,
123     deadline: Instant,
124     mut guard: MutexGuard<'b, State<T>>,
125     success: &mut bool,
126 ) -> MutexGuard<'a, State<T>> {
127     let (wait_token, signal_token) = blocking::tokens();
128     match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
129         NoneBlocked => {}
130         _ => unreachable!(),
131     }
132     drop(guard); // unlock
133     *success = wait_token.wait_max_until(deadline); // block
134     let mut new_guard = lock.lock().unwrap(); // relock
135     if !*success {
136         abort_selection(&mut new_guard);
137     }
138     new_guard
139 }
140
141 fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
142     match mem::replace(&mut guard.blocker, NoneBlocked) {
143         NoneBlocked => true,
144         BlockedSender(token) => {
145             guard.blocker = BlockedSender(token);
146             true
147         }
148         BlockedReceiver(token) => {
149             drop(token);
150             false
151         }
152     }
153 }
154
155 /// Wakes up a thread, dropping the lock at the correct time
156 fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
157     // We need to be careful to wake up the waiting thread *outside* of the mutex
158     // in case it incurs a context switch.
159     drop(guard);
160     token.signal();
161 }
162
163 impl<T> Packet<T> {
164     pub fn new(capacity: usize) -> Packet<T> {
165         Packet {
166             channels: AtomicUsize::new(1),
167             lock: Mutex::new(State {
168                 disconnected: false,
169                 blocker: NoneBlocked,
170                 cap: capacity,
171                 canceled: None,
172                 queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
173                 buf: Buffer {
174                     buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
175                     start: 0,
176                     size: 0,
177                 },
178             }),
179         }
180     }
181
182     // wait until a send slot is available, returning locked access to
183     // the channel state.
184     fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
185         let mut node = Node { token: None, next: ptr::null_mut() };
186         loop {
187             let mut guard = self.lock.lock().unwrap();
188             // are we ready to go?
189             if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
190                 return guard;
191             }
192             // no room; actually block
193             let wait_token = guard.queue.enqueue(&mut node);
194             drop(guard);
195             wait_token.wait();
196         }
197     }
198
199     pub fn send(&self, t: T) -> Result<(), T> {
200         let mut guard = self.acquire_send_slot();
201         if guard.disconnected {
202             return Err(t);
203         }
204         guard.buf.enqueue(t);
205
206         match mem::replace(&mut guard.blocker, NoneBlocked) {
207             // if our capacity is 0, then we need to wait for a receiver to be
208             // available to take our data. After waiting, we check again to make
209             // sure the port didn't go away in the meantime. If it did, we need
210             // to hand back our data.
211             NoneBlocked if guard.cap == 0 => {
212                 let mut canceled = false;
213                 assert!(guard.canceled.is_none());
214                 guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
215                 let mut guard = wait(&self.lock, guard, BlockedSender);
216                 if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
217             }
218
219             // success, we buffered some data
220             NoneBlocked => Ok(()),
221
222             // success, someone's about to receive our buffered data.
223             BlockedReceiver(token) => {
224                 wakeup(token, guard);
225                 Ok(())
226             }
227
228             BlockedSender(..) => panic!("lolwut"),
229         }
230     }
231
232     pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
233         let mut guard = self.lock.lock().unwrap();
234         if guard.disconnected {
235             Err(super::TrySendError::Disconnected(t))
236         } else if guard.buf.size() == guard.buf.capacity() {
237             Err(super::TrySendError::Full(t))
238         } else if guard.cap == 0 {
239             // With capacity 0, even though we have buffer space we can't
240             // transfer the data unless there's a receiver waiting.
241             match mem::replace(&mut guard.blocker, NoneBlocked) {
242                 NoneBlocked => Err(super::TrySendError::Full(t)),
243                 BlockedSender(..) => unreachable!(),
244                 BlockedReceiver(token) => {
245                     guard.buf.enqueue(t);
246                     wakeup(token, guard);
247                     Ok(())
248                 }
249             }
250         } else {
251             // If the buffer has some space and the capacity isn't 0, then we
252             // just enqueue the data for later retrieval, ensuring to wake up
253             // any blocked receiver if there is one.
254             assert!(guard.buf.size() < guard.buf.capacity());
255             guard.buf.enqueue(t);
256             match mem::replace(&mut guard.blocker, NoneBlocked) {
257                 BlockedReceiver(token) => wakeup(token, guard),
258                 NoneBlocked => {}
259                 BlockedSender(..) => unreachable!(),
260             }
261             Ok(())
262         }
263     }
264
265     // Receives a message from this channel
266     //
267     // When reading this, remember that there can only ever be one receiver at
268     // time.
269     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
270         let mut guard = self.lock.lock().unwrap();
271
272         let mut woke_up_after_waiting = false;
273         // Wait for the buffer to have something in it. No need for a
274         // while loop because we're the only receiver.
275         if !guard.disconnected && guard.buf.size() == 0 {
276             if let Some(deadline) = deadline {
277                 guard =
278                     wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
279             } else {
280                 guard = wait(&self.lock, guard, BlockedReceiver);
281                 woke_up_after_waiting = true;
282             }
283         }
284
285         // N.B., channel could be disconnected while waiting, so the order of
286         // these conditionals is important.
287         if guard.disconnected && guard.buf.size() == 0 {
288             return Err(Disconnected);
289         }
290
291         // Pick up the data, wake up our neighbors, and carry on
292         assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
293
294         if guard.buf.size() == 0 {
295             return Err(Empty);
296         }
297
298         let ret = guard.buf.dequeue();
299         self.wakeup_senders(woke_up_after_waiting, guard);
300         Ok(ret)
301     }
302
303     pub fn try_recv(&self) -> Result<T, Failure> {
304         let mut guard = self.lock.lock().unwrap();
305
306         // Easy cases first
307         if guard.disconnected && guard.buf.size() == 0 {
308             return Err(Disconnected);
309         }
310         if guard.buf.size() == 0 {
311             return Err(Empty);
312         }
313
314         // Be sure to wake up neighbors
315         let ret = Ok(guard.buf.dequeue());
316         self.wakeup_senders(false, guard);
317         ret
318     }
319
320     // Wake up pending senders after some data has been received
321     //
322     // * `waited` - flag if the receiver blocked to receive some data, or if it
323     //              just picked up some data on the way out
324     // * `guard` - the lock guard that is held over this channel's lock
325     fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
326         let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
327
328         // If this is a no-buffer channel (cap == 0), then if we didn't wait we
329         // need to ACK the sender. If we waited, then the sender waking us up
330         // was already the ACK.
331         let pending_sender2 = if guard.cap == 0 && !waited {
332             match mem::replace(&mut guard.blocker, NoneBlocked) {
333                 NoneBlocked => None,
334                 BlockedReceiver(..) => unreachable!(),
335                 BlockedSender(token) => {
336                     guard.canceled.take();
337                     Some(token)
338                 }
339             }
340         } else {
341             None
342         };
343         mem::drop(guard);
344
345         // only outside of the lock do we wake up the pending threads
346         if let Some(token) = pending_sender1 {
347             token.signal();
348         }
349         if let Some(token) = pending_sender2 {
350             token.signal();
351         }
352     }
353
354     // Prepares this shared packet for a channel clone, essentially just bumping
355     // a refcount.
356     pub fn clone_chan(&self) {
357         let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
358
359         // See comments on Arc::clone() on why we do this (for `mem::forget`).
360         if old_count > MAX_REFCOUNT {
361             abort();
362         }
363     }
364
365     pub fn drop_chan(&self) {
366         // Only flag the channel as disconnected if we're the last channel
367         match self.channels.fetch_sub(1, Ordering::SeqCst) {
368             1 => {}
369             _ => return,
370         }
371
372         // Not much to do other than wake up a receiver if one's there
373         let mut guard = self.lock.lock().unwrap();
374         if guard.disconnected {
375             return;
376         }
377         guard.disconnected = true;
378         match mem::replace(&mut guard.blocker, NoneBlocked) {
379             NoneBlocked => {}
380             BlockedSender(..) => unreachable!(),
381             BlockedReceiver(token) => wakeup(token, guard),
382         }
383     }
384
385     pub fn drop_port(&self) {
386         let mut guard = self.lock.lock().unwrap();
387
388         if guard.disconnected {
389             return;
390         }
391         guard.disconnected = true;
392
393         // If the capacity is 0, then the sender may want its data back after
394         // we're disconnected. Otherwise it's now our responsibility to destroy
395         // the buffered data. As with many other portions of this code, this
396         // needs to be careful to destroy the data *outside* of the lock to
397         // prevent deadlock.
398         let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
399         let mut queue =
400             mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
401
402         let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
403             NoneBlocked => None,
404             BlockedSender(token) => {
405                 *guard.canceled.take().unwrap() = true;
406                 Some(token)
407             }
408             BlockedReceiver(..) => unreachable!(),
409         };
410         mem::drop(guard);
411
412         while let Some(token) = queue.dequeue() {
413             token.signal();
414         }
415         if let Some(token) = waiter {
416             token.signal();
417         }
418     }
419 }
420
421 impl<T> Drop for Packet<T> {
422     fn drop(&mut self) {
423         assert_eq!(self.channels.load(Ordering::SeqCst), 0);
424         let mut guard = self.lock.lock().unwrap();
425         assert!(guard.queue.dequeue().is_none());
426         assert!(guard.canceled.is_none());
427     }
428 }
429
430 ////////////////////////////////////////////////////////////////////////////////
431 // Buffer, a simple ring buffer backed by Vec<T>
432 ////////////////////////////////////////////////////////////////////////////////
433
434 impl<T> Buffer<T> {
435     fn enqueue(&mut self, t: T) {
436         let pos = (self.start + self.size) % self.buf.len();
437         self.size += 1;
438         let prev = mem::replace(&mut self.buf[pos], Some(t));
439         assert!(prev.is_none());
440     }
441
442     fn dequeue(&mut self) -> T {
443         let start = self.start;
444         self.size -= 1;
445         self.start = (self.start + 1) % self.buf.len();
446         let result = &mut self.buf[start];
447         result.take().unwrap()
448     }
449
450     fn size(&self) -> usize {
451         self.size
452     }
453     fn capacity(&self) -> usize {
454         self.buf.len()
455     }
456 }
457
458 ////////////////////////////////////////////////////////////////////////////////
459 // Queue, a simple queue to enqueue threads with (stack-allocated nodes)
460 ////////////////////////////////////////////////////////////////////////////////
461
462 impl Queue {
463     fn enqueue(&mut self, node: &mut Node) -> WaitToken {
464         let (wait_token, signal_token) = blocking::tokens();
465         node.token = Some(signal_token);
466         node.next = ptr::null_mut();
467
468         if self.tail.is_null() {
469             self.head = node as *mut Node;
470             self.tail = node as *mut Node;
471         } else {
472             unsafe {
473                 (*self.tail).next = node as *mut Node;
474                 self.tail = node as *mut Node;
475             }
476         }
477
478         wait_token
479     }
480
481     fn dequeue(&mut self) -> Option<SignalToken> {
482         if self.head.is_null() {
483             return None;
484         }
485         let node = self.head;
486         self.head = unsafe { (*node).next };
487         if self.head.is_null() {
488             self.tail = ptr::null_mut();
489         }
490         unsafe {
491             (*node).next = ptr::null_mut();
492             Some((*node).token.take().unwrap())
493         }
494     }
495 }