1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Selection over an array of receivers
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
37 //! val = rx1.recv() => {
38 //! assert_eq!(val, 1);
40 //! val = rx2.recv() => {
41 //! assert_eq!(val, 2);
54 use option::{Some, None, Option};
57 use result::{Ok, Err, Result};
59 use rt::task::{Task, BlockedTask};
63 /// The "receiver set" of the select interface. This structure is used to manage
64 /// a set of receivers which are being selected over.
66 head: *mut Handle<'static, ()>,
67 tail: *mut Handle<'static, ()>,
69 marker1: marker::NoSend,
72 /// A handle to a receiver which is currently a member of a `Select` set of
73 /// receivers. This handle is used to keep the receiver in the set as well as
74 /// interact with the underlying receiver.
75 pub struct Handle<'rx, T> {
76 /// The ID of this handle, used to compare against the return value of
79 selector: &'rx Select,
80 next: *mut Handle<'static, ()>,
81 prev: *mut Handle<'static, ()>,
85 // due to our fun transmutes, we be sure to place this at the end. (nothing
86 // previous relies on T)
90 struct Packets { cur: *mut Handle<'static, ()> }
94 fn can_recv(&self) -> bool;
95 fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
96 fn abort_selection(&self) -> bool;
100 /// Creates a new selection structure. This set is initially empty and
101 /// `wait` will fail!() if called.
103 /// Usage of this struct directly can sometimes be burdensome, and usage is
104 /// rather much easier through the `select!` macro.
105 pub fn new() -> Select {
107 marker1: marker::NoSend,
108 head: 0 as *mut Handle<'static, ()>,
109 tail: 0 as *mut Handle<'static, ()>,
110 next_id: Cell::new(1),
114 /// Creates a new handle into this receiver set for a new receiver. Note
115 /// that this does *not* add the receiver to the receiver set, for that you
116 /// must call the `add` method on the handle itself.
117 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
118 let id = self.next_id.get();
119 self.next_id.set(id + 1);
123 next: 0 as *mut Handle<'static, ()>,
124 prev: 0 as *mut Handle<'static, ()>,
131 /// Waits for an event on this receiver set. The returned value is *not* an
132 /// index, but rather an id. This id can be queried against any active
133 /// `Handle` structures (each one has an `id` method). The handle with
134 /// the matching `id` will have some sort of event available on it. The
135 /// event could either be that data is available or the corresponding
136 /// channel has been closed.
137 pub fn wait(&self) -> uint {
141 /// Helper method for skipping the preflight checks during testing
142 fn wait2(&self, do_preflight_checks: bool) -> uint {
143 // Note that this is currently an inefficient implementation. We in
144 // theory have knowledge about all receivers in the set ahead of time,
145 // so this method shouldn't really have to iterate over all of them yet
146 // again. The idea with this "receiver set" interface is to get the
147 // interface right this time around, and later this implementation can
150 // This implementation can be summarized by:
152 // fn select(receivers) {
153 // if any receiver ready { return ready index }
155 // block on all receivers
157 // unblock on all receivers
158 // return ready index
161 // Most notably, the iterations over all of the receivers shouldn't be
165 for p in self.iter() {
167 if do_preflight_checks && (*p).packet.can_recv() {
173 let mut ready_index = amt;
174 let mut ready_id = uint::MAX;
175 let mut iter = self.iter().enumerate();
177 // Acquire a number of blocking contexts, and block on each one
178 // sequentially until one fails. If one fails, then abort
179 // immediately so we can go unblock on all the other receivers.
180 let task: Box<Task> = Local::take();
181 task.deschedule(amt, |task| {
182 // Prepare for the block
183 let (i, handle) = iter.next().unwrap();
184 match (*handle).packet.start_selection(task) {
188 ready_id = (*handle).id;
194 // Abort the selection process on each receiver. If the abort
195 // process returns `true`, then that means that the receiver is
196 // ready to receive some data. Note that this also means that the
197 // receiver may have yet to have fully read the `to_wake` field and
198 // woken us up (although the wakeup is guaranteed to fail).
200 // This situation happens in the window of where a sender invokes
201 // increment(), sees -1, and then decides to wake up the task. After
202 // all this is done, the sending thread will set `selecting` to
203 // `false`. Until this is done, we cannot return. If we were to
204 // return, then a sender could wake up a receiver which has gone
205 // back to sleep after this call to `select`.
207 // Note that it is a "fairly small window" in which an increment()
208 // views that it should wake a thread up until the `selecting` bit
209 // is set to false. For now, the implementation currently just spins
210 // in a yield loop. This is very distasteful, but this
211 // implementation is already nowhere near what it should ideally be.
212 // A rewrite should focus on avoiding a yield loop, and for now this
213 // implementation is tying us over to a more efficient "don't
214 // iterate over everything every time" implementation.
215 for handle in self.iter().take(ready_index) {
216 if (*handle).packet.abort_selection() {
217 ready_id = (*handle).id;
221 assert!(ready_id != uint::MAX);
226 fn iter(&self) -> Packets { Packets { cur: self.head } }
229 impl<'rx, T: Send> Handle<'rx, T> {
230 /// Retrieve the id of this handle.
232 pub fn id(&self) -> uint { self.id }
234 /// Receive a value on the underlying receiver. Has the same semantics as
236 pub fn recv(&mut self) -> T { self.rx.recv() }
237 /// Block to receive a value on the underlying receiver, returning `Some` on
238 /// success or `None` if the channel disconnects. This function has the same
239 /// semantics as `Receiver.recv_opt`
240 pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
242 /// Adds this handle to the receiver set that the handle was created from. This
243 /// method can be called multiple times, but it has no effect if `add` was
244 /// called previously.
246 /// This method is unsafe because it requires that the `Handle` is not moved
247 /// while it is added to the `Select` set.
248 pub unsafe fn add(&mut self) {
249 if self.added { return }
250 let selector: &mut Select = cast::transmute(&*self.selector);
251 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
253 if selector.head.is_null() {
257 (*me).prev = selector.tail;
258 assert!((*me).next.is_null());
259 (*selector.tail).next = me;
265 /// Removes this handle from the `Select` set. This method is unsafe because
266 /// it has no guarantee that the `Handle` was not moved since `add` was
268 pub unsafe fn remove(&mut self) {
269 if !self.added { return }
271 let selector: &mut Select = cast::transmute(&*self.selector);
272 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
274 if self.prev.is_null() {
275 assert_eq!(selector.head, me);
276 selector.head = self.next;
278 (*self.prev).next = self.next;
280 if self.next.is_null() {
281 assert_eq!(selector.tail, me);
282 selector.tail = self.prev;
284 (*self.next).prev = self.prev;
287 self.next = 0 as *mut Handle<'static, ()>;
288 self.prev = 0 as *mut Handle<'static, ()>;
295 impl Drop for Select {
297 assert!(self.head.is_null());
298 assert!(self.tail.is_null());
303 impl<'rx, T: Send> Drop for Handle<'rx, T> {
305 unsafe { self.remove() }
309 impl Iterator<*mut Handle<'static, ()>> for Packets {
310 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
311 if self.cur.is_null() {
314 let ret = Some(self.cur);
315 unsafe { self.cur = (*self.cur).next; }
322 #[allow(unused_imports)]
328 let (tx1, rx1) = channel::<int>();
329 let (tx2, rx2) = channel::<int>();
332 foo = rx1.recv() => { assert_eq!(foo, 1); },
333 _bar = rx2.recv() => { fail!() }
337 _foo = rx1.recv() => { fail!() },
338 bar = rx2.recv() => { assert_eq!(bar, 2) }
342 foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
343 _bar = rx2.recv() => { fail!() }
347 bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
352 let (_tx1, rx1) = channel::<int>();
353 let (_tx2, rx2) = channel::<int>();
354 let (_tx3, rx3) = channel::<int>();
355 let (_tx4, rx4) = channel::<int>();
356 let (tx5, rx5) = channel::<int>();
359 _foo = rx1.recv() => { fail!("1") },
360 _foo = rx2.recv() => { fail!("2") },
361 _foo = rx3.recv() => { fail!("3") },
362 _foo = rx4.recv() => { fail!("4") },
363 foo = rx5.recv() => { assert_eq!(foo, 4); }
368 let (_tx1, rx1) = channel::<int>();
369 let (tx2, rx2) = channel::<int>();
373 _a1 = rx1.recv_opt() => { fail!() },
374 a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
378 test!(fn unblocks() {
379 let (tx1, rx1) = channel::<int>();
380 let (_tx2, rx2) = channel::<int>();
381 let (tx3, rx3) = channel::<int>();
384 for _ in range(0, 20) { task::deschedule(); }
387 for _ in range(0, 20) { task::deschedule(); }
391 a = rx1.recv() => { assert_eq!(a, 1); },
392 _b = rx2.recv() => { fail!() }
396 a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
397 _b = rx2.recv() => { fail!() }
401 test!(fn both_ready() {
402 let (tx1, rx1) = channel::<int>();
403 let (tx2, rx2) = channel::<int>();
404 let (tx3, rx3) = channel::<()>();
407 for _ in range(0, 20) { task::deschedule(); }
414 a = rx1.recv() => { assert_eq!(a, 1); },
415 a = rx2.recv() => { assert_eq!(a, 2); }
418 a = rx1.recv() => { assert_eq!(a, 1); },
419 a = rx2.recv() => { assert_eq!(a, 2); }
421 assert_eq!(rx1.try_recv(), Err(Empty));
422 assert_eq!(rx2.try_recv(), Err(Empty));
427 static AMT: int = 10000;
428 let (tx1, rx1) = channel::<int>();
429 let (tx2, rx2) = channel::<int>();
430 let (tx3, rx3) = channel::<()>();
433 for i in range(0, AMT) {
443 for i in range(0, AMT) {
445 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
446 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
453 let (tx1, rx1) = channel::<int>();
454 let (_tx2, rx2) = channel::<int>();
455 let (tx3, rx3) = channel::<()>();
460 assert_eq!(rx3.try_recv(), Err(Empty));
467 _i1 = rx1.recv() => {},
468 _i2 = rx2.recv() => fail!()
473 test!(fn cloning2() {
474 let (tx1, rx1) = channel::<int>();
475 let (_tx2, rx2) = channel::<int>();
476 let (tx3, rx3) = channel::<()>();
481 assert_eq!(rx3.try_recv(), Err(Empty));
488 _i1 = rx1.recv() => {},
489 _i2 = rx2.recv() => fail!()
494 test!(fn cloning3() {
495 let (tx1, rx1) = channel::<()>();
496 let (tx2, rx2) = channel::<()>();
497 let (tx3, rx3) = channel::<()>();
499 let s = Select::new();
500 let mut h1 = s.handle(&rx1);
501 let mut h2 = s.handle(&rx2);
504 assert_eq!(s.wait(), h2.id);
508 for _ in range(0, 1000) { task::deschedule(); }
514 test!(fn preflight1() {
515 let (tx, rx) = channel();
522 test!(fn preflight2() {
523 let (tx, rx) = channel();
531 test!(fn preflight3() {
532 let (tx, rx) = channel();
540 test!(fn preflight4() {
541 let (tx, rx) = channel();
543 let s = Select::new();
544 let mut h = s.handle(&rx);
546 assert_eq!(s.wait2(false), h.id);
549 test!(fn preflight5() {
550 let (tx, rx) = channel();
553 let s = Select::new();
554 let mut h = s.handle(&rx);
556 assert_eq!(s.wait2(false), h.id);
559 test!(fn preflight6() {
560 let (tx, rx) = channel();
563 let s = Select::new();
564 let mut h = s.handle(&rx);
566 assert_eq!(s.wait2(false), h.id);
569 test!(fn preflight7() {
570 let (tx, rx) = channel::<()>();
572 let s = Select::new();
573 let mut h = s.handle(&rx);
575 assert_eq!(s.wait2(false), h.id);
578 test!(fn preflight8() {
579 let (tx, rx) = channel();
583 let s = Select::new();
584 let mut h = s.handle(&rx);
586 assert_eq!(s.wait2(false), h.id);
589 test!(fn preflight9() {
590 let (tx, rx) = channel();
595 let s = Select::new();
596 let mut h = s.handle(&rx);
598 assert_eq!(s.wait2(false), h.id);
601 test!(fn oneshot_data_waiting() {
602 let (tx1, rx1) = channel();
603 let (tx2, rx2) = channel();
606 () = rx1.recv() => {}
611 for _ in range(0, 100) { task::deschedule() }
616 test!(fn stream_data_waiting() {
617 let (tx1, rx1) = channel();
618 let (tx2, rx2) = channel();
625 () = rx1.recv() => {}
630 for _ in range(0, 100) { task::deschedule() }
635 test!(fn shared_data_waiting() {
636 let (tx1, rx1) = channel();
637 let (tx2, rx2) = channel();
643 () = rx1.recv() => {}
648 for _ in range(0, 100) { task::deschedule() }
654 let (tx, rx) = sync_channel(1);
657 n = rx.recv() => { assert_eq!(n, 1); }
662 let (tx, rx) = sync_channel(0);
664 for _ in range(0, 100) { task::deschedule() }
668 n = rx.recv() => { assert_eq!(n, 1); }
673 let (tx1, rx1) = sync_channel(0);
674 let (tx2, rx2) = channel();
675 spawn(proc() { tx1.send(1); });
676 spawn(proc() { tx2.send(2); });
680 assert_eq!(rx2.recv(), 2);
684 assert_eq!(rx1.recv(), 1);