]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/select.rs
Ignore tests broken by failing on ICE
[rust.git] / src / libstd / comm / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
32 //!
33 //! tx1.send(1);
34 //! tx2.send(2);
35 //!
36 //! select! {
37 //!     val = rx1.recv() => {
38 //!         assert_eq!(val, 1);
39 //!     },
40 //!     val = rx2.recv() => {
41 //!         assert_eq!(val, 2);
42 //!     }
43 //! }
44 //! ```
45
46 #![allow(dead_code)]
47
48 use cast;
49 use cell::Cell;
50 use iter::Iterator;
51 use kinds::marker;
52 use kinds::Send;
53 use ops::Drop;
54 use option::{Some, None, Option};
55 use ptr::RawPtr;
56 use result::{Ok, Err, Result};
57 use rt::local::Local;
58 use rt::task::{Task, BlockedTask};
59 use super::Receiver;
60 use uint;
61
62 /// The "receiver set" of the select interface. This structure is used to manage
63 /// a set of receivers which are being selected over.
64 pub struct Select {
65     head: *mut Handle<'static, ()>,
66     tail: *mut Handle<'static, ()>,
67     next_id: Cell<uint>,
68     marker1: marker::NoSend,
69 }
70
71 /// A handle to a receiver which is currently a member of a `Select` set of
72 /// receivers.  This handle is used to keep the receiver in the set as well as
73 /// interact with the underlying receiver.
74 pub struct Handle<'rx, T> {
75     /// The ID of this handle, used to compare against the return value of
76     /// `Select::wait()`
77     id: uint,
78     selector: &'rx Select,
79     next: *mut Handle<'static, ()>,
80     prev: *mut Handle<'static, ()>,
81     added: bool,
82     packet: &'rx Packet,
83
84     // due to our fun transmutes, we be sure to place this at the end. (nothing
85     // previous relies on T)
86     rx: &'rx Receiver<T>,
87 }
88
89 struct Packets { cur: *mut Handle<'static, ()> }
90
91 #[doc(hidden)]
92 pub trait Packet {
93     fn can_recv(&self) -> bool;
94     fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
95     fn abort_selection(&self) -> bool;
96 }
97
98 impl Select {
99     /// Creates a new selection structure. This set is initially empty and
100     /// `wait` will fail!() if called.
101     ///
102     /// Usage of this struct directly can sometimes be burdensome, and usage is
103     /// rather much easier through the `select!` macro.
104     pub fn new() -> Select {
105         Select {
106             marker1: marker::NoSend,
107             head: 0 as *mut Handle<'static, ()>,
108             tail: 0 as *mut Handle<'static, ()>,
109             next_id: Cell::new(1),
110         }
111     }
112
113     /// Creates a new handle into this receiver set for a new receiver. Note
114     /// that this does *not* add the receiver to the receiver set, for that you
115     /// must call the `add` method on the handle itself.
116     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
117         let id = self.next_id.get();
118         self.next_id.set(id + 1);
119         Handle {
120             id: id,
121             selector: self,
122             next: 0 as *mut Handle<'static, ()>,
123             prev: 0 as *mut Handle<'static, ()>,
124             added: false,
125             rx: rx,
126             packet: rx,
127         }
128     }
129
130     /// Waits for an event on this receiver set. The returned value is *not* an
131     /// index, but rather an id. This id can be queried against any active
132     /// `Handle` structures (each one has an `id` method). The handle with
133     /// the matching `id` will have some sort of event available on it. The
134     /// event could either be that data is available or the corresponding
135     /// channel has been closed.
136     pub fn wait(&self) -> uint {
137         self.wait2(false)
138     }
139
140     /// Helper method for skipping the preflight checks during testing
141     fn wait2(&self, do_preflight_checks: bool) -> uint {
142         // Note that this is currently an inefficient implementation. We in
143         // theory have knowledge about all receivers in the set ahead of time,
144         // so this method shouldn't really have to iterate over all of them yet
145         // again. The idea with this "receiver set" interface is to get the
146         // interface right this time around, and later this implementation can
147         // be optimized.
148         //
149         // This implementation can be summarized by:
150         //
151         //      fn select(receivers) {
152         //          if any receiver ready { return ready index }
153         //          deschedule {
154         //              block on all receivers
155         //          }
156         //          unblock on all receivers
157         //          return ready index
158         //      }
159         //
160         // Most notably, the iterations over all of the receivers shouldn't be
161         // necessary.
162         unsafe {
163             let mut amt = 0;
164             for p in self.iter() {
165                 amt += 1;
166                 if do_preflight_checks && (*p).packet.can_recv() {
167                     return (*p).id;
168                 }
169             }
170             assert!(amt > 0);
171
172             let mut ready_index = amt;
173             let mut ready_id = uint::MAX;
174             let mut iter = self.iter().enumerate();
175
176             // Acquire a number of blocking contexts, and block on each one
177             // sequentially until one fails. If one fails, then abort
178             // immediately so we can go unblock on all the other receivers.
179             let task: ~Task = Local::take();
180             task.deschedule(amt, |task| {
181                 // Prepare for the block
182                 let (i, handle) = iter.next().unwrap();
183                 match (*handle).packet.start_selection(task) {
184                     Ok(()) => Ok(()),
185                     Err(task) => {
186                         ready_index = i;
187                         ready_id = (*handle).id;
188                         Err(task)
189                     }
190                 }
191             });
192
193             // Abort the selection process on each receiver. If the abort
194             // process returns `true`, then that means that the receiver is
195             // ready to receive some data. Note that this also means that the
196             // receiver may have yet to have fully read the `to_wake` field and
197             // woken us up (although the wakeup is guaranteed to fail).
198             //
199             // This situation happens in the window of where a sender invokes
200             // increment(), sees -1, and then decides to wake up the task. After
201             // all this is done, the sending thread will set `selecting` to
202             // `false`. Until this is done, we cannot return. If we were to
203             // return, then a sender could wake up a receiver which has gone
204             // back to sleep after this call to `select`.
205             //
206             // Note that it is a "fairly small window" in which an increment()
207             // views that it should wake a thread up until the `selecting` bit
208             // is set to false. For now, the implementation currently just spins
209             // in a yield loop. This is very distasteful, but this
210             // implementation is already nowhere near what it should ideally be.
211             // A rewrite should focus on avoiding a yield loop, and for now this
212             // implementation is tying us over to a more efficient "don't
213             // iterate over everything every time" implementation.
214             for handle in self.iter().take(ready_index) {
215                 if (*handle).packet.abort_selection() {
216                     ready_id = (*handle).id;
217                 }
218             }
219
220             assert!(ready_id != uint::MAX);
221             return ready_id;
222         }
223     }
224
225     fn iter(&self) -> Packets { Packets { cur: self.head } }
226 }
227
228 impl<'rx, T: Send> Handle<'rx, T> {
229     /// Retrieve the id of this handle.
230     #[inline]
231     pub fn id(&self) -> uint { self.id }
232
233     /// Receive a value on the underlying receiver. Has the same semantics as
234     /// `Receiver.recv`
235     pub fn recv(&mut self) -> T { self.rx.recv() }
236     /// Block to receive a value on the underlying receiver, returning `Some` on
237     /// success or `None` if the channel disconnects. This function has the same
238     /// semantics as `Receiver.recv_opt`
239     pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
240
241     /// Adds this handle to the receiver set that the handle was created from. This
242     /// method can be called multiple times, but it has no effect if `add` was
243     /// called previously.
244     ///
245     /// This method is unsafe because it requires that the `Handle` is not moved
246     /// while it is added to the `Select` set.
247     pub unsafe fn add(&mut self) {
248         if self.added { return }
249         let selector: &mut Select = cast::transmute(&*self.selector);
250         let me: *mut Handle<'static, ()> = cast::transmute(&*self);
251
252         if selector.head.is_null() {
253             selector.head = me;
254             selector.tail = me;
255         } else {
256             (*me).prev = selector.tail;
257             assert!((*me).next.is_null());
258             (*selector.tail).next = me;
259             selector.tail = me;
260         }
261         self.added = true;
262     }
263
264     /// Removes this handle from the `Select` set. This method is unsafe because
265     /// it has no guarantee that the `Handle` was not moved since `add` was
266     /// called.
267     pub unsafe fn remove(&mut self) {
268         if !self.added { return }
269
270         let selector: &mut Select = cast::transmute(&*self.selector);
271         let me: *mut Handle<'static, ()> = cast::transmute(&*self);
272
273         if self.prev.is_null() {
274             assert_eq!(selector.head, me);
275             selector.head = self.next;
276         } else {
277             (*self.prev).next = self.next;
278         }
279         if self.next.is_null() {
280             assert_eq!(selector.tail, me);
281             selector.tail = self.prev;
282         } else {
283             (*self.next).prev = self.prev;
284         }
285
286         self.next = 0 as *mut Handle<'static, ()>;
287         self.prev = 0 as *mut Handle<'static, ()>;
288
289         self.added = false;
290     }
291 }
292
293 #[unsafe_destructor]
294 impl Drop for Select {
295     fn drop(&mut self) {
296         assert!(self.head.is_null());
297         assert!(self.tail.is_null());
298     }
299 }
300
301 #[unsafe_destructor]
302 impl<'rx, T: Send> Drop for Handle<'rx, T> {
303     fn drop(&mut self) {
304         unsafe { self.remove() }
305     }
306 }
307
308 impl Iterator<*mut Handle<'static, ()>> for Packets {
309     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
310         if self.cur.is_null() {
311             None
312         } else {
313             let ret = Some(self.cur);
314             unsafe { self.cur = (*self.cur).next; }
315             ret
316         }
317     }
318 }
319
320 #[cfg(test)]
321 #[allow(unused_imports)]
322 mod test {
323     use super::super::*;
324     use prelude::*;
325
326     test!(fn smoke() {
327         let (tx1, rx1) = channel::<int>();
328         let (tx2, rx2) = channel::<int>();
329         tx1.send(1);
330         select! (
331             foo = rx1.recv() => { assert_eq!(foo, 1); },
332             _bar = rx2.recv() => { fail!() }
333         )
334         tx2.send(2);
335         select! (
336             _foo = rx1.recv() => { fail!() },
337             bar = rx2.recv() => { assert_eq!(bar, 2) }
338         )
339         drop(tx1);
340         select! (
341             foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
342             _bar = rx2.recv() => { fail!() }
343         )
344         drop(tx2);
345         select! (
346             bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
347         )
348     })
349
350     test!(fn smoke2() {
351         let (_tx1, rx1) = channel::<int>();
352         let (_tx2, rx2) = channel::<int>();
353         let (_tx3, rx3) = channel::<int>();
354         let (_tx4, rx4) = channel::<int>();
355         let (tx5, rx5) = channel::<int>();
356         tx5.send(4);
357         select! (
358             _foo = rx1.recv() => { fail!("1") },
359             _foo = rx2.recv() => { fail!("2") },
360             _foo = rx3.recv() => { fail!("3") },
361             _foo = rx4.recv() => { fail!("4") },
362             foo = rx5.recv() => { assert_eq!(foo, 4); }
363         )
364     })
365
366     test!(fn closed() {
367         let (_tx1, rx1) = channel::<int>();
368         let (tx2, rx2) = channel::<int>();
369         drop(tx2);
370
371         select! (
372             _a1 = rx1.recv_opt() => { fail!() },
373             a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
374         )
375     })
376
377     test!(fn unblocks() {
378         let (tx1, rx1) = channel::<int>();
379         let (_tx2, rx2) = channel::<int>();
380         let (tx3, rx3) = channel::<int>();
381
382         spawn(proc() {
383             for _ in range(0, 20) { task::deschedule(); }
384             tx1.send(1);
385             rx3.recv();
386             for _ in range(0, 20) { task::deschedule(); }
387         });
388
389         select! (
390             a = rx1.recv() => { assert_eq!(a, 1); },
391             _b = rx2.recv() => { fail!() }
392         )
393         tx3.send(1);
394         select! (
395             a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
396             _b = rx2.recv() => { fail!() }
397         )
398     })
399
400     test!(fn both_ready() {
401         let (tx1, rx1) = channel::<int>();
402         let (tx2, rx2) = channel::<int>();
403         let (tx3, rx3) = channel::<()>();
404
405         spawn(proc() {
406             for _ in range(0, 20) { task::deschedule(); }
407             tx1.send(1);
408             tx2.send(2);
409             rx3.recv();
410         });
411
412         select! (
413             a = rx1.recv() => { assert_eq!(a, 1); },
414             a = rx2.recv() => { assert_eq!(a, 2); }
415         )
416         select! (
417             a = rx1.recv() => { assert_eq!(a, 1); },
418             a = rx2.recv() => { assert_eq!(a, 2); }
419         )
420         assert_eq!(rx1.try_recv(), Err(Empty));
421         assert_eq!(rx2.try_recv(), Err(Empty));
422         tx3.send(());
423     })
424
425     test!(fn stress() {
426         static AMT: int = 10000;
427         let (tx1, rx1) = channel::<int>();
428         let (tx2, rx2) = channel::<int>();
429         let (tx3, rx3) = channel::<()>();
430
431         spawn(proc() {
432             for i in range(0, AMT) {
433                 if i % 2 == 0 {
434                     tx1.send(i);
435                 } else {
436                     tx2.send(i);
437                 }
438                 rx3.recv();
439             }
440         });
441
442         for i in range(0, AMT) {
443             select! (
444                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
445                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
446             )
447             tx3.send(());
448         }
449     })
450
451     test!(fn cloning() {
452         let (tx1, rx1) = channel::<int>();
453         let (_tx2, rx2) = channel::<int>();
454         let (tx3, rx3) = channel::<()>();
455
456         spawn(proc() {
457             rx3.recv();
458             tx1.clone();
459             assert_eq!(rx3.try_recv(), Err(Empty));
460             tx1.send(2);
461             rx3.recv();
462         });
463
464         tx3.send(());
465         select!(
466             _i1 = rx1.recv() => {},
467             _i2 = rx2.recv() => fail!()
468         )
469         tx3.send(());
470     })
471
472     test!(fn cloning2() {
473         let (tx1, rx1) = channel::<int>();
474         let (_tx2, rx2) = channel::<int>();
475         let (tx3, rx3) = channel::<()>();
476
477         spawn(proc() {
478             rx3.recv();
479             tx1.clone();
480             assert_eq!(rx3.try_recv(), Err(Empty));
481             tx1.send(2);
482             rx3.recv();
483         });
484
485         tx3.send(());
486         select!(
487             _i1 = rx1.recv() => {},
488             _i2 = rx2.recv() => fail!()
489         )
490         tx3.send(());
491     })
492
493     test!(fn cloning3() {
494         let (tx1, rx1) = channel::<()>();
495         let (tx2, rx2) = channel::<()>();
496         let (tx3, rx3) = channel::<()>();
497         spawn(proc() {
498             let s = Select::new();
499             let mut h1 = s.handle(&rx1);
500             let mut h2 = s.handle(&rx2);
501             unsafe { h2.add(); }
502             unsafe { h1.add(); }
503             assert_eq!(s.wait(), h2.id);
504             tx3.send(());
505         });
506
507         for _ in range(0, 1000) { task::deschedule(); }
508         drop(tx1.clone());
509         tx2.send(());
510         rx3.recv();
511     })
512
513     test!(fn preflight1() {
514         let (tx, rx) = channel();
515         tx.send(());
516         select!(
517             () = rx.recv() => {}
518         )
519     })
520
521     test!(fn preflight2() {
522         let (tx, rx) = channel();
523         tx.send(());
524         tx.send(());
525         select!(
526             () = rx.recv() => {}
527         )
528     })
529
530     test!(fn preflight3() {
531         let (tx, rx) = channel();
532         drop(tx.clone());
533         tx.send(());
534         select!(
535             () = rx.recv() => {}
536         )
537     })
538
539     test!(fn preflight4() {
540         let (tx, rx) = channel();
541         tx.send(());
542         let s = Select::new();
543         let mut h = s.handle(&rx);
544         unsafe { h.add(); }
545         assert_eq!(s.wait2(false), h.id);
546     })
547
548     test!(fn preflight5() {
549         let (tx, rx) = channel();
550         tx.send(());
551         tx.send(());
552         let s = Select::new();
553         let mut h = s.handle(&rx);
554         unsafe { h.add(); }
555         assert_eq!(s.wait2(false), h.id);
556     })
557
558     test!(fn preflight6() {
559         let (tx, rx) = channel();
560         drop(tx.clone());
561         tx.send(());
562         let s = Select::new();
563         let mut h = s.handle(&rx);
564         unsafe { h.add(); }
565         assert_eq!(s.wait2(false), h.id);
566     })
567
568     test!(fn preflight7() {
569         let (tx, rx) = channel::<()>();
570         drop(tx);
571         let s = Select::new();
572         let mut h = s.handle(&rx);
573         unsafe { h.add(); }
574         assert_eq!(s.wait2(false), h.id);
575     })
576
577     test!(fn preflight8() {
578         let (tx, rx) = channel();
579         tx.send(());
580         drop(tx);
581         rx.recv();
582         let s = Select::new();
583         let mut h = s.handle(&rx);
584         unsafe { h.add(); }
585         assert_eq!(s.wait2(false), h.id);
586     })
587
588     test!(fn preflight9() {
589         let (tx, rx) = channel();
590         drop(tx.clone());
591         tx.send(());
592         drop(tx);
593         rx.recv();
594         let s = Select::new();
595         let mut h = s.handle(&rx);
596         unsafe { h.add(); }
597         assert_eq!(s.wait2(false), h.id);
598     })
599
600     test!(fn oneshot_data_waiting() {
601         let (tx1, rx1) = channel();
602         let (tx2, rx2) = channel();
603         spawn(proc() {
604             select! {
605                 () = rx1.recv() => {}
606             }
607             tx2.send(());
608         });
609
610         for _ in range(0, 100) { task::deschedule() }
611         tx1.send(());
612         rx2.recv();
613     })
614
615     test!(fn stream_data_waiting() {
616         let (tx1, rx1) = channel();
617         let (tx2, rx2) = channel();
618         tx1.send(());
619         tx1.send(());
620         rx1.recv();
621         rx1.recv();
622         spawn(proc() {
623             select! {
624                 () = rx1.recv() => {}
625             }
626             tx2.send(());
627         });
628
629         for _ in range(0, 100) { task::deschedule() }
630         tx1.send(());
631         rx2.recv();
632     })
633
634     test!(fn shared_data_waiting() {
635         let (tx1, rx1) = channel();
636         let (tx2, rx2) = channel();
637         drop(tx1.clone());
638         tx1.send(());
639         rx1.recv();
640         spawn(proc() {
641             select! {
642                 () = rx1.recv() => {}
643             }
644             tx2.send(());
645         });
646
647         for _ in range(0, 100) { task::deschedule() }
648         tx1.send(());
649         rx2.recv();
650     })
651
652     test!(fn sync1() {
653         let (tx, rx) = sync_channel(1);
654         tx.send(1);
655         select! {
656             n = rx.recv() => { assert_eq!(n, 1); }
657         }
658     })
659
660     test!(fn sync2() {
661         let (tx, rx) = sync_channel(0);
662         spawn(proc() {
663             for _ in range(0, 100) { task::deschedule() }
664             tx.send(1);
665         });
666         select! {
667             n = rx.recv() => { assert_eq!(n, 1); }
668         }
669     })
670
671     test!(fn sync3() {
672         let (tx1, rx1) = sync_channel(0);
673         let (tx2, rx2) = channel();
674         spawn(proc() { tx1.send(1); });
675         spawn(proc() { tx2.send(2); });
676         select! {
677             n = rx1.recv() => {
678                 assert_eq!(n, 1);
679                 assert_eq!(rx2.recv(), 2);
680             },
681             n = rx2.recv() => {
682                 assert_eq!(n, 2);
683                 assert_eq!(rx1.recv(), 1);
684             }
685         }
686     })
687 }