1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! # #![feature(std_misc)]
31 //! use std::sync::mpsc::channel;
33 //! let (tx1, rx1) = channel();
34 //! let (tx2, rx2) = channel();
36 //! tx1.send(1).unwrap();
37 //! tx2.send(2).unwrap();
40 //! val = rx1.recv() => {
41 //! assert_eq!(val.unwrap(), 1);
43 //! val = rx2.recv() => {
44 //! assert_eq!(val.unwrap(), 2);
50 #![unstable(feature = "std_misc",
51 reason = "This implementation, while likely sufficient, is unsafe and \
52 likely to be error prone. At some point in the future this \
53 module will likely be replaced, and it is currently \
54 unknown how much API breakage that will cause. The ability \
55 to select over a number of channels will remain forever, \
56 but no guarantees beyond this are being made")]
67 use sync::mpsc::{Receiver, RecvError};
68 use sync::mpsc::blocking::{self, SignalToken};
70 /// The "receiver set" of the select interface. This structure is used to manage
71 /// a set of receivers which are being selected over.
73 head: *mut Handle<'static, ()>,
74 tail: *mut Handle<'static, ()>,
78 impl !marker::Send for Select {}
80 /// A handle to a receiver which is currently a member of a `Select` set of
81 /// receivers. This handle is used to keep the receiver in the set as well as
82 /// interact with the underlying receiver.
83 pub struct Handle<'rx, T:Send+'rx> {
84 /// The ID of this handle, used to compare against the return value of
87 selector: &'rx Select,
88 next: *mut Handle<'static, ()>,
89 prev: *mut Handle<'static, ()>,
91 packet: &'rx (Packet+'rx),
93 // due to our fun transmutes, we be sure to place this at the end. (nothing
94 // previous relies on T)
98 struct Packets { cur: *mut Handle<'static, ()> }
102 pub enum StartResult {
109 fn can_recv(&self) -> bool;
110 fn start_selection(&self, token: SignalToken) -> StartResult;
111 fn abort_selection(&self) -> bool;
115 /// Creates a new selection structure. This set is initially empty.
117 /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
118 /// the `select!` macro.
123 /// # #![feature(std_misc)]
124 /// use std::sync::mpsc::Select;
126 /// let select = Select::new();
128 pub fn new() -> Select {
130 head: ptr::null_mut(),
131 tail: ptr::null_mut(),
132 next_id: Cell::new(1),
136 /// Creates a new handle into this receiver set for a new receiver. Note
137 /// that this does *not* add the receiver to the receiver set, for that you
138 /// must call the `add` method on the handle itself.
139 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
140 let id = self.next_id.get();
141 self.next_id.set(id + 1);
145 next: ptr::null_mut(),
146 prev: ptr::null_mut(),
153 /// Waits for an event on this receiver set. The returned value is *not* an
154 /// index, but rather an id. This id can be queried against any active
155 /// `Handle` structures (each one has an `id` method). The handle with
156 /// the matching `id` will have some sort of event available on it. The
157 /// event could either be that data is available or the corresponding
158 /// channel has been closed.
159 pub fn wait(&self) -> usize {
163 /// Helper method for skipping the preflight checks during testing
164 fn wait2(&self, do_preflight_checks: bool) -> usize {
165 // Note that this is currently an inefficient implementation. We in
166 // theory have knowledge about all receivers in the set ahead of time,
167 // so this method shouldn't really have to iterate over all of them yet
168 // again. The idea with this "receiver set" interface is to get the
169 // interface right this time around, and later this implementation can
172 // This implementation can be summarized by:
174 // fn select(receivers) {
175 // if any receiver ready { return ready index }
177 // block on all receivers
179 // unblock on all receivers
180 // return ready index
183 // Most notably, the iterations over all of the receivers shouldn't be
186 // Stage 1: preflight checks. Look for any packets ready to receive
187 if do_preflight_checks {
188 for handle in self.iter() {
189 if (*handle).packet.can_recv() {
190 return (*handle).id();
195 // Stage 2: begin the blocking process
197 // Create a number of signal tokens, and install each one
198 // sequentially until one fails. If one fails, then abort the
199 // selection on the already-installed tokens.
200 let (wait_token, signal_token) = blocking::tokens();
201 for (i, handle) in self.iter().enumerate() {
202 match (*handle).packet.start_selection(signal_token.clone()) {
203 StartResult::Installed => {}
204 StartResult::Abort => {
205 // Go back and abort the already-begun selections
206 for handle in self.iter().take(i) {
207 (*handle).packet.abort_selection();
214 // Stage 3: no messages available, actually block
217 // Stage 4: there *must* be message available; find it.
219 // Abort the selection process on each receiver. If the abort
220 // process returns `true`, then that means that the receiver is
221 // ready to receive some data. Note that this also means that the
222 // receiver may have yet to have fully read the `to_wake` field and
223 // woken us up (although the wakeup is guaranteed to fail).
225 // This situation happens in the window of where a sender invokes
226 // increment(), sees -1, and then decides to wake up the task. After
227 // all this is done, the sending thread will set `selecting` to
228 // `false`. Until this is done, we cannot return. If we were to
229 // return, then a sender could wake up a receiver which has gone
230 // back to sleep after this call to `select`.
232 // Note that it is a "fairly small window" in which an increment()
233 // views that it should wake a thread up until the `selecting` bit
234 // is set to false. For now, the implementation currently just spins
235 // in a yield loop. This is very distasteful, but this
236 // implementation is already nowhere near what it should ideally be.
237 // A rewrite should focus on avoiding a yield loop, and for now this
238 // implementation is tying us over to a more efficient "don't
239 // iterate over everything every time" implementation.
240 let mut ready_id = usize::MAX;
241 for handle in self.iter() {
242 if (*handle).packet.abort_selection() {
243 ready_id = (*handle).id;
247 // We must have found a ready receiver
248 assert!(ready_id != usize::MAX);
253 fn iter(&self) -> Packets { Packets { cur: self.head } }
256 impl<'rx, T: Send> Handle<'rx, T> {
257 /// Retrieves the id of this handle.
259 pub fn id(&self) -> usize { self.id }
261 /// Blocks to receive a value on the underlying receiver, returning `Some` on
262 /// success or `None` if the channel disconnects. This function has the same
263 /// semantics as `Receiver.recv`
264 pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
266 /// Adds this handle to the receiver set that the handle was created from. This
267 /// method can be called multiple times, but it has no effect if `add` was
268 /// called previously.
270 /// This method is unsafe because it requires that the `Handle` is not moved
271 /// while it is added to the `Select` set.
272 pub unsafe fn add(&mut self) {
273 if self.added { return }
274 let selector: &mut Select = mem::transmute(&*self.selector);
275 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
277 if selector.head.is_null() {
281 (*me).prev = selector.tail;
282 assert!((*me).next.is_null());
283 (*selector.tail).next = me;
289 /// Removes this handle from the `Select` set. This method is unsafe because
290 /// it has no guarantee that the `Handle` was not moved since `add` was
292 pub unsafe fn remove(&mut self) {
293 if !self.added { return }
295 let selector: &mut Select = mem::transmute(&*self.selector);
296 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
298 if self.prev.is_null() {
299 assert_eq!(selector.head, me);
300 selector.head = self.next;
302 (*self.prev).next = self.next;
304 if self.next.is_null() {
305 assert_eq!(selector.tail, me);
306 selector.tail = self.prev;
308 (*self.next).prev = self.prev;
311 self.next = ptr::null_mut();
312 self.prev = ptr::null_mut();
319 impl Drop for Select {
321 assert!(self.head.is_null());
322 assert!(self.tail.is_null());
327 impl<'rx, T: Send> Drop for Handle<'rx, T> {
329 unsafe { self.remove() }
333 impl Iterator for Packets {
334 type Item = *mut Handle<'static, ()>;
336 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
337 if self.cur.is_null() {
340 let ret = Some(self.cur);
341 unsafe { self.cur = (*self.cur).next; }
348 #[allow(unused_imports)]
355 // Don't use the libstd version so we can pull in the right Select structure
356 // (std::comm points at the wrong one)
357 macro_rules! select {
359 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
361 let sel = Select::new();
362 $( let mut $rx = sel.handle(&$rx); )+
366 let ret = sel.wait();
367 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
374 let (tx1, rx1) = channel::<i32>();
375 let (tx2, rx2) = channel::<i32>();
376 tx1.send(1).unwrap();
378 foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
379 _bar = rx2.recv() => { panic!() }
381 tx2.send(2).unwrap();
383 _foo = rx1.recv() => { panic!() },
384 bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
388 foo = rx1.recv() => { assert!(foo.is_err()); },
389 _bar = rx2.recv() => { panic!() }
393 bar = rx2.recv() => { assert!(bar.is_err()); }
399 let (_tx1, rx1) = channel::<i32>();
400 let (_tx2, rx2) = channel::<i32>();
401 let (_tx3, rx3) = channel::<i32>();
402 let (_tx4, rx4) = channel::<i32>();
403 let (tx5, rx5) = channel::<i32>();
404 tx5.send(4).unwrap();
406 _foo = rx1.recv() => { panic!("1") },
407 _foo = rx2.recv() => { panic!("2") },
408 _foo = rx3.recv() => { panic!("3") },
409 _foo = rx4.recv() => { panic!("4") },
410 foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
416 let (_tx1, rx1) = channel::<i32>();
417 let (tx2, rx2) = channel::<i32>();
421 _a1 = rx1.recv() => { panic!() },
422 a2 = rx2.recv() => { assert!(a2.is_err()); }
428 let (tx1, rx1) = channel::<i32>();
429 let (_tx2, rx2) = channel::<i32>();
430 let (tx3, rx3) = channel::<i32>();
432 let _t = thread::spawn(move|| {
433 for _ in 0..20 { thread::yield_now(); }
434 tx1.send(1).unwrap();
436 for _ in 0..20 { thread::yield_now(); }
440 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
441 _b = rx2.recv() => { panic!() }
443 tx3.send(1).unwrap();
445 a = rx1.recv() => { assert!(a.is_err()) },
446 _b = rx2.recv() => { panic!() }
452 let (tx1, rx1) = channel::<i32>();
453 let (tx2, rx2) = channel::<i32>();
454 let (tx3, rx3) = channel::<()>();
456 let _t = thread::spawn(move|| {
457 for _ in 0..20 { thread::yield_now(); }
458 tx1.send(1).unwrap();
459 tx2.send(2).unwrap();
464 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
465 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
468 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
469 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
471 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
472 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
473 tx3.send(()).unwrap();
478 const AMT: i32 = 10000;
479 let (tx1, rx1) = channel::<i32>();
480 let (tx2, rx2) = channel::<i32>();
481 let (tx3, rx3) = channel::<()>();
483 let _t = thread::spawn(move|| {
486 tx1.send(i).unwrap();
488 tx2.send(i).unwrap();
496 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
497 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
499 tx3.send(()).unwrap();
505 let (tx1, rx1) = channel::<i32>();
506 let (_tx2, rx2) = channel::<i32>();
507 let (tx3, rx3) = channel::<()>();
509 let _t = thread::spawn(move|| {
512 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
513 tx1.send(2).unwrap();
517 tx3.send(()).unwrap();
519 _i1 = rx1.recv() => {},
520 _i2 = rx2.recv() => panic!()
522 tx3.send(()).unwrap();
527 let (tx1, rx1) = channel::<i32>();
528 let (_tx2, rx2) = channel::<i32>();
529 let (tx3, rx3) = channel::<()>();
531 let _t = thread::spawn(move|| {
534 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
535 tx1.send(2).unwrap();
539 tx3.send(()).unwrap();
541 _i1 = rx1.recv() => {},
542 _i2 = rx2.recv() => panic!()
544 tx3.send(()).unwrap();
549 let (tx1, rx1) = channel::<()>();
550 let (tx2, rx2) = channel::<()>();
551 let (tx3, rx3) = channel::<()>();
552 let _t = thread::spawn(move|| {
553 let s = Select::new();
554 let mut h1 = s.handle(&rx1);
555 let mut h2 = s.handle(&rx2);
558 assert_eq!(s.wait(), h2.id);
559 tx3.send(()).unwrap();
562 for _ in 0..1000 { thread::yield_now(); }
564 tx2.send(()).unwrap();
570 let (tx, rx) = channel();
571 tx.send(()).unwrap();
579 let (tx, rx) = channel();
580 tx.send(()).unwrap();
581 tx.send(()).unwrap();
589 let (tx, rx) = channel();
591 tx.send(()).unwrap();
599 let (tx, rx) = channel();
600 tx.send(()).unwrap();
601 let s = Select::new();
602 let mut h = s.handle(&rx);
604 assert_eq!(s.wait2(false), h.id);
609 let (tx, rx) = channel();
610 tx.send(()).unwrap();
611 tx.send(()).unwrap();
612 let s = Select::new();
613 let mut h = s.handle(&rx);
615 assert_eq!(s.wait2(false), h.id);
620 let (tx, rx) = channel();
622 tx.send(()).unwrap();
623 let s = Select::new();
624 let mut h = s.handle(&rx);
626 assert_eq!(s.wait2(false), h.id);
631 let (tx, rx) = channel::<()>();
633 let s = Select::new();
634 let mut h = s.handle(&rx);
636 assert_eq!(s.wait2(false), h.id);
641 let (tx, rx) = channel();
642 tx.send(()).unwrap();
645 let s = Select::new();
646 let mut h = s.handle(&rx);
648 assert_eq!(s.wait2(false), h.id);
653 let (tx, rx) = channel();
655 tx.send(()).unwrap();
658 let s = Select::new();
659 let mut h = s.handle(&rx);
661 assert_eq!(s.wait2(false), h.id);
665 fn oneshot_data_waiting() {
666 let (tx1, rx1) = channel();
667 let (tx2, rx2) = channel();
668 let _t = thread::spawn(move|| {
670 _n = rx1.recv() => {}
672 tx2.send(()).unwrap();
675 for _ in 0..100 { thread::yield_now() }
676 tx1.send(()).unwrap();
681 fn stream_data_waiting() {
682 let (tx1, rx1) = channel();
683 let (tx2, rx2) = channel();
684 tx1.send(()).unwrap();
685 tx1.send(()).unwrap();
688 let _t = thread::spawn(move|| {
690 _n = rx1.recv() => {}
692 tx2.send(()).unwrap();
695 for _ in 0..100 { thread::yield_now() }
696 tx1.send(()).unwrap();
701 fn shared_data_waiting() {
702 let (tx1, rx1) = channel();
703 let (tx2, rx2) = channel();
705 tx1.send(()).unwrap();
707 let _t = thread::spawn(move|| {
709 _n = rx1.recv() => {}
711 tx2.send(()).unwrap();
714 for _ in 0..100 { thread::yield_now() }
715 tx1.send(()).unwrap();
721 let (tx, rx) = sync_channel::<i32>(1);
724 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
730 let (tx, rx) = sync_channel::<i32>(0);
731 let _t = thread::spawn(move|| {
732 for _ in 0..100 { thread::yield_now() }
736 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
742 let (tx1, rx1) = sync_channel::<i32>(0);
743 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
744 let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
745 let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
750 assert_eq!(rx2.recv().unwrap(), 2);
755 assert_eq!(rx1.recv().unwrap(), 1);