1 //! Selection over an array of receivers
3 //! This module contains the implementation machinery necessary for selecting
4 //! over a number of receivers. One large goal of this module is to provide an
5 //! efficient interface to selecting over any receiver of any type.
7 //! This is achieved through an architecture of a "receiver set" in which
8 //! receivers are added to a set and then the entire set is waited on at once.
9 //! The set can be waited on multiple times to prevent re-adding each receiver
12 //! Usage of this module is currently encouraged to go through the use of the
13 //! `select!` macro. This macro allows naturally binding of variables to the
14 //! received values of receivers in a much more natural syntax then usage of the
15 //! `Select` structure directly.
20 //! #![feature(mpsc_select)]
22 //! use std::sync::mpsc::channel;
24 //! let (tx1, rx1) = channel();
25 //! let (tx2, rx2) = channel();
27 //! tx1.send(1).unwrap();
28 //! tx2.send(2).unwrap();
31 //! val = rx1.recv() => {
32 //! assert_eq!(val.unwrap(), 1);
34 //! val = rx2.recv() => {
35 //! assert_eq!(val.unwrap(), 2);
41 #![unstable(feature = "mpsc_select",
42 reason = "This implementation, while likely sufficient, is unsafe and \
43 likely to be error prone. At some point in the future this \
44 module will be removed.",
46 #![rustc_deprecated(since = "1.32.0",
47 reason = "channel selection will be removed in a future release")]
52 use core::cell::{Cell, UnsafeCell};
57 use sync::mpsc::{Receiver, RecvError};
58 use sync::mpsc::blocking::{self, SignalToken};
60 /// The "receiver set" of the select interface. This structure is used to manage
61 /// a set of receivers which are being selected over.
63 inner: UnsafeCell<SelectInner>,
68 head: *mut Handle<'static, ()>,
69 tail: *mut Handle<'static, ()>,
72 impl !marker::Send for Select {}
74 /// A handle to a receiver which is currently a member of a `Select` set of
75 /// receivers. This handle is used to keep the receiver in the set as well as
76 /// interact with the underlying receiver.
77 pub struct Handle<'rx, T:Send+'rx> {
78 /// The ID of this handle, used to compare against the return value of
81 selector: *mut SelectInner,
82 next: *mut Handle<'static, ()>,
83 prev: *mut Handle<'static, ()>,
85 packet: &'rx (dyn Packet+'rx),
87 // due to our fun transmutes, we be sure to place this at the end. (nothing
88 // previous relies on T)
92 struct Packets { cur: *mut Handle<'static, ()> }
95 #[derive(PartialEq, Eq)]
96 pub enum StartResult {
103 fn can_recv(&self) -> bool;
104 fn start_selection(&self, token: SignalToken) -> StartResult;
105 fn abort_selection(&self) -> bool;
109 /// Creates a new selection structure. This set is initially empty.
111 /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
112 /// the `select!` macro.
117 /// #![feature(mpsc_select)]
119 /// use std::sync::mpsc::Select;
121 /// let select = Select::new();
123 pub fn new() -> Select {
125 inner: UnsafeCell::new(SelectInner {
126 head: ptr::null_mut(),
127 tail: ptr::null_mut(),
129 next_id: Cell::new(1),
133 /// Creates a new handle into this receiver set for a new receiver. Note
134 /// that this does *not* add the receiver to the receiver set, for that you
135 /// must call the `add` method on the handle itself.
136 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
137 let id = self.next_id.get();
138 self.next_id.set(id + 1);
141 selector: self.inner.get(),
142 next: ptr::null_mut(),
143 prev: ptr::null_mut(),
150 /// Waits for an event on this receiver set. The returned value is *not* an
151 /// index, but rather an id. This id can be queried against any active
152 /// `Handle` structures (each one has an `id` method). The handle with
153 /// the matching `id` will have some sort of event available on it. The
154 /// event could either be that data is available or the corresponding
155 /// channel has been closed.
156 pub fn wait(&self) -> usize {
160 /// Helper method for skipping the preflight checks during testing
161 pub(super) fn wait2(&self, do_preflight_checks: bool) -> usize {
162 // Note that this is currently an inefficient implementation. We in
163 // theory have knowledge about all receivers in the set ahead of time,
164 // so this method shouldn't really have to iterate over all of them yet
165 // again. The idea with this "receiver set" interface is to get the
166 // interface right this time around, and later this implementation can
169 // This implementation can be summarized by:
171 // fn select(receivers) {
172 // if any receiver ready { return ready index }
174 // block on all receivers
176 // unblock on all receivers
177 // return ready index
180 // Most notably, the iterations over all of the receivers shouldn't be
183 // Stage 1: preflight checks. Look for any packets ready to receive
184 if do_preflight_checks {
185 for handle in self.iter() {
186 if (*handle).packet.can_recv() {
187 return (*handle).id();
192 // Stage 2: begin the blocking process
194 // Create a number of signal tokens, and install each one
195 // sequentially until one fails. If one fails, then abort the
196 // selection on the already-installed tokens.
197 let (wait_token, signal_token) = blocking::tokens();
198 for (i, handle) in self.iter().enumerate() {
199 match (*handle).packet.start_selection(signal_token.clone()) {
200 StartResult::Installed => {}
201 StartResult::Abort => {
202 // Go back and abort the already-begun selections
203 for handle in self.iter().take(i) {
204 (*handle).packet.abort_selection();
211 // Stage 3: no messages available, actually block
214 // Stage 4: there *must* be message available; find it.
216 // Abort the selection process on each receiver. If the abort
217 // process returns `true`, then that means that the receiver is
218 // ready to receive some data. Note that this also means that the
219 // receiver may have yet to have fully read the `to_wake` field and
220 // woken us up (although the wakeup is guaranteed to fail).
222 // This situation happens in the window of where a sender invokes
223 // increment(), sees -1, and then decides to wake up the thread. After
224 // all this is done, the sending thread will set `selecting` to
225 // `false`. Until this is done, we cannot return. If we were to
226 // return, then a sender could wake up a receiver which has gone
227 // back to sleep after this call to `select`.
229 // Note that it is a "fairly small window" in which an increment()
230 // views that it should wake a thread up until the `selecting` bit
231 // is set to false. For now, the implementation currently just spins
232 // in a yield loop. This is very distasteful, but this
233 // implementation is already nowhere near what it should ideally be.
234 // A rewrite should focus on avoiding a yield loop, and for now this
235 // implementation is tying us over to a more efficient "don't
236 // iterate over everything every time" implementation.
237 let mut ready_id = usize::MAX;
238 for handle in self.iter() {
239 if (*handle).packet.abort_selection() {
240 ready_id = (*handle).id;
244 // We must have found a ready receiver
245 assert!(ready_id != usize::MAX);
250 fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
253 impl<'rx, T: Send> Handle<'rx, T> {
254 /// Retrieves the id of this handle.
256 pub fn id(&self) -> usize { self.id }
258 /// Blocks to receive a value on the underlying receiver, returning `Some` on
259 /// success or `None` if the channel disconnects. This function has the same
260 /// semantics as `Receiver.recv`
261 pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
263 /// Adds this handle to the receiver set that the handle was created from. This
264 /// method can be called multiple times, but it has no effect if `add` was
265 /// called previously.
267 /// This method is unsafe because it requires that the `Handle` is not moved
268 /// while it is added to the `Select` set.
269 pub unsafe fn add(&mut self) {
270 if self.added { return }
271 let selector = &mut *self.selector;
272 let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
274 if selector.head.is_null() {
278 (*me).prev = selector.tail;
279 assert!((*me).next.is_null());
280 (*selector.tail).next = me;
286 /// Removes this handle from the `Select` set. This method is unsafe because
287 /// it has no guarantee that the `Handle` was not moved since `add` was
289 pub unsafe fn remove(&mut self) {
290 if !self.added { return }
292 let selector = &mut *self.selector;
293 let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
295 if self.prev.is_null() {
296 assert_eq!(selector.head, me);
297 selector.head = self.next;
299 (*self.prev).next = self.next;
301 if self.next.is_null() {
302 assert_eq!(selector.tail, me);
303 selector.tail = self.prev;
305 (*self.next).prev = self.prev;
308 self.next = ptr::null_mut();
309 self.prev = ptr::null_mut();
315 impl Drop for Select {
318 assert!((&*self.inner.get()).head.is_null());
319 assert!((&*self.inner.get()).tail.is_null());
324 impl<'rx, T: Send> Drop for Handle<'rx, T> {
326 unsafe { self.remove() }
330 impl Iterator for Packets {
331 type Item = *mut Handle<'static, ()>;
333 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
334 if self.cur.is_null() {
337 let ret = Some(self.cur);
338 unsafe { self.cur = (*self.cur).next; }
344 impl fmt::Debug for Select {
345 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
346 f.debug_struct("Select").finish()
350 impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
351 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352 f.debug_struct("Handle").finish()