]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/oneshot.rs
doc: remove incomplete sentence
[rust.git] / src / libstd / sync / mpsc / oneshot.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Oneshot channels/ports
12 ///
13 /// This is the initial flavor of channels/ports used for comm module. This is
14 /// an optimization for the one-use case of a channel. The major optimization of
15 /// this type is to have one and exactly one allocation when the chan/port pair
16 /// is created.
17 ///
18 /// Another possible optimization would be to not use an Arc box because
19 /// in theory we know when the shared packet can be deallocated (no real need
20 /// for the atomic reference counting), but I was having trouble how to destroy
21 /// the data early in a drop of a Port.
22 ///
23 /// # Implementation
24 ///
25 /// Oneshots are implemented around one atomic uint variable. This variable
26 /// indicates both the state of the port/chan but also contains any tasks
27 /// blocked on the port. All atomic operations happen on this one word.
28 ///
29 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
30 /// on behalf of the channel side of things (it can be mentally thought of as
31 /// consuming the port). This upgrade is then also stored in the shared packet.
32 /// The one caveat to consider is that when a port sees a disconnected channel
33 /// it must check for data because there is no "data plus upgrade" state.
34
35 pub use self::Failure::*;
36 pub use self::UpgradeResult::*;
37 pub use self::SelectionResult::*;
38 use self::MyUpgrade::*;
39
40 use core::prelude::*;
41
42 use sync::mpsc::Receiver;
43 use sync::mpsc::blocking::{mod, SignalToken};
44 use core::mem;
45 use sync::atomic;
46
47 // Various states you can find a port in.
48 const EMPTY: uint = 0;          // initial state: no data, no blocked reciever
49 const DATA: uint = 1;           // data ready for receiver to take
50 const DISCONNECTED: uint = 2;   // channel is disconnected OR upgraded
51 // Any other value represents a pointer to a SignalToken value. The
52 // protocol ensures that when the state moves *to* a pointer,
53 // ownership of the token is given to the packet, and when the state
54 // moves *from* a pointer, ownership of the token is transferred to
55 // whoever changed the state.
56
57 pub struct Packet<T> {
58     // Internal state of the chan/port pair (stores the blocked task as well)
59     state: atomic::AtomicUint,
60     // One-shot data slot location
61     data: Option<T>,
62     // when used for the second time, a oneshot channel must be upgraded, and
63     // this contains the slot for the upgrade
64     upgrade: MyUpgrade<T>,
65 }
66
67 pub enum Failure<T> {
68     Empty,
69     Disconnected,
70     Upgraded(Receiver<T>),
71 }
72
73 pub enum UpgradeResult {
74     UpSuccess,
75     UpDisconnected,
76     UpWoke(SignalToken),
77 }
78
79 pub enum SelectionResult<T> {
80     SelCanceled,
81     SelUpgraded(SignalToken, Receiver<T>),
82     SelSuccess,
83 }
84
85 enum MyUpgrade<T> {
86     NothingSent,
87     SendUsed,
88     GoUp(Receiver<T>),
89 }
90
91 impl<T: Send> Packet<T> {
92     pub fn new() -> Packet<T> {
93         Packet {
94             data: None,
95             upgrade: NothingSent,
96             state: atomic::AtomicUint::new(EMPTY),
97         }
98     }
99
100     pub fn send(&mut self, t: T) -> Result<(), T> {
101         // Sanity check
102         match self.upgrade {
103             NothingSent => {}
104             _ => panic!("sending on a oneshot that's already sent on "),
105         }
106         assert!(self.data.is_none());
107         self.data = Some(t);
108         self.upgrade = SendUsed;
109
110         match self.state.swap(DATA, atomic::SeqCst) {
111             // Sent the data, no one was waiting
112             EMPTY => Ok(()),
113
114             // Couldn't send the data, the port hung up first. Return the data
115             // back up the stack.
116             DISCONNECTED => {
117                 Err(self.data.take().unwrap())
118             }
119
120             // Not possible, these are one-use channels
121             DATA => unreachable!(),
122
123             // There is a thread waiting on the other end. We leave the 'DATA'
124             // state inside so it'll pick it up on the other end.
125             ptr => unsafe {
126                 SignalToken::cast_from_uint(ptr).signal();
127                 Ok(())
128             }
129         }
130     }
131
132     // Just tests whether this channel has been sent on or not, this is only
133     // safe to use from the sender.
134     pub fn sent(&self) -> bool {
135         match self.upgrade {
136             NothingSent => false,
137             _ => true,
138         }
139     }
140
141     pub fn recv(&mut self) -> Result<T, Failure<T>> {
142         // Attempt to not block the task (it's a little expensive). If it looks
143         // like we're not empty, then immediately go through to `try_recv`.
144         if self.state.load(atomic::SeqCst) == EMPTY {
145             let (wait_token, signal_token) = blocking::tokens();
146             let ptr = unsafe { signal_token.cast_to_uint() };
147
148             // race with senders to enter the blocking state
149             if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY {
150                 wait_token.wait();
151                 debug_assert!(self.state.load(atomic::SeqCst) != EMPTY);
152             } else {
153                 // drop the signal token, since we never blocked
154                 drop(unsafe { SignalToken::cast_from_uint(ptr) });
155             }
156         }
157
158         self.try_recv()
159     }
160
161     pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
162         match self.state.load(atomic::SeqCst) {
163             EMPTY => Err(Empty),
164
165             // We saw some data on the channel, but the channel can be used
166             // again to send us an upgrade. As a result, we need to re-insert
167             // into the channel that there's no data available (otherwise we'll
168             // just see DATA next time). This is done as a cmpxchg because if
169             // the state changes under our feet we'd rather just see that state
170             // change.
171             DATA => {
172                 self.state.compare_and_swap(DATA, EMPTY, atomic::SeqCst);
173                 match self.data.take() {
174                     Some(data) => Ok(data),
175                     None => unreachable!(),
176                 }
177             }
178
179             // There's no guarantee that we receive before an upgrade happens,
180             // and an upgrade flags the channel as disconnected, so when we see
181             // this we first need to check if there's data available and *then*
182             // we go through and process the upgrade.
183             DISCONNECTED => {
184                 match self.data.take() {
185                     Some(data) => Ok(data),
186                     None => {
187                         match mem::replace(&mut self.upgrade, SendUsed) {
188                             SendUsed | NothingSent => Err(Disconnected),
189                             GoUp(upgrade) => Err(Upgraded(upgrade))
190                         }
191                     }
192                 }
193             }
194
195             // We are the sole receiver; there cannot be a blocking
196             // receiver already.
197             _ => unreachable!()
198         }
199     }
200
201     // Returns whether the upgrade was completed. If the upgrade wasn't
202     // completed, then the port couldn't get sent to the other half (it will
203     // never receive it).
204     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
205         let prev = match self.upgrade {
206             NothingSent => NothingSent,
207             SendUsed => SendUsed,
208             _ => panic!("upgrading again"),
209         };
210         self.upgrade = GoUp(up);
211
212         match self.state.swap(DISCONNECTED, atomic::SeqCst) {
213             // If the channel is empty or has data on it, then we're good to go.
214             // Senders will check the data before the upgrade (in case we
215             // plastered over the DATA state).
216             DATA | EMPTY => UpSuccess,
217
218             // If the other end is already disconnected, then we failed the
219             // upgrade. Be sure to trash the port we were given.
220             DISCONNECTED => { self.upgrade = prev; UpDisconnected }
221
222             // If someone's waiting, we gotta wake them up
223             ptr => UpWoke(unsafe { SignalToken::cast_from_uint(ptr) })
224         }
225     }
226
227     pub fn drop_chan(&mut self) {
228         match self.state.swap(DISCONNECTED, atomic::SeqCst) {
229             DATA | DISCONNECTED | EMPTY => {}
230
231             // If someone's waiting, we gotta wake them up
232             ptr => unsafe {
233                 SignalToken::cast_from_uint(ptr).signal();
234             }
235         }
236     }
237
238     pub fn drop_port(&mut self) {
239         match self.state.swap(DISCONNECTED, atomic::SeqCst) {
240             // An empty channel has nothing to do, and a remotely disconnected
241             // channel also has nothing to do b/c we're about to run the drop
242             // glue
243             DISCONNECTED | EMPTY => {}
244
245             // There's data on the channel, so make sure we destroy it promptly.
246             // This is why not using an arc is a little difficult (need the box
247             // to stay valid while we take the data).
248             DATA => { self.data.take().unwrap(); }
249
250             // We're the only ones that can block on this port
251             _ => unreachable!()
252         }
253     }
254
255     ////////////////////////////////////////////////////////////////////////////
256     // select implementation
257     ////////////////////////////////////////////////////////////////////////////
258
259     // If Ok, the value is whether this port has data, if Err, then the upgraded
260     // port needs to be checked instead of this one.
261     pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
262         match self.state.load(atomic::SeqCst) {
263             EMPTY => Ok(false), // Welp, we tried
264             DATA => Ok(true),   // we have some un-acquired data
265             DISCONNECTED if self.data.is_some() => Ok(true), // we have data
266             DISCONNECTED => {
267                 match mem::replace(&mut self.upgrade, SendUsed) {
268                     // The other end sent us an upgrade, so we need to
269                     // propagate upwards whether the upgrade can receive
270                     // data
271                     GoUp(upgrade) => Err(upgrade),
272
273                     // If the other end disconnected without sending an
274                     // upgrade, then we have data to receive (the channel is
275                     // disconnected).
276                     up => { self.upgrade = up; Ok(true) }
277                 }
278             }
279             _ => unreachable!(), // we're the "one blocker"
280         }
281     }
282
283     // Attempts to start selection on this port. This can either succeed, fail
284     // because there is data, or fail because there is an upgrade pending.
285     pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
286         let ptr = unsafe { token.cast_to_uint() };
287         match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) {
288             EMPTY => SelSuccess,
289             DATA => {
290                 drop(unsafe { SignalToken::cast_from_uint(ptr) });
291                 SelCanceled
292             }
293             DISCONNECTED if self.data.is_some() => {
294                 drop(unsafe { SignalToken::cast_from_uint(ptr) });
295                 SelCanceled
296             }
297             DISCONNECTED => {
298                 match mem::replace(&mut self.upgrade, SendUsed) {
299                     // The other end sent us an upgrade, so we need to
300                     // propagate upwards whether the upgrade can receive
301                     // data
302                     GoUp(upgrade) => {
303                         SelUpgraded(unsafe { SignalToken::cast_from_uint(ptr) }, upgrade)
304                     }
305
306                     // If the other end disconnected without sending an
307                     // upgrade, then we have data to receive (the channel is
308                     // disconnected).
309                     up => {
310                         self.upgrade = up;
311                         drop(unsafe { SignalToken::cast_from_uint(ptr) });
312                         SelCanceled
313                     }
314                 }
315             }
316             _ => unreachable!(), // we're the "one blocker"
317         }
318     }
319
320     // Remove a previous selecting task from this port. This ensures that the
321     // blocked task will no longer be visible to any other threads.
322     //
323     // The return value indicates whether there's data on this port.
324     pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
325         let state = match self.state.load(atomic::SeqCst) {
326             // Each of these states means that no further activity will happen
327             // with regard to abortion selection
328             s @ EMPTY |
329             s @ DATA |
330             s @ DISCONNECTED => s,
331
332             // If we've got a blocked task, then use an atomic to gain ownership
333             // of it (may fail)
334             ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst)
335         };
336
337         // Now that we've got ownership of our state, figure out what to do
338         // about it.
339         match state {
340             EMPTY => unreachable!(),
341             // our task used for select was stolen
342             DATA => Ok(true),
343
344             // If the other end has hung up, then we have complete ownership
345             // of the port. First, check if there was data waiting for us. This
346             // is possible if the other end sent something and then hung up.
347             //
348             // We then need to check to see if there was an upgrade requested,
349             // and if so, the upgraded port needs to have its selection aborted.
350             DISCONNECTED => {
351                 if self.data.is_some() {
352                     Ok(true)
353                 } else {
354                     match mem::replace(&mut self.upgrade, SendUsed) {
355                         GoUp(port) => Err(port),
356                         _ => Ok(true),
357                     }
358                 }
359             }
360
361             // We woke ourselves up from select.
362             ptr => unsafe {
363                 drop(SignalToken::cast_from_uint(ptr));
364                 Ok(false)
365             }
366         }
367     }
368 }
369
370 #[unsafe_destructor]
371 impl<T: Send> Drop for Packet<T> {
372     fn drop(&mut self) {
373         assert_eq!(self.state.load(atomic::SeqCst), DISCONNECTED);
374     }
375 }