]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/oneshot.rs
Remove the unstable and deprecated mpsc_select
[rust.git] / src / libstd / sync / mpsc / oneshot.rs
1 /// Oneshot channels/ports
2 ///
3 /// This is the initial flavor of channels/ports used for comm module. This is
4 /// an optimization for the one-use case of a channel. The major optimization of
5 /// this type is to have one and exactly one allocation when the chan/port pair
6 /// is created.
7 ///
8 /// Another possible optimization would be to not use an Arc box because
9 /// in theory we know when the shared packet can be deallocated (no real need
10 /// for the atomic reference counting), but I was having trouble how to destroy
11 /// the data early in a drop of a Port.
12 ///
13 /// # Implementation
14 ///
15 /// Oneshots are implemented around one atomic usize variable. This variable
16 /// indicates both the state of the port/chan but also contains any threads
17 /// blocked on the port. All atomic operations happen on this one word.
18 ///
19 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
20 /// on behalf of the channel side of things (it can be mentally thought of as
21 /// consuming the port). This upgrade is then also stored in the shared packet.
22 /// The one caveat to consider is that when a port sees a disconnected channel
23 /// it must check for data because there is no "data plus upgrade" state.
24
25 pub use self::Failure::*;
26 pub use self::UpgradeResult::*;
27 use self::MyUpgrade::*;
28
29 use crate::sync::mpsc::Receiver;
30 use crate::sync::mpsc::blocking::{self, SignalToken};
31 use crate::cell::UnsafeCell;
32 use crate::ptr;
33 use crate::sync::atomic::{AtomicUsize, Ordering};
34 use crate::time::Instant;
35
36 // Various states you can find a port in.
37 const EMPTY: usize = 0;          // initial state: no data, no blocked receiver
38 const DATA: usize = 1;           // data ready for receiver to take
39 const DISCONNECTED: usize = 2;   // channel is disconnected OR upgraded
40 // Any other value represents a pointer to a SignalToken value. The
41 // protocol ensures that when the state moves *to* a pointer,
42 // ownership of the token is given to the packet, and when the state
43 // moves *from* a pointer, ownership of the token is transferred to
44 // whoever changed the state.
45
46 pub struct Packet<T> {
47     // Internal state of the chan/port pair (stores the blocked thread as well)
48     state: AtomicUsize,
49     // One-shot data slot location
50     data: UnsafeCell<Option<T>>,
51     // when used for the second time, a oneshot channel must be upgraded, and
52     // this contains the slot for the upgrade
53     upgrade: UnsafeCell<MyUpgrade<T>>,
54 }
55
56 pub enum Failure<T> {
57     Empty,
58     Disconnected,
59     Upgraded(Receiver<T>),
60 }
61
62 pub enum UpgradeResult {
63     UpSuccess,
64     UpDisconnected,
65     UpWoke(SignalToken),
66 }
67
68 enum MyUpgrade<T> {
69     NothingSent,
70     SendUsed,
71     GoUp(Receiver<T>),
72 }
73
74 impl<T> Packet<T> {
75     pub fn new() -> Packet<T> {
76         Packet {
77             data: UnsafeCell::new(None),
78             upgrade: UnsafeCell::new(NothingSent),
79             state: AtomicUsize::new(EMPTY),
80         }
81     }
82
83     pub fn send(&self, t: T) -> Result<(), T> {
84         unsafe {
85             // Sanity check
86             match *self.upgrade.get() {
87                 NothingSent => {}
88                 _ => panic!("sending on a oneshot that's already sent on "),
89             }
90             assert!((*self.data.get()).is_none());
91             ptr::write(self.data.get(), Some(t));
92             ptr::write(self.upgrade.get(), SendUsed);
93
94             match self.state.swap(DATA, Ordering::SeqCst) {
95                 // Sent the data, no one was waiting
96                 EMPTY => Ok(()),
97
98                 // Couldn't send the data, the port hung up first. Return the data
99                 // back up the stack.
100                 DISCONNECTED => {
101                     self.state.swap(DISCONNECTED, Ordering::SeqCst);
102                     ptr::write(self.upgrade.get(), NothingSent);
103                     Err((&mut *self.data.get()).take().unwrap())
104                 }
105
106                 // Not possible, these are one-use channels
107                 DATA => unreachable!(),
108
109                 // There is a thread waiting on the other end. We leave the 'DATA'
110                 // state inside so it'll pick it up on the other end.
111                 ptr => {
112                     SignalToken::cast_from_usize(ptr).signal();
113                     Ok(())
114                 }
115             }
116         }
117     }
118
119     // Just tests whether this channel has been sent on or not, this is only
120     // safe to use from the sender.
121     pub fn sent(&self) -> bool {
122         unsafe {
123             match *self.upgrade.get() {
124                 NothingSent => false,
125                 _ => true,
126             }
127         }
128     }
129
130     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
131         // Attempt to not block the thread (it's a little expensive). If it looks
132         // like we're not empty, then immediately go through to `try_recv`.
133         if self.state.load(Ordering::SeqCst) == EMPTY {
134             let (wait_token, signal_token) = blocking::tokens();
135             let ptr = unsafe { signal_token.cast_to_usize() };
136
137             // race with senders to enter the blocking state
138             if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
139                 if let Some(deadline) = deadline {
140                     let timed_out = !wait_token.wait_max_until(deadline);
141                     // Try to reset the state
142                     if timed_out {
143                         self.abort_selection().map_err(Upgraded)?;
144                     }
145                 } else {
146                     wait_token.wait();
147                     debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
148                 }
149             } else {
150                 // drop the signal token, since we never blocked
151                 drop(unsafe { SignalToken::cast_from_usize(ptr) });
152             }
153         }
154
155         self.try_recv()
156     }
157
158     pub fn try_recv(&self) -> Result<T, Failure<T>> {
159         unsafe {
160             match self.state.load(Ordering::SeqCst) {
161                 EMPTY => Err(Empty),
162
163                 // We saw some data on the channel, but the channel can be used
164                 // again to send us an upgrade. As a result, we need to re-insert
165                 // into the channel that there's no data available (otherwise we'll
166                 // just see DATA next time). This is done as a cmpxchg because if
167                 // the state changes under our feet we'd rather just see that state
168                 // change.
169                 DATA => {
170                     self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
171                     match (&mut *self.data.get()).take() {
172                         Some(data) => Ok(data),
173                         None => unreachable!(),
174                     }
175                 }
176
177                 // There's no guarantee that we receive before an upgrade happens,
178                 // and an upgrade flags the channel as disconnected, so when we see
179                 // this we first need to check if there's data available and *then*
180                 // we go through and process the upgrade.
181                 DISCONNECTED => {
182                     match (&mut *self.data.get()).take() {
183                         Some(data) => Ok(data),
184                         None => {
185                             match ptr::replace(self.upgrade.get(), SendUsed) {
186                                 SendUsed | NothingSent => Err(Disconnected),
187                                 GoUp(upgrade) => Err(Upgraded(upgrade))
188                             }
189                         }
190                     }
191                 }
192
193                 // We are the sole receiver; there cannot be a blocking
194                 // receiver already.
195                 _ => unreachable!()
196             }
197         }
198     }
199
200     // Returns whether the upgrade was completed. If the upgrade wasn't
201     // completed, then the port couldn't get sent to the other half (it will
202     // never receive it).
203     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
204         unsafe {
205             let prev = match *self.upgrade.get() {
206                 NothingSent => NothingSent,
207                 SendUsed => SendUsed,
208                 _ => panic!("upgrading again"),
209             };
210             ptr::write(self.upgrade.get(), GoUp(up));
211
212             match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
213                 // If the channel is empty or has data on it, then we're good to go.
214                 // Senders will check the data before the upgrade (in case we
215                 // plastered over the DATA state).
216                 DATA | EMPTY => UpSuccess,
217
218                 // If the other end is already disconnected, then we failed the
219                 // upgrade. Be sure to trash the port we were given.
220                 DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
221
222                 // If someone's waiting, we gotta wake them up
223                 ptr => UpWoke(SignalToken::cast_from_usize(ptr))
224             }
225         }
226     }
227
228     pub fn drop_chan(&self) {
229         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
230             DATA | DISCONNECTED | EMPTY => {}
231
232             // If someone's waiting, we gotta wake them up
233             ptr => unsafe {
234                 SignalToken::cast_from_usize(ptr).signal();
235             }
236         }
237     }
238
239     pub fn drop_port(&self) {
240         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
241             // An empty channel has nothing to do, and a remotely disconnected
242             // channel also has nothing to do b/c we're about to run the drop
243             // glue
244             DISCONNECTED | EMPTY => {}
245
246             // There's data on the channel, so make sure we destroy it promptly.
247             // This is why not using an arc is a little difficult (need the box
248             // to stay valid while we take the data).
249             DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
250
251             // We're the only ones that can block on this port
252             _ => unreachable!()
253         }
254     }
255
256     ////////////////////////////////////////////////////////////////////////////
257     // select implementation
258     ////////////////////////////////////////////////////////////////////////////
259
260     // Remove a previous selecting thread from this port. This ensures that the
261     // blocked thread will no longer be visible to any other threads.
262     //
263     // The return value indicates whether there's data on this port.
264     pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
265         let state = match self.state.load(Ordering::SeqCst) {
266             // Each of these states means that no further activity will happen
267             // with regard to abortion selection
268             s @ EMPTY |
269             s @ DATA |
270             s @ DISCONNECTED => s,
271
272             // If we've got a blocked thread, then use an atomic to gain ownership
273             // of it (may fail)
274             ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
275         };
276
277         // Now that we've got ownership of our state, figure out what to do
278         // about it.
279         match state {
280             EMPTY => unreachable!(),
281             // our thread used for select was stolen
282             DATA => Ok(true),
283
284             // If the other end has hung up, then we have complete ownership
285             // of the port. First, check if there was data waiting for us. This
286             // is possible if the other end sent something and then hung up.
287             //
288             // We then need to check to see if there was an upgrade requested,
289             // and if so, the upgraded port needs to have its selection aborted.
290             DISCONNECTED => unsafe {
291                 if (*self.data.get()).is_some() {
292                     Ok(true)
293                 } else {
294                     match ptr::replace(self.upgrade.get(), SendUsed) {
295                         GoUp(port) => Err(port),
296                         _ => Ok(true),
297                     }
298                 }
299             },
300
301             // We woke ourselves up from select.
302             ptr => unsafe {
303                 drop(SignalToken::cast_from_usize(ptr));
304                 Ok(false)
305             }
306         }
307     }
308 }
309
310 impl<T> Drop for Packet<T> {
311     fn drop(&mut self) {
312         assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
313     }
314 }