]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/select.rs
Merge pull request #20510 from tshepang/patch-6
[rust.git] / src / libstd / sync / mpsc / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! use std::sync::mpsc::channel;
31 //!
32 //! let (tx1, rx1) = channel();
33 //! let (tx2, rx2) = channel();
34 //!
35 //! tx1.send(1i).unwrap();
36 //! tx2.send(2i).unwrap();
37 //!
38 //! select! {
39 //!     val = rx1.recv() => {
40 //!         assert_eq!(val.unwrap(), 1i);
41 //!     },
42 //!     val = rx2.recv() => {
43 //!         assert_eq!(val.unwrap(), 2i);
44 //!     }
45 //! }
46 //! ```
47
48 #![allow(dead_code)]
49 #![experimental = "This implementation, while likely sufficient, is unsafe and \
50                    likely to be error prone. At some point in the future this \
51                    module will likely be replaced, and it is currently \
52                    unknown how much API breakage that will cause. The ability \
53                    to select over a number of channels will remain forever, \
54                    but no guarantees beyond this are being made"]
55
56
57 use core::prelude::*;
58
59 use core::cell::Cell;
60 use core::kinds::marker;
61 use core::mem;
62 use core::uint;
63
64 use sync::mpsc::{Receiver, RecvError};
65 use sync::mpsc::blocking::{self, SignalToken};
66
67 /// The "receiver set" of the select interface. This structure is used to manage
68 /// a set of receivers which are being selected over.
69 pub struct Select {
70     head: *mut Handle<'static, ()>,
71     tail: *mut Handle<'static, ()>,
72     next_id: Cell<uint>,
73     marker1: marker::NoSend,
74 }
75
76 /// A handle to a receiver which is currently a member of a `Select` set of
77 /// receivers.  This handle is used to keep the receiver in the set as well as
78 /// interact with the underlying receiver.
79 pub struct Handle<'rx, T:'rx> {
80     /// The ID of this handle, used to compare against the return value of
81     /// `Select::wait()`
82     id: uint,
83     selector: &'rx Select,
84     next: *mut Handle<'static, ()>,
85     prev: *mut Handle<'static, ()>,
86     added: bool,
87     packet: &'rx (Packet+'rx),
88
89     // due to our fun transmutes, we be sure to place this at the end. (nothing
90     // previous relies on T)
91     rx: &'rx Receiver<T>,
92 }
93
94 struct Packets { cur: *mut Handle<'static, ()> }
95
96 #[doc(hidden)]
97 #[derive(PartialEq)]
98 pub enum StartResult {
99     Installed,
100     Abort,
101 }
102
103 #[doc(hidden)]
104 pub trait Packet {
105     fn can_recv(&self) -> bool;
106     fn start_selection(&self, token: SignalToken) -> StartResult;
107     fn abort_selection(&self) -> bool;
108 }
109
110 impl Select {
111     /// Creates a new selection structure. This set is initially empty and
112     /// `wait` will panic!() if called.
113     ///
114     /// Usage of this struct directly can sometimes be burdensome, and usage is
115     /// rather much easier through the `select!` macro.
116     pub fn new() -> Select {
117         Select {
118             marker1: marker::NoSend,
119             head: 0 as *mut Handle<'static, ()>,
120             tail: 0 as *mut Handle<'static, ()>,
121             next_id: Cell::new(1),
122         }
123     }
124
125     /// Creates a new handle into this receiver set for a new receiver. Note
126     /// that this does *not* add the receiver to the receiver set, for that you
127     /// must call the `add` method on the handle itself.
128     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
129         let id = self.next_id.get();
130         self.next_id.set(id + 1);
131         Handle {
132             id: id,
133             selector: self,
134             next: 0 as *mut Handle<'static, ()>,
135             prev: 0 as *mut Handle<'static, ()>,
136             added: false,
137             rx: rx,
138             packet: rx,
139         }
140     }
141
142     /// Waits for an event on this receiver set. The returned value is *not* an
143     /// index, but rather an id. This id can be queried against any active
144     /// `Handle` structures (each one has an `id` method). The handle with
145     /// the matching `id` will have some sort of event available on it. The
146     /// event could either be that data is available or the corresponding
147     /// channel has been closed.
148     pub fn wait(&self) -> uint {
149         self.wait2(true)
150     }
151
152     /// Helper method for skipping the preflight checks during testing
153     fn wait2(&self, do_preflight_checks: bool) -> uint {
154         // Note that this is currently an inefficient implementation. We in
155         // theory have knowledge about all receivers in the set ahead of time,
156         // so this method shouldn't really have to iterate over all of them yet
157         // again. The idea with this "receiver set" interface is to get the
158         // interface right this time around, and later this implementation can
159         // be optimized.
160         //
161         // This implementation can be summarized by:
162         //
163         //      fn select(receivers) {
164         //          if any receiver ready { return ready index }
165         //          deschedule {
166         //              block on all receivers
167         //          }
168         //          unblock on all receivers
169         //          return ready index
170         //      }
171         //
172         // Most notably, the iterations over all of the receivers shouldn't be
173         // necessary.
174         unsafe {
175             // Stage 1: preflight checks. Look for any packets ready to receive
176             if do_preflight_checks {
177                 for handle in self.iter() {
178                     if (*handle).packet.can_recv() {
179                         return (*handle).id();
180                     }
181                 }
182             }
183
184             // Stage 2: begin the blocking process
185             //
186             // Create a number of signal tokens, and install each one
187             // sequentially until one fails. If one fails, then abort the
188             // selection on the already-installed tokens.
189             let (wait_token, signal_token) = blocking::tokens();
190             for (i, handle) in self.iter().enumerate() {
191                 match (*handle).packet.start_selection(signal_token.clone()) {
192                     StartResult::Installed => {}
193                     StartResult::Abort => {
194                         // Go back and abort the already-begun selections
195                         for handle in self.iter().take(i) {
196                             (*handle).packet.abort_selection();
197                         }
198                         return (*handle).id;
199                     }
200                 }
201             }
202
203             // Stage 3: no messages available, actually block
204             wait_token.wait();
205
206             // Stage 4: there *must* be message available; find it.
207             //
208             // Abort the selection process on each receiver. If the abort
209             // process returns `true`, then that means that the receiver is
210             // ready to receive some data. Note that this also means that the
211             // receiver may have yet to have fully read the `to_wake` field and
212             // woken us up (although the wakeup is guaranteed to fail).
213             //
214             // This situation happens in the window of where a sender invokes
215             // increment(), sees -1, and then decides to wake up the task. After
216             // all this is done, the sending thread will set `selecting` to
217             // `false`. Until this is done, we cannot return. If we were to
218             // return, then a sender could wake up a receiver which has gone
219             // back to sleep after this call to `select`.
220             //
221             // Note that it is a "fairly small window" in which an increment()
222             // views that it should wake a thread up until the `selecting` bit
223             // is set to false. For now, the implementation currently just spins
224             // in a yield loop. This is very distasteful, but this
225             // implementation is already nowhere near what it should ideally be.
226             // A rewrite should focus on avoiding a yield loop, and for now this
227             // implementation is tying us over to a more efficient "don't
228             // iterate over everything every time" implementation.
229             let mut ready_id = uint::MAX;
230             for handle in self.iter() {
231                 if (*handle).packet.abort_selection() {
232                     ready_id = (*handle).id;
233                 }
234             }
235
236             // We must have found a ready receiver
237             assert!(ready_id != uint::MAX);
238             return ready_id;
239         }
240     }
241
242     fn iter(&self) -> Packets { Packets { cur: self.head } }
243 }
244
245 impl<'rx, T: Send> Handle<'rx, T> {
246     /// Retrieve the id of this handle.
247     #[inline]
248     pub fn id(&self) -> uint { self.id }
249
250     /// Block to receive a value on the underlying receiver, returning `Some` on
251     /// success or `None` if the channel disconnects. This function has the same
252     /// semantics as `Receiver.recv`
253     pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
254
255     /// Adds this handle to the receiver set that the handle was created from. This
256     /// method can be called multiple times, but it has no effect if `add` was
257     /// called previously.
258     ///
259     /// This method is unsafe because it requires that the `Handle` is not moved
260     /// while it is added to the `Select` set.
261     pub unsafe fn add(&mut self) {
262         if self.added { return }
263         let selector: &mut Select = mem::transmute(&*self.selector);
264         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
265
266         if selector.head.is_null() {
267             selector.head = me;
268             selector.tail = me;
269         } else {
270             (*me).prev = selector.tail;
271             assert!((*me).next.is_null());
272             (*selector.tail).next = me;
273             selector.tail = me;
274         }
275         self.added = true;
276     }
277
278     /// Removes this handle from the `Select` set. This method is unsafe because
279     /// it has no guarantee that the `Handle` was not moved since `add` was
280     /// called.
281     pub unsafe fn remove(&mut self) {
282         if !self.added { return }
283
284         let selector: &mut Select = mem::transmute(&*self.selector);
285         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
286
287         if self.prev.is_null() {
288             assert_eq!(selector.head, me);
289             selector.head = self.next;
290         } else {
291             (*self.prev).next = self.next;
292         }
293         if self.next.is_null() {
294             assert_eq!(selector.tail, me);
295             selector.tail = self.prev;
296         } else {
297             (*self.next).prev = self.prev;
298         }
299
300         self.next = 0 as *mut Handle<'static, ()>;
301         self.prev = 0 as *mut Handle<'static, ()>;
302
303         self.added = false;
304     }
305 }
306
307 #[unsafe_destructor]
308 impl Drop for Select {
309     fn drop(&mut self) {
310         assert!(self.head.is_null());
311         assert!(self.tail.is_null());
312     }
313 }
314
315 #[unsafe_destructor]
316 impl<'rx, T: Send> Drop for Handle<'rx, T> {
317     fn drop(&mut self) {
318         unsafe { self.remove() }
319     }
320 }
321
322 impl Iterator for Packets {
323     type Item = *mut Handle<'static, ()>;
324
325     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
326         if self.cur.is_null() {
327             None
328         } else {
329             let ret = Some(self.cur);
330             unsafe { self.cur = (*self.cur).next; }
331             ret
332         }
333     }
334 }
335
336 #[cfg(test)]
337 #[allow(unused_imports)]
338 mod test {
339     use prelude::v1::*;
340
341     use thread::Thread;
342     use sync::mpsc::*;
343
344     // Don't use the libstd version so we can pull in the right Select structure
345     // (std::comm points at the wrong one)
346     macro_rules! select {
347         (
348             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
349         ) => ({
350             let sel = Select::new();
351             $( let mut $rx = sel.handle(&$rx); )+
352             unsafe {
353                 $( $rx.add(); )+
354             }
355             let ret = sel.wait();
356             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
357             { unreachable!() }
358         })
359     }
360
361     #[test]
362     fn smoke() {
363         let (tx1, rx1) = channel::<int>();
364         let (tx2, rx2) = channel::<int>();
365         tx1.send(1).unwrap();
366         select! {
367             foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
368             _bar = rx2.recv() => { panic!() }
369         }
370         tx2.send(2).unwrap();
371         select! {
372             _foo = rx1.recv() => { panic!() },
373             bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
374         }
375         drop(tx1);
376         select! {
377             foo = rx1.recv() => { assert!(foo.is_err()); },
378             _bar = rx2.recv() => { panic!() }
379         }
380         drop(tx2);
381         select! {
382             bar = rx2.recv() => { assert!(bar.is_err()); }
383         }
384     }
385
386     #[test]
387     fn smoke2() {
388         let (_tx1, rx1) = channel::<int>();
389         let (_tx2, rx2) = channel::<int>();
390         let (_tx3, rx3) = channel::<int>();
391         let (_tx4, rx4) = channel::<int>();
392         let (tx5, rx5) = channel::<int>();
393         tx5.send(4).unwrap();
394         select! {
395             _foo = rx1.recv() => { panic!("1") },
396             _foo = rx2.recv() => { panic!("2") },
397             _foo = rx3.recv() => { panic!("3") },
398             _foo = rx4.recv() => { panic!("4") },
399             foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
400         }
401     }
402
403     #[test]
404     fn closed() {
405         let (_tx1, rx1) = channel::<int>();
406         let (tx2, rx2) = channel::<int>();
407         drop(tx2);
408
409         select! {
410             _a1 = rx1.recv() => { panic!() },
411             a2 = rx2.recv() => { assert!(a2.is_err()); }
412         }
413     }
414
415     #[test]
416     fn unblocks() {
417         let (tx1, rx1) = channel::<int>();
418         let (_tx2, rx2) = channel::<int>();
419         let (tx3, rx3) = channel::<int>();
420
421         let _t = Thread::spawn(move|| {
422             for _ in range(0u, 20) { Thread::yield_now(); }
423             tx1.send(1).unwrap();
424             rx3.recv().unwrap();
425             for _ in range(0u, 20) { Thread::yield_now(); }
426         });
427
428         select! {
429             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
430             _b = rx2.recv() => { panic!() }
431         }
432         tx3.send(1).unwrap();
433         select! {
434             a = rx1.recv() => { assert!(a.is_err()) },
435             _b = rx2.recv() => { panic!() }
436         }
437     }
438
439     #[test]
440     fn both_ready() {
441         let (tx1, rx1) = channel::<int>();
442         let (tx2, rx2) = channel::<int>();
443         let (tx3, rx3) = channel::<()>();
444
445         let _t = Thread::spawn(move|| {
446             for _ in range(0u, 20) { Thread::yield_now(); }
447             tx1.send(1).unwrap();
448             tx2.send(2).unwrap();
449             rx3.recv().unwrap();
450         });
451
452         select! {
453             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
454             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
455         }
456         select! {
457             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
458             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
459         }
460         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
461         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
462         tx3.send(()).unwrap();
463     }
464
465     #[test]
466     fn stress() {
467         static AMT: int = 10000;
468         let (tx1, rx1) = channel::<int>();
469         let (tx2, rx2) = channel::<int>();
470         let (tx3, rx3) = channel::<()>();
471
472         let _t = Thread::spawn(move|| {
473             for i in range(0, AMT) {
474                 if i % 2 == 0 {
475                     tx1.send(i).unwrap();
476                 } else {
477                     tx2.send(i).unwrap();
478                 }
479                 rx3.recv().unwrap();
480             }
481         });
482
483         for i in range(0, AMT) {
484             select! {
485                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
486                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
487             }
488             tx3.send(()).unwrap();
489         }
490     }
491
492     #[test]
493     fn cloning() {
494         let (tx1, rx1) = channel::<int>();
495         let (_tx2, rx2) = channel::<int>();
496         let (tx3, rx3) = channel::<()>();
497
498         let _t = Thread::spawn(move|| {
499             rx3.recv().unwrap();
500             tx1.clone();
501             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
502             tx1.send(2).unwrap();
503             rx3.recv().unwrap();
504         });
505
506         tx3.send(()).unwrap();
507         select! {
508             _i1 = rx1.recv() => {},
509             _i2 = rx2.recv() => panic!()
510         }
511         tx3.send(()).unwrap();
512     }
513
514     #[test]
515     fn cloning2() {
516         let (tx1, rx1) = channel::<int>();
517         let (_tx2, rx2) = channel::<int>();
518         let (tx3, rx3) = channel::<()>();
519
520         let _t = Thread::spawn(move|| {
521             rx3.recv().unwrap();
522             tx1.clone();
523             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
524             tx1.send(2).unwrap();
525             rx3.recv().unwrap();
526         });
527
528         tx3.send(()).unwrap();
529         select! {
530             _i1 = rx1.recv() => {},
531             _i2 = rx2.recv() => panic!()
532         }
533         tx3.send(()).unwrap();
534     }
535
536     #[test]
537     fn cloning3() {
538         let (tx1, rx1) = channel::<()>();
539         let (tx2, rx2) = channel::<()>();
540         let (tx3, rx3) = channel::<()>();
541         let _t = Thread::spawn(move|| {
542             let s = Select::new();
543             let mut h1 = s.handle(&rx1);
544             let mut h2 = s.handle(&rx2);
545             unsafe { h2.add(); }
546             unsafe { h1.add(); }
547             assert_eq!(s.wait(), h2.id);
548             tx3.send(()).unwrap();
549         });
550
551         for _ in range(0u, 1000) { Thread::yield_now(); }
552         drop(tx1.clone());
553         tx2.send(()).unwrap();
554         rx3.recv().unwrap();
555     }
556
557     #[test]
558     fn preflight1() {
559         let (tx, rx) = channel();
560         tx.send(()).unwrap();
561         select! {
562             _n = rx.recv() => {}
563         }
564     }
565
566     #[test]
567     fn preflight2() {
568         let (tx, rx) = channel();
569         tx.send(()).unwrap();
570         tx.send(()).unwrap();
571         select! {
572             _n = rx.recv() => {}
573         }
574     }
575
576     #[test]
577     fn preflight3() {
578         let (tx, rx) = channel();
579         drop(tx.clone());
580         tx.send(()).unwrap();
581         select! {
582             _n = rx.recv() => {}
583         }
584     }
585
586     #[test]
587     fn preflight4() {
588         let (tx, rx) = channel();
589         tx.send(()).unwrap();
590         let s = Select::new();
591         let mut h = s.handle(&rx);
592         unsafe { h.add(); }
593         assert_eq!(s.wait2(false), h.id);
594     }
595
596     #[test]
597     fn preflight5() {
598         let (tx, rx) = channel();
599         tx.send(()).unwrap();
600         tx.send(()).unwrap();
601         let s = Select::new();
602         let mut h = s.handle(&rx);
603         unsafe { h.add(); }
604         assert_eq!(s.wait2(false), h.id);
605     }
606
607     #[test]
608     fn preflight6() {
609         let (tx, rx) = channel();
610         drop(tx.clone());
611         tx.send(()).unwrap();
612         let s = Select::new();
613         let mut h = s.handle(&rx);
614         unsafe { h.add(); }
615         assert_eq!(s.wait2(false), h.id);
616     }
617
618     #[test]
619     fn preflight7() {
620         let (tx, rx) = channel::<()>();
621         drop(tx);
622         let s = Select::new();
623         let mut h = s.handle(&rx);
624         unsafe { h.add(); }
625         assert_eq!(s.wait2(false), h.id);
626     }
627
628     #[test]
629     fn preflight8() {
630         let (tx, rx) = channel();
631         tx.send(()).unwrap();
632         drop(tx);
633         rx.recv().unwrap();
634         let s = Select::new();
635         let mut h = s.handle(&rx);
636         unsafe { h.add(); }
637         assert_eq!(s.wait2(false), h.id);
638     }
639
640     #[test]
641     fn preflight9() {
642         let (tx, rx) = channel();
643         drop(tx.clone());
644         tx.send(()).unwrap();
645         drop(tx);
646         rx.recv().unwrap();
647         let s = Select::new();
648         let mut h = s.handle(&rx);
649         unsafe { h.add(); }
650         assert_eq!(s.wait2(false), h.id);
651     }
652
653     #[test]
654     fn oneshot_data_waiting() {
655         let (tx1, rx1) = channel();
656         let (tx2, rx2) = channel();
657         let _t = Thread::spawn(move|| {
658             select! {
659                 _n = rx1.recv() => {}
660             }
661             tx2.send(()).unwrap();
662         });
663
664         for _ in range(0u, 100) { Thread::yield_now() }
665         tx1.send(()).unwrap();
666         rx2.recv().unwrap();
667     }
668
669     #[test]
670     fn stream_data_waiting() {
671         let (tx1, rx1) = channel();
672         let (tx2, rx2) = channel();
673         tx1.send(()).unwrap();
674         tx1.send(()).unwrap();
675         rx1.recv().unwrap();
676         rx1.recv().unwrap();
677         let _t = Thread::spawn(move|| {
678             select! {
679                 _n = rx1.recv() => {}
680             }
681             tx2.send(()).unwrap();
682         });
683
684         for _ in range(0u, 100) { Thread::yield_now() }
685         tx1.send(()).unwrap();
686         rx2.recv().unwrap();
687     }
688
689     #[test]
690     fn shared_data_waiting() {
691         let (tx1, rx1) = channel();
692         let (tx2, rx2) = channel();
693         drop(tx1.clone());
694         tx1.send(()).unwrap();
695         rx1.recv().unwrap();
696         let _t = Thread::spawn(move|| {
697             select! {
698                 _n = rx1.recv() => {}
699             }
700             tx2.send(()).unwrap();
701         });
702
703         for _ in range(0u, 100) { Thread::yield_now() }
704         tx1.send(()).unwrap();
705         rx2.recv().unwrap();
706     }
707
708     #[test]
709     fn sync1() {
710         let (tx, rx) = sync_channel::<int>(1);
711         tx.send(1).unwrap();
712         select! {
713             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
714         }
715     }
716
717     #[test]
718     fn sync2() {
719         let (tx, rx) = sync_channel::<int>(0);
720         let _t = Thread::spawn(move|| {
721             for _ in range(0u, 100) { Thread::yield_now() }
722             tx.send(1).unwrap();
723         });
724         select! {
725             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
726         }
727     }
728
729     #[test]
730     fn sync3() {
731         let (tx1, rx1) = sync_channel::<int>(0);
732         let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
733         let _t = Thread::spawn(move|| { tx1.send(1).unwrap(); });
734         let _t = Thread::spawn(move|| { tx2.send(2).unwrap(); });
735         select! {
736             n = rx1.recv() => {
737                 let n = n.unwrap();
738                 assert_eq!(n, 1);
739                 assert_eq!(rx2.recv().unwrap(), 2);
740             },
741             n = rx2.recv() => {
742                 let n = n.unwrap();
743                 assert_eq!(n, 2);
744                 assert_eq!(rx1.recv().unwrap(), 1);
745             }
746         }
747     }
748 }