]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/sync.rs
Deprecate Error::description for real
[rust.git] / src / libstd / sync / mpsc / sync.rs
1 use self::Blocker::*;
2 /// Synchronous channels/ports
3 ///
4 /// This channel implementation differs significantly from the asynchronous
5 /// implementations found next to it (oneshot/stream/share). This is an
6 /// implementation of a synchronous, bounded buffer channel.
7 ///
8 /// Each channel is created with some amount of backing buffer, and sends will
9 /// *block* until buffer space becomes available. A buffer size of 0 is valid,
10 /// which means that every successful send is paired with a successful recv.
11 ///
12 /// This flavor of channels defines a new `send_opt` method for channels which
13 /// is the method by which a message is sent but the thread does not panic if it
14 /// cannot be delivered.
15 ///
16 /// Another major difference is that send() will *always* return back the data
17 /// if it couldn't be sent. This is because it is deterministically known when
18 /// the data is received and when it is not received.
19 ///
20 /// Implementation-wise, it can all be summed up with "use a mutex plus some
21 /// logic". The mutex used here is an OS native mutex, meaning that no user code
22 /// is run inside of the mutex (to prevent context switching). This
23 /// implementation shares almost all code for the buffered and unbuffered cases
24 /// of a synchronous channel. There are a few branches for the unbuffered case,
25 /// but they're mostly just relevant to blocking senders.
26 pub use self::Failure::*;
27
28 use core::intrinsics::abort;
29 use core::isize;
30 use core::mem;
31 use core::ptr;
32
33 use crate::sync::atomic::{AtomicUsize, Ordering};
34 use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
35 use crate::sync::{Mutex, MutexGuard};
36 use crate::time::Instant;
37
38 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
39
40 pub struct Packet<T> {
41     /// Only field outside of the mutex. Just done for kicks, but mainly because
42     /// the other shared channel already had the code implemented
43     channels: AtomicUsize,
44
45     lock: Mutex<State<T>>,
46 }
47
48 unsafe impl<T: Send> Send for Packet<T> {}
49
50 unsafe impl<T: Send> Sync for Packet<T> {}
51
52 struct State<T> {
53     disconnected: bool, // Is the channel disconnected yet?
54     queue: Queue,       // queue of senders waiting to send data
55     blocker: Blocker,   // currently blocked thread on this channel
56     buf: Buffer<T>,     // storage for buffered messages
57     cap: usize,         // capacity of this channel
58
59     /// A curious flag used to indicate whether a sender failed or succeeded in
60     /// blocking. This is used to transmit information back to the thread that it
61     /// must dequeue its message from the buffer because it was not received.
62     /// This is only relevant in the 0-buffer case. This obviously cannot be
63     /// safely constructed, but it's guaranteed to always have a valid pointer
64     /// value.
65     canceled: Option<&'static mut bool>,
66 }
67
68 unsafe impl<T: Send> Send for State<T> {}
69
70 /// Possible flavors of threads who can be blocked on this channel.
71 enum Blocker {
72     BlockedSender(SignalToken),
73     BlockedReceiver(SignalToken),
74     NoneBlocked,
75 }
76
77 /// Simple queue for threading threads together. Nodes are stack-allocated, so
78 /// this structure is not safe at all
79 struct Queue {
80     head: *mut Node,
81     tail: *mut Node,
82 }
83
84 struct Node {
85     token: Option<SignalToken>,
86     next: *mut Node,
87 }
88
89 unsafe impl Send for Node {}
90
91 /// A simple ring-buffer
92 struct Buffer<T> {
93     buf: Vec<Option<T>>,
94     start: usize,
95     size: usize,
96 }
97
98 #[derive(Debug)]
99 pub enum Failure {
100     Empty,
101     Disconnected,
102 }
103
104 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
105 /// in the meantime. This re-locks the mutex upon returning.
106 fn wait<'a, 'b, T>(
107     lock: &'a Mutex<State<T>>,
108     mut guard: MutexGuard<'b, State<T>>,
109     f: fn(SignalToken) -> Blocker,
110 ) -> MutexGuard<'a, State<T>> {
111     let (wait_token, signal_token) = blocking::tokens();
112     match mem::replace(&mut guard.blocker, f(signal_token)) {
113         NoneBlocked => {}
114         _ => unreachable!(),
115     }
116     drop(guard); // unlock
117     wait_token.wait(); // block
118     lock.lock().unwrap() // relock
119 }
120
121 /// Same as wait, but waiting at most until `deadline`.
122 fn wait_timeout_receiver<'a, 'b, T>(
123     lock: &'a Mutex<State<T>>,
124     deadline: Instant,
125     mut guard: MutexGuard<'b, State<T>>,
126     success: &mut bool,
127 ) -> MutexGuard<'a, State<T>> {
128     let (wait_token, signal_token) = blocking::tokens();
129     match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
130         NoneBlocked => {}
131         _ => unreachable!(),
132     }
133     drop(guard); // unlock
134     *success = wait_token.wait_max_until(deadline); // block
135     let mut new_guard = lock.lock().unwrap(); // relock
136     if !*success {
137         abort_selection(&mut new_guard);
138     }
139     new_guard
140 }
141
142 fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
143     match mem::replace(&mut guard.blocker, NoneBlocked) {
144         NoneBlocked => true,
145         BlockedSender(token) => {
146             guard.blocker = BlockedSender(token);
147             true
148         }
149         BlockedReceiver(token) => {
150             drop(token);
151             false
152         }
153     }
154 }
155
156 /// Wakes up a thread, dropping the lock at the correct time
157 fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
158     // We need to be careful to wake up the waiting thread *outside* of the mutex
159     // in case it incurs a context switch.
160     drop(guard);
161     token.signal();
162 }
163
164 impl<T> Packet<T> {
165     pub fn new(capacity: usize) -> Packet<T> {
166         Packet {
167             channels: AtomicUsize::new(1),
168             lock: Mutex::new(State {
169                 disconnected: false,
170                 blocker: NoneBlocked,
171                 cap: capacity,
172                 canceled: None,
173                 queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
174                 buf: Buffer {
175                     buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
176                     start: 0,
177                     size: 0,
178                 },
179             }),
180         }
181     }
182
183     // wait until a send slot is available, returning locked access to
184     // the channel state.
185     fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
186         let mut node = Node { token: None, next: ptr::null_mut() };
187         loop {
188             let mut guard = self.lock.lock().unwrap();
189             // are we ready to go?
190             if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
191                 return guard;
192             }
193             // no room; actually block
194             let wait_token = guard.queue.enqueue(&mut node);
195             drop(guard);
196             wait_token.wait();
197         }
198     }
199
200     pub fn send(&self, t: T) -> Result<(), T> {
201         let mut guard = self.acquire_send_slot();
202         if guard.disconnected {
203             return Err(t);
204         }
205         guard.buf.enqueue(t);
206
207         match mem::replace(&mut guard.blocker, NoneBlocked) {
208             // if our capacity is 0, then we need to wait for a receiver to be
209             // available to take our data. After waiting, we check again to make
210             // sure the port didn't go away in the meantime. If it did, we need
211             // to hand back our data.
212             NoneBlocked if guard.cap == 0 => {
213                 let mut canceled = false;
214                 assert!(guard.canceled.is_none());
215                 guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
216                 let mut guard = wait(&self.lock, guard, BlockedSender);
217                 if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
218             }
219
220             // success, we buffered some data
221             NoneBlocked => Ok(()),
222
223             // success, someone's about to receive our buffered data.
224             BlockedReceiver(token) => {
225                 wakeup(token, guard);
226                 Ok(())
227             }
228
229             BlockedSender(..) => panic!("lolwut"),
230         }
231     }
232
233     pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
234         let mut guard = self.lock.lock().unwrap();
235         if guard.disconnected {
236             Err(super::TrySendError::Disconnected(t))
237         } else if guard.buf.size() == guard.buf.capacity() {
238             Err(super::TrySendError::Full(t))
239         } else if guard.cap == 0 {
240             // With capacity 0, even though we have buffer space we can't
241             // transfer the data unless there's a receiver waiting.
242             match mem::replace(&mut guard.blocker, NoneBlocked) {
243                 NoneBlocked => Err(super::TrySendError::Full(t)),
244                 BlockedSender(..) => unreachable!(),
245                 BlockedReceiver(token) => {
246                     guard.buf.enqueue(t);
247                     wakeup(token, guard);
248                     Ok(())
249                 }
250             }
251         } else {
252             // If the buffer has some space and the capacity isn't 0, then we
253             // just enqueue the data for later retrieval, ensuring to wake up
254             // any blocked receiver if there is one.
255             assert!(guard.buf.size() < guard.buf.capacity());
256             guard.buf.enqueue(t);
257             match mem::replace(&mut guard.blocker, NoneBlocked) {
258                 BlockedReceiver(token) => wakeup(token, guard),
259                 NoneBlocked => {}
260                 BlockedSender(..) => unreachable!(),
261             }
262             Ok(())
263         }
264     }
265
266     // Receives a message from this channel
267     //
268     // When reading this, remember that there can only ever be one receiver at
269     // time.
270     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
271         let mut guard = self.lock.lock().unwrap();
272
273         let mut woke_up_after_waiting = false;
274         // Wait for the buffer to have something in it. No need for a
275         // while loop because we're the only receiver.
276         if !guard.disconnected && guard.buf.size() == 0 {
277             if let Some(deadline) = deadline {
278                 guard =
279                     wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
280             } else {
281                 guard = wait(&self.lock, guard, BlockedReceiver);
282                 woke_up_after_waiting = true;
283             }
284         }
285
286         // N.B., channel could be disconnected while waiting, so the order of
287         // these conditionals is important.
288         if guard.disconnected && guard.buf.size() == 0 {
289             return Err(Disconnected);
290         }
291
292         // Pick up the data, wake up our neighbors, and carry on
293         assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
294
295         if guard.buf.size() == 0 {
296             return Err(Empty);
297         }
298
299         let ret = guard.buf.dequeue();
300         self.wakeup_senders(woke_up_after_waiting, guard);
301         Ok(ret)
302     }
303
304     pub fn try_recv(&self) -> Result<T, Failure> {
305         let mut guard = self.lock.lock().unwrap();
306
307         // Easy cases first
308         if guard.disconnected && guard.buf.size() == 0 {
309             return Err(Disconnected);
310         }
311         if guard.buf.size() == 0 {
312             return Err(Empty);
313         }
314
315         // Be sure to wake up neighbors
316         let ret = Ok(guard.buf.dequeue());
317         self.wakeup_senders(false, guard);
318         ret
319     }
320
321     // Wake up pending senders after some data has been received
322     //
323     // * `waited` - flag if the receiver blocked to receive some data, or if it
324     //              just picked up some data on the way out
325     // * `guard` - the lock guard that is held over this channel's lock
326     fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
327         let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
328
329         // If this is a no-buffer channel (cap == 0), then if we didn't wait we
330         // need to ACK the sender. If we waited, then the sender waking us up
331         // was already the ACK.
332         let pending_sender2 = if guard.cap == 0 && !waited {
333             match mem::replace(&mut guard.blocker, NoneBlocked) {
334                 NoneBlocked => None,
335                 BlockedReceiver(..) => unreachable!(),
336                 BlockedSender(token) => {
337                     guard.canceled.take();
338                     Some(token)
339                 }
340             }
341         } else {
342             None
343         };
344         mem::drop(guard);
345
346         // only outside of the lock do we wake up the pending threads
347         pending_sender1.map(|t| t.signal());
348         pending_sender2.map(|t| t.signal());
349     }
350
351     // Prepares this shared packet for a channel clone, essentially just bumping
352     // a refcount.
353     pub fn clone_chan(&self) {
354         let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
355
356         // See comments on Arc::clone() on why we do this (for `mem::forget`).
357         if old_count > MAX_REFCOUNT {
358             unsafe {
359                 abort();
360             }
361         }
362     }
363
364     pub fn drop_chan(&self) {
365         // Only flag the channel as disconnected if we're the last channel
366         match self.channels.fetch_sub(1, Ordering::SeqCst) {
367             1 => {}
368             _ => return,
369         }
370
371         // Not much to do other than wake up a receiver if one's there
372         let mut guard = self.lock.lock().unwrap();
373         if guard.disconnected {
374             return;
375         }
376         guard.disconnected = true;
377         match mem::replace(&mut guard.blocker, NoneBlocked) {
378             NoneBlocked => {}
379             BlockedSender(..) => unreachable!(),
380             BlockedReceiver(token) => wakeup(token, guard),
381         }
382     }
383
384     pub fn drop_port(&self) {
385         let mut guard = self.lock.lock().unwrap();
386
387         if guard.disconnected {
388             return;
389         }
390         guard.disconnected = true;
391
392         // If the capacity is 0, then the sender may want its data back after
393         // we're disconnected. Otherwise it's now our responsibility to destroy
394         // the buffered data. As with many other portions of this code, this
395         // needs to be careful to destroy the data *outside* of the lock to
396         // prevent deadlock.
397         let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
398         let mut queue =
399             mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
400
401         let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
402             NoneBlocked => None,
403             BlockedSender(token) => {
404                 *guard.canceled.take().unwrap() = true;
405                 Some(token)
406             }
407             BlockedReceiver(..) => unreachable!(),
408         };
409         mem::drop(guard);
410
411         while let Some(token) = queue.dequeue() {
412             token.signal();
413         }
414         waiter.map(|t| t.signal());
415     }
416 }
417
418 impl<T> Drop for Packet<T> {
419     fn drop(&mut self) {
420         assert_eq!(self.channels.load(Ordering::SeqCst), 0);
421         let mut guard = self.lock.lock().unwrap();
422         assert!(guard.queue.dequeue().is_none());
423         assert!(guard.canceled.is_none());
424     }
425 }
426
427 ////////////////////////////////////////////////////////////////////////////////
428 // Buffer, a simple ring buffer backed by Vec<T>
429 ////////////////////////////////////////////////////////////////////////////////
430
431 impl<T> Buffer<T> {
432     fn enqueue(&mut self, t: T) {
433         let pos = (self.start + self.size) % self.buf.len();
434         self.size += 1;
435         let prev = mem::replace(&mut self.buf[pos], Some(t));
436         assert!(prev.is_none());
437     }
438
439     fn dequeue(&mut self) -> T {
440         let start = self.start;
441         self.size -= 1;
442         self.start = (self.start + 1) % self.buf.len();
443         let result = &mut self.buf[start];
444         result.take().unwrap()
445     }
446
447     fn size(&self) -> usize {
448         self.size
449     }
450     fn capacity(&self) -> usize {
451         self.buf.len()
452     }
453 }
454
455 ////////////////////////////////////////////////////////////////////////////////
456 // Queue, a simple queue to enqueue threads with (stack-allocated nodes)
457 ////////////////////////////////////////////////////////////////////////////////
458
459 impl Queue {
460     fn enqueue(&mut self, node: &mut Node) -> WaitToken {
461         let (wait_token, signal_token) = blocking::tokens();
462         node.token = Some(signal_token);
463         node.next = ptr::null_mut();
464
465         if self.tail.is_null() {
466             self.head = node as *mut Node;
467             self.tail = node as *mut Node;
468         } else {
469             unsafe {
470                 (*self.tail).next = node as *mut Node;
471                 self.tail = node as *mut Node;
472             }
473         }
474
475         wait_token
476     }
477
478     fn dequeue(&mut self) -> Option<SignalToken> {
479         if self.head.is_null() {
480             return None;
481         }
482         let node = self.head;
483         self.head = unsafe { (*node).next };
484         if self.head.is_null() {
485             self.tail = ptr::null_mut();
486         }
487         unsafe {
488             (*node).next = ptr::null_mut();
489             Some((*node).token.take().unwrap())
490         }
491     }
492 }