]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/select.rs
auto merge of #13967 : richo/rust/features/ICE-fails, r=alexcrichton
[rust.git] / src / libstd / comm / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
32 //!
33 //! tx1.send(1);
34 //! tx2.send(2);
35 //!
36 //! select! {
37 //!     val = rx1.recv() => {
38 //!         assert_eq!(val, 1);
39 //!     },
40 //!     val = rx2.recv() => {
41 //!         assert_eq!(val, 2);
42 //!     }
43 //! }
44 //! ```
45
46 #![allow(dead_code)]
47
48 use cast;
49 use cell::Cell;
50 use iter::Iterator;
51 use kinds::marker;
52 use kinds::Send;
53 use ops::Drop;
54 use option::{Some, None, Option};
55 use owned::Box;
56 use ptr::RawPtr;
57 use result::{Ok, Err, Result};
58 use rt::local::Local;
59 use rt::task::{Task, BlockedTask};
60 use super::Receiver;
61 use uint;
62
63 /// The "receiver set" of the select interface. This structure is used to manage
64 /// a set of receivers which are being selected over.
65 pub struct Select {
66     head: *mut Handle<'static, ()>,
67     tail: *mut Handle<'static, ()>,
68     next_id: Cell<uint>,
69     marker1: marker::NoSend,
70 }
71
72 /// A handle to a receiver which is currently a member of a `Select` set of
73 /// receivers.  This handle is used to keep the receiver in the set as well as
74 /// interact with the underlying receiver.
75 pub struct Handle<'rx, T> {
76     /// The ID of this handle, used to compare against the return value of
77     /// `Select::wait()`
78     id: uint,
79     selector: &'rx Select,
80     next: *mut Handle<'static, ()>,
81     prev: *mut Handle<'static, ()>,
82     added: bool,
83     packet: &'rx Packet,
84
85     // due to our fun transmutes, we be sure to place this at the end. (nothing
86     // previous relies on T)
87     rx: &'rx Receiver<T>,
88 }
89
90 struct Packets { cur: *mut Handle<'static, ()> }
91
92 #[doc(hidden)]
93 pub trait Packet {
94     fn can_recv(&self) -> bool;
95     fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
96     fn abort_selection(&self) -> bool;
97 }
98
99 impl Select {
100     /// Creates a new selection structure. This set is initially empty and
101     /// `wait` will fail!() if called.
102     ///
103     /// Usage of this struct directly can sometimes be burdensome, and usage is
104     /// rather much easier through the `select!` macro.
105     pub fn new() -> Select {
106         Select {
107             marker1: marker::NoSend,
108             head: 0 as *mut Handle<'static, ()>,
109             tail: 0 as *mut Handle<'static, ()>,
110             next_id: Cell::new(1),
111         }
112     }
113
114     /// Creates a new handle into this receiver set for a new receiver. Note
115     /// that this does *not* add the receiver to the receiver set, for that you
116     /// must call the `add` method on the handle itself.
117     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
118         let id = self.next_id.get();
119         self.next_id.set(id + 1);
120         Handle {
121             id: id,
122             selector: self,
123             next: 0 as *mut Handle<'static, ()>,
124             prev: 0 as *mut Handle<'static, ()>,
125             added: false,
126             rx: rx,
127             packet: rx,
128         }
129     }
130
131     /// Waits for an event on this receiver set. The returned value is *not* an
132     /// index, but rather an id. This id can be queried against any active
133     /// `Handle` structures (each one has an `id` method). The handle with
134     /// the matching `id` will have some sort of event available on it. The
135     /// event could either be that data is available or the corresponding
136     /// channel has been closed.
137     pub fn wait(&self) -> uint {
138         self.wait2(false)
139     }
140
141     /// Helper method for skipping the preflight checks during testing
142     fn wait2(&self, do_preflight_checks: bool) -> uint {
143         // Note that this is currently an inefficient implementation. We in
144         // theory have knowledge about all receivers in the set ahead of time,
145         // so this method shouldn't really have to iterate over all of them yet
146         // again. The idea with this "receiver set" interface is to get the
147         // interface right this time around, and later this implementation can
148         // be optimized.
149         //
150         // This implementation can be summarized by:
151         //
152         //      fn select(receivers) {
153         //          if any receiver ready { return ready index }
154         //          deschedule {
155         //              block on all receivers
156         //          }
157         //          unblock on all receivers
158         //          return ready index
159         //      }
160         //
161         // Most notably, the iterations over all of the receivers shouldn't be
162         // necessary.
163         unsafe {
164             let mut amt = 0;
165             for p in self.iter() {
166                 amt += 1;
167                 if do_preflight_checks && (*p).packet.can_recv() {
168                     return (*p).id;
169                 }
170             }
171             assert!(amt > 0);
172
173             let mut ready_index = amt;
174             let mut ready_id = uint::MAX;
175             let mut iter = self.iter().enumerate();
176
177             // Acquire a number of blocking contexts, and block on each one
178             // sequentially until one fails. If one fails, then abort
179             // immediately so we can go unblock on all the other receivers.
180             let task: Box<Task> = Local::take();
181             task.deschedule(amt, |task| {
182                 // Prepare for the block
183                 let (i, handle) = iter.next().unwrap();
184                 match (*handle).packet.start_selection(task) {
185                     Ok(()) => Ok(()),
186                     Err(task) => {
187                         ready_index = i;
188                         ready_id = (*handle).id;
189                         Err(task)
190                     }
191                 }
192             });
193
194             // Abort the selection process on each receiver. If the abort
195             // process returns `true`, then that means that the receiver is
196             // ready to receive some data. Note that this also means that the
197             // receiver may have yet to have fully read the `to_wake` field and
198             // woken us up (although the wakeup is guaranteed to fail).
199             //
200             // This situation happens in the window of where a sender invokes
201             // increment(), sees -1, and then decides to wake up the task. After
202             // all this is done, the sending thread will set `selecting` to
203             // `false`. Until this is done, we cannot return. If we were to
204             // return, then a sender could wake up a receiver which has gone
205             // back to sleep after this call to `select`.
206             //
207             // Note that it is a "fairly small window" in which an increment()
208             // views that it should wake a thread up until the `selecting` bit
209             // is set to false. For now, the implementation currently just spins
210             // in a yield loop. This is very distasteful, but this
211             // implementation is already nowhere near what it should ideally be.
212             // A rewrite should focus on avoiding a yield loop, and for now this
213             // implementation is tying us over to a more efficient "don't
214             // iterate over everything every time" implementation.
215             for handle in self.iter().take(ready_index) {
216                 if (*handle).packet.abort_selection() {
217                     ready_id = (*handle).id;
218                 }
219             }
220
221             assert!(ready_id != uint::MAX);
222             return ready_id;
223         }
224     }
225
226     fn iter(&self) -> Packets { Packets { cur: self.head } }
227 }
228
229 impl<'rx, T: Send> Handle<'rx, T> {
230     /// Retrieve the id of this handle.
231     #[inline]
232     pub fn id(&self) -> uint { self.id }
233
234     /// Receive a value on the underlying receiver. Has the same semantics as
235     /// `Receiver.recv`
236     pub fn recv(&mut self) -> T { self.rx.recv() }
237     /// Block to receive a value on the underlying receiver, returning `Some` on
238     /// success or `None` if the channel disconnects. This function has the same
239     /// semantics as `Receiver.recv_opt`
240     pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
241
242     /// Adds this handle to the receiver set that the handle was created from. This
243     /// method can be called multiple times, but it has no effect if `add` was
244     /// called previously.
245     ///
246     /// This method is unsafe because it requires that the `Handle` is not moved
247     /// while it is added to the `Select` set.
248     pub unsafe fn add(&mut self) {
249         if self.added { return }
250         let selector: &mut Select = cast::transmute(&*self.selector);
251         let me: *mut Handle<'static, ()> = cast::transmute(&*self);
252
253         if selector.head.is_null() {
254             selector.head = me;
255             selector.tail = me;
256         } else {
257             (*me).prev = selector.tail;
258             assert!((*me).next.is_null());
259             (*selector.tail).next = me;
260             selector.tail = me;
261         }
262         self.added = true;
263     }
264
265     /// Removes this handle from the `Select` set. This method is unsafe because
266     /// it has no guarantee that the `Handle` was not moved since `add` was
267     /// called.
268     pub unsafe fn remove(&mut self) {
269         if !self.added { return }
270
271         let selector: &mut Select = cast::transmute(&*self.selector);
272         let me: *mut Handle<'static, ()> = cast::transmute(&*self);
273
274         if self.prev.is_null() {
275             assert_eq!(selector.head, me);
276             selector.head = self.next;
277         } else {
278             (*self.prev).next = self.next;
279         }
280         if self.next.is_null() {
281             assert_eq!(selector.tail, me);
282             selector.tail = self.prev;
283         } else {
284             (*self.next).prev = self.prev;
285         }
286
287         self.next = 0 as *mut Handle<'static, ()>;
288         self.prev = 0 as *mut Handle<'static, ()>;
289
290         self.added = false;
291     }
292 }
293
294 #[unsafe_destructor]
295 impl Drop for Select {
296     fn drop(&mut self) {
297         assert!(self.head.is_null());
298         assert!(self.tail.is_null());
299     }
300 }
301
302 #[unsafe_destructor]
303 impl<'rx, T: Send> Drop for Handle<'rx, T> {
304     fn drop(&mut self) {
305         unsafe { self.remove() }
306     }
307 }
308
309 impl Iterator<*mut Handle<'static, ()>> for Packets {
310     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
311         if self.cur.is_null() {
312             None
313         } else {
314             let ret = Some(self.cur);
315             unsafe { self.cur = (*self.cur).next; }
316             ret
317         }
318     }
319 }
320
321 #[cfg(test)]
322 #[allow(unused_imports)]
323 mod test {
324     use super::super::*;
325     use prelude::*;
326
327     test!(fn smoke() {
328         let (tx1, rx1) = channel::<int>();
329         let (tx2, rx2) = channel::<int>();
330         tx1.send(1);
331         select! (
332             foo = rx1.recv() => { assert_eq!(foo, 1); },
333             _bar = rx2.recv() => { fail!() }
334         )
335         tx2.send(2);
336         select! (
337             _foo = rx1.recv() => { fail!() },
338             bar = rx2.recv() => { assert_eq!(bar, 2) }
339         )
340         drop(tx1);
341         select! (
342             foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
343             _bar = rx2.recv() => { fail!() }
344         )
345         drop(tx2);
346         select! (
347             bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
348         )
349     })
350
351     test!(fn smoke2() {
352         let (_tx1, rx1) = channel::<int>();
353         let (_tx2, rx2) = channel::<int>();
354         let (_tx3, rx3) = channel::<int>();
355         let (_tx4, rx4) = channel::<int>();
356         let (tx5, rx5) = channel::<int>();
357         tx5.send(4);
358         select! (
359             _foo = rx1.recv() => { fail!("1") },
360             _foo = rx2.recv() => { fail!("2") },
361             _foo = rx3.recv() => { fail!("3") },
362             _foo = rx4.recv() => { fail!("4") },
363             foo = rx5.recv() => { assert_eq!(foo, 4); }
364         )
365     })
366
367     test!(fn closed() {
368         let (_tx1, rx1) = channel::<int>();
369         let (tx2, rx2) = channel::<int>();
370         drop(tx2);
371
372         select! (
373             _a1 = rx1.recv_opt() => { fail!() },
374             a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
375         )
376     })
377
378     test!(fn unblocks() {
379         let (tx1, rx1) = channel::<int>();
380         let (_tx2, rx2) = channel::<int>();
381         let (tx3, rx3) = channel::<int>();
382
383         spawn(proc() {
384             for _ in range(0, 20) { task::deschedule(); }
385             tx1.send(1);
386             rx3.recv();
387             for _ in range(0, 20) { task::deschedule(); }
388         });
389
390         select! (
391             a = rx1.recv() => { assert_eq!(a, 1); },
392             _b = rx2.recv() => { fail!() }
393         )
394         tx3.send(1);
395         select! (
396             a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
397             _b = rx2.recv() => { fail!() }
398         )
399     })
400
401     test!(fn both_ready() {
402         let (tx1, rx1) = channel::<int>();
403         let (tx2, rx2) = channel::<int>();
404         let (tx3, rx3) = channel::<()>();
405
406         spawn(proc() {
407             for _ in range(0, 20) { task::deschedule(); }
408             tx1.send(1);
409             tx2.send(2);
410             rx3.recv();
411         });
412
413         select! (
414             a = rx1.recv() => { assert_eq!(a, 1); },
415             a = rx2.recv() => { assert_eq!(a, 2); }
416         )
417         select! (
418             a = rx1.recv() => { assert_eq!(a, 1); },
419             a = rx2.recv() => { assert_eq!(a, 2); }
420         )
421         assert_eq!(rx1.try_recv(), Err(Empty));
422         assert_eq!(rx2.try_recv(), Err(Empty));
423         tx3.send(());
424     })
425
426     test!(fn stress() {
427         static AMT: int = 10000;
428         let (tx1, rx1) = channel::<int>();
429         let (tx2, rx2) = channel::<int>();
430         let (tx3, rx3) = channel::<()>();
431
432         spawn(proc() {
433             for i in range(0, AMT) {
434                 if i % 2 == 0 {
435                     tx1.send(i);
436                 } else {
437                     tx2.send(i);
438                 }
439                 rx3.recv();
440             }
441         });
442
443         for i in range(0, AMT) {
444             select! (
445                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
446                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
447             )
448             tx3.send(());
449         }
450     })
451
452     test!(fn cloning() {
453         let (tx1, rx1) = channel::<int>();
454         let (_tx2, rx2) = channel::<int>();
455         let (tx3, rx3) = channel::<()>();
456
457         spawn(proc() {
458             rx3.recv();
459             tx1.clone();
460             assert_eq!(rx3.try_recv(), Err(Empty));
461             tx1.send(2);
462             rx3.recv();
463         });
464
465         tx3.send(());
466         select!(
467             _i1 = rx1.recv() => {},
468             _i2 = rx2.recv() => fail!()
469         )
470         tx3.send(());
471     })
472
473     test!(fn cloning2() {
474         let (tx1, rx1) = channel::<int>();
475         let (_tx2, rx2) = channel::<int>();
476         let (tx3, rx3) = channel::<()>();
477
478         spawn(proc() {
479             rx3.recv();
480             tx1.clone();
481             assert_eq!(rx3.try_recv(), Err(Empty));
482             tx1.send(2);
483             rx3.recv();
484         });
485
486         tx3.send(());
487         select!(
488             _i1 = rx1.recv() => {},
489             _i2 = rx2.recv() => fail!()
490         )
491         tx3.send(());
492     })
493
494     test!(fn cloning3() {
495         let (tx1, rx1) = channel::<()>();
496         let (tx2, rx2) = channel::<()>();
497         let (tx3, rx3) = channel::<()>();
498         spawn(proc() {
499             let s = Select::new();
500             let mut h1 = s.handle(&rx1);
501             let mut h2 = s.handle(&rx2);
502             unsafe { h2.add(); }
503             unsafe { h1.add(); }
504             assert_eq!(s.wait(), h2.id);
505             tx3.send(());
506         });
507
508         for _ in range(0, 1000) { task::deschedule(); }
509         drop(tx1.clone());
510         tx2.send(());
511         rx3.recv();
512     })
513
514     test!(fn preflight1() {
515         let (tx, rx) = channel();
516         tx.send(());
517         select!(
518             () = rx.recv() => {}
519         )
520     })
521
522     test!(fn preflight2() {
523         let (tx, rx) = channel();
524         tx.send(());
525         tx.send(());
526         select!(
527             () = rx.recv() => {}
528         )
529     })
530
531     test!(fn preflight3() {
532         let (tx, rx) = channel();
533         drop(tx.clone());
534         tx.send(());
535         select!(
536             () = rx.recv() => {}
537         )
538     })
539
540     test!(fn preflight4() {
541         let (tx, rx) = channel();
542         tx.send(());
543         let s = Select::new();
544         let mut h = s.handle(&rx);
545         unsafe { h.add(); }
546         assert_eq!(s.wait2(false), h.id);
547     })
548
549     test!(fn preflight5() {
550         let (tx, rx) = channel();
551         tx.send(());
552         tx.send(());
553         let s = Select::new();
554         let mut h = s.handle(&rx);
555         unsafe { h.add(); }
556         assert_eq!(s.wait2(false), h.id);
557     })
558
559     test!(fn preflight6() {
560         let (tx, rx) = channel();
561         drop(tx.clone());
562         tx.send(());
563         let s = Select::new();
564         let mut h = s.handle(&rx);
565         unsafe { h.add(); }
566         assert_eq!(s.wait2(false), h.id);
567     })
568
569     test!(fn preflight7() {
570         let (tx, rx) = channel::<()>();
571         drop(tx);
572         let s = Select::new();
573         let mut h = s.handle(&rx);
574         unsafe { h.add(); }
575         assert_eq!(s.wait2(false), h.id);
576     })
577
578     test!(fn preflight8() {
579         let (tx, rx) = channel();
580         tx.send(());
581         drop(tx);
582         rx.recv();
583         let s = Select::new();
584         let mut h = s.handle(&rx);
585         unsafe { h.add(); }
586         assert_eq!(s.wait2(false), h.id);
587     })
588
589     test!(fn preflight9() {
590         let (tx, rx) = channel();
591         drop(tx.clone());
592         tx.send(());
593         drop(tx);
594         rx.recv();
595         let s = Select::new();
596         let mut h = s.handle(&rx);
597         unsafe { h.add(); }
598         assert_eq!(s.wait2(false), h.id);
599     })
600
601     test!(fn oneshot_data_waiting() {
602         let (tx1, rx1) = channel();
603         let (tx2, rx2) = channel();
604         spawn(proc() {
605             select! {
606                 () = rx1.recv() => {}
607             }
608             tx2.send(());
609         });
610
611         for _ in range(0, 100) { task::deschedule() }
612         tx1.send(());
613         rx2.recv();
614     })
615
616     test!(fn stream_data_waiting() {
617         let (tx1, rx1) = channel();
618         let (tx2, rx2) = channel();
619         tx1.send(());
620         tx1.send(());
621         rx1.recv();
622         rx1.recv();
623         spawn(proc() {
624             select! {
625                 () = rx1.recv() => {}
626             }
627             tx2.send(());
628         });
629
630         for _ in range(0, 100) { task::deschedule() }
631         tx1.send(());
632         rx2.recv();
633     })
634
635     test!(fn shared_data_waiting() {
636         let (tx1, rx1) = channel();
637         let (tx2, rx2) = channel();
638         drop(tx1.clone());
639         tx1.send(());
640         rx1.recv();
641         spawn(proc() {
642             select! {
643                 () = rx1.recv() => {}
644             }
645             tx2.send(());
646         });
647
648         for _ in range(0, 100) { task::deschedule() }
649         tx1.send(());
650         rx2.recv();
651     })
652
653     test!(fn sync1() {
654         let (tx, rx) = sync_channel(1);
655         tx.send(1);
656         select! {
657             n = rx.recv() => { assert_eq!(n, 1); }
658         }
659     })
660
661     test!(fn sync2() {
662         let (tx, rx) = sync_channel(0);
663         spawn(proc() {
664             for _ in range(0, 100) { task::deschedule() }
665             tx.send(1);
666         });
667         select! {
668             n = rx.recv() => { assert_eq!(n, 1); }
669         }
670     })
671
672     test!(fn sync3() {
673         let (tx1, rx1) = sync_channel(0);
674         let (tx2, rx2) = channel();
675         spawn(proc() { tx1.send(1); });
676         spawn(proc() { tx2.send(2); });
677         select! {
678             n = rx1.recv() => {
679                 assert_eq!(n, 1);
680                 assert_eq!(rx2.recv(), 2);
681             },
682             n = rx2.recv() => {
683                 assert_eq!(n, 2);
684                 assert_eq!(rx1.recv(), 1);
685             }
686         }
687     })
688 }