1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! #![feature(mpsc_select)]
32 //! use std::sync::mpsc::channel;
34 //! let (tx1, rx1) = channel();
35 //! let (tx2, rx2) = channel();
37 //! tx1.send(1).unwrap();
38 //! tx2.send(2).unwrap();
41 //! val = rx1.recv() => {
42 //! assert_eq!(val.unwrap(), 1);
44 //! val = rx2.recv() => {
45 //! assert_eq!(val.unwrap(), 2);
51 #![unstable(feature = "mpsc_select",
52 reason = "This implementation, while likely sufficient, is unsafe and \
53 likely to be error prone. At some point in the future this \
54 module will likely be replaced, and it is currently \
55 unknown how much API breakage that will cause. The ability \
56 to select over a number of channels will remain forever, \
57 but no guarantees beyond this are being made",
63 use core::cell::{Cell, UnsafeCell};
68 use sync::mpsc::{Receiver, RecvError};
69 use sync::mpsc::blocking::{self, SignalToken};
71 /// The "receiver set" of the select interface. This structure is used to manage
72 /// a set of receivers which are being selected over.
74 inner: UnsafeCell<SelectInner>,
79 head: *mut Handle<'static, ()>,
80 tail: *mut Handle<'static, ()>,
83 impl !marker::Send for Select {}
85 /// A handle to a receiver which is currently a member of a `Select` set of
86 /// receivers. This handle is used to keep the receiver in the set as well as
87 /// interact with the underlying receiver.
88 pub struct Handle<'rx, T:Send+'rx> {
89 /// The ID of this handle, used to compare against the return value of
92 selector: *mut SelectInner,
93 next: *mut Handle<'static, ()>,
94 prev: *mut Handle<'static, ()>,
96 packet: &'rx (dyn Packet+'rx),
98 // due to our fun transmutes, we be sure to place this at the end. (nothing
99 // previous relies on T)
100 rx: &'rx Receiver<T>,
103 struct Packets { cur: *mut Handle<'static, ()> }
106 #[derive(PartialEq, Eq)]
107 pub enum StartResult {
114 fn can_recv(&self) -> bool;
115 fn start_selection(&self, token: SignalToken) -> StartResult;
116 fn abort_selection(&self) -> bool;
120 /// Creates a new selection structure. This set is initially empty.
122 /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
123 /// the `select!` macro.
128 /// #![feature(mpsc_select)]
130 /// use std::sync::mpsc::Select;
132 /// let select = Select::new();
134 pub fn new() -> Select {
136 inner: UnsafeCell::new(SelectInner {
137 head: ptr::null_mut(),
138 tail: ptr::null_mut(),
140 next_id: Cell::new(1),
144 /// Creates a new handle into this receiver set for a new receiver. Note
145 /// that this does *not* add the receiver to the receiver set, for that you
146 /// must call the `add` method on the handle itself.
147 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
148 let id = self.next_id.get();
149 self.next_id.set(id + 1);
152 selector: self.inner.get(),
153 next: ptr::null_mut(),
154 prev: ptr::null_mut(),
161 /// Waits for an event on this receiver set. The returned value is *not* an
162 /// index, but rather an id. This id can be queried against any active
163 /// `Handle` structures (each one has an `id` method). The handle with
164 /// the matching `id` will have some sort of event available on it. The
165 /// event could either be that data is available or the corresponding
166 /// channel has been closed.
167 pub fn wait(&self) -> usize {
171 /// Helper method for skipping the preflight checks during testing
172 fn wait2(&self, do_preflight_checks: bool) -> usize {
173 // Note that this is currently an inefficient implementation. We in
174 // theory have knowledge about all receivers in the set ahead of time,
175 // so this method shouldn't really have to iterate over all of them yet
176 // again. The idea with this "receiver set" interface is to get the
177 // interface right this time around, and later this implementation can
180 // This implementation can be summarized by:
182 // fn select(receivers) {
183 // if any receiver ready { return ready index }
185 // block on all receivers
187 // unblock on all receivers
188 // return ready index
191 // Most notably, the iterations over all of the receivers shouldn't be
194 // Stage 1: preflight checks. Look for any packets ready to receive
195 if do_preflight_checks {
196 for handle in self.iter() {
197 if (*handle).packet.can_recv() {
198 return (*handle).id();
203 // Stage 2: begin the blocking process
205 // Create a number of signal tokens, and install each one
206 // sequentially until one fails. If one fails, then abort the
207 // selection on the already-installed tokens.
208 let (wait_token, signal_token) = blocking::tokens();
209 for (i, handle) in self.iter().enumerate() {
210 match (*handle).packet.start_selection(signal_token.clone()) {
211 StartResult::Installed => {}
212 StartResult::Abort => {
213 // Go back and abort the already-begun selections
214 for handle in self.iter().take(i) {
215 (*handle).packet.abort_selection();
222 // Stage 3: no messages available, actually block
225 // Stage 4: there *must* be message available; find it.
227 // Abort the selection process on each receiver. If the abort
228 // process returns `true`, then that means that the receiver is
229 // ready to receive some data. Note that this also means that the
230 // receiver may have yet to have fully read the `to_wake` field and
231 // woken us up (although the wakeup is guaranteed to fail).
233 // This situation happens in the window of where a sender invokes
234 // increment(), sees -1, and then decides to wake up the thread. After
235 // all this is done, the sending thread will set `selecting` to
236 // `false`. Until this is done, we cannot return. If we were to
237 // return, then a sender could wake up a receiver which has gone
238 // back to sleep after this call to `select`.
240 // Note that it is a "fairly small window" in which an increment()
241 // views that it should wake a thread up until the `selecting` bit
242 // is set to false. For now, the implementation currently just spins
243 // in a yield loop. This is very distasteful, but this
244 // implementation is already nowhere near what it should ideally be.
245 // A rewrite should focus on avoiding a yield loop, and for now this
246 // implementation is tying us over to a more efficient "don't
247 // iterate over everything every time" implementation.
248 let mut ready_id = usize::MAX;
249 for handle in self.iter() {
250 if (*handle).packet.abort_selection() {
251 ready_id = (*handle).id;
255 // We must have found a ready receiver
256 assert!(ready_id != usize::MAX);
261 fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
264 impl<'rx, T: Send> Handle<'rx, T> {
265 /// Retrieves the id of this handle.
267 pub fn id(&self) -> usize { self.id }
269 /// Blocks to receive a value on the underlying receiver, returning `Some` on
270 /// success or `None` if the channel disconnects. This function has the same
271 /// semantics as `Receiver.recv`
272 pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
274 /// Adds this handle to the receiver set that the handle was created from. This
275 /// method can be called multiple times, but it has no effect if `add` was
276 /// called previously.
278 /// This method is unsafe because it requires that the `Handle` is not moved
279 /// while it is added to the `Select` set.
280 pub unsafe fn add(&mut self) {
281 if self.added { return }
282 let selector = &mut *self.selector;
283 let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
285 if selector.head.is_null() {
289 (*me).prev = selector.tail;
290 assert!((*me).next.is_null());
291 (*selector.tail).next = me;
297 /// Removes this handle from the `Select` set. This method is unsafe because
298 /// it has no guarantee that the `Handle` was not moved since `add` was
300 pub unsafe fn remove(&mut self) {
301 if !self.added { return }
303 let selector = &mut *self.selector;
304 let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
306 if self.prev.is_null() {
307 assert_eq!(selector.head, me);
308 selector.head = self.next;
310 (*self.prev).next = self.next;
312 if self.next.is_null() {
313 assert_eq!(selector.tail, me);
314 selector.tail = self.prev;
316 (*self.next).prev = self.prev;
319 self.next = ptr::null_mut();
320 self.prev = ptr::null_mut();
326 impl Drop for Select {
329 assert!((&*self.inner.get()).head.is_null());
330 assert!((&*self.inner.get()).tail.is_null());
335 impl<'rx, T: Send> Drop for Handle<'rx, T> {
337 unsafe { self.remove() }
341 impl Iterator for Packets {
342 type Item = *mut Handle<'static, ()>;
344 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
345 if self.cur.is_null() {
348 let ret = Some(self.cur);
349 unsafe { self.cur = (*self.cur).next; }
355 impl fmt::Debug for Select {
356 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
357 f.debug_struct("Select").finish()
361 impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
362 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363 f.debug_struct("Handle").finish()
367 #[allow(unused_imports)]
368 #[cfg(all(test, not(target_os = "emscripten")))]
373 // Don't use the libstd version so we can pull in the right Select structure
374 // (std::comm points at the wrong one)
375 macro_rules! select {
377 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
379 let sel = Select::new();
380 $( let mut $rx = sel.handle(&$rx); )+
384 let ret = sel.wait();
385 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
392 let (tx1, rx1) = channel::<i32>();
393 let (tx2, rx2) = channel::<i32>();
394 tx1.send(1).unwrap();
396 foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
397 _bar = rx2.recv() => { panic!() }
399 tx2.send(2).unwrap();
401 _foo = rx1.recv() => { panic!() },
402 bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
406 foo = rx1.recv() => { assert!(foo.is_err()); },
407 _bar = rx2.recv() => { panic!() }
411 bar = rx2.recv() => { assert!(bar.is_err()); }
417 let (_tx1, rx1) = channel::<i32>();
418 let (_tx2, rx2) = channel::<i32>();
419 let (_tx3, rx3) = channel::<i32>();
420 let (_tx4, rx4) = channel::<i32>();
421 let (tx5, rx5) = channel::<i32>();
422 tx5.send(4).unwrap();
424 _foo = rx1.recv() => { panic!("1") },
425 _foo = rx2.recv() => { panic!("2") },
426 _foo = rx3.recv() => { panic!("3") },
427 _foo = rx4.recv() => { panic!("4") },
428 foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
434 let (_tx1, rx1) = channel::<i32>();
435 let (tx2, rx2) = channel::<i32>();
439 _a1 = rx1.recv() => { panic!() },
440 a2 = rx2.recv() => { assert!(a2.is_err()); }
446 let (tx1, rx1) = channel::<i32>();
447 let (_tx2, rx2) = channel::<i32>();
448 let (tx3, rx3) = channel::<i32>();
450 let _t = thread::spawn(move|| {
451 for _ in 0..20 { thread::yield_now(); }
452 tx1.send(1).unwrap();
454 for _ in 0..20 { thread::yield_now(); }
458 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
459 _b = rx2.recv() => { panic!() }
461 tx3.send(1).unwrap();
463 a = rx1.recv() => { assert!(a.is_err()) },
464 _b = rx2.recv() => { panic!() }
470 let (tx1, rx1) = channel::<i32>();
471 let (tx2, rx2) = channel::<i32>();
472 let (tx3, rx3) = channel::<()>();
474 let _t = thread::spawn(move|| {
475 for _ in 0..20 { thread::yield_now(); }
476 tx1.send(1).unwrap();
477 tx2.send(2).unwrap();
482 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
483 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
486 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
487 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
489 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
490 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
491 tx3.send(()).unwrap();
496 const AMT: i32 = 10000;
497 let (tx1, rx1) = channel::<i32>();
498 let (tx2, rx2) = channel::<i32>();
499 let (tx3, rx3) = channel::<()>();
501 let _t = thread::spawn(move|| {
504 tx1.send(i).unwrap();
506 tx2.send(i).unwrap();
514 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
515 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
517 tx3.send(()).unwrap();
521 #[allow(unused_must_use)]
524 let (tx1, rx1) = channel::<i32>();
525 let (_tx2, rx2) = channel::<i32>();
526 let (tx3, rx3) = channel::<()>();
528 let _t = thread::spawn(move|| {
531 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
532 tx1.send(2).unwrap();
536 tx3.send(()).unwrap();
538 _i1 = rx1.recv() => {},
539 _i2 = rx2.recv() => panic!()
541 tx3.send(()).unwrap();
544 #[allow(unused_must_use)]
547 let (tx1, rx1) = channel::<i32>();
548 let (_tx2, rx2) = channel::<i32>();
549 let (tx3, rx3) = channel::<()>();
551 let _t = thread::spawn(move|| {
554 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
555 tx1.send(2).unwrap();
559 tx3.send(()).unwrap();
561 _i1 = rx1.recv() => {},
562 _i2 = rx2.recv() => panic!()
564 tx3.send(()).unwrap();
569 let (tx1, rx1) = channel::<()>();
570 let (tx2, rx2) = channel::<()>();
571 let (tx3, rx3) = channel::<()>();
572 let _t = thread::spawn(move|| {
573 let s = Select::new();
574 let mut h1 = s.handle(&rx1);
575 let mut h2 = s.handle(&rx2);
578 assert_eq!(s.wait(), h2.id);
579 tx3.send(()).unwrap();
582 for _ in 0..1000 { thread::yield_now(); }
584 tx2.send(()).unwrap();
590 let (tx, rx) = channel();
591 tx.send(()).unwrap();
599 let (tx, rx) = channel();
600 tx.send(()).unwrap();
601 tx.send(()).unwrap();
609 let (tx, rx) = channel();
611 tx.send(()).unwrap();
619 let (tx, rx) = channel();
620 tx.send(()).unwrap();
621 let s = Select::new();
622 let mut h = s.handle(&rx);
624 assert_eq!(s.wait2(false), h.id);
629 let (tx, rx) = channel();
630 tx.send(()).unwrap();
631 tx.send(()).unwrap();
632 let s = Select::new();
633 let mut h = s.handle(&rx);
635 assert_eq!(s.wait2(false), h.id);
640 let (tx, rx) = channel();
642 tx.send(()).unwrap();
643 let s = Select::new();
644 let mut h = s.handle(&rx);
646 assert_eq!(s.wait2(false), h.id);
651 let (tx, rx) = channel::<()>();
653 let s = Select::new();
654 let mut h = s.handle(&rx);
656 assert_eq!(s.wait2(false), h.id);
661 let (tx, rx) = channel();
662 tx.send(()).unwrap();
665 let s = Select::new();
666 let mut h = s.handle(&rx);
668 assert_eq!(s.wait2(false), h.id);
673 let (tx, rx) = channel();
675 tx.send(()).unwrap();
678 let s = Select::new();
679 let mut h = s.handle(&rx);
681 assert_eq!(s.wait2(false), h.id);
685 fn oneshot_data_waiting() {
686 let (tx1, rx1) = channel();
687 let (tx2, rx2) = channel();
688 let _t = thread::spawn(move|| {
690 _n = rx1.recv() => {}
692 tx2.send(()).unwrap();
695 for _ in 0..100 { thread::yield_now() }
696 tx1.send(()).unwrap();
701 fn stream_data_waiting() {
702 let (tx1, rx1) = channel();
703 let (tx2, rx2) = channel();
704 tx1.send(()).unwrap();
705 tx1.send(()).unwrap();
708 let _t = thread::spawn(move|| {
710 _n = rx1.recv() => {}
712 tx2.send(()).unwrap();
715 for _ in 0..100 { thread::yield_now() }
716 tx1.send(()).unwrap();
721 fn shared_data_waiting() {
722 let (tx1, rx1) = channel();
723 let (tx2, rx2) = channel();
725 tx1.send(()).unwrap();
727 let _t = thread::spawn(move|| {
729 _n = rx1.recv() => {}
731 tx2.send(()).unwrap();
734 for _ in 0..100 { thread::yield_now() }
735 tx1.send(()).unwrap();
741 let (tx, rx) = sync_channel::<i32>(1);
744 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
750 let (tx, rx) = sync_channel::<i32>(0);
751 let _t = thread::spawn(move|| {
752 for _ in 0..100 { thread::yield_now() }
756 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
762 let (tx1, rx1) = sync_channel::<i32>(0);
763 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
764 let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
765 let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
770 assert_eq!(rx2.recv().unwrap(), 2);
775 assert_eq!(rx1.recv().unwrap(), 1);