]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/oneshot.rs
Improve type size assertions
[rust.git] / src / libstd / sync / mpsc / oneshot.rs
1 /// Oneshot channels/ports
2 ///
3 /// This is the initial flavor of channels/ports used for comm module. This is
4 /// an optimization for the one-use case of a channel. The major optimization of
5 /// this type is to have one and exactly one allocation when the chan/port pair
6 /// is created.
7 ///
8 /// Another possible optimization would be to not use an Arc box because
9 /// in theory we know when the shared packet can be deallocated (no real need
10 /// for the atomic reference counting), but I was having trouble how to destroy
11 /// the data early in a drop of a Port.
12 ///
13 /// # Implementation
14 ///
15 /// Oneshots are implemented around one atomic usize variable. This variable
16 /// indicates both the state of the port/chan but also contains any threads
17 /// blocked on the port. All atomic operations happen on this one word.
18 ///
19 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
20 /// on behalf of the channel side of things (it can be mentally thought of as
21 /// consuming the port). This upgrade is then also stored in the shared packet.
22 /// The one caveat to consider is that when a port sees a disconnected channel
23 /// it must check for data because there is no "data plus upgrade" state.
24
25 pub use self::Failure::*;
26 pub use self::UpgradeResult::*;
27 pub use self::SelectionResult::*;
28 use self::MyUpgrade::*;
29
30 use crate::sync::mpsc::Receiver;
31 use crate::sync::mpsc::blocking::{self, SignalToken};
32 use crate::cell::UnsafeCell;
33 use crate::ptr;
34 use crate::sync::atomic::{AtomicUsize, Ordering};
35 use crate::time::Instant;
36
37 // Various states you can find a port in.
38 const EMPTY: usize = 0;          // initial state: no data, no blocked receiver
39 const DATA: usize = 1;           // data ready for receiver to take
40 const DISCONNECTED: usize = 2;   // channel is disconnected OR upgraded
41 // Any other value represents a pointer to a SignalToken value. The
42 // protocol ensures that when the state moves *to* a pointer,
43 // ownership of the token is given to the packet, and when the state
44 // moves *from* a pointer, ownership of the token is transferred to
45 // whoever changed the state.
46
47 pub struct Packet<T> {
48     // Internal state of the chan/port pair (stores the blocked thread as well)
49     state: AtomicUsize,
50     // One-shot data slot location
51     data: UnsafeCell<Option<T>>,
52     // when used for the second time, a oneshot channel must be upgraded, and
53     // this contains the slot for the upgrade
54     upgrade: UnsafeCell<MyUpgrade<T>>,
55 }
56
57 pub enum Failure<T> {
58     Empty,
59     Disconnected,
60     Upgraded(Receiver<T>),
61 }
62
63 pub enum UpgradeResult {
64     UpSuccess,
65     UpDisconnected,
66     UpWoke(SignalToken),
67 }
68
69 pub enum SelectionResult<T> {
70     SelCanceled,
71     SelUpgraded(SignalToken, Receiver<T>),
72     SelSuccess,
73 }
74
75 enum MyUpgrade<T> {
76     NothingSent,
77     SendUsed,
78     GoUp(Receiver<T>),
79 }
80
81 impl<T> Packet<T> {
82     pub fn new() -> Packet<T> {
83         Packet {
84             data: UnsafeCell::new(None),
85             upgrade: UnsafeCell::new(NothingSent),
86             state: AtomicUsize::new(EMPTY),
87         }
88     }
89
90     pub fn send(&self, t: T) -> Result<(), T> {
91         unsafe {
92             // Sanity check
93             match *self.upgrade.get() {
94                 NothingSent => {}
95                 _ => panic!("sending on a oneshot that's already sent on "),
96             }
97             assert!((*self.data.get()).is_none());
98             ptr::write(self.data.get(), Some(t));
99             ptr::write(self.upgrade.get(), SendUsed);
100
101             match self.state.swap(DATA, Ordering::SeqCst) {
102                 // Sent the data, no one was waiting
103                 EMPTY => Ok(()),
104
105                 // Couldn't send the data, the port hung up first. Return the data
106                 // back up the stack.
107                 DISCONNECTED => {
108                     self.state.swap(DISCONNECTED, Ordering::SeqCst);
109                     ptr::write(self.upgrade.get(), NothingSent);
110                     Err((&mut *self.data.get()).take().unwrap())
111                 }
112
113                 // Not possible, these are one-use channels
114                 DATA => unreachable!(),
115
116                 // There is a thread waiting on the other end. We leave the 'DATA'
117                 // state inside so it'll pick it up on the other end.
118                 ptr => {
119                     SignalToken::cast_from_usize(ptr).signal();
120                     Ok(())
121                 }
122             }
123         }
124     }
125
126     // Just tests whether this channel has been sent on or not, this is only
127     // safe to use from the sender.
128     pub fn sent(&self) -> bool {
129         unsafe {
130             match *self.upgrade.get() {
131                 NothingSent => false,
132                 _ => true,
133             }
134         }
135     }
136
137     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
138         // Attempt to not block the thread (it's a little expensive). If it looks
139         // like we're not empty, then immediately go through to `try_recv`.
140         if self.state.load(Ordering::SeqCst) == EMPTY {
141             let (wait_token, signal_token) = blocking::tokens();
142             let ptr = unsafe { signal_token.cast_to_usize() };
143
144             // race with senders to enter the blocking state
145             if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
146                 if let Some(deadline) = deadline {
147                     let timed_out = !wait_token.wait_max_until(deadline);
148                     // Try to reset the state
149                     if timed_out {
150                         self.abort_selection().map_err(Upgraded)?;
151                     }
152                 } else {
153                     wait_token.wait();
154                     debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
155                 }
156             } else {
157                 // drop the signal token, since we never blocked
158                 drop(unsafe { SignalToken::cast_from_usize(ptr) });
159             }
160         }
161
162         self.try_recv()
163     }
164
165     pub fn try_recv(&self) -> Result<T, Failure<T>> {
166         unsafe {
167             match self.state.load(Ordering::SeqCst) {
168                 EMPTY => Err(Empty),
169
170                 // We saw some data on the channel, but the channel can be used
171                 // again to send us an upgrade. As a result, we need to re-insert
172                 // into the channel that there's no data available (otherwise we'll
173                 // just see DATA next time). This is done as a cmpxchg because if
174                 // the state changes under our feet we'd rather just see that state
175                 // change.
176                 DATA => {
177                     self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
178                     match (&mut *self.data.get()).take() {
179                         Some(data) => Ok(data),
180                         None => unreachable!(),
181                     }
182                 }
183
184                 // There's no guarantee that we receive before an upgrade happens,
185                 // and an upgrade flags the channel as disconnected, so when we see
186                 // this we first need to check if there's data available and *then*
187                 // we go through and process the upgrade.
188                 DISCONNECTED => {
189                     match (&mut *self.data.get()).take() {
190                         Some(data) => Ok(data),
191                         None => {
192                             match ptr::replace(self.upgrade.get(), SendUsed) {
193                                 SendUsed | NothingSent => Err(Disconnected),
194                                 GoUp(upgrade) => Err(Upgraded(upgrade))
195                             }
196                         }
197                     }
198                 }
199
200                 // We are the sole receiver; there cannot be a blocking
201                 // receiver already.
202                 _ => unreachable!()
203             }
204         }
205     }
206
207     // Returns whether the upgrade was completed. If the upgrade wasn't
208     // completed, then the port couldn't get sent to the other half (it will
209     // never receive it).
210     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
211         unsafe {
212             let prev = match *self.upgrade.get() {
213                 NothingSent => NothingSent,
214                 SendUsed => SendUsed,
215                 _ => panic!("upgrading again"),
216             };
217             ptr::write(self.upgrade.get(), GoUp(up));
218
219             match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
220                 // If the channel is empty or has data on it, then we're good to go.
221                 // Senders will check the data before the upgrade (in case we
222                 // plastered over the DATA state).
223                 DATA | EMPTY => UpSuccess,
224
225                 // If the other end is already disconnected, then we failed the
226                 // upgrade. Be sure to trash the port we were given.
227                 DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
228
229                 // If someone's waiting, we gotta wake them up
230                 ptr => UpWoke(SignalToken::cast_from_usize(ptr))
231             }
232         }
233     }
234
235     pub fn drop_chan(&self) {
236         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
237             DATA | DISCONNECTED | EMPTY => {}
238
239             // If someone's waiting, we gotta wake them up
240             ptr => unsafe {
241                 SignalToken::cast_from_usize(ptr).signal();
242             }
243         }
244     }
245
246     pub fn drop_port(&self) {
247         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
248             // An empty channel has nothing to do, and a remotely disconnected
249             // channel also has nothing to do b/c we're about to run the drop
250             // glue
251             DISCONNECTED | EMPTY => {}
252
253             // There's data on the channel, so make sure we destroy it promptly.
254             // This is why not using an arc is a little difficult (need the box
255             // to stay valid while we take the data).
256             DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
257
258             // We're the only ones that can block on this port
259             _ => unreachable!()
260         }
261     }
262
263     ////////////////////////////////////////////////////////////////////////////
264     // select implementation
265     ////////////////////////////////////////////////////////////////////////////
266
267     // If Ok, the value is whether this port has data, if Err, then the upgraded
268     // port needs to be checked instead of this one.
269     pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
270         unsafe {
271             match self.state.load(Ordering::SeqCst) {
272                 EMPTY => Ok(false), // Welp, we tried
273                 DATA => Ok(true),   // we have some un-acquired data
274                 DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
275                 DISCONNECTED => {
276                     match ptr::replace(self.upgrade.get(), SendUsed) {
277                         // The other end sent us an upgrade, so we need to
278                         // propagate upwards whether the upgrade can receive
279                         // data
280                         GoUp(upgrade) => Err(upgrade),
281
282                         // If the other end disconnected without sending an
283                         // upgrade, then we have data to receive (the channel is
284                         // disconnected).
285                         up => { ptr::write(self.upgrade.get(), up); Ok(true) }
286                     }
287                 }
288                 _ => unreachable!(), // we're the "one blocker"
289             }
290         }
291     }
292
293     // Attempts to start selection on this port. This can either succeed, fail
294     // because there is data, or fail because there is an upgrade pending.
295     pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
296         unsafe {
297             let ptr = token.cast_to_usize();
298             match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
299                 EMPTY => SelSuccess,
300                 DATA => {
301                     drop(SignalToken::cast_from_usize(ptr));
302                     SelCanceled
303                 }
304                 DISCONNECTED if (*self.data.get()).is_some() => {
305                     drop(SignalToken::cast_from_usize(ptr));
306                     SelCanceled
307                 }
308                 DISCONNECTED => {
309                     match ptr::replace(self.upgrade.get(), SendUsed) {
310                         // The other end sent us an upgrade, so we need to
311                         // propagate upwards whether the upgrade can receive
312                         // data
313                         GoUp(upgrade) => {
314                             SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
315                         }
316
317                         // If the other end disconnected without sending an
318                         // upgrade, then we have data to receive (the channel is
319                         // disconnected).
320                         up => {
321                             ptr::write(self.upgrade.get(), up);
322                             drop(SignalToken::cast_from_usize(ptr));
323                             SelCanceled
324                         }
325                     }
326                 }
327                 _ => unreachable!(), // we're the "one blocker"
328             }
329         }
330     }
331
332     // Remove a previous selecting thread from this port. This ensures that the
333     // blocked thread will no longer be visible to any other threads.
334     //
335     // The return value indicates whether there's data on this port.
336     pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
337         let state = match self.state.load(Ordering::SeqCst) {
338             // Each of these states means that no further activity will happen
339             // with regard to abortion selection
340             s @ EMPTY |
341             s @ DATA |
342             s @ DISCONNECTED => s,
343
344             // If we've got a blocked thread, then use an atomic to gain ownership
345             // of it (may fail)
346             ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
347         };
348
349         // Now that we've got ownership of our state, figure out what to do
350         // about it.
351         match state {
352             EMPTY => unreachable!(),
353             // our thread used for select was stolen
354             DATA => Ok(true),
355
356             // If the other end has hung up, then we have complete ownership
357             // of the port. First, check if there was data waiting for us. This
358             // is possible if the other end sent something and then hung up.
359             //
360             // We then need to check to see if there was an upgrade requested,
361             // and if so, the upgraded port needs to have its selection aborted.
362             DISCONNECTED => unsafe {
363                 if (*self.data.get()).is_some() {
364                     Ok(true)
365                 } else {
366                     match ptr::replace(self.upgrade.get(), SendUsed) {
367                         GoUp(port) => Err(port),
368                         _ => Ok(true),
369                     }
370                 }
371             },
372
373             // We woke ourselves up from select.
374             ptr => unsafe {
375                 drop(SignalToken::cast_from_usize(ptr));
376                 Ok(false)
377             }
378         }
379     }
380 }
381
382 impl<T> Drop for Packet<T> {
383     fn drop(&mut self) {
384         assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
385     }
386 }