1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! use std::sync::mpsc::channel;
32 //! let (tx1, rx1) = channel();
33 //! let (tx2, rx2) = channel();
35 //! tx1.send(1i).unwrap();
36 //! tx2.send(2i).unwrap();
39 //! val = rx1.recv() => {
40 //! assert_eq!(val.unwrap(), 1i);
42 //! val = rx2.recv() => {
43 //! assert_eq!(val.unwrap(), 2i);
49 #![unstable = "This implementation, while likely sufficient, is unsafe and \
50 likely to be error prone. At some point in the future this \
51 module will likely be replaced, and it is currently \
52 unknown how much API breakage that will cause. The ability \
53 to select over a number of channels will remain forever, \
54 but no guarantees beyond this are being made"]
64 use sync::mpsc::{Receiver, RecvError};
65 use sync::mpsc::blocking::{self, SignalToken};
67 /// The "receiver set" of the select interface. This structure is used to manage
68 /// a set of receivers which are being selected over.
69 #[cfg(stage0)] // NOTE remove impl after next snapshot
71 head: *mut Handle<'static, ()>,
72 tail: *mut Handle<'static, ()>,
74 marker1: marker::NoSend,
77 /// The "receiver set" of the select interface. This structure is used to manage
78 /// a set of receivers which are being selected over.
79 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
81 head: *mut Handle<'static, ()>,
82 tail: *mut Handle<'static, ()>,
86 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
87 impl !marker::Send for Select {}
89 /// A handle to a receiver which is currently a member of a `Select` set of
90 /// receivers. This handle is used to keep the receiver in the set as well as
91 /// interact with the underlying receiver.
92 pub struct Handle<'rx, T:'rx> {
93 /// The ID of this handle, used to compare against the return value of
96 selector: &'rx Select,
97 next: *mut Handle<'static, ()>,
98 prev: *mut Handle<'static, ()>,
100 packet: &'rx (Packet+'rx),
102 // due to our fun transmutes, we be sure to place this at the end. (nothing
103 // previous relies on T)
104 rx: &'rx Receiver<T>,
107 struct Packets { cur: *mut Handle<'static, ()> }
111 pub enum StartResult {
118 fn can_recv(&self) -> bool;
119 fn start_selection(&self, token: SignalToken) -> StartResult;
120 fn abort_selection(&self) -> bool;
124 /// Creates a new selection structure. This set is initially empty and
125 /// `wait` will panic!() if called.
127 /// Usage of this struct directly can sometimes be burdensome, and usage is
128 /// rather much easier through the `select!` macro.
129 #[cfg(stage0)] // NOTE remove impl after next snapshot
130 pub fn new() -> Select {
132 marker1: marker::NoSend,
133 head: 0 as *mut Handle<'static, ()>,
134 tail: 0 as *mut Handle<'static, ()>,
135 next_id: Cell::new(1),
139 /// Creates a new selection structure. This set is initially empty and
140 /// `wait` will panic!() if called.
142 /// Usage of this struct directly can sometimes be burdensome, and usage is
143 /// rather much easier through the `select!` macro.
144 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
145 pub fn new() -> Select {
147 head: 0 as *mut Handle<'static, ()>,
148 tail: 0 as *mut Handle<'static, ()>,
149 next_id: Cell::new(1),
153 /// Creates a new handle into this receiver set for a new receiver. Note
154 /// that this does *not* add the receiver to the receiver set, for that you
155 /// must call the `add` method on the handle itself.
156 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
157 let id = self.next_id.get();
158 self.next_id.set(id + 1);
162 next: 0 as *mut Handle<'static, ()>,
163 prev: 0 as *mut Handle<'static, ()>,
170 /// Waits for an event on this receiver set. The returned value is *not* an
171 /// index, but rather an id. This id can be queried against any active
172 /// `Handle` structures (each one has an `id` method). The handle with
173 /// the matching `id` will have some sort of event available on it. The
174 /// event could either be that data is available or the corresponding
175 /// channel has been closed.
176 pub fn wait(&self) -> uint {
180 /// Helper method for skipping the preflight checks during testing
181 fn wait2(&self, do_preflight_checks: bool) -> uint {
182 // Note that this is currently an inefficient implementation. We in
183 // theory have knowledge about all receivers in the set ahead of time,
184 // so this method shouldn't really have to iterate over all of them yet
185 // again. The idea with this "receiver set" interface is to get the
186 // interface right this time around, and later this implementation can
189 // This implementation can be summarized by:
191 // fn select(receivers) {
192 // if any receiver ready { return ready index }
194 // block on all receivers
196 // unblock on all receivers
197 // return ready index
200 // Most notably, the iterations over all of the receivers shouldn't be
203 // Stage 1: preflight checks. Look for any packets ready to receive
204 if do_preflight_checks {
205 for handle in self.iter() {
206 if (*handle).packet.can_recv() {
207 return (*handle).id();
212 // Stage 2: begin the blocking process
214 // Create a number of signal tokens, and install each one
215 // sequentially until one fails. If one fails, then abort the
216 // selection on the already-installed tokens.
217 let (wait_token, signal_token) = blocking::tokens();
218 for (i, handle) in self.iter().enumerate() {
219 match (*handle).packet.start_selection(signal_token.clone()) {
220 StartResult::Installed => {}
221 StartResult::Abort => {
222 // Go back and abort the already-begun selections
223 for handle in self.iter().take(i) {
224 (*handle).packet.abort_selection();
231 // Stage 3: no messages available, actually block
234 // Stage 4: there *must* be message available; find it.
236 // Abort the selection process on each receiver. If the abort
237 // process returns `true`, then that means that the receiver is
238 // ready to receive some data. Note that this also means that the
239 // receiver may have yet to have fully read the `to_wake` field and
240 // woken us up (although the wakeup is guaranteed to fail).
242 // This situation happens in the window of where a sender invokes
243 // increment(), sees -1, and then decides to wake up the task. After
244 // all this is done, the sending thread will set `selecting` to
245 // `false`. Until this is done, we cannot return. If we were to
246 // return, then a sender could wake up a receiver which has gone
247 // back to sleep after this call to `select`.
249 // Note that it is a "fairly small window" in which an increment()
250 // views that it should wake a thread up until the `selecting` bit
251 // is set to false. For now, the implementation currently just spins
252 // in a yield loop. This is very distasteful, but this
253 // implementation is already nowhere near what it should ideally be.
254 // A rewrite should focus on avoiding a yield loop, and for now this
255 // implementation is tying us over to a more efficient "don't
256 // iterate over everything every time" implementation.
257 let mut ready_id = uint::MAX;
258 for handle in self.iter() {
259 if (*handle).packet.abort_selection() {
260 ready_id = (*handle).id;
264 // We must have found a ready receiver
265 assert!(ready_id != uint::MAX);
270 fn iter(&self) -> Packets { Packets { cur: self.head } }
273 impl<'rx, T: Send> Handle<'rx, T> {
274 /// Retrieve the id of this handle.
276 pub fn id(&self) -> uint { self.id }
278 /// Block to receive a value on the underlying receiver, returning `Some` on
279 /// success or `None` if the channel disconnects. This function has the same
280 /// semantics as `Receiver.recv`
281 pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
283 /// Adds this handle to the receiver set that the handle was created from. This
284 /// method can be called multiple times, but it has no effect if `add` was
285 /// called previously.
287 /// This method is unsafe because it requires that the `Handle` is not moved
288 /// while it is added to the `Select` set.
289 pub unsafe fn add(&mut self) {
290 if self.added { return }
291 let selector: &mut Select = mem::transmute(&*self.selector);
292 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
294 if selector.head.is_null() {
298 (*me).prev = selector.tail;
299 assert!((*me).next.is_null());
300 (*selector.tail).next = me;
306 /// Removes this handle from the `Select` set. This method is unsafe because
307 /// it has no guarantee that the `Handle` was not moved since `add` was
309 pub unsafe fn remove(&mut self) {
310 if !self.added { return }
312 let selector: &mut Select = mem::transmute(&*self.selector);
313 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
315 if self.prev.is_null() {
316 assert_eq!(selector.head, me);
317 selector.head = self.next;
319 (*self.prev).next = self.next;
321 if self.next.is_null() {
322 assert_eq!(selector.tail, me);
323 selector.tail = self.prev;
325 (*self.next).prev = self.prev;
328 self.next = 0 as *mut Handle<'static, ()>;
329 self.prev = 0 as *mut Handle<'static, ()>;
336 impl Drop for Select {
338 assert!(self.head.is_null());
339 assert!(self.tail.is_null());
344 impl<'rx, T: Send> Drop for Handle<'rx, T> {
346 unsafe { self.remove() }
350 impl Iterator for Packets {
351 type Item = *mut Handle<'static, ()>;
353 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
354 if self.cur.is_null() {
357 let ret = Some(self.cur);
358 unsafe { self.cur = (*self.cur).next; }
365 #[allow(unused_imports)]
372 // Don't use the libstd version so we can pull in the right Select structure
373 // (std::comm points at the wrong one)
374 macro_rules! select {
376 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
378 let sel = Select::new();
379 $( let mut $rx = sel.handle(&$rx); )+
383 let ret = sel.wait();
384 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
391 let (tx1, rx1) = channel::<int>();
392 let (tx2, rx2) = channel::<int>();
393 tx1.send(1).unwrap();
395 foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
396 _bar = rx2.recv() => { panic!() }
398 tx2.send(2).unwrap();
400 _foo = rx1.recv() => { panic!() },
401 bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
405 foo = rx1.recv() => { assert!(foo.is_err()); },
406 _bar = rx2.recv() => { panic!() }
410 bar = rx2.recv() => { assert!(bar.is_err()); }
416 let (_tx1, rx1) = channel::<int>();
417 let (_tx2, rx2) = channel::<int>();
418 let (_tx3, rx3) = channel::<int>();
419 let (_tx4, rx4) = channel::<int>();
420 let (tx5, rx5) = channel::<int>();
421 tx5.send(4).unwrap();
423 _foo = rx1.recv() => { panic!("1") },
424 _foo = rx2.recv() => { panic!("2") },
425 _foo = rx3.recv() => { panic!("3") },
426 _foo = rx4.recv() => { panic!("4") },
427 foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
433 let (_tx1, rx1) = channel::<int>();
434 let (tx2, rx2) = channel::<int>();
438 _a1 = rx1.recv() => { panic!() },
439 a2 = rx2.recv() => { assert!(a2.is_err()); }
445 let (tx1, rx1) = channel::<int>();
446 let (_tx2, rx2) = channel::<int>();
447 let (tx3, rx3) = channel::<int>();
449 let _t = Thread::spawn(move|| {
450 for _ in range(0u, 20) { Thread::yield_now(); }
451 tx1.send(1).unwrap();
453 for _ in range(0u, 20) { Thread::yield_now(); }
457 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
458 _b = rx2.recv() => { panic!() }
460 tx3.send(1).unwrap();
462 a = rx1.recv() => { assert!(a.is_err()) },
463 _b = rx2.recv() => { panic!() }
469 let (tx1, rx1) = channel::<int>();
470 let (tx2, rx2) = channel::<int>();
471 let (tx3, rx3) = channel::<()>();
473 let _t = Thread::spawn(move|| {
474 for _ in range(0u, 20) { Thread::yield_now(); }
475 tx1.send(1).unwrap();
476 tx2.send(2).unwrap();
481 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
482 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
485 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
486 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
488 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
489 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
490 tx3.send(()).unwrap();
495 static AMT: int = 10000;
496 let (tx1, rx1) = channel::<int>();
497 let (tx2, rx2) = channel::<int>();
498 let (tx3, rx3) = channel::<()>();
500 let _t = Thread::spawn(move|| {
501 for i in range(0, AMT) {
503 tx1.send(i).unwrap();
505 tx2.send(i).unwrap();
511 for i in range(0, AMT) {
513 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
514 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
516 tx3.send(()).unwrap();
522 let (tx1, rx1) = channel::<int>();
523 let (_tx2, rx2) = channel::<int>();
524 let (tx3, rx3) = channel::<()>();
526 let _t = Thread::spawn(move|| {
529 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
530 tx1.send(2).unwrap();
534 tx3.send(()).unwrap();
536 _i1 = rx1.recv() => {},
537 _i2 = rx2.recv() => panic!()
539 tx3.send(()).unwrap();
544 let (tx1, rx1) = channel::<int>();
545 let (_tx2, rx2) = channel::<int>();
546 let (tx3, rx3) = channel::<()>();
548 let _t = Thread::spawn(move|| {
551 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
552 tx1.send(2).unwrap();
556 tx3.send(()).unwrap();
558 _i1 = rx1.recv() => {},
559 _i2 = rx2.recv() => panic!()
561 tx3.send(()).unwrap();
566 let (tx1, rx1) = channel::<()>();
567 let (tx2, rx2) = channel::<()>();
568 let (tx3, rx3) = channel::<()>();
569 let _t = Thread::spawn(move|| {
570 let s = Select::new();
571 let mut h1 = s.handle(&rx1);
572 let mut h2 = s.handle(&rx2);
575 assert_eq!(s.wait(), h2.id);
576 tx3.send(()).unwrap();
579 for _ in range(0u, 1000) { Thread::yield_now(); }
581 tx2.send(()).unwrap();
587 let (tx, rx) = channel();
588 tx.send(()).unwrap();
596 let (tx, rx) = channel();
597 tx.send(()).unwrap();
598 tx.send(()).unwrap();
606 let (tx, rx) = channel();
608 tx.send(()).unwrap();
616 let (tx, rx) = channel();
617 tx.send(()).unwrap();
618 let s = Select::new();
619 let mut h = s.handle(&rx);
621 assert_eq!(s.wait2(false), h.id);
626 let (tx, rx) = channel();
627 tx.send(()).unwrap();
628 tx.send(()).unwrap();
629 let s = Select::new();
630 let mut h = s.handle(&rx);
632 assert_eq!(s.wait2(false), h.id);
637 let (tx, rx) = channel();
639 tx.send(()).unwrap();
640 let s = Select::new();
641 let mut h = s.handle(&rx);
643 assert_eq!(s.wait2(false), h.id);
648 let (tx, rx) = channel::<()>();
650 let s = Select::new();
651 let mut h = s.handle(&rx);
653 assert_eq!(s.wait2(false), h.id);
658 let (tx, rx) = channel();
659 tx.send(()).unwrap();
662 let s = Select::new();
663 let mut h = s.handle(&rx);
665 assert_eq!(s.wait2(false), h.id);
670 let (tx, rx) = channel();
672 tx.send(()).unwrap();
675 let s = Select::new();
676 let mut h = s.handle(&rx);
678 assert_eq!(s.wait2(false), h.id);
682 fn oneshot_data_waiting() {
683 let (tx1, rx1) = channel();
684 let (tx2, rx2) = channel();
685 let _t = Thread::spawn(move|| {
687 _n = rx1.recv() => {}
689 tx2.send(()).unwrap();
692 for _ in range(0u, 100) { Thread::yield_now() }
693 tx1.send(()).unwrap();
698 fn stream_data_waiting() {
699 let (tx1, rx1) = channel();
700 let (tx2, rx2) = channel();
701 tx1.send(()).unwrap();
702 tx1.send(()).unwrap();
705 let _t = Thread::spawn(move|| {
707 _n = rx1.recv() => {}
709 tx2.send(()).unwrap();
712 for _ in range(0u, 100) { Thread::yield_now() }
713 tx1.send(()).unwrap();
718 fn shared_data_waiting() {
719 let (tx1, rx1) = channel();
720 let (tx2, rx2) = channel();
722 tx1.send(()).unwrap();
724 let _t = Thread::spawn(move|| {
726 _n = rx1.recv() => {}
728 tx2.send(()).unwrap();
731 for _ in range(0u, 100) { Thread::yield_now() }
732 tx1.send(()).unwrap();
738 let (tx, rx) = sync_channel::<int>(1);
741 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
747 let (tx, rx) = sync_channel::<int>(0);
748 let _t = Thread::spawn(move|| {
749 for _ in range(0u, 100) { Thread::yield_now() }
753 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
759 let (tx1, rx1) = sync_channel::<int>(0);
760 let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
761 let _t = Thread::spawn(move|| { tx1.send(1).unwrap(); });
762 let _t = Thread::spawn(move|| { tx2.send(2).unwrap(); });
767 assert_eq!(rx2.recv().unwrap(), 2);
772 assert_eq!(rx1.recv().unwrap(), 1);