1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
37 //! val = rx1.recv() => {
38 //! assert_eq!(val, 1i);
40 //! val = rx2.recv() => {
41 //! assert_eq!(val, 2i);
47 #![experimental = "This implementation, while likely sufficient, is unsafe and \
48 likely to be error prone. At some point in the future this \
49 module will likely be replaced, and it is currently \
50 unknown how much API breakage that will cause. The ability \
51 to select over a number of channels will remain forever, \
52 but no guarantees beyond this are being made"]
57 use alloc::owned::Box;
59 use core::kinds::marker;
62 use rustrt::local::Local;
63 use rustrt::task::{Task, BlockedTask};
67 /// The "receiver set" of the select interface. This structure is used to manage
68 /// a set of receivers which are being selected over.
70 head: *mut Handle<'static, ()>,
71 tail: *mut Handle<'static, ()>,
73 marker1: marker::NoSend,
76 /// A handle to a receiver which is currently a member of a `Select` set of
77 /// receivers. This handle is used to keep the receiver in the set as well as
78 /// interact with the underlying receiver.
79 pub struct Handle<'rx, T> {
80 /// The ID of this handle, used to compare against the return value of
83 selector: &'rx Select,
84 next: *mut Handle<'static, ()>,
85 prev: *mut Handle<'static, ()>,
89 // due to our fun transmutes, we be sure to place this at the end. (nothing
90 // previous relies on T)
94 struct Packets { cur: *mut Handle<'static, ()> }
98 fn can_recv(&self) -> bool;
99 fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
100 fn abort_selection(&self) -> bool;
104 /// Creates a new selection structure. This set is initially empty and
105 /// `wait` will fail!() if called.
107 /// Usage of this struct directly can sometimes be burdensome, and usage is
108 /// rather much easier through the `select!` macro.
109 pub fn new() -> Select {
111 marker1: marker::NoSend,
112 head: 0 as *mut Handle<'static, ()>,
113 tail: 0 as *mut Handle<'static, ()>,
114 next_id: Cell::new(1),
118 /// Creates a new handle into this receiver set for a new receiver. Note
119 /// that this does *not* add the receiver to the receiver set, for that you
120 /// must call the `add` method on the handle itself.
121 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
122 let id = self.next_id.get();
123 self.next_id.set(id + 1);
127 next: 0 as *mut Handle<'static, ()>,
128 prev: 0 as *mut Handle<'static, ()>,
135 /// Waits for an event on this receiver set. The returned value is *not* an
136 /// index, but rather an id. This id can be queried against any active
137 /// `Handle` structures (each one has an `id` method). The handle with
138 /// the matching `id` will have some sort of event available on it. The
139 /// event could either be that data is available or the corresponding
140 /// channel has been closed.
141 pub fn wait(&self) -> uint {
145 /// Helper method for skipping the preflight checks during testing
146 fn wait2(&self, do_preflight_checks: bool) -> uint {
147 // Note that this is currently an inefficient implementation. We in
148 // theory have knowledge about all receivers in the set ahead of time,
149 // so this method shouldn't really have to iterate over all of them yet
150 // again. The idea with this "receiver set" interface is to get the
151 // interface right this time around, and later this implementation can
154 // This implementation can be summarized by:
156 // fn select(receivers) {
157 // if any receiver ready { return ready index }
159 // block on all receivers
161 // unblock on all receivers
162 // return ready index
165 // Most notably, the iterations over all of the receivers shouldn't be
169 for p in self.iter() {
171 if do_preflight_checks && (*p).packet.can_recv() {
177 let mut ready_index = amt;
178 let mut ready_id = uint::MAX;
179 let mut iter = self.iter().enumerate();
181 // Acquire a number of blocking contexts, and block on each one
182 // sequentially until one fails. If one fails, then abort
183 // immediately so we can go unblock on all the other receivers.
184 let task: Box<Task> = Local::take();
185 task.deschedule(amt, |task| {
186 // Prepare for the block
187 let (i, handle) = iter.next().unwrap();
188 match (*handle).packet.start_selection(task) {
192 ready_id = (*handle).id;
198 // Abort the selection process on each receiver. If the abort
199 // process returns `true`, then that means that the receiver is
200 // ready to receive some data. Note that this also means that the
201 // receiver may have yet to have fully read the `to_wake` field and
202 // woken us up (although the wakeup is guaranteed to fail).
204 // This situation happens in the window of where a sender invokes
205 // increment(), sees -1, and then decides to wake up the task. After
206 // all this is done, the sending thread will set `selecting` to
207 // `false`. Until this is done, we cannot return. If we were to
208 // return, then a sender could wake up a receiver which has gone
209 // back to sleep after this call to `select`.
211 // Note that it is a "fairly small window" in which an increment()
212 // views that it should wake a thread up until the `selecting` bit
213 // is set to false. For now, the implementation currently just spins
214 // in a yield loop. This is very distasteful, but this
215 // implementation is already nowhere near what it should ideally be.
216 // A rewrite should focus on avoiding a yield loop, and for now this
217 // implementation is tying us over to a more efficient "don't
218 // iterate over everything every time" implementation.
219 for handle in self.iter().take(ready_index) {
220 if (*handle).packet.abort_selection() {
221 ready_id = (*handle).id;
225 assert!(ready_id != uint::MAX);
230 fn iter(&self) -> Packets { Packets { cur: self.head } }
233 impl<'rx, T: Send> Handle<'rx, T> {
234 /// Retrieve the id of this handle.
236 pub fn id(&self) -> uint { self.id }
238 /// Receive a value on the underlying receiver. Has the same semantics as
240 pub fn recv(&mut self) -> T { self.rx.recv() }
241 /// Block to receive a value on the underlying receiver, returning `Some` on
242 /// success or `None` if the channel disconnects. This function has the same
243 /// semantics as `Receiver.recv_opt`
244 pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
246 /// Adds this handle to the receiver set that the handle was created from. This
247 /// method can be called multiple times, but it has no effect if `add` was
248 /// called previously.
250 /// This method is unsafe because it requires that the `Handle` is not moved
251 /// while it is added to the `Select` set.
252 pub unsafe fn add(&mut self) {
253 if self.added { return }
254 let selector: &mut Select = mem::transmute(&*self.selector);
255 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
257 if selector.head.is_null() {
261 (*me).prev = selector.tail;
262 assert!((*me).next.is_null());
263 (*selector.tail).next = me;
269 /// Removes this handle from the `Select` set. This method is unsafe because
270 /// it has no guarantee that the `Handle` was not moved since `add` was
272 pub unsafe fn remove(&mut self) {
273 if !self.added { return }
275 let selector: &mut Select = mem::transmute(&*self.selector);
276 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
278 if self.prev.is_null() {
279 assert_eq!(selector.head, me);
280 selector.head = self.next;
282 (*self.prev).next = self.next;
284 if self.next.is_null() {
285 assert_eq!(selector.tail, me);
286 selector.tail = self.prev;
288 (*self.next).prev = self.prev;
291 self.next = 0 as *mut Handle<'static, ()>;
292 self.prev = 0 as *mut Handle<'static, ()>;
299 impl Drop for Select {
301 assert!(self.head.is_null());
302 assert!(self.tail.is_null());
307 impl<'rx, T: Send> Drop for Handle<'rx, T> {
309 unsafe { self.remove() }
313 impl Iterator<*mut Handle<'static, ()>> for Packets {
314 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
315 if self.cur.is_null() {
318 let ret = Some(self.cur);
319 unsafe { self.cur = (*self.cur).next; }
326 #[allow(unused_imports)]
332 // Don't use the libstd version so we can pull in the right Select structure
333 // (std::comm points at the wrong one)
334 macro_rules! select {
336 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
339 let sel = Select::new();
340 $( let mut $rx = sel.handle(&$rx); )+
344 let ret = sel.wait();
345 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
351 let (tx1, rx1) = channel::<int>();
352 let (tx2, rx2) = channel::<int>();
355 foo = rx1.recv() => { assert_eq!(foo, 1); },
356 _bar = rx2.recv() => { fail!() }
360 _foo = rx1.recv() => { fail!() },
361 bar = rx2.recv() => { assert_eq!(bar, 2) }
365 foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
366 _bar = rx2.recv() => { fail!() }
370 bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
375 let (_tx1, rx1) = channel::<int>();
376 let (_tx2, rx2) = channel::<int>();
377 let (_tx3, rx3) = channel::<int>();
378 let (_tx4, rx4) = channel::<int>();
379 let (tx5, rx5) = channel::<int>();
382 _foo = rx1.recv() => { fail!("1") },
383 _foo = rx2.recv() => { fail!("2") },
384 _foo = rx3.recv() => { fail!("3") },
385 _foo = rx4.recv() => { fail!("4") },
386 foo = rx5.recv() => { assert_eq!(foo, 4); }
391 let (_tx1, rx1) = channel::<int>();
392 let (tx2, rx2) = channel::<int>();
396 _a1 = rx1.recv_opt() => { fail!() },
397 a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
401 test!(fn unblocks() {
402 let (tx1, rx1) = channel::<int>();
403 let (_tx2, rx2) = channel::<int>();
404 let (tx3, rx3) = channel::<int>();
407 for _ in range(0u, 20) { task::deschedule(); }
410 for _ in range(0u, 20) { task::deschedule(); }
414 a = rx1.recv() => { assert_eq!(a, 1); },
415 _b = rx2.recv() => { fail!() }
419 a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
420 _b = rx2.recv() => { fail!() }
424 test!(fn both_ready() {
425 let (tx1, rx1) = channel::<int>();
426 let (tx2, rx2) = channel::<int>();
427 let (tx3, rx3) = channel::<()>();
430 for _ in range(0u, 20) { task::deschedule(); }
437 a = rx1.recv() => { assert_eq!(a, 1); },
438 a = rx2.recv() => { assert_eq!(a, 2); }
441 a = rx1.recv() => { assert_eq!(a, 1); },
442 a = rx2.recv() => { assert_eq!(a, 2); }
444 assert_eq!(rx1.try_recv(), Err(Empty));
445 assert_eq!(rx2.try_recv(), Err(Empty));
450 static AMT: int = 10000;
451 let (tx1, rx1) = channel::<int>();
452 let (tx2, rx2) = channel::<int>();
453 let (tx3, rx3) = channel::<()>();
456 for i in range(0, AMT) {
466 for i in range(0, AMT) {
468 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
469 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
476 let (tx1, rx1) = channel::<int>();
477 let (_tx2, rx2) = channel::<int>();
478 let (tx3, rx3) = channel::<()>();
483 assert_eq!(rx3.try_recv(), Err(Empty));
490 _i1 = rx1.recv() => {},
491 _i2 = rx2.recv() => fail!()
496 test!(fn cloning2() {
497 let (tx1, rx1) = channel::<int>();
498 let (_tx2, rx2) = channel::<int>();
499 let (tx3, rx3) = channel::<()>();
504 assert_eq!(rx3.try_recv(), Err(Empty));
511 _i1 = rx1.recv() => {},
512 _i2 = rx2.recv() => fail!()
517 test!(fn cloning3() {
518 let (tx1, rx1) = channel::<()>();
519 let (tx2, rx2) = channel::<()>();
520 let (tx3, rx3) = channel::<()>();
522 let s = Select::new();
523 let mut h1 = s.handle(&rx1);
524 let mut h2 = s.handle(&rx2);
527 assert_eq!(s.wait(), h2.id);
531 for _ in range(0u, 1000) { task::deschedule(); }
537 test!(fn preflight1() {
538 let (tx, rx) = channel();
545 test!(fn preflight2() {
546 let (tx, rx) = channel();
554 test!(fn preflight3() {
555 let (tx, rx) = channel();
563 test!(fn preflight4() {
564 let (tx, rx) = channel();
566 let s = Select::new();
567 let mut h = s.handle(&rx);
569 assert_eq!(s.wait2(false), h.id);
572 test!(fn preflight5() {
573 let (tx, rx) = channel();
576 let s = Select::new();
577 let mut h = s.handle(&rx);
579 assert_eq!(s.wait2(false), h.id);
582 test!(fn preflight6() {
583 let (tx, rx) = channel();
586 let s = Select::new();
587 let mut h = s.handle(&rx);
589 assert_eq!(s.wait2(false), h.id);
592 test!(fn preflight7() {
593 let (tx, rx) = channel::<()>();
595 let s = Select::new();
596 let mut h = s.handle(&rx);
598 assert_eq!(s.wait2(false), h.id);
601 test!(fn preflight8() {
602 let (tx, rx) = channel();
606 let s = Select::new();
607 let mut h = s.handle(&rx);
609 assert_eq!(s.wait2(false), h.id);
612 test!(fn preflight9() {
613 let (tx, rx) = channel();
618 let s = Select::new();
619 let mut h = s.handle(&rx);
621 assert_eq!(s.wait2(false), h.id);
624 test!(fn oneshot_data_waiting() {
625 let (tx1, rx1) = channel();
626 let (tx2, rx2) = channel();
629 () = rx1.recv() => {}
634 for _ in range(0u, 100) { task::deschedule() }
639 test!(fn stream_data_waiting() {
640 let (tx1, rx1) = channel();
641 let (tx2, rx2) = channel();
648 () = rx1.recv() => {}
653 for _ in range(0u, 100) { task::deschedule() }
658 test!(fn shared_data_waiting() {
659 let (tx1, rx1) = channel();
660 let (tx2, rx2) = channel();
666 () = rx1.recv() => {}
671 for _ in range(0u, 100) { task::deschedule() }
677 let (tx, rx) = sync_channel::<int>(1);
680 n = rx.recv() => { assert_eq!(n, 1); }
685 let (tx, rx) = sync_channel::<int>(0);
687 for _ in range(0u, 100) { task::deschedule() }
691 n = rx.recv() => { assert_eq!(n, 1); }
696 let (tx1, rx1) = sync_channel::<int>(0);
697 let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
698 spawn(proc() { tx1.send(1); });
699 spawn(proc() { tx2.send(2); });
703 assert_eq!(rx2.recv(), 2);
707 assert_eq!(rx1.recv(), 1);