]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/select.rs
auto merge of #15323 : alexcrichton/rust/no-travis-wait, r=huonw
[rust.git] / src / libsync / comm / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
32 //!
33 //! tx1.send(1i);
34 //! tx2.send(2i);
35 //!
36 //! select! {
37 //!     val = rx1.recv() => {
38 //!         assert_eq!(val, 1i);
39 //!     },
40 //!     val = rx2.recv() => {
41 //!         assert_eq!(val, 2i);
42 //!     }
43 //! }
44 //! ```
45
46 #![allow(dead_code)]
47 #![experimental = "This implementation, while likely sufficient, is unsafe and \
48                    likely to be error prone. At some point in the future this \
49                    module will likely be replaced, and it is currently \
50                    unknown how much API breakage that will cause. The ability \
51                    to select over a number of channels will remain forever, \
52                    but no guarantees beyond this are being made"]
53
54
55 use core::prelude::*;
56
57 use alloc::owned::Box;
58 use core::cell::Cell;
59 use core::kinds::marker;
60 use core::mem;
61 use core::uint;
62 use rustrt::local::Local;
63 use rustrt::task::{Task, BlockedTask};
64
65 use comm::Receiver;
66
67 /// The "receiver set" of the select interface. This structure is used to manage
68 /// a set of receivers which are being selected over.
69 pub struct Select {
70     head: *mut Handle<'static, ()>,
71     tail: *mut Handle<'static, ()>,
72     next_id: Cell<uint>,
73     marker1: marker::NoSend,
74 }
75
76 /// A handle to a receiver which is currently a member of a `Select` set of
77 /// receivers.  This handle is used to keep the receiver in the set as well as
78 /// interact with the underlying receiver.
79 pub struct Handle<'rx, T> {
80     /// The ID of this handle, used to compare against the return value of
81     /// `Select::wait()`
82     id: uint,
83     selector: &'rx Select,
84     next: *mut Handle<'static, ()>,
85     prev: *mut Handle<'static, ()>,
86     added: bool,
87     packet: &'rx Packet,
88
89     // due to our fun transmutes, we be sure to place this at the end. (nothing
90     // previous relies on T)
91     rx: &'rx Receiver<T>,
92 }
93
94 struct Packets { cur: *mut Handle<'static, ()> }
95
96 #[doc(hidden)]
97 pub trait Packet {
98     fn can_recv(&self) -> bool;
99     fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
100     fn abort_selection(&self) -> bool;
101 }
102
103 impl Select {
104     /// Creates a new selection structure. This set is initially empty and
105     /// `wait` will fail!() if called.
106     ///
107     /// Usage of this struct directly can sometimes be burdensome, and usage is
108     /// rather much easier through the `select!` macro.
109     pub fn new() -> Select {
110         Select {
111             marker1: marker::NoSend,
112             head: 0 as *mut Handle<'static, ()>,
113             tail: 0 as *mut Handle<'static, ()>,
114             next_id: Cell::new(1),
115         }
116     }
117
118     /// Creates a new handle into this receiver set for a new receiver. Note
119     /// that this does *not* add the receiver to the receiver set, for that you
120     /// must call the `add` method on the handle itself.
121     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
122         let id = self.next_id.get();
123         self.next_id.set(id + 1);
124         Handle {
125             id: id,
126             selector: self,
127             next: 0 as *mut Handle<'static, ()>,
128             prev: 0 as *mut Handle<'static, ()>,
129             added: false,
130             rx: rx,
131             packet: rx,
132         }
133     }
134
135     /// Waits for an event on this receiver set. The returned value is *not* an
136     /// index, but rather an id. This id can be queried against any active
137     /// `Handle` structures (each one has an `id` method). The handle with
138     /// the matching `id` will have some sort of event available on it. The
139     /// event could either be that data is available or the corresponding
140     /// channel has been closed.
141     pub fn wait(&self) -> uint {
142         self.wait2(true)
143     }
144
145     /// Helper method for skipping the preflight checks during testing
146     fn wait2(&self, do_preflight_checks: bool) -> uint {
147         // Note that this is currently an inefficient implementation. We in
148         // theory have knowledge about all receivers in the set ahead of time,
149         // so this method shouldn't really have to iterate over all of them yet
150         // again. The idea with this "receiver set" interface is to get the
151         // interface right this time around, and later this implementation can
152         // be optimized.
153         //
154         // This implementation can be summarized by:
155         //
156         //      fn select(receivers) {
157         //          if any receiver ready { return ready index }
158         //          deschedule {
159         //              block on all receivers
160         //          }
161         //          unblock on all receivers
162         //          return ready index
163         //      }
164         //
165         // Most notably, the iterations over all of the receivers shouldn't be
166         // necessary.
167         unsafe {
168             let mut amt = 0;
169             for p in self.iter() {
170                 amt += 1;
171                 if do_preflight_checks && (*p).packet.can_recv() {
172                     return (*p).id;
173                 }
174             }
175             assert!(amt > 0);
176
177             let mut ready_index = amt;
178             let mut ready_id = uint::MAX;
179             let mut iter = self.iter().enumerate();
180
181             // Acquire a number of blocking contexts, and block on each one
182             // sequentially until one fails. If one fails, then abort
183             // immediately so we can go unblock on all the other receivers.
184             let task: Box<Task> = Local::take();
185             task.deschedule(amt, |task| {
186                 // Prepare for the block
187                 let (i, handle) = iter.next().unwrap();
188                 match (*handle).packet.start_selection(task) {
189                     Ok(()) => Ok(()),
190                     Err(task) => {
191                         ready_index = i;
192                         ready_id = (*handle).id;
193                         Err(task)
194                     }
195                 }
196             });
197
198             // Abort the selection process on each receiver. If the abort
199             // process returns `true`, then that means that the receiver is
200             // ready to receive some data. Note that this also means that the
201             // receiver may have yet to have fully read the `to_wake` field and
202             // woken us up (although the wakeup is guaranteed to fail).
203             //
204             // This situation happens in the window of where a sender invokes
205             // increment(), sees -1, and then decides to wake up the task. After
206             // all this is done, the sending thread will set `selecting` to
207             // `false`. Until this is done, we cannot return. If we were to
208             // return, then a sender could wake up a receiver which has gone
209             // back to sleep after this call to `select`.
210             //
211             // Note that it is a "fairly small window" in which an increment()
212             // views that it should wake a thread up until the `selecting` bit
213             // is set to false. For now, the implementation currently just spins
214             // in a yield loop. This is very distasteful, but this
215             // implementation is already nowhere near what it should ideally be.
216             // A rewrite should focus on avoiding a yield loop, and for now this
217             // implementation is tying us over to a more efficient "don't
218             // iterate over everything every time" implementation.
219             for handle in self.iter().take(ready_index) {
220                 if (*handle).packet.abort_selection() {
221                     ready_id = (*handle).id;
222                 }
223             }
224
225             assert!(ready_id != uint::MAX);
226             return ready_id;
227         }
228     }
229
230     fn iter(&self) -> Packets { Packets { cur: self.head } }
231 }
232
233 impl<'rx, T: Send> Handle<'rx, T> {
234     /// Retrieve the id of this handle.
235     #[inline]
236     pub fn id(&self) -> uint { self.id }
237
238     /// Receive a value on the underlying receiver. Has the same semantics as
239     /// `Receiver.recv`
240     pub fn recv(&mut self) -> T { self.rx.recv() }
241     /// Block to receive a value on the underlying receiver, returning `Some` on
242     /// success or `None` if the channel disconnects. This function has the same
243     /// semantics as `Receiver.recv_opt`
244     pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
245
246     /// Adds this handle to the receiver set that the handle was created from. This
247     /// method can be called multiple times, but it has no effect if `add` was
248     /// called previously.
249     ///
250     /// This method is unsafe because it requires that the `Handle` is not moved
251     /// while it is added to the `Select` set.
252     pub unsafe fn add(&mut self) {
253         if self.added { return }
254         let selector: &mut Select = mem::transmute(&*self.selector);
255         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
256
257         if selector.head.is_null() {
258             selector.head = me;
259             selector.tail = me;
260         } else {
261             (*me).prev = selector.tail;
262             assert!((*me).next.is_null());
263             (*selector.tail).next = me;
264             selector.tail = me;
265         }
266         self.added = true;
267     }
268
269     /// Removes this handle from the `Select` set. This method is unsafe because
270     /// it has no guarantee that the `Handle` was not moved since `add` was
271     /// called.
272     pub unsafe fn remove(&mut self) {
273         if !self.added { return }
274
275         let selector: &mut Select = mem::transmute(&*self.selector);
276         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
277
278         if self.prev.is_null() {
279             assert_eq!(selector.head, me);
280             selector.head = self.next;
281         } else {
282             (*self.prev).next = self.next;
283         }
284         if self.next.is_null() {
285             assert_eq!(selector.tail, me);
286             selector.tail = self.prev;
287         } else {
288             (*self.next).prev = self.prev;
289         }
290
291         self.next = 0 as *mut Handle<'static, ()>;
292         self.prev = 0 as *mut Handle<'static, ()>;
293
294         self.added = false;
295     }
296 }
297
298 #[unsafe_destructor]
299 impl Drop for Select {
300     fn drop(&mut self) {
301         assert!(self.head.is_null());
302         assert!(self.tail.is_null());
303     }
304 }
305
306 #[unsafe_destructor]
307 impl<'rx, T: Send> Drop for Handle<'rx, T> {
308     fn drop(&mut self) {
309         unsafe { self.remove() }
310     }
311 }
312
313 impl Iterator<*mut Handle<'static, ()>> for Packets {
314     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
315         if self.cur.is_null() {
316             None
317         } else {
318             let ret = Some(self.cur);
319             unsafe { self.cur = (*self.cur).next; }
320             ret
321         }
322     }
323 }
324
325 #[cfg(test)]
326 #[allow(unused_imports)]
327 mod test {
328     use std::prelude::*;
329
330     use super::super::*;
331
332     // Don't use the libstd version so we can pull in the right Select structure
333     // (std::comm points at the wrong one)
334     macro_rules! select {
335         (
336             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
337         ) => ({
338             use comm::Select;
339             let sel = Select::new();
340             $( let mut $rx = sel.handle(&$rx); )+
341             unsafe {
342                 $( $rx.add(); )+
343             }
344             let ret = sel.wait();
345             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
346             { unreachable!() }
347         })
348     }
349
350     test!(fn smoke() {
351         let (tx1, rx1) = channel::<int>();
352         let (tx2, rx2) = channel::<int>();
353         tx1.send(1);
354         select! (
355             foo = rx1.recv() => { assert_eq!(foo, 1); },
356             _bar = rx2.recv() => { fail!() }
357         )
358         tx2.send(2);
359         select! (
360             _foo = rx1.recv() => { fail!() },
361             bar = rx2.recv() => { assert_eq!(bar, 2) }
362         )
363         drop(tx1);
364         select! (
365             foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
366             _bar = rx2.recv() => { fail!() }
367         )
368         drop(tx2);
369         select! (
370             bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
371         )
372     })
373
374     test!(fn smoke2() {
375         let (_tx1, rx1) = channel::<int>();
376         let (_tx2, rx2) = channel::<int>();
377         let (_tx3, rx3) = channel::<int>();
378         let (_tx4, rx4) = channel::<int>();
379         let (tx5, rx5) = channel::<int>();
380         tx5.send(4);
381         select! (
382             _foo = rx1.recv() => { fail!("1") },
383             _foo = rx2.recv() => { fail!("2") },
384             _foo = rx3.recv() => { fail!("3") },
385             _foo = rx4.recv() => { fail!("4") },
386             foo = rx5.recv() => { assert_eq!(foo, 4); }
387         )
388     })
389
390     test!(fn closed() {
391         let (_tx1, rx1) = channel::<int>();
392         let (tx2, rx2) = channel::<int>();
393         drop(tx2);
394
395         select! (
396             _a1 = rx1.recv_opt() => { fail!() },
397             a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
398         )
399     })
400
401     test!(fn unblocks() {
402         let (tx1, rx1) = channel::<int>();
403         let (_tx2, rx2) = channel::<int>();
404         let (tx3, rx3) = channel::<int>();
405
406         spawn(proc() {
407             for _ in range(0u, 20) { task::deschedule(); }
408             tx1.send(1);
409             rx3.recv();
410             for _ in range(0u, 20) { task::deschedule(); }
411         });
412
413         select! (
414             a = rx1.recv() => { assert_eq!(a, 1); },
415             _b = rx2.recv() => { fail!() }
416         )
417         tx3.send(1);
418         select! (
419             a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
420             _b = rx2.recv() => { fail!() }
421         )
422     })
423
424     test!(fn both_ready() {
425         let (tx1, rx1) = channel::<int>();
426         let (tx2, rx2) = channel::<int>();
427         let (tx3, rx3) = channel::<()>();
428
429         spawn(proc() {
430             for _ in range(0u, 20) { task::deschedule(); }
431             tx1.send(1);
432             tx2.send(2);
433             rx3.recv();
434         });
435
436         select! (
437             a = rx1.recv() => { assert_eq!(a, 1); },
438             a = rx2.recv() => { assert_eq!(a, 2); }
439         )
440         select! (
441             a = rx1.recv() => { assert_eq!(a, 1); },
442             a = rx2.recv() => { assert_eq!(a, 2); }
443         )
444         assert_eq!(rx1.try_recv(), Err(Empty));
445         assert_eq!(rx2.try_recv(), Err(Empty));
446         tx3.send(());
447     })
448
449     test!(fn stress() {
450         static AMT: int = 10000;
451         let (tx1, rx1) = channel::<int>();
452         let (tx2, rx2) = channel::<int>();
453         let (tx3, rx3) = channel::<()>();
454
455         spawn(proc() {
456             for i in range(0, AMT) {
457                 if i % 2 == 0 {
458                     tx1.send(i);
459                 } else {
460                     tx2.send(i);
461                 }
462                 rx3.recv();
463             }
464         });
465
466         for i in range(0, AMT) {
467             select! (
468                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
469                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
470             )
471             tx3.send(());
472         }
473     })
474
475     test!(fn cloning() {
476         let (tx1, rx1) = channel::<int>();
477         let (_tx2, rx2) = channel::<int>();
478         let (tx3, rx3) = channel::<()>();
479
480         spawn(proc() {
481             rx3.recv();
482             tx1.clone();
483             assert_eq!(rx3.try_recv(), Err(Empty));
484             tx1.send(2);
485             rx3.recv();
486         });
487
488         tx3.send(());
489         select!(
490             _i1 = rx1.recv() => {},
491             _i2 = rx2.recv() => fail!()
492         )
493         tx3.send(());
494     })
495
496     test!(fn cloning2() {
497         let (tx1, rx1) = channel::<int>();
498         let (_tx2, rx2) = channel::<int>();
499         let (tx3, rx3) = channel::<()>();
500
501         spawn(proc() {
502             rx3.recv();
503             tx1.clone();
504             assert_eq!(rx3.try_recv(), Err(Empty));
505             tx1.send(2);
506             rx3.recv();
507         });
508
509         tx3.send(());
510         select!(
511             _i1 = rx1.recv() => {},
512             _i2 = rx2.recv() => fail!()
513         )
514         tx3.send(());
515     })
516
517     test!(fn cloning3() {
518         let (tx1, rx1) = channel::<()>();
519         let (tx2, rx2) = channel::<()>();
520         let (tx3, rx3) = channel::<()>();
521         spawn(proc() {
522             let s = Select::new();
523             let mut h1 = s.handle(&rx1);
524             let mut h2 = s.handle(&rx2);
525             unsafe { h2.add(); }
526             unsafe { h1.add(); }
527             assert_eq!(s.wait(), h2.id);
528             tx3.send(());
529         });
530
531         for _ in range(0u, 1000) { task::deschedule(); }
532         drop(tx1.clone());
533         tx2.send(());
534         rx3.recv();
535     })
536
537     test!(fn preflight1() {
538         let (tx, rx) = channel();
539         tx.send(());
540         select!(
541             () = rx.recv() => {}
542         )
543     })
544
545     test!(fn preflight2() {
546         let (tx, rx) = channel();
547         tx.send(());
548         tx.send(());
549         select!(
550             () = rx.recv() => {}
551         )
552     })
553
554     test!(fn preflight3() {
555         let (tx, rx) = channel();
556         drop(tx.clone());
557         tx.send(());
558         select!(
559             () = rx.recv() => {}
560         )
561     })
562
563     test!(fn preflight4() {
564         let (tx, rx) = channel();
565         tx.send(());
566         let s = Select::new();
567         let mut h = s.handle(&rx);
568         unsafe { h.add(); }
569         assert_eq!(s.wait2(false), h.id);
570     })
571
572     test!(fn preflight5() {
573         let (tx, rx) = channel();
574         tx.send(());
575         tx.send(());
576         let s = Select::new();
577         let mut h = s.handle(&rx);
578         unsafe { h.add(); }
579         assert_eq!(s.wait2(false), h.id);
580     })
581
582     test!(fn preflight6() {
583         let (tx, rx) = channel();
584         drop(tx.clone());
585         tx.send(());
586         let s = Select::new();
587         let mut h = s.handle(&rx);
588         unsafe { h.add(); }
589         assert_eq!(s.wait2(false), h.id);
590     })
591
592     test!(fn preflight7() {
593         let (tx, rx) = channel::<()>();
594         drop(tx);
595         let s = Select::new();
596         let mut h = s.handle(&rx);
597         unsafe { h.add(); }
598         assert_eq!(s.wait2(false), h.id);
599     })
600
601     test!(fn preflight8() {
602         let (tx, rx) = channel();
603         tx.send(());
604         drop(tx);
605         rx.recv();
606         let s = Select::new();
607         let mut h = s.handle(&rx);
608         unsafe { h.add(); }
609         assert_eq!(s.wait2(false), h.id);
610     })
611
612     test!(fn preflight9() {
613         let (tx, rx) = channel();
614         drop(tx.clone());
615         tx.send(());
616         drop(tx);
617         rx.recv();
618         let s = Select::new();
619         let mut h = s.handle(&rx);
620         unsafe { h.add(); }
621         assert_eq!(s.wait2(false), h.id);
622     })
623
624     test!(fn oneshot_data_waiting() {
625         let (tx1, rx1) = channel();
626         let (tx2, rx2) = channel();
627         spawn(proc() {
628             select! {
629                 () = rx1.recv() => {}
630             }
631             tx2.send(());
632         });
633
634         for _ in range(0u, 100) { task::deschedule() }
635         tx1.send(());
636         rx2.recv();
637     })
638
639     test!(fn stream_data_waiting() {
640         let (tx1, rx1) = channel();
641         let (tx2, rx2) = channel();
642         tx1.send(());
643         tx1.send(());
644         rx1.recv();
645         rx1.recv();
646         spawn(proc() {
647             select! {
648                 () = rx1.recv() => {}
649             }
650             tx2.send(());
651         });
652
653         for _ in range(0u, 100) { task::deschedule() }
654         tx1.send(());
655         rx2.recv();
656     })
657
658     test!(fn shared_data_waiting() {
659         let (tx1, rx1) = channel();
660         let (tx2, rx2) = channel();
661         drop(tx1.clone());
662         tx1.send(());
663         rx1.recv();
664         spawn(proc() {
665             select! {
666                 () = rx1.recv() => {}
667             }
668             tx2.send(());
669         });
670
671         for _ in range(0u, 100) { task::deschedule() }
672         tx1.send(());
673         rx2.recv();
674     })
675
676     test!(fn sync1() {
677         let (tx, rx) = sync_channel::<int>(1);
678         tx.send(1);
679         select! {
680             n = rx.recv() => { assert_eq!(n, 1); }
681         }
682     })
683
684     test!(fn sync2() {
685         let (tx, rx) = sync_channel::<int>(0);
686         spawn(proc() {
687             for _ in range(0u, 100) { task::deschedule() }
688             tx.send(1);
689         });
690         select! {
691             n = rx.recv() => { assert_eq!(n, 1); }
692         }
693     })
694
695     test!(fn sync3() {
696         let (tx1, rx1) = sync_channel::<int>(0);
697         let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
698         spawn(proc() { tx1.send(1); });
699         spawn(proc() { tx2.send(2); });
700         select! {
701             n = rx1.recv() => {
702                 assert_eq!(n, 1);
703                 assert_eq!(rx2.recv(), 2);
704             },
705             n = rx2.recv() => {
706                 assert_eq!(n, 2);
707                 assert_eq!(rx1.recv(), 1);
708             }
709         }
710     })
711 }