1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
37 //! val = rx1.recv() => {
38 //! assert_eq!(val, 1);
40 //! val = rx2.recv() => {
41 //! assert_eq!(val, 2);
54 use option::{Some, None, Option};
56 use result::{Ok, Err, Result};
58 use rt::task::{Task, BlockedTask};
62 /// The "receiver set" of the select interface. This structure is used to manage
63 /// a set of receivers which are being selected over.
65 head: *mut Handle<'static, ()>,
66 tail: *mut Handle<'static, ()>,
68 marker1: marker::NoSend,
71 /// A handle to a receiver which is currently a member of a `Select` set of
72 /// receivers. This handle is used to keep the receiver in the set as well as
73 /// interact with the underlying receiver.
74 pub struct Handle<'rx, T> {
75 /// The ID of this handle, used to compare against the return value of
78 selector: &'rx Select,
79 next: *mut Handle<'static, ()>,
80 prev: *mut Handle<'static, ()>,
84 // due to our fun transmutes, we be sure to place this at the end. (nothing
85 // previous relies on T)
89 struct Packets { cur: *mut Handle<'static, ()> }
93 fn can_recv(&self) -> bool;
94 fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
95 fn abort_selection(&self) -> bool;
99 /// Creates a new selection structure. This set is initially empty and
100 /// `wait` will fail!() if called.
102 /// Usage of this struct directly can sometimes be burdensome, and usage is
103 /// rather much easier through the `select!` macro.
104 pub fn new() -> Select {
106 marker1: marker::NoSend,
107 head: 0 as *mut Handle<'static, ()>,
108 tail: 0 as *mut Handle<'static, ()>,
109 next_id: Cell::new(1),
113 /// Creates a new handle into this receiver set for a new receiver. Note
114 /// that this does *not* add the receiver to the receiver set, for that you
115 /// must call the `add` method on the handle itself.
116 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
117 let id = self.next_id.get();
118 self.next_id.set(id + 1);
122 next: 0 as *mut Handle<'static, ()>,
123 prev: 0 as *mut Handle<'static, ()>,
130 /// Waits for an event on this receiver set. The returned value is *not* an
131 /// index, but rather an id. This id can be queried against any active
132 /// `Handle` structures (each one has an `id` method). The handle with
133 /// the matching `id` will have some sort of event available on it. The
134 /// event could either be that data is available or the corresponding
135 /// channel has been closed.
136 pub fn wait(&self) -> uint {
140 /// Helper method for skipping the preflight checks during testing
141 fn wait2(&self, do_preflight_checks: bool) -> uint {
142 // Note that this is currently an inefficient implementation. We in
143 // theory have knowledge about all receivers in the set ahead of time,
144 // so this method shouldn't really have to iterate over all of them yet
145 // again. The idea with this "receiver set" interface is to get the
146 // interface right this time around, and later this implementation can
149 // This implementation can be summarized by:
151 // fn select(receivers) {
152 // if any receiver ready { return ready index }
154 // block on all receivers
156 // unblock on all receivers
157 // return ready index
160 // Most notably, the iterations over all of the receivers shouldn't be
164 for p in self.iter() {
166 if do_preflight_checks && (*p).packet.can_recv() {
172 let mut ready_index = amt;
173 let mut ready_id = uint::MAX;
174 let mut iter = self.iter().enumerate();
176 // Acquire a number of blocking contexts, and block on each one
177 // sequentially until one fails. If one fails, then abort
178 // immediately so we can go unblock on all the other receivers.
179 let task: ~Task = Local::take();
180 task.deschedule(amt, |task| {
181 // Prepare for the block
182 let (i, handle) = iter.next().unwrap();
183 match (*handle).packet.start_selection(task) {
187 ready_id = (*handle).id;
193 // Abort the selection process on each receiver. If the abort
194 // process returns `true`, then that means that the receiver is
195 // ready to receive some data. Note that this also means that the
196 // receiver may have yet to have fully read the `to_wake` field and
197 // woken us up (although the wakeup is guaranteed to fail).
199 // This situation happens in the window of where a sender invokes
200 // increment(), sees -1, and then decides to wake up the task. After
201 // all this is done, the sending thread will set `selecting` to
202 // `false`. Until this is done, we cannot return. If we were to
203 // return, then a sender could wake up a receiver which has gone
204 // back to sleep after this call to `select`.
206 // Note that it is a "fairly small window" in which an increment()
207 // views that it should wake a thread up until the `selecting` bit
208 // is set to false. For now, the implementation currently just spins
209 // in a yield loop. This is very distasteful, but this
210 // implementation is already nowhere near what it should ideally be.
211 // A rewrite should focus on avoiding a yield loop, and for now this
212 // implementation is tying us over to a more efficient "don't
213 // iterate over everything every time" implementation.
214 for handle in self.iter().take(ready_index) {
215 if (*handle).packet.abort_selection() {
216 ready_id = (*handle).id;
220 assert!(ready_id != uint::MAX);
225 fn iter(&self) -> Packets { Packets { cur: self.head } }
228 impl<'rx, T: Send> Handle<'rx, T> {
229 /// Retrieve the id of this handle.
231 pub fn id(&self) -> uint { self.id }
233 /// Receive a value on the underlying receiver. Has the same semantics as
235 pub fn recv(&mut self) -> T { self.rx.recv() }
236 /// Block to receive a value on the underlying receiver, returning `Some` on
237 /// success or `None` if the channel disconnects. This function has the same
238 /// semantics as `Receiver.recv_opt`
239 pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
241 /// Adds this handle to the receiver set that the handle was created from. This
242 /// method can be called multiple times, but it has no effect if `add` was
243 /// called previously.
245 /// This method is unsafe because it requires that the `Handle` is not moved
246 /// while it is added to the `Select` set.
247 pub unsafe fn add(&mut self) {
248 if self.added { return }
249 let selector: &mut Select = cast::transmute(&*self.selector);
250 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
252 if selector.head.is_null() {
256 (*me).prev = selector.tail;
257 assert!((*me).next.is_null());
258 (*selector.tail).next = me;
264 /// Removes this handle from the `Select` set. This method is unsafe because
265 /// it has no guarantee that the `Handle` was not moved since `add` was
267 pub unsafe fn remove(&mut self) {
268 if !self.added { return }
270 let selector: &mut Select = cast::transmute(&*self.selector);
271 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
273 if self.prev.is_null() {
274 assert_eq!(selector.head, me);
275 selector.head = self.next;
277 (*self.prev).next = self.next;
279 if self.next.is_null() {
280 assert_eq!(selector.tail, me);
281 selector.tail = self.prev;
283 (*self.next).prev = self.prev;
286 self.next = 0 as *mut Handle<'static, ()>;
287 self.prev = 0 as *mut Handle<'static, ()>;
294 impl Drop for Select {
296 assert!(self.head.is_null());
297 assert!(self.tail.is_null());
302 impl<'rx, T: Send> Drop for Handle<'rx, T> {
304 unsafe { self.remove() }
308 impl Iterator<*mut Handle<'static, ()>> for Packets {
309 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
310 if self.cur.is_null() {
313 let ret = Some(self.cur);
314 unsafe { self.cur = (*self.cur).next; }
321 #[allow(unused_imports)]
327 let (tx1, rx1) = channel::<int>();
328 let (tx2, rx2) = channel::<int>();
331 foo = rx1.recv() => { assert_eq!(foo, 1); },
332 _bar = rx2.recv() => { fail!() }
336 _foo = rx1.recv() => { fail!() },
337 bar = rx2.recv() => { assert_eq!(bar, 2) }
341 foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
342 _bar = rx2.recv() => { fail!() }
346 bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
351 let (_tx1, rx1) = channel::<int>();
352 let (_tx2, rx2) = channel::<int>();
353 let (_tx3, rx3) = channel::<int>();
354 let (_tx4, rx4) = channel::<int>();
355 let (tx5, rx5) = channel::<int>();
358 _foo = rx1.recv() => { fail!("1") },
359 _foo = rx2.recv() => { fail!("2") },
360 _foo = rx3.recv() => { fail!("3") },
361 _foo = rx4.recv() => { fail!("4") },
362 foo = rx5.recv() => { assert_eq!(foo, 4); }
367 let (_tx1, rx1) = channel::<int>();
368 let (tx2, rx2) = channel::<int>();
372 _a1 = rx1.recv_opt() => { fail!() },
373 a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
377 test!(fn unblocks() {
378 let (tx1, rx1) = channel::<int>();
379 let (_tx2, rx2) = channel::<int>();
380 let (tx3, rx3) = channel::<int>();
383 for _ in range(0, 20) { task::deschedule(); }
386 for _ in range(0, 20) { task::deschedule(); }
390 a = rx1.recv() => { assert_eq!(a, 1); },
391 _b = rx2.recv() => { fail!() }
395 a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
396 _b = rx2.recv() => { fail!() }
400 test!(fn both_ready() {
401 let (tx1, rx1) = channel::<int>();
402 let (tx2, rx2) = channel::<int>();
403 let (tx3, rx3) = channel::<()>();
406 for _ in range(0, 20) { task::deschedule(); }
413 a = rx1.recv() => { assert_eq!(a, 1); },
414 a = rx2.recv() => { assert_eq!(a, 2); }
417 a = rx1.recv() => { assert_eq!(a, 1); },
418 a = rx2.recv() => { assert_eq!(a, 2); }
420 assert_eq!(rx1.try_recv(), Err(Empty));
421 assert_eq!(rx2.try_recv(), Err(Empty));
426 static AMT: int = 10000;
427 let (tx1, rx1) = channel::<int>();
428 let (tx2, rx2) = channel::<int>();
429 let (tx3, rx3) = channel::<()>();
432 for i in range(0, AMT) {
442 for i in range(0, AMT) {
444 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
445 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
452 let (tx1, rx1) = channel::<int>();
453 let (_tx2, rx2) = channel::<int>();
454 let (tx3, rx3) = channel::<()>();
459 assert_eq!(rx3.try_recv(), Err(Empty));
466 _i1 = rx1.recv() => {},
467 _i2 = rx2.recv() => fail!()
472 test!(fn cloning2() {
473 let (tx1, rx1) = channel::<int>();
474 let (_tx2, rx2) = channel::<int>();
475 let (tx3, rx3) = channel::<()>();
480 assert_eq!(rx3.try_recv(), Err(Empty));
487 _i1 = rx1.recv() => {},
488 _i2 = rx2.recv() => fail!()
493 test!(fn cloning3() {
494 let (tx1, rx1) = channel::<()>();
495 let (tx2, rx2) = channel::<()>();
496 let (tx3, rx3) = channel::<()>();
498 let s = Select::new();
499 let mut h1 = s.handle(&rx1);
500 let mut h2 = s.handle(&rx2);
503 assert_eq!(s.wait(), h2.id);
507 for _ in range(0, 1000) { task::deschedule(); }
513 test!(fn preflight1() {
514 let (tx, rx) = channel();
521 test!(fn preflight2() {
522 let (tx, rx) = channel();
530 test!(fn preflight3() {
531 let (tx, rx) = channel();
539 test!(fn preflight4() {
540 let (tx, rx) = channel();
542 let s = Select::new();
543 let mut h = s.handle(&rx);
545 assert_eq!(s.wait2(false), h.id);
548 test!(fn preflight5() {
549 let (tx, rx) = channel();
552 let s = Select::new();
553 let mut h = s.handle(&rx);
555 assert_eq!(s.wait2(false), h.id);
558 test!(fn preflight6() {
559 let (tx, rx) = channel();
562 let s = Select::new();
563 let mut h = s.handle(&rx);
565 assert_eq!(s.wait2(false), h.id);
568 test!(fn preflight7() {
569 let (tx, rx) = channel::<()>();
571 let s = Select::new();
572 let mut h = s.handle(&rx);
574 assert_eq!(s.wait2(false), h.id);
577 test!(fn preflight8() {
578 let (tx, rx) = channel();
582 let s = Select::new();
583 let mut h = s.handle(&rx);
585 assert_eq!(s.wait2(false), h.id);
588 test!(fn preflight9() {
589 let (tx, rx) = channel();
594 let s = Select::new();
595 let mut h = s.handle(&rx);
597 assert_eq!(s.wait2(false), h.id);
600 test!(fn oneshot_data_waiting() {
601 let (tx1, rx1) = channel();
602 let (tx2, rx2) = channel();
605 () = rx1.recv() => {}
610 for _ in range(0, 100) { task::deschedule() }
615 test!(fn stream_data_waiting() {
616 let (tx1, rx1) = channel();
617 let (tx2, rx2) = channel();
624 () = rx1.recv() => {}
629 for _ in range(0, 100) { task::deschedule() }
634 test!(fn shared_data_waiting() {
635 let (tx1, rx1) = channel();
636 let (tx2, rx2) = channel();
642 () = rx1.recv() => {}
647 for _ in range(0, 100) { task::deschedule() }
653 let (tx, rx) = sync_channel(1);
656 n = rx.recv() => { assert_eq!(n, 1); }
661 let (tx, rx) = sync_channel(0);
663 for _ in range(0, 100) { task::deschedule() }
667 n = rx.recv() => { assert_eq!(n, 1); }
672 let (tx1, rx1) = sync_channel(0);
673 let (tx2, rx2) = channel();
674 spawn(proc() { tx1.send(1); });
675 spawn(proc() { tx2.send(2); });
679 assert_eq!(rx2.recv(), 2);
683 assert_eq!(rx1.recv(), 1);