]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/select.rs
Improve type size assertions
[rust.git] / src / libstd / sync / mpsc / select.rs
1 //! Selection over an array of receivers
2 //!
3 //! This module contains the implementation machinery necessary for selecting
4 //! over a number of receivers. One large goal of this module is to provide an
5 //! efficient interface to selecting over any receiver of any type.
6 //!
7 //! This is achieved through an architecture of a "receiver set" in which
8 //! receivers are added to a set and then the entire set is waited on at once.
9 //! The set can be waited on multiple times to prevent re-adding each receiver
10 //! to the set.
11 //!
12 //! Usage of this module is currently encouraged to go through the use of the
13 //! `select!` macro. This macro allows naturally binding of variables to the
14 //! received values of receivers in a much more natural syntax then usage of the
15 //! `Select` structure directly.
16 //!
17 //! # Examples
18 //!
19 //! ```rust
20 //! #![feature(mpsc_select)]
21 //!
22 //! use std::sync::mpsc::channel;
23 //!
24 //! let (tx1, rx1) = channel();
25 //! let (tx2, rx2) = channel();
26 //!
27 //! tx1.send(1).unwrap();
28 //! tx2.send(2).unwrap();
29 //!
30 //! select! {
31 //!     val = rx1.recv() => {
32 //!         assert_eq!(val.unwrap(), 1);
33 //!     },
34 //!     val = rx2.recv() => {
35 //!         assert_eq!(val.unwrap(), 2);
36 //!     }
37 //! }
38 //! ```
39
40 #![allow(dead_code)]
41 #![unstable(feature = "mpsc_select",
42             reason = "This implementation, while likely sufficient, is unsafe and \
43                       likely to be error prone. At some point in the future this \
44                       module will be removed.",
45             issue = "27800")]
46 #![rustc_deprecated(since = "1.32.0",
47                     reason = "channel selection will be removed in a future release")]
48
49 use core::cell::{Cell, UnsafeCell};
50 use core::marker;
51 use core::ptr;
52 use core::usize;
53
54 use crate::fmt;
55 use crate::sync::mpsc::{Receiver, RecvError};
56 use crate::sync::mpsc::blocking::{self, SignalToken};
57
58 /// The "receiver set" of the select interface. This structure is used to manage
59 /// a set of receivers which are being selected over.
60 pub struct Select {
61     inner: UnsafeCell<SelectInner>,
62     next_id: Cell<usize>,
63 }
64
65 struct SelectInner {
66     head: *mut Handle<'static, ()>,
67     tail: *mut Handle<'static, ()>,
68 }
69
70 impl !marker::Send for Select {}
71
72 /// A handle to a receiver which is currently a member of a `Select` set of
73 /// receivers. This handle is used to keep the receiver in the set as well as
74 /// interact with the underlying receiver.
75 pub struct Handle<'rx, T:Send+'rx> {
76     /// The ID of this handle, used to compare against the return value of
77     /// `Select::wait()`.
78     id: usize,
79     selector: *mut SelectInner,
80     next: *mut Handle<'static, ()>,
81     prev: *mut Handle<'static, ()>,
82     added: bool,
83     packet: &'rx (dyn Packet+'rx),
84
85     // due to our fun transmutes, we be sure to place this at the end. (nothing
86     // previous relies on T)
87     rx: &'rx Receiver<T>,
88 }
89
90 struct Packets { cur: *mut Handle<'static, ()> }
91
92 #[doc(hidden)]
93 #[derive(PartialEq, Eq)]
94 pub enum StartResult {
95     Installed,
96     Abort,
97 }
98
99 #[doc(hidden)]
100 pub trait Packet {
101     fn can_recv(&self) -> bool;
102     fn start_selection(&self, token: SignalToken) -> StartResult;
103     fn abort_selection(&self) -> bool;
104 }
105
106 impl Select {
107     /// Creates a new selection structure. This set is initially empty.
108     ///
109     /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
110     /// the `select!` macro.
111     ///
112     /// # Examples
113     ///
114     /// ```
115     /// #![feature(mpsc_select)]
116     ///
117     /// use std::sync::mpsc::Select;
118     ///
119     /// let select = Select::new();
120     /// ```
121     pub fn new() -> Select {
122         Select {
123             inner: UnsafeCell::new(SelectInner {
124                 head: ptr::null_mut(),
125                 tail: ptr::null_mut(),
126             }),
127             next_id: Cell::new(1),
128         }
129     }
130
131     /// Creates a new handle into this receiver set for a new receiver. Note
132     /// that this does *not* add the receiver to the receiver set, for that you
133     /// must call the `add` method on the handle itself.
134     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
135         let id = self.next_id.get();
136         self.next_id.set(id + 1);
137         Handle {
138             id,
139             selector: self.inner.get(),
140             next: ptr::null_mut(),
141             prev: ptr::null_mut(),
142             added: false,
143             rx,
144             packet: rx,
145         }
146     }
147
148     /// Waits for an event on this receiver set. The returned value is *not* an
149     /// index, but rather an ID. This ID can be queried against any active
150     /// `Handle` structures (each one has an `id` method). The handle with
151     /// the matching `id` will have some sort of event available on it. The
152     /// event could either be that data is available or the corresponding
153     /// channel has been closed.
154     pub fn wait(&self) -> usize {
155         self.wait2(true)
156     }
157
158     /// Helper method for skipping the preflight checks during testing
159     pub(super) fn wait2(&self, do_preflight_checks: bool) -> usize {
160         // Note that this is currently an inefficient implementation. We in
161         // theory have knowledge about all receivers in the set ahead of time,
162         // so this method shouldn't really have to iterate over all of them yet
163         // again. The idea with this "receiver set" interface is to get the
164         // interface right this time around, and later this implementation can
165         // be optimized.
166         //
167         // This implementation can be summarized by:
168         //
169         //      fn select(receivers) {
170         //          if any receiver ready { return ready index }
171         //          deschedule {
172         //              block on all receivers
173         //          }
174         //          unblock on all receivers
175         //          return ready index
176         //      }
177         //
178         // Most notably, the iterations over all of the receivers shouldn't be
179         // necessary.
180         unsafe {
181             // Stage 1: preflight checks. Look for any packets ready to receive
182             if do_preflight_checks {
183                 for handle in self.iter() {
184                     if (*handle).packet.can_recv() {
185                         return (*handle).id();
186                     }
187                 }
188             }
189
190             // Stage 2: begin the blocking process
191             //
192             // Create a number of signal tokens, and install each one
193             // sequentially until one fails. If one fails, then abort the
194             // selection on the already-installed tokens.
195             let (wait_token, signal_token) = blocking::tokens();
196             for (i, handle) in self.iter().enumerate() {
197                 match (*handle).packet.start_selection(signal_token.clone()) {
198                     StartResult::Installed => {}
199                     StartResult::Abort => {
200                         // Go back and abort the already-begun selections
201                         for handle in self.iter().take(i) {
202                             (*handle).packet.abort_selection();
203                         }
204                         return (*handle).id;
205                     }
206                 }
207             }
208
209             // Stage 3: no messages available, actually block
210             wait_token.wait();
211
212             // Stage 4: there *must* be message available; find it.
213             //
214             // Abort the selection process on each receiver. If the abort
215             // process returns `true`, then that means that the receiver is
216             // ready to receive some data. Note that this also means that the
217             // receiver may have yet to have fully read the `to_wake` field and
218             // woken us up (although the wakeup is guaranteed to fail).
219             //
220             // This situation happens in the window of where a sender invokes
221             // increment(), sees -1, and then decides to wake up the thread. After
222             // all this is done, the sending thread will set `selecting` to
223             // `false`. Until this is done, we cannot return. If we were to
224             // return, then a sender could wake up a receiver which has gone
225             // back to sleep after this call to `select`.
226             //
227             // Note that it is a "fairly small window" in which an increment()
228             // views that it should wake a thread up until the `selecting` bit
229             // is set to false. For now, the implementation currently just spins
230             // in a yield loop. This is very distasteful, but this
231             // implementation is already nowhere near what it should ideally be.
232             // A rewrite should focus on avoiding a yield loop, and for now this
233             // implementation is tying us over to a more efficient "don't
234             // iterate over everything every time" implementation.
235             let mut ready_id = usize::MAX;
236             for handle in self.iter() {
237                 if (*handle).packet.abort_selection() {
238                     ready_id = (*handle).id;
239                 }
240             }
241
242             // We must have found a ready receiver
243             assert!(ready_id != usize::MAX);
244             return ready_id;
245         }
246     }
247
248     fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
249 }
250
251 impl<'rx, T: Send> Handle<'rx, T> {
252     /// Retrieves the ID of this handle.
253     #[inline]
254     pub fn id(&self) -> usize { self.id }
255
256     /// Blocks to receive a value on the underlying receiver, returning `Some` on
257     /// success or `None` if the channel disconnects. This function has the same
258     /// semantics as `Receiver.recv`
259     pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
260
261     /// Adds this handle to the receiver set that the handle was created from. This
262     /// method can be called multiple times, but it has no effect if `add` was
263     /// called previously.
264     ///
265     /// This method is unsafe because it requires that the `Handle` is not moved
266     /// while it is added to the `Select` set.
267     pub unsafe fn add(&mut self) {
268         if self.added { return }
269         let selector = &mut *self.selector;
270         let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
271
272         if selector.head.is_null() {
273             selector.head = me;
274             selector.tail = me;
275         } else {
276             (*me).prev = selector.tail;
277             assert!((*me).next.is_null());
278             (*selector.tail).next = me;
279             selector.tail = me;
280         }
281         self.added = true;
282     }
283
284     /// Removes this handle from the `Select` set. This method is unsafe because
285     /// it has no guarantee that the `Handle` was not moved since `add` was
286     /// called.
287     pub unsafe fn remove(&mut self) {
288         if !self.added { return }
289
290         let selector = &mut *self.selector;
291         let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
292
293         if self.prev.is_null() {
294             assert_eq!(selector.head, me);
295             selector.head = self.next;
296         } else {
297             (*self.prev).next = self.next;
298         }
299         if self.next.is_null() {
300             assert_eq!(selector.tail, me);
301             selector.tail = self.prev;
302         } else {
303             (*self.next).prev = self.prev;
304         }
305
306         self.next = ptr::null_mut();
307         self.prev = ptr::null_mut();
308
309         self.added = false;
310     }
311 }
312
313 impl Drop for Select {
314     fn drop(&mut self) {
315         unsafe {
316             assert!((&*self.inner.get()).head.is_null());
317             assert!((&*self.inner.get()).tail.is_null());
318         }
319     }
320 }
321
322 impl<T: Send> Drop for Handle<'_, T> {
323     fn drop(&mut self) {
324         unsafe { self.remove() }
325     }
326 }
327
328 impl Iterator for Packets {
329     type Item = *mut Handle<'static, ()>;
330
331     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
332         if self.cur.is_null() {
333             None
334         } else {
335             let ret = Some(self.cur);
336             unsafe { self.cur = (*self.cur).next; }
337             ret
338         }
339     }
340 }
341
342 impl fmt::Debug for Select {
343     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344         f.debug_struct("Select").finish()
345     }
346 }
347
348 impl<T: Send> fmt::Debug for Handle<'_, T> {
349     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350         f.debug_struct("Handle").finish()
351     }
352 }