]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpmc/zero.rs
Re-add #[allow(unused)] attr
[rust.git] / library / std / src / sync / mpmc / zero.rs
1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4
5 use super::context::Context;
6 use super::error::*;
7 use super::select::{Operation, Selected, Token};
8 use super::utils::Backoff;
9 use super::waker::Waker;
10
11 use crate::cell::UnsafeCell;
12 use crate::marker::PhantomData;
13 use crate::sync::atomic::{AtomicBool, Ordering};
14 use crate::sync::Mutex;
15 use crate::time::Instant;
16 use crate::{fmt, ptr};
17
18 /// A pointer to a packet.
19 pub(crate) struct ZeroToken(*mut ());
20
21 impl Default for ZeroToken {
22     fn default() -> Self {
23         Self(ptr::null_mut())
24     }
25 }
26
27 impl fmt::Debug for ZeroToken {
28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         fmt::Debug::fmt(&(self.0 as usize), f)
30     }
31 }
32
33 /// A slot for passing one message from a sender to a receiver.
34 struct Packet<T> {
35     /// Equals `true` if the packet is allocated on the stack.
36     on_stack: bool,
37
38     /// Equals `true` once the packet is ready for reading or writing.
39     ready: AtomicBool,
40
41     /// The message.
42     msg: UnsafeCell<Option<T>>,
43 }
44
45 impl<T> Packet<T> {
46     /// Creates an empty packet on the stack.
47     fn empty_on_stack() -> Packet<T> {
48         Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
49     }
50
51     /// Creates a packet on the stack, containing a message.
52     fn message_on_stack(msg: T) -> Packet<T> {
53         Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
54     }
55
56     /// Waits until the packet becomes ready for reading or writing.
57     fn wait_ready(&self) {
58         let backoff = Backoff::new();
59         while !self.ready.load(Ordering::Acquire) {
60             backoff.snooze();
61         }
62     }
63 }
64
65 /// Inner representation of a zero-capacity channel.
66 struct Inner {
67     /// Senders waiting to pair up with a receive operation.
68     senders: Waker,
69
70     /// Receivers waiting to pair up with a send operation.
71     receivers: Waker,
72
73     /// Equals `true` when the channel is disconnected.
74     is_disconnected: bool,
75 }
76
77 /// Zero-capacity channel.
78 pub(crate) struct Channel<T> {
79     /// Inner representation of the channel.
80     inner: Mutex<Inner>,
81
82     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
83     _marker: PhantomData<T>,
84 }
85
86 impl<T> Channel<T> {
87     /// Constructs a new zero-capacity channel.
88     pub(crate) fn new() -> Self {
89         Channel {
90             inner: Mutex::new(Inner {
91                 senders: Waker::new(),
92                 receivers: Waker::new(),
93                 is_disconnected: false,
94             }),
95             _marker: PhantomData,
96         }
97     }
98
99     /// Writes a message into the packet.
100     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
101         // If there is no packet, the channel is disconnected.
102         if token.zero.0.is_null() {
103             return Err(msg);
104         }
105
106         let packet = &*(token.zero.0 as *const Packet<T>);
107         packet.msg.get().write(Some(msg));
108         packet.ready.store(true, Ordering::Release);
109         Ok(())
110     }
111
112     /// Reads a message from the packet.
113     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
114         // If there is no packet, the channel is disconnected.
115         if token.zero.0.is_null() {
116             return Err(());
117         }
118
119         let packet = &*(token.zero.0 as *const Packet<T>);
120
121         if packet.on_stack {
122             // The message has been in the packet from the beginning, so there is no need to wait
123             // for it. However, after reading the message, we need to set `ready` to `true` in
124             // order to signal that the packet can be destroyed.
125             let msg = packet.msg.get().replace(None).unwrap();
126             packet.ready.store(true, Ordering::Release);
127             Ok(msg)
128         } else {
129             // Wait until the message becomes available, then read it and destroy the
130             // heap-allocated packet.
131             packet.wait_ready();
132             let msg = packet.msg.get().replace(None).unwrap();
133             drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
134             Ok(msg)
135         }
136     }
137
138     /// Attempts to send a message into the channel.
139     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
140         let token = &mut Token::default();
141         let mut inner = self.inner.lock().unwrap();
142
143         // If there's a waiting receiver, pair up with it.
144         if let Some(operation) = inner.receivers.try_select() {
145             token.zero.0 = operation.packet;
146             drop(inner);
147             unsafe {
148                 self.write(token, msg).ok().unwrap();
149             }
150             Ok(())
151         } else if inner.is_disconnected {
152             Err(TrySendError::Disconnected(msg))
153         } else {
154             Err(TrySendError::Full(msg))
155         }
156     }
157
158     /// Sends a message into the channel.
159     pub(crate) fn send(
160         &self,
161         msg: T,
162         deadline: Option<Instant>,
163     ) -> Result<(), SendTimeoutError<T>> {
164         let token = &mut Token::default();
165         let mut inner = self.inner.lock().unwrap();
166
167         // If there's a waiting receiver, pair up with it.
168         if let Some(operation) = inner.receivers.try_select() {
169             token.zero.0 = operation.packet;
170             drop(inner);
171             unsafe {
172                 self.write(token, msg).ok().unwrap();
173             }
174             return Ok(());
175         }
176
177         if inner.is_disconnected {
178             return Err(SendTimeoutError::Disconnected(msg));
179         }
180
181         Context::with(|cx| {
182             // Prepare for blocking until a receiver wakes us up.
183             let oper = Operation::hook(token);
184             let mut packet = Packet::<T>::message_on_stack(msg);
185             inner.senders.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
186             inner.receivers.notify();
187             drop(inner);
188
189             // Block the current thread.
190             let sel = cx.wait_until(deadline);
191
192             match sel {
193                 Selected::Waiting => unreachable!(),
194                 Selected::Aborted => {
195                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
196                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
197                     Err(SendTimeoutError::Timeout(msg))
198                 }
199                 Selected::Disconnected => {
200                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
201                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
202                     Err(SendTimeoutError::Disconnected(msg))
203                 }
204                 Selected::Operation(_) => {
205                     // Wait until the message is read, then drop the packet.
206                     packet.wait_ready();
207                     Ok(())
208                 }
209             }
210         })
211     }
212
213     /// Attempts to receive a message without blocking.
214     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
215         let token = &mut Token::default();
216         let mut inner = self.inner.lock().unwrap();
217
218         // If there's a waiting sender, pair up with it.
219         if let Some(operation) = inner.senders.try_select() {
220             token.zero.0 = operation.packet;
221             drop(inner);
222             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
223         } else if inner.is_disconnected {
224             Err(TryRecvError::Disconnected)
225         } else {
226             Err(TryRecvError::Empty)
227         }
228     }
229
230     /// Receives a message from the channel.
231     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
232         let token = &mut Token::default();
233         let mut inner = self.inner.lock().unwrap();
234
235         // If there's a waiting sender, pair up with it.
236         if let Some(operation) = inner.senders.try_select() {
237             token.zero.0 = operation.packet;
238             drop(inner);
239             unsafe {
240                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
241             }
242         }
243
244         if inner.is_disconnected {
245             return Err(RecvTimeoutError::Disconnected);
246         }
247
248         Context::with(|cx| {
249             // Prepare for blocking until a sender wakes us up.
250             let oper = Operation::hook(token);
251             let mut packet = Packet::<T>::empty_on_stack();
252             inner.receivers.register_with_packet(
253                 oper,
254                 &mut packet as *mut Packet<T> as *mut (),
255                 cx,
256             );
257             inner.senders.notify();
258             drop(inner);
259
260             // Block the current thread.
261             let sel = cx.wait_until(deadline);
262
263             match sel {
264                 Selected::Waiting => unreachable!(),
265                 Selected::Aborted => {
266                     self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
267                     Err(RecvTimeoutError::Timeout)
268                 }
269                 Selected::Disconnected => {
270                     self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
271                     Err(RecvTimeoutError::Disconnected)
272                 }
273                 Selected::Operation(_) => {
274                     // Wait until the message is provided, then read it.
275                     packet.wait_ready();
276                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
277                 }
278             }
279         })
280     }
281
282     /// Disconnects the channel and wakes up all blocked senders and receivers.
283     ///
284     /// Returns `true` if this call disconnected the channel.
285     pub(crate) fn disconnect(&self) -> bool {
286         let mut inner = self.inner.lock().unwrap();
287
288         if !inner.is_disconnected {
289             inner.is_disconnected = true;
290             inner.senders.disconnect();
291             inner.receivers.disconnect();
292             true
293         } else {
294             false
295         }
296     }
297
298     /// Returns the current number of messages inside the channel.
299     pub(crate) fn len(&self) -> usize {
300         0
301     }
302
303     /// Returns the capacity of the channel.
304     #[allow(clippy::unnecessary_wraps)] // This is intentional.
305     pub(crate) fn capacity(&self) -> Option<usize> {
306         Some(0)
307     }
308
309     /// Returns `true` if the channel is empty.
310     pub(crate) fn is_empty(&self) -> bool {
311         true
312     }
313
314     /// Returns `true` if the channel is full.
315     pub(crate) fn is_full(&self) -> bool {
316         true
317     }
318 }