1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
37 //! val = rx1.recv() => {
38 //! assert_eq!(val, 1);
40 //! val = rx2.recv() => {
41 //! assert_eq!(val, 2);
50 use alloc::owned::Box;
52 use core::kinds::marker;
55 use rustrt::local::Local;
56 use rustrt::task::{Task, BlockedTask};
60 /// The "receiver set" of the select interface. This structure is used to manage
61 /// a set of receivers which are being selected over.
63 head: *mut Handle<'static, ()>,
64 tail: *mut Handle<'static, ()>,
66 marker1: marker::NoSend,
69 /// A handle to a receiver which is currently a member of a `Select` set of
70 /// receivers. This handle is used to keep the receiver in the set as well as
71 /// interact with the underlying receiver.
72 pub struct Handle<'rx, T> {
73 /// The ID of this handle, used to compare against the return value of
76 selector: &'rx Select,
77 next: *mut Handle<'static, ()>,
78 prev: *mut Handle<'static, ()>,
82 // due to our fun transmutes, we be sure to place this at the end. (nothing
83 // previous relies on T)
87 struct Packets { cur: *mut Handle<'static, ()> }
91 fn can_recv(&self) -> bool;
92 fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
93 fn abort_selection(&self) -> bool;
97 /// Creates a new selection structure. This set is initially empty and
98 /// `wait` will fail!() if called.
100 /// Usage of this struct directly can sometimes be burdensome, and usage is
101 /// rather much easier through the `select!` macro.
102 pub fn new() -> Select {
104 marker1: marker::NoSend,
105 head: 0 as *mut Handle<'static, ()>,
106 tail: 0 as *mut Handle<'static, ()>,
107 next_id: Cell::new(1),
111 /// Creates a new handle into this receiver set for a new receiver. Note
112 /// that this does *not* add the receiver to the receiver set, for that you
113 /// must call the `add` method on the handle itself.
114 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
115 let id = self.next_id.get();
116 self.next_id.set(id + 1);
120 next: 0 as *mut Handle<'static, ()>,
121 prev: 0 as *mut Handle<'static, ()>,
128 /// Waits for an event on this receiver set. The returned value is *not* an
129 /// index, but rather an id. This id can be queried against any active
130 /// `Handle` structures (each one has an `id` method). The handle with
131 /// the matching `id` will have some sort of event available on it. The
132 /// event could either be that data is available or the corresponding
133 /// channel has been closed.
134 pub fn wait(&self) -> uint {
138 /// Helper method for skipping the preflight checks during testing
139 fn wait2(&self, do_preflight_checks: bool) -> uint {
140 // Note that this is currently an inefficient implementation. We in
141 // theory have knowledge about all receivers in the set ahead of time,
142 // so this method shouldn't really have to iterate over all of them yet
143 // again. The idea with this "receiver set" interface is to get the
144 // interface right this time around, and later this implementation can
147 // This implementation can be summarized by:
149 // fn select(receivers) {
150 // if any receiver ready { return ready index }
152 // block on all receivers
154 // unblock on all receivers
155 // return ready index
158 // Most notably, the iterations over all of the receivers shouldn't be
162 for p in self.iter() {
164 if do_preflight_checks && (*p).packet.can_recv() {
170 let mut ready_index = amt;
171 let mut ready_id = uint::MAX;
172 let mut iter = self.iter().enumerate();
174 // Acquire a number of blocking contexts, and block on each one
175 // sequentially until one fails. If one fails, then abort
176 // immediately so we can go unblock on all the other receivers.
177 let task: Box<Task> = Local::take();
178 task.deschedule(amt, |task| {
179 // Prepare for the block
180 let (i, handle) = iter.next().unwrap();
181 match (*handle).packet.start_selection(task) {
185 ready_id = (*handle).id;
191 // Abort the selection process on each receiver. If the abort
192 // process returns `true`, then that means that the receiver is
193 // ready to receive some data. Note that this also means that the
194 // receiver may have yet to have fully read the `to_wake` field and
195 // woken us up (although the wakeup is guaranteed to fail).
197 // This situation happens in the window of where a sender invokes
198 // increment(), sees -1, and then decides to wake up the task. After
199 // all this is done, the sending thread will set `selecting` to
200 // `false`. Until this is done, we cannot return. If we were to
201 // return, then a sender could wake up a receiver which has gone
202 // back to sleep after this call to `select`.
204 // Note that it is a "fairly small window" in which an increment()
205 // views that it should wake a thread up until the `selecting` bit
206 // is set to false. For now, the implementation currently just spins
207 // in a yield loop. This is very distasteful, but this
208 // implementation is already nowhere near what it should ideally be.
209 // A rewrite should focus on avoiding a yield loop, and for now this
210 // implementation is tying us over to a more efficient "don't
211 // iterate over everything every time" implementation.
212 for handle in self.iter().take(ready_index) {
213 if (*handle).packet.abort_selection() {
214 ready_id = (*handle).id;
218 assert!(ready_id != uint::MAX);
223 fn iter(&self) -> Packets { Packets { cur: self.head } }
226 impl<'rx, T: Send> Handle<'rx, T> {
227 /// Retrieve the id of this handle.
229 pub fn id(&self) -> uint { self.id }
231 /// Receive a value on the underlying receiver. Has the same semantics as
233 pub fn recv(&mut self) -> T { self.rx.recv() }
234 /// Block to receive a value on the underlying receiver, returning `Some` on
235 /// success or `None` if the channel disconnects. This function has the same
236 /// semantics as `Receiver.recv_opt`
237 pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
239 /// Adds this handle to the receiver set that the handle was created from. This
240 /// method can be called multiple times, but it has no effect if `add` was
241 /// called previously.
243 /// This method is unsafe because it requires that the `Handle` is not moved
244 /// while it is added to the `Select` set.
245 pub unsafe fn add(&mut self) {
246 if self.added { return }
247 let selector: &mut Select = mem::transmute(&*self.selector);
248 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
250 if selector.head.is_null() {
254 (*me).prev = selector.tail;
255 assert!((*me).next.is_null());
256 (*selector.tail).next = me;
262 /// Removes this handle from the `Select` set. This method is unsafe because
263 /// it has no guarantee that the `Handle` was not moved since `add` was
265 pub unsafe fn remove(&mut self) {
266 if !self.added { return }
268 let selector: &mut Select = mem::transmute(&*self.selector);
269 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
271 if self.prev.is_null() {
272 assert_eq!(selector.head, me);
273 selector.head = self.next;
275 (*self.prev).next = self.next;
277 if self.next.is_null() {
278 assert_eq!(selector.tail, me);
279 selector.tail = self.prev;
281 (*self.next).prev = self.prev;
284 self.next = 0 as *mut Handle<'static, ()>;
285 self.prev = 0 as *mut Handle<'static, ()>;
292 impl Drop for Select {
294 assert!(self.head.is_null());
295 assert!(self.tail.is_null());
300 impl<'rx, T: Send> Drop for Handle<'rx, T> {
302 unsafe { self.remove() }
306 impl Iterator<*mut Handle<'static, ()>> for Packets {
307 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
308 if self.cur.is_null() {
311 let ret = Some(self.cur);
312 unsafe { self.cur = (*self.cur).next; }
319 #[allow(unused_imports)]
325 // Don't use the libstd version so we can pull in the right Select structure
326 // (std::comm points at the wrong one)
327 macro_rules! select {
329 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
332 let sel = Select::new();
333 $( let mut $rx = sel.handle(&$rx); )+
337 let ret = sel.wait();
338 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
344 let (tx1, rx1) = channel::<int>();
345 let (tx2, rx2) = channel::<int>();
348 foo = rx1.recv() => { assert_eq!(foo, 1); },
349 _bar = rx2.recv() => { fail!() }
353 _foo = rx1.recv() => { fail!() },
354 bar = rx2.recv() => { assert_eq!(bar, 2) }
358 foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
359 _bar = rx2.recv() => { fail!() }
363 bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
368 let (_tx1, rx1) = channel::<int>();
369 let (_tx2, rx2) = channel::<int>();
370 let (_tx3, rx3) = channel::<int>();
371 let (_tx4, rx4) = channel::<int>();
372 let (tx5, rx5) = channel::<int>();
375 _foo = rx1.recv() => { fail!("1") },
376 _foo = rx2.recv() => { fail!("2") },
377 _foo = rx3.recv() => { fail!("3") },
378 _foo = rx4.recv() => { fail!("4") },
379 foo = rx5.recv() => { assert_eq!(foo, 4); }
384 let (_tx1, rx1) = channel::<int>();
385 let (tx2, rx2) = channel::<int>();
389 _a1 = rx1.recv_opt() => { fail!() },
390 a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
394 test!(fn unblocks() {
395 let (tx1, rx1) = channel::<int>();
396 let (_tx2, rx2) = channel::<int>();
397 let (tx3, rx3) = channel::<int>();
400 for _ in range(0, 20) { task::deschedule(); }
403 for _ in range(0, 20) { task::deschedule(); }
407 a = rx1.recv() => { assert_eq!(a, 1); },
408 _b = rx2.recv() => { fail!() }
412 a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
413 _b = rx2.recv() => { fail!() }
417 test!(fn both_ready() {
418 let (tx1, rx1) = channel::<int>();
419 let (tx2, rx2) = channel::<int>();
420 let (tx3, rx3) = channel::<()>();
423 for _ in range(0, 20) { task::deschedule(); }
430 a = rx1.recv() => { assert_eq!(a, 1); },
431 a = rx2.recv() => { assert_eq!(a, 2); }
434 a = rx1.recv() => { assert_eq!(a, 1); },
435 a = rx2.recv() => { assert_eq!(a, 2); }
437 assert_eq!(rx1.try_recv(), Err(Empty));
438 assert_eq!(rx2.try_recv(), Err(Empty));
443 static AMT: int = 10000;
444 let (tx1, rx1) = channel::<int>();
445 let (tx2, rx2) = channel::<int>();
446 let (tx3, rx3) = channel::<()>();
449 for i in range(0, AMT) {
459 for i in range(0, AMT) {
461 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
462 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
469 let (tx1, rx1) = channel::<int>();
470 let (_tx2, rx2) = channel::<int>();
471 let (tx3, rx3) = channel::<()>();
476 assert_eq!(rx3.try_recv(), Err(Empty));
483 _i1 = rx1.recv() => {},
484 _i2 = rx2.recv() => fail!()
489 test!(fn cloning2() {
490 let (tx1, rx1) = channel::<int>();
491 let (_tx2, rx2) = channel::<int>();
492 let (tx3, rx3) = channel::<()>();
497 assert_eq!(rx3.try_recv(), Err(Empty));
504 _i1 = rx1.recv() => {},
505 _i2 = rx2.recv() => fail!()
510 test!(fn cloning3() {
511 let (tx1, rx1) = channel::<()>();
512 let (tx2, rx2) = channel::<()>();
513 let (tx3, rx3) = channel::<()>();
515 let s = Select::new();
516 let mut h1 = s.handle(&rx1);
517 let mut h2 = s.handle(&rx2);
520 assert_eq!(s.wait(), h2.id);
524 for _ in range(0, 1000) { task::deschedule(); }
530 test!(fn preflight1() {
531 let (tx, rx) = channel();
538 test!(fn preflight2() {
539 let (tx, rx) = channel();
547 test!(fn preflight3() {
548 let (tx, rx) = channel();
556 test!(fn preflight4() {
557 let (tx, rx) = channel();
559 let s = Select::new();
560 let mut h = s.handle(&rx);
562 assert_eq!(s.wait2(false), h.id);
565 test!(fn preflight5() {
566 let (tx, rx) = channel();
569 let s = Select::new();
570 let mut h = s.handle(&rx);
572 assert_eq!(s.wait2(false), h.id);
575 test!(fn preflight6() {
576 let (tx, rx) = channel();
579 let s = Select::new();
580 let mut h = s.handle(&rx);
582 assert_eq!(s.wait2(false), h.id);
585 test!(fn preflight7() {
586 let (tx, rx) = channel::<()>();
588 let s = Select::new();
589 let mut h = s.handle(&rx);
591 assert_eq!(s.wait2(false), h.id);
594 test!(fn preflight8() {
595 let (tx, rx) = channel();
599 let s = Select::new();
600 let mut h = s.handle(&rx);
602 assert_eq!(s.wait2(false), h.id);
605 test!(fn preflight9() {
606 let (tx, rx) = channel();
611 let s = Select::new();
612 let mut h = s.handle(&rx);
614 assert_eq!(s.wait2(false), h.id);
617 test!(fn oneshot_data_waiting() {
618 let (tx1, rx1) = channel();
619 let (tx2, rx2) = channel();
622 () = rx1.recv() => {}
627 for _ in range(0, 100) { task::deschedule() }
632 test!(fn stream_data_waiting() {
633 let (tx1, rx1) = channel();
634 let (tx2, rx2) = channel();
641 () = rx1.recv() => {}
646 for _ in range(0, 100) { task::deschedule() }
651 test!(fn shared_data_waiting() {
652 let (tx1, rx1) = channel();
653 let (tx2, rx2) = channel();
659 () = rx1.recv() => {}
664 for _ in range(0, 100) { task::deschedule() }
670 let (tx, rx) = sync_channel(1);
673 n = rx.recv() => { assert_eq!(n, 1); }
678 let (tx, rx) = sync_channel(0);
680 for _ in range(0, 100) { task::deschedule() }
684 n = rx.recv() => { assert_eq!(n, 1); }
689 let (tx1, rx1) = sync_channel(0);
690 let (tx2, rx2) = channel();
691 spawn(proc() { tx1.send(1); });
692 spawn(proc() { tx2.send(2); });
696 assert_eq!(rx2.recv(), 2);
700 assert_eq!(rx1.recv(), 1);