]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/oneshot.rs
Rollup merge of #95876 - fee1-dead:note-const-drop, r=oli-obk
[rust.git] / library / std / src / sync / mpsc / oneshot.rs
1 /// Oneshot channels/ports
2 ///
3 /// This is the initial flavor of channels/ports used for comm module. This is
4 /// an optimization for the one-use case of a channel. The major optimization of
5 /// this type is to have one and exactly one allocation when the chan/port pair
6 /// is created.
7 ///
8 /// Another possible optimization would be to not use an Arc box because
9 /// in theory we know when the shared packet can be deallocated (no real need
10 /// for the atomic reference counting), but I was having trouble how to destroy
11 /// the data early in a drop of a Port.
12 ///
13 /// # Implementation
14 ///
15 /// Oneshots are implemented around one atomic usize variable. This variable
16 /// indicates both the state of the port/chan but also contains any threads
17 /// blocked on the port. All atomic operations happen on this one word.
18 ///
19 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
20 /// on behalf of the channel side of things (it can be mentally thought of as
21 /// consuming the port). This upgrade is then also stored in the shared packet.
22 /// The one caveat to consider is that when a port sees a disconnected channel
23 /// it must check for data because there is no "data plus upgrade" state.
24 pub use self::Failure::*;
25 use self::MyUpgrade::*;
26 pub use self::UpgradeResult::*;
27
28 use crate::cell::UnsafeCell;
29 use crate::ptr;
30 use crate::sync::atomic::{AtomicPtr, Ordering};
31 use crate::sync::mpsc::blocking::{self, SignalToken};
32 use crate::sync::mpsc::Receiver;
33 use crate::time::Instant;
34
35 // Various states you can find a port in.
36 const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
37 const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
38 const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
39 // Any other value represents a pointer to a SignalToken value. The
40 // protocol ensures that when the state moves *to* a pointer,
41 // ownership of the token is given to the packet, and when the state
42 // moves *from* a pointer, ownership of the token is transferred to
43 // whoever changed the state.
44
45 pub struct Packet<T> {
46     // Internal state of the chan/port pair (stores the blocked thread as well)
47     state: AtomicPtr<u8>,
48     // One-shot data slot location
49     data: UnsafeCell<Option<T>>,
50     // when used for the second time, a oneshot channel must be upgraded, and
51     // this contains the slot for the upgrade
52     upgrade: UnsafeCell<MyUpgrade<T>>,
53 }
54
55 pub enum Failure<T> {
56     Empty,
57     Disconnected,
58     Upgraded(Receiver<T>),
59 }
60
61 pub enum UpgradeResult {
62     UpSuccess,
63     UpDisconnected,
64     UpWoke(SignalToken),
65 }
66
67 enum MyUpgrade<T> {
68     NothingSent,
69     SendUsed,
70     GoUp(Receiver<T>),
71 }
72
73 impl<T> Packet<T> {
74     pub fn new() -> Packet<T> {
75         Packet {
76             data: UnsafeCell::new(None),
77             upgrade: UnsafeCell::new(NothingSent),
78             state: AtomicPtr::new(EMPTY),
79         }
80     }
81
82     pub fn send(&self, t: T) -> Result<(), T> {
83         unsafe {
84             // Sanity check
85             match *self.upgrade.get() {
86                 NothingSent => {}
87                 _ => panic!("sending on a oneshot that's already sent on "),
88             }
89             assert!((*self.data.get()).is_none());
90             ptr::write(self.data.get(), Some(t));
91             ptr::write(self.upgrade.get(), SendUsed);
92
93             match self.state.swap(DATA, Ordering::SeqCst) {
94                 // Sent the data, no one was waiting
95                 EMPTY => Ok(()),
96
97                 // Couldn't send the data, the port hung up first. Return the data
98                 // back up the stack.
99                 DISCONNECTED => {
100                     self.state.swap(DISCONNECTED, Ordering::SeqCst);
101                     ptr::write(self.upgrade.get(), NothingSent);
102                     Err((&mut *self.data.get()).take().unwrap())
103                 }
104
105                 // Not possible, these are one-use channels
106                 DATA => unreachable!(),
107
108                 // There is a thread waiting on the other end. We leave the 'DATA'
109                 // state inside so it'll pick it up on the other end.
110                 ptr => {
111                     SignalToken::from_raw(ptr).signal();
112                     Ok(())
113                 }
114             }
115         }
116     }
117
118     // Just tests whether this channel has been sent on or not, this is only
119     // safe to use from the sender.
120     pub fn sent(&self) -> bool {
121         unsafe { !matches!(*self.upgrade.get(), NothingSent) }
122     }
123
124     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
125         // Attempt to not block the thread (it's a little expensive). If it looks
126         // like we're not empty, then immediately go through to `try_recv`.
127         if self.state.load(Ordering::SeqCst) == EMPTY {
128             let (wait_token, signal_token) = blocking::tokens();
129             let ptr = unsafe { signal_token.to_raw() };
130
131             // race with senders to enter the blocking state
132             if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
133                 if let Some(deadline) = deadline {
134                     let timed_out = !wait_token.wait_max_until(deadline);
135                     // Try to reset the state
136                     if timed_out {
137                         self.abort_selection().map_err(Upgraded)?;
138                     }
139                 } else {
140                     wait_token.wait();
141                     debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
142                 }
143             } else {
144                 // drop the signal token, since we never blocked
145                 drop(unsafe { SignalToken::from_raw(ptr) });
146             }
147         }
148
149         self.try_recv()
150     }
151
152     pub fn try_recv(&self) -> Result<T, Failure<T>> {
153         unsafe {
154             match self.state.load(Ordering::SeqCst) {
155                 EMPTY => Err(Empty),
156
157                 // We saw some data on the channel, but the channel can be used
158                 // again to send us an upgrade. As a result, we need to re-insert
159                 // into the channel that there's no data available (otherwise we'll
160                 // just see DATA next time). This is done as a cmpxchg because if
161                 // the state changes under our feet we'd rather just see that state
162                 // change.
163                 DATA => {
164                     let _ = self.state.compare_exchange(
165                         DATA,
166                         EMPTY,
167                         Ordering::SeqCst,
168                         Ordering::SeqCst,
169                     );
170                     match (&mut *self.data.get()).take() {
171                         Some(data) => Ok(data),
172                         None => unreachable!(),
173                     }
174                 }
175
176                 // There's no guarantee that we receive before an upgrade happens,
177                 // and an upgrade flags the channel as disconnected, so when we see
178                 // this we first need to check if there's data available and *then*
179                 // we go through and process the upgrade.
180                 DISCONNECTED => match (&mut *self.data.get()).take() {
181                     Some(data) => Ok(data),
182                     None => match ptr::replace(self.upgrade.get(), SendUsed) {
183                         SendUsed | NothingSent => Err(Disconnected),
184                         GoUp(upgrade) => Err(Upgraded(upgrade)),
185                     },
186                 },
187
188                 // We are the sole receiver; there cannot be a blocking
189                 // receiver already.
190                 _ => unreachable!(),
191             }
192         }
193     }
194
195     // Returns whether the upgrade was completed. If the upgrade wasn't
196     // completed, then the port couldn't get sent to the other half (it will
197     // never receive it).
198     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
199         unsafe {
200             let prev = match *self.upgrade.get() {
201                 NothingSent => NothingSent,
202                 SendUsed => SendUsed,
203                 _ => panic!("upgrading again"),
204             };
205             ptr::write(self.upgrade.get(), GoUp(up));
206
207             match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
208                 // If the channel is empty or has data on it, then we're good to go.
209                 // Senders will check the data before the upgrade (in case we
210                 // plastered over the DATA state).
211                 DATA | EMPTY => UpSuccess,
212
213                 // If the other end is already disconnected, then we failed the
214                 // upgrade. Be sure to trash the port we were given.
215                 DISCONNECTED => {
216                     ptr::replace(self.upgrade.get(), prev);
217                     UpDisconnected
218                 }
219
220                 // If someone's waiting, we gotta wake them up
221                 ptr => UpWoke(SignalToken::from_raw(ptr)),
222             }
223         }
224     }
225
226     pub fn drop_chan(&self) {
227         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
228             DATA | DISCONNECTED | EMPTY => {}
229
230             // If someone's waiting, we gotta wake them up
231             ptr => unsafe {
232                 SignalToken::from_raw(ptr).signal();
233             },
234         }
235     }
236
237     pub fn drop_port(&self) {
238         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
239             // An empty channel has nothing to do, and a remotely disconnected
240             // channel also has nothing to do b/c we're about to run the drop
241             // glue
242             DISCONNECTED | EMPTY => {}
243
244             // There's data on the channel, so make sure we destroy it promptly.
245             // This is why not using an arc is a little difficult (need the box
246             // to stay valid while we take the data).
247             DATA => unsafe {
248                 (&mut *self.data.get()).take().unwrap();
249             },
250
251             // We're the only ones that can block on this port
252             _ => unreachable!(),
253         }
254     }
255
256     ////////////////////////////////////////////////////////////////////////////
257     // select implementation
258     ////////////////////////////////////////////////////////////////////////////
259
260     // Remove a previous selecting thread from this port. This ensures that the
261     // blocked thread will no longer be visible to any other threads.
262     //
263     // The return value indicates whether there's data on this port.
264     pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
265         let state = match self.state.load(Ordering::SeqCst) {
266             // Each of these states means that no further activity will happen
267             // with regard to abortion selection
268             s @ (EMPTY | DATA | DISCONNECTED) => s,
269
270             // If we've got a blocked thread, then use an atomic to gain ownership
271             // of it (may fail)
272             ptr => self
273                 .state
274                 .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
275                 .unwrap_or_else(|x| x),
276         };
277
278         // Now that we've got ownership of our state, figure out what to do
279         // about it.
280         match state {
281             EMPTY => unreachable!(),
282             // our thread used for select was stolen
283             DATA => Ok(true),
284
285             // If the other end has hung up, then we have complete ownership
286             // of the port. First, check if there was data waiting for us. This
287             // is possible if the other end sent something and then hung up.
288             //
289             // We then need to check to see if there was an upgrade requested,
290             // and if so, the upgraded port needs to have its selection aborted.
291             DISCONNECTED => unsafe {
292                 if (*self.data.get()).is_some() {
293                     Ok(true)
294                 } else {
295                     match ptr::replace(self.upgrade.get(), SendUsed) {
296                         GoUp(port) => Err(port),
297                         _ => Ok(true),
298                     }
299                 }
300             },
301
302             // We woke ourselves up from select.
303             ptr => unsafe {
304                 drop(SignalToken::from_raw(ptr));
305                 Ok(false)
306             },
307         }
308     }
309 }
310
311 impl<T> Drop for Packet<T> {
312     fn drop(&mut self) {
313         assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
314     }
315 }