]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/select.rs
core: Fix size_hint for signed integer Range<T> iterators
[rust.git] / src / libstd / sync / mpsc / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Examples
28 //!
29 //! ```rust
30 //! # #![feature(std_misc)]
31 //! use std::sync::mpsc::channel;
32 //!
33 //! let (tx1, rx1) = channel();
34 //! let (tx2, rx2) = channel();
35 //!
36 //! tx1.send(1).unwrap();
37 //! tx2.send(2).unwrap();
38 //!
39 //! select! {
40 //!     val = rx1.recv() => {
41 //!         assert_eq!(val.unwrap(), 1);
42 //!     },
43 //!     val = rx2.recv() => {
44 //!         assert_eq!(val.unwrap(), 2);
45 //!     }
46 //! }
47 //! ```
48
49 #![allow(dead_code)]
50 #![unstable(feature = "std_misc",
51             reason = "This implementation, while likely sufficient, is unsafe and \
52                       likely to be error prone. At some point in the future this \
53                       module will likely be replaced, and it is currently \
54                       unknown how much API breakage that will cause. The ability \
55                       to select over a number of channels will remain forever, \
56                       but no guarantees beyond this are being made")]
57
58
59 use core::prelude::*;
60
61 use core::cell::Cell;
62 use core::marker;
63 use core::mem;
64 use core::ptr;
65 use core::usize;
66
67 use sync::mpsc::{Receiver, RecvError};
68 use sync::mpsc::blocking::{self, SignalToken};
69
70 /// The "receiver set" of the select interface. This structure is used to manage
71 /// a set of receivers which are being selected over.
72 pub struct Select {
73     head: *mut Handle<'static, ()>,
74     tail: *mut Handle<'static, ()>,
75     next_id: Cell<usize>,
76 }
77
78 impl !marker::Send for Select {}
79
80 /// A handle to a receiver which is currently a member of a `Select` set of
81 /// receivers.  This handle is used to keep the receiver in the set as well as
82 /// interact with the underlying receiver.
83 pub struct Handle<'rx, T:Send+'rx> {
84     /// The ID of this handle, used to compare against the return value of
85     /// `Select::wait()`
86     id: usize,
87     selector: &'rx Select,
88     next: *mut Handle<'static, ()>,
89     prev: *mut Handle<'static, ()>,
90     added: bool,
91     packet: &'rx (Packet+'rx),
92
93     // due to our fun transmutes, we be sure to place this at the end. (nothing
94     // previous relies on T)
95     rx: &'rx Receiver<T>,
96 }
97
98 struct Packets { cur: *mut Handle<'static, ()> }
99
100 #[doc(hidden)]
101 #[derive(PartialEq)]
102 pub enum StartResult {
103     Installed,
104     Abort,
105 }
106
107 #[doc(hidden)]
108 pub trait Packet {
109     fn can_recv(&self) -> bool;
110     fn start_selection(&self, token: SignalToken) -> StartResult;
111     fn abort_selection(&self) -> bool;
112 }
113
114 impl Select {
115     /// Creates a new selection structure. This set is initially empty.
116     ///
117     /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
118     /// the `select!` macro.
119     ///
120     /// # Examples
121     ///
122     /// ```
123     /// # #![feature(std_misc)]
124     /// use std::sync::mpsc::Select;
125     ///
126     /// let select = Select::new();
127     /// ```
128     pub fn new() -> Select {
129         Select {
130             head: ptr::null_mut(),
131             tail: ptr::null_mut(),
132             next_id: Cell::new(1),
133         }
134     }
135
136     /// Creates a new handle into this receiver set for a new receiver. Note
137     /// that this does *not* add the receiver to the receiver set, for that you
138     /// must call the `add` method on the handle itself.
139     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
140         let id = self.next_id.get();
141         self.next_id.set(id + 1);
142         Handle {
143             id: id,
144             selector: self,
145             next: ptr::null_mut(),
146             prev: ptr::null_mut(),
147             added: false,
148             rx: rx,
149             packet: rx,
150         }
151     }
152
153     /// Waits for an event on this receiver set. The returned value is *not* an
154     /// index, but rather an id. This id can be queried against any active
155     /// `Handle` structures (each one has an `id` method). The handle with
156     /// the matching `id` will have some sort of event available on it. The
157     /// event could either be that data is available or the corresponding
158     /// channel has been closed.
159     pub fn wait(&self) -> usize {
160         self.wait2(true)
161     }
162
163     /// Helper method for skipping the preflight checks during testing
164     fn wait2(&self, do_preflight_checks: bool) -> usize {
165         // Note that this is currently an inefficient implementation. We in
166         // theory have knowledge about all receivers in the set ahead of time,
167         // so this method shouldn't really have to iterate over all of them yet
168         // again. The idea with this "receiver set" interface is to get the
169         // interface right this time around, and later this implementation can
170         // be optimized.
171         //
172         // This implementation can be summarized by:
173         //
174         //      fn select(receivers) {
175         //          if any receiver ready { return ready index }
176         //          deschedule {
177         //              block on all receivers
178         //          }
179         //          unblock on all receivers
180         //          return ready index
181         //      }
182         //
183         // Most notably, the iterations over all of the receivers shouldn't be
184         // necessary.
185         unsafe {
186             // Stage 1: preflight checks. Look for any packets ready to receive
187             if do_preflight_checks {
188                 for handle in self.iter() {
189                     if (*handle).packet.can_recv() {
190                         return (*handle).id();
191                     }
192                 }
193             }
194
195             // Stage 2: begin the blocking process
196             //
197             // Create a number of signal tokens, and install each one
198             // sequentially until one fails. If one fails, then abort the
199             // selection on the already-installed tokens.
200             let (wait_token, signal_token) = blocking::tokens();
201             for (i, handle) in self.iter().enumerate() {
202                 match (*handle).packet.start_selection(signal_token.clone()) {
203                     StartResult::Installed => {}
204                     StartResult::Abort => {
205                         // Go back and abort the already-begun selections
206                         for handle in self.iter().take(i) {
207                             (*handle).packet.abort_selection();
208                         }
209                         return (*handle).id;
210                     }
211                 }
212             }
213
214             // Stage 3: no messages available, actually block
215             wait_token.wait();
216
217             // Stage 4: there *must* be message available; find it.
218             //
219             // Abort the selection process on each receiver. If the abort
220             // process returns `true`, then that means that the receiver is
221             // ready to receive some data. Note that this also means that the
222             // receiver may have yet to have fully read the `to_wake` field and
223             // woken us up (although the wakeup is guaranteed to fail).
224             //
225             // This situation happens in the window of where a sender invokes
226             // increment(), sees -1, and then decides to wake up the task. After
227             // all this is done, the sending thread will set `selecting` to
228             // `false`. Until this is done, we cannot return. If we were to
229             // return, then a sender could wake up a receiver which has gone
230             // back to sleep after this call to `select`.
231             //
232             // Note that it is a "fairly small window" in which an increment()
233             // views that it should wake a thread up until the `selecting` bit
234             // is set to false. For now, the implementation currently just spins
235             // in a yield loop. This is very distasteful, but this
236             // implementation is already nowhere near what it should ideally be.
237             // A rewrite should focus on avoiding a yield loop, and for now this
238             // implementation is tying us over to a more efficient "don't
239             // iterate over everything every time" implementation.
240             let mut ready_id = usize::MAX;
241             for handle in self.iter() {
242                 if (*handle).packet.abort_selection() {
243                     ready_id = (*handle).id;
244                 }
245             }
246
247             // We must have found a ready receiver
248             assert!(ready_id != usize::MAX);
249             return ready_id;
250         }
251     }
252
253     fn iter(&self) -> Packets { Packets { cur: self.head } }
254 }
255
256 impl<'rx, T: Send> Handle<'rx, T> {
257     /// Retrieves the id of this handle.
258     #[inline]
259     pub fn id(&self) -> usize { self.id }
260
261     /// Blocks to receive a value on the underlying receiver, returning `Some` on
262     /// success or `None` if the channel disconnects. This function has the same
263     /// semantics as `Receiver.recv`
264     pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
265
266     /// Adds this handle to the receiver set that the handle was created from. This
267     /// method can be called multiple times, but it has no effect if `add` was
268     /// called previously.
269     ///
270     /// This method is unsafe because it requires that the `Handle` is not moved
271     /// while it is added to the `Select` set.
272     pub unsafe fn add(&mut self) {
273         if self.added { return }
274         let selector: &mut Select = mem::transmute(&*self.selector);
275         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
276
277         if selector.head.is_null() {
278             selector.head = me;
279             selector.tail = me;
280         } else {
281             (*me).prev = selector.tail;
282             assert!((*me).next.is_null());
283             (*selector.tail).next = me;
284             selector.tail = me;
285         }
286         self.added = true;
287     }
288
289     /// Removes this handle from the `Select` set. This method is unsafe because
290     /// it has no guarantee that the `Handle` was not moved since `add` was
291     /// called.
292     pub unsafe fn remove(&mut self) {
293         if !self.added { return }
294
295         let selector: &mut Select = mem::transmute(&*self.selector);
296         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
297
298         if self.prev.is_null() {
299             assert_eq!(selector.head, me);
300             selector.head = self.next;
301         } else {
302             (*self.prev).next = self.next;
303         }
304         if self.next.is_null() {
305             assert_eq!(selector.tail, me);
306             selector.tail = self.prev;
307         } else {
308             (*self.next).prev = self.prev;
309         }
310
311         self.next = ptr::null_mut();
312         self.prev = ptr::null_mut();
313
314         self.added = false;
315     }
316 }
317
318 #[unsafe_destructor]
319 impl Drop for Select {
320     fn drop(&mut self) {
321         assert!(self.head.is_null());
322         assert!(self.tail.is_null());
323     }
324 }
325
326 #[unsafe_destructor]
327 impl<'rx, T: Send> Drop for Handle<'rx, T> {
328     fn drop(&mut self) {
329         unsafe { self.remove() }
330     }
331 }
332
333 impl Iterator for Packets {
334     type Item = *mut Handle<'static, ()>;
335
336     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
337         if self.cur.is_null() {
338             None
339         } else {
340             let ret = Some(self.cur);
341             unsafe { self.cur = (*self.cur).next; }
342             ret
343         }
344     }
345 }
346
347 #[cfg(test)]
348 #[allow(unused_imports)]
349 mod test {
350     use prelude::v1::*;
351
352     use thread;
353     use sync::mpsc::*;
354
355     // Don't use the libstd version so we can pull in the right Select structure
356     // (std::comm points at the wrong one)
357     macro_rules! select {
358         (
359             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
360         ) => ({
361             let sel = Select::new();
362             $( let mut $rx = sel.handle(&$rx); )+
363             unsafe {
364                 $( $rx.add(); )+
365             }
366             let ret = sel.wait();
367             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
368             { unreachable!() }
369         })
370     }
371
372     #[test]
373     fn smoke() {
374         let (tx1, rx1) = channel::<i32>();
375         let (tx2, rx2) = channel::<i32>();
376         tx1.send(1).unwrap();
377         select! {
378             foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
379             _bar = rx2.recv() => { panic!() }
380         }
381         tx2.send(2).unwrap();
382         select! {
383             _foo = rx1.recv() => { panic!() },
384             bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
385         }
386         drop(tx1);
387         select! {
388             foo = rx1.recv() => { assert!(foo.is_err()); },
389             _bar = rx2.recv() => { panic!() }
390         }
391         drop(tx2);
392         select! {
393             bar = rx2.recv() => { assert!(bar.is_err()); }
394         }
395     }
396
397     #[test]
398     fn smoke2() {
399         let (_tx1, rx1) = channel::<i32>();
400         let (_tx2, rx2) = channel::<i32>();
401         let (_tx3, rx3) = channel::<i32>();
402         let (_tx4, rx4) = channel::<i32>();
403         let (tx5, rx5) = channel::<i32>();
404         tx5.send(4).unwrap();
405         select! {
406             _foo = rx1.recv() => { panic!("1") },
407             _foo = rx2.recv() => { panic!("2") },
408             _foo = rx3.recv() => { panic!("3") },
409             _foo = rx4.recv() => { panic!("4") },
410             foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
411         }
412     }
413
414     #[test]
415     fn closed() {
416         let (_tx1, rx1) = channel::<i32>();
417         let (tx2, rx2) = channel::<i32>();
418         drop(tx2);
419
420         select! {
421             _a1 = rx1.recv() => { panic!() },
422             a2 = rx2.recv() => { assert!(a2.is_err()); }
423         }
424     }
425
426     #[test]
427     fn unblocks() {
428         let (tx1, rx1) = channel::<i32>();
429         let (_tx2, rx2) = channel::<i32>();
430         let (tx3, rx3) = channel::<i32>();
431
432         let _t = thread::spawn(move|| {
433             for _ in 0..20 { thread::yield_now(); }
434             tx1.send(1).unwrap();
435             rx3.recv().unwrap();
436             for _ in 0..20 { thread::yield_now(); }
437         });
438
439         select! {
440             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
441             _b = rx2.recv() => { panic!() }
442         }
443         tx3.send(1).unwrap();
444         select! {
445             a = rx1.recv() => { assert!(a.is_err()) },
446             _b = rx2.recv() => { panic!() }
447         }
448     }
449
450     #[test]
451     fn both_ready() {
452         let (tx1, rx1) = channel::<i32>();
453         let (tx2, rx2) = channel::<i32>();
454         let (tx3, rx3) = channel::<()>();
455
456         let _t = thread::spawn(move|| {
457             for _ in 0..20 { thread::yield_now(); }
458             tx1.send(1).unwrap();
459             tx2.send(2).unwrap();
460             rx3.recv().unwrap();
461         });
462
463         select! {
464             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
465             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
466         }
467         select! {
468             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
469             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
470         }
471         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
472         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
473         tx3.send(()).unwrap();
474     }
475
476     #[test]
477     fn stress() {
478         const AMT: i32 = 10000;
479         let (tx1, rx1) = channel::<i32>();
480         let (tx2, rx2) = channel::<i32>();
481         let (tx3, rx3) = channel::<()>();
482
483         let _t = thread::spawn(move|| {
484             for i in 0..AMT {
485                 if i % 2 == 0 {
486                     tx1.send(i).unwrap();
487                 } else {
488                     tx2.send(i).unwrap();
489                 }
490                 rx3.recv().unwrap();
491             }
492         });
493
494         for i in 0..AMT {
495             select! {
496                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
497                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
498             }
499             tx3.send(()).unwrap();
500         }
501     }
502
503     #[test]
504     fn cloning() {
505         let (tx1, rx1) = channel::<i32>();
506         let (_tx2, rx2) = channel::<i32>();
507         let (tx3, rx3) = channel::<()>();
508
509         let _t = thread::spawn(move|| {
510             rx3.recv().unwrap();
511             tx1.clone();
512             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
513             tx1.send(2).unwrap();
514             rx3.recv().unwrap();
515         });
516
517         tx3.send(()).unwrap();
518         select! {
519             _i1 = rx1.recv() => {},
520             _i2 = rx2.recv() => panic!()
521         }
522         tx3.send(()).unwrap();
523     }
524
525     #[test]
526     fn cloning2() {
527         let (tx1, rx1) = channel::<i32>();
528         let (_tx2, rx2) = channel::<i32>();
529         let (tx3, rx3) = channel::<()>();
530
531         let _t = thread::spawn(move|| {
532             rx3.recv().unwrap();
533             tx1.clone();
534             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
535             tx1.send(2).unwrap();
536             rx3.recv().unwrap();
537         });
538
539         tx3.send(()).unwrap();
540         select! {
541             _i1 = rx1.recv() => {},
542             _i2 = rx2.recv() => panic!()
543         }
544         tx3.send(()).unwrap();
545     }
546
547     #[test]
548     fn cloning3() {
549         let (tx1, rx1) = channel::<()>();
550         let (tx2, rx2) = channel::<()>();
551         let (tx3, rx3) = channel::<()>();
552         let _t = thread::spawn(move|| {
553             let s = Select::new();
554             let mut h1 = s.handle(&rx1);
555             let mut h2 = s.handle(&rx2);
556             unsafe { h2.add(); }
557             unsafe { h1.add(); }
558             assert_eq!(s.wait(), h2.id);
559             tx3.send(()).unwrap();
560         });
561
562         for _ in 0..1000 { thread::yield_now(); }
563         drop(tx1.clone());
564         tx2.send(()).unwrap();
565         rx3.recv().unwrap();
566     }
567
568     #[test]
569     fn preflight1() {
570         let (tx, rx) = channel();
571         tx.send(()).unwrap();
572         select! {
573             _n = rx.recv() => {}
574         }
575     }
576
577     #[test]
578     fn preflight2() {
579         let (tx, rx) = channel();
580         tx.send(()).unwrap();
581         tx.send(()).unwrap();
582         select! {
583             _n = rx.recv() => {}
584         }
585     }
586
587     #[test]
588     fn preflight3() {
589         let (tx, rx) = channel();
590         drop(tx.clone());
591         tx.send(()).unwrap();
592         select! {
593             _n = rx.recv() => {}
594         }
595     }
596
597     #[test]
598     fn preflight4() {
599         let (tx, rx) = channel();
600         tx.send(()).unwrap();
601         let s = Select::new();
602         let mut h = s.handle(&rx);
603         unsafe { h.add(); }
604         assert_eq!(s.wait2(false), h.id);
605     }
606
607     #[test]
608     fn preflight5() {
609         let (tx, rx) = channel();
610         tx.send(()).unwrap();
611         tx.send(()).unwrap();
612         let s = Select::new();
613         let mut h = s.handle(&rx);
614         unsafe { h.add(); }
615         assert_eq!(s.wait2(false), h.id);
616     }
617
618     #[test]
619     fn preflight6() {
620         let (tx, rx) = channel();
621         drop(tx.clone());
622         tx.send(()).unwrap();
623         let s = Select::new();
624         let mut h = s.handle(&rx);
625         unsafe { h.add(); }
626         assert_eq!(s.wait2(false), h.id);
627     }
628
629     #[test]
630     fn preflight7() {
631         let (tx, rx) = channel::<()>();
632         drop(tx);
633         let s = Select::new();
634         let mut h = s.handle(&rx);
635         unsafe { h.add(); }
636         assert_eq!(s.wait2(false), h.id);
637     }
638
639     #[test]
640     fn preflight8() {
641         let (tx, rx) = channel();
642         tx.send(()).unwrap();
643         drop(tx);
644         rx.recv().unwrap();
645         let s = Select::new();
646         let mut h = s.handle(&rx);
647         unsafe { h.add(); }
648         assert_eq!(s.wait2(false), h.id);
649     }
650
651     #[test]
652     fn preflight9() {
653         let (tx, rx) = channel();
654         drop(tx.clone());
655         tx.send(()).unwrap();
656         drop(tx);
657         rx.recv().unwrap();
658         let s = Select::new();
659         let mut h = s.handle(&rx);
660         unsafe { h.add(); }
661         assert_eq!(s.wait2(false), h.id);
662     }
663
664     #[test]
665     fn oneshot_data_waiting() {
666         let (tx1, rx1) = channel();
667         let (tx2, rx2) = channel();
668         let _t = thread::spawn(move|| {
669             select! {
670                 _n = rx1.recv() => {}
671             }
672             tx2.send(()).unwrap();
673         });
674
675         for _ in 0..100 { thread::yield_now() }
676         tx1.send(()).unwrap();
677         rx2.recv().unwrap();
678     }
679
680     #[test]
681     fn stream_data_waiting() {
682         let (tx1, rx1) = channel();
683         let (tx2, rx2) = channel();
684         tx1.send(()).unwrap();
685         tx1.send(()).unwrap();
686         rx1.recv().unwrap();
687         rx1.recv().unwrap();
688         let _t = thread::spawn(move|| {
689             select! {
690                 _n = rx1.recv() => {}
691             }
692             tx2.send(()).unwrap();
693         });
694
695         for _ in 0..100 { thread::yield_now() }
696         tx1.send(()).unwrap();
697         rx2.recv().unwrap();
698     }
699
700     #[test]
701     fn shared_data_waiting() {
702         let (tx1, rx1) = channel();
703         let (tx2, rx2) = channel();
704         drop(tx1.clone());
705         tx1.send(()).unwrap();
706         rx1.recv().unwrap();
707         let _t = thread::spawn(move|| {
708             select! {
709                 _n = rx1.recv() => {}
710             }
711             tx2.send(()).unwrap();
712         });
713
714         for _ in 0..100 { thread::yield_now() }
715         tx1.send(()).unwrap();
716         rx2.recv().unwrap();
717     }
718
719     #[test]
720     fn sync1() {
721         let (tx, rx) = sync_channel::<i32>(1);
722         tx.send(1).unwrap();
723         select! {
724             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
725         }
726     }
727
728     #[test]
729     fn sync2() {
730         let (tx, rx) = sync_channel::<i32>(0);
731         let _t = thread::spawn(move|| {
732             for _ in 0..100 { thread::yield_now() }
733             tx.send(1).unwrap();
734         });
735         select! {
736             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
737         }
738     }
739
740     #[test]
741     fn sync3() {
742         let (tx1, rx1) = sync_channel::<i32>(0);
743         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
744         let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
745         let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
746         select! {
747             n = rx1.recv() => {
748                 let n = n.unwrap();
749                 assert_eq!(n, 1);
750                 assert_eq!(rx2.recv().unwrap(), 2);
751             },
752             n = rx2.recv() => {
753                 let n = n.unwrap();
754                 assert_eq!(n, 2);
755                 assert_eq!(rx1.recv().unwrap(), 1);
756             }
757         }
758     }
759 }