]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/select.rs
auto merge of #14816 : theptrk/rust/unclear-comment, r=huonw
[rust.git] / src / libsync / comm / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
32 //!
33 //! tx1.send(1);
34 //! tx2.send(2);
35 //!
36 //! select! {
37 //!     val = rx1.recv() => {
38 //!         assert_eq!(val, 1);
39 //!     },
40 //!     val = rx2.recv() => {
41 //!         assert_eq!(val, 2);
42 //!     }
43 //! }
44 //! ```
45
46 #![allow(dead_code)]
47
48 use core::prelude::*;
49
50 use alloc::owned::Box;
51 use core::cell::Cell;
52 use core::kinds::marker;
53 use core::mem;
54 use core::uint;
55 use rustrt::local::Local;
56 use rustrt::task::{Task, BlockedTask};
57
58 use comm::Receiver;
59
60 /// The "receiver set" of the select interface. This structure is used to manage
61 /// a set of receivers which are being selected over.
62 pub struct Select {
63     head: *mut Handle<'static, ()>,
64     tail: *mut Handle<'static, ()>,
65     next_id: Cell<uint>,
66     marker1: marker::NoSend,
67 }
68
69 /// A handle to a receiver which is currently a member of a `Select` set of
70 /// receivers.  This handle is used to keep the receiver in the set as well as
71 /// interact with the underlying receiver.
72 pub struct Handle<'rx, T> {
73     /// The ID of this handle, used to compare against the return value of
74     /// `Select::wait()`
75     id: uint,
76     selector: &'rx Select,
77     next: *mut Handle<'static, ()>,
78     prev: *mut Handle<'static, ()>,
79     added: bool,
80     packet: &'rx Packet,
81
82     // due to our fun transmutes, we be sure to place this at the end. (nothing
83     // previous relies on T)
84     rx: &'rx Receiver<T>,
85 }
86
87 struct Packets { cur: *mut Handle<'static, ()> }
88
89 #[doc(hidden)]
90 pub trait Packet {
91     fn can_recv(&self) -> bool;
92     fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
93     fn abort_selection(&self) -> bool;
94 }
95
96 impl Select {
97     /// Creates a new selection structure. This set is initially empty and
98     /// `wait` will fail!() if called.
99     ///
100     /// Usage of this struct directly can sometimes be burdensome, and usage is
101     /// rather much easier through the `select!` macro.
102     pub fn new() -> Select {
103         Select {
104             marker1: marker::NoSend,
105             head: 0 as *mut Handle<'static, ()>,
106             tail: 0 as *mut Handle<'static, ()>,
107             next_id: Cell::new(1),
108         }
109     }
110
111     /// Creates a new handle into this receiver set for a new receiver. Note
112     /// that this does *not* add the receiver to the receiver set, for that you
113     /// must call the `add` method on the handle itself.
114     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
115         let id = self.next_id.get();
116         self.next_id.set(id + 1);
117         Handle {
118             id: id,
119             selector: self,
120             next: 0 as *mut Handle<'static, ()>,
121             prev: 0 as *mut Handle<'static, ()>,
122             added: false,
123             rx: rx,
124             packet: rx,
125         }
126     }
127
128     /// Waits for an event on this receiver set. The returned value is *not* an
129     /// index, but rather an id. This id can be queried against any active
130     /// `Handle` structures (each one has an `id` method). The handle with
131     /// the matching `id` will have some sort of event available on it. The
132     /// event could either be that data is available or the corresponding
133     /// channel has been closed.
134     pub fn wait(&self) -> uint {
135         self.wait2(false)
136     }
137
138     /// Helper method for skipping the preflight checks during testing
139     fn wait2(&self, do_preflight_checks: bool) -> uint {
140         // Note that this is currently an inefficient implementation. We in
141         // theory have knowledge about all receivers in the set ahead of time,
142         // so this method shouldn't really have to iterate over all of them yet
143         // again. The idea with this "receiver set" interface is to get the
144         // interface right this time around, and later this implementation can
145         // be optimized.
146         //
147         // This implementation can be summarized by:
148         //
149         //      fn select(receivers) {
150         //          if any receiver ready { return ready index }
151         //          deschedule {
152         //              block on all receivers
153         //          }
154         //          unblock on all receivers
155         //          return ready index
156         //      }
157         //
158         // Most notably, the iterations over all of the receivers shouldn't be
159         // necessary.
160         unsafe {
161             let mut amt = 0;
162             for p in self.iter() {
163                 amt += 1;
164                 if do_preflight_checks && (*p).packet.can_recv() {
165                     return (*p).id;
166                 }
167             }
168             assert!(amt > 0);
169
170             let mut ready_index = amt;
171             let mut ready_id = uint::MAX;
172             let mut iter = self.iter().enumerate();
173
174             // Acquire a number of blocking contexts, and block on each one
175             // sequentially until one fails. If one fails, then abort
176             // immediately so we can go unblock on all the other receivers.
177             let task: Box<Task> = Local::take();
178             task.deschedule(amt, |task| {
179                 // Prepare for the block
180                 let (i, handle) = iter.next().unwrap();
181                 match (*handle).packet.start_selection(task) {
182                     Ok(()) => Ok(()),
183                     Err(task) => {
184                         ready_index = i;
185                         ready_id = (*handle).id;
186                         Err(task)
187                     }
188                 }
189             });
190
191             // Abort the selection process on each receiver. If the abort
192             // process returns `true`, then that means that the receiver is
193             // ready to receive some data. Note that this also means that the
194             // receiver may have yet to have fully read the `to_wake` field and
195             // woken us up (although the wakeup is guaranteed to fail).
196             //
197             // This situation happens in the window of where a sender invokes
198             // increment(), sees -1, and then decides to wake up the task. After
199             // all this is done, the sending thread will set `selecting` to
200             // `false`. Until this is done, we cannot return. If we were to
201             // return, then a sender could wake up a receiver which has gone
202             // back to sleep after this call to `select`.
203             //
204             // Note that it is a "fairly small window" in which an increment()
205             // views that it should wake a thread up until the `selecting` bit
206             // is set to false. For now, the implementation currently just spins
207             // in a yield loop. This is very distasteful, but this
208             // implementation is already nowhere near what it should ideally be.
209             // A rewrite should focus on avoiding a yield loop, and for now this
210             // implementation is tying us over to a more efficient "don't
211             // iterate over everything every time" implementation.
212             for handle in self.iter().take(ready_index) {
213                 if (*handle).packet.abort_selection() {
214                     ready_id = (*handle).id;
215                 }
216             }
217
218             assert!(ready_id != uint::MAX);
219             return ready_id;
220         }
221     }
222
223     fn iter(&self) -> Packets { Packets { cur: self.head } }
224 }
225
226 impl<'rx, T: Send> Handle<'rx, T> {
227     /// Retrieve the id of this handle.
228     #[inline]
229     pub fn id(&self) -> uint { self.id }
230
231     /// Receive a value on the underlying receiver. Has the same semantics as
232     /// `Receiver.recv`
233     pub fn recv(&mut self) -> T { self.rx.recv() }
234     /// Block to receive a value on the underlying receiver, returning `Some` on
235     /// success or `None` if the channel disconnects. This function has the same
236     /// semantics as `Receiver.recv_opt`
237     pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
238
239     /// Adds this handle to the receiver set that the handle was created from. This
240     /// method can be called multiple times, but it has no effect if `add` was
241     /// called previously.
242     ///
243     /// This method is unsafe because it requires that the `Handle` is not moved
244     /// while it is added to the `Select` set.
245     pub unsafe fn add(&mut self) {
246         if self.added { return }
247         let selector: &mut Select = mem::transmute(&*self.selector);
248         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
249
250         if selector.head.is_null() {
251             selector.head = me;
252             selector.tail = me;
253         } else {
254             (*me).prev = selector.tail;
255             assert!((*me).next.is_null());
256             (*selector.tail).next = me;
257             selector.tail = me;
258         }
259         self.added = true;
260     }
261
262     /// Removes this handle from the `Select` set. This method is unsafe because
263     /// it has no guarantee that the `Handle` was not moved since `add` was
264     /// called.
265     pub unsafe fn remove(&mut self) {
266         if !self.added { return }
267
268         let selector: &mut Select = mem::transmute(&*self.selector);
269         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
270
271         if self.prev.is_null() {
272             assert_eq!(selector.head, me);
273             selector.head = self.next;
274         } else {
275             (*self.prev).next = self.next;
276         }
277         if self.next.is_null() {
278             assert_eq!(selector.tail, me);
279             selector.tail = self.prev;
280         } else {
281             (*self.next).prev = self.prev;
282         }
283
284         self.next = 0 as *mut Handle<'static, ()>;
285         self.prev = 0 as *mut Handle<'static, ()>;
286
287         self.added = false;
288     }
289 }
290
291 #[unsafe_destructor]
292 impl Drop for Select {
293     fn drop(&mut self) {
294         assert!(self.head.is_null());
295         assert!(self.tail.is_null());
296     }
297 }
298
299 #[unsafe_destructor]
300 impl<'rx, T: Send> Drop for Handle<'rx, T> {
301     fn drop(&mut self) {
302         unsafe { self.remove() }
303     }
304 }
305
306 impl Iterator<*mut Handle<'static, ()>> for Packets {
307     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
308         if self.cur.is_null() {
309             None
310         } else {
311             let ret = Some(self.cur);
312             unsafe { self.cur = (*self.cur).next; }
313             ret
314         }
315     }
316 }
317
318 #[cfg(test)]
319 #[allow(unused_imports)]
320 mod test {
321     use std::prelude::*;
322
323     use super::super::*;
324
325     // Don't use the libstd version so we can pull in the right Select structure
326     // (std::comm points at the wrong one)
327     macro_rules! select {
328         (
329             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
330         ) => ({
331             use comm::Select;
332             let sel = Select::new();
333             $( let mut $rx = sel.handle(&$rx); )+
334             unsafe {
335                 $( $rx.add(); )+
336             }
337             let ret = sel.wait();
338             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
339             { unreachable!() }
340         })
341     }
342
343     test!(fn smoke() {
344         let (tx1, rx1) = channel::<int>();
345         let (tx2, rx2) = channel::<int>();
346         tx1.send(1);
347         select! (
348             foo = rx1.recv() => { assert_eq!(foo, 1); },
349             _bar = rx2.recv() => { fail!() }
350         )
351         tx2.send(2);
352         select! (
353             _foo = rx1.recv() => { fail!() },
354             bar = rx2.recv() => { assert_eq!(bar, 2) }
355         )
356         drop(tx1);
357         select! (
358             foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
359             _bar = rx2.recv() => { fail!() }
360         )
361         drop(tx2);
362         select! (
363             bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
364         )
365     })
366
367     test!(fn smoke2() {
368         let (_tx1, rx1) = channel::<int>();
369         let (_tx2, rx2) = channel::<int>();
370         let (_tx3, rx3) = channel::<int>();
371         let (_tx4, rx4) = channel::<int>();
372         let (tx5, rx5) = channel::<int>();
373         tx5.send(4);
374         select! (
375             _foo = rx1.recv() => { fail!("1") },
376             _foo = rx2.recv() => { fail!("2") },
377             _foo = rx3.recv() => { fail!("3") },
378             _foo = rx4.recv() => { fail!("4") },
379             foo = rx5.recv() => { assert_eq!(foo, 4); }
380         )
381     })
382
383     test!(fn closed() {
384         let (_tx1, rx1) = channel::<int>();
385         let (tx2, rx2) = channel::<int>();
386         drop(tx2);
387
388         select! (
389             _a1 = rx1.recv_opt() => { fail!() },
390             a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
391         )
392     })
393
394     test!(fn unblocks() {
395         let (tx1, rx1) = channel::<int>();
396         let (_tx2, rx2) = channel::<int>();
397         let (tx3, rx3) = channel::<int>();
398
399         spawn(proc() {
400             for _ in range(0, 20) { task::deschedule(); }
401             tx1.send(1);
402             rx3.recv();
403             for _ in range(0, 20) { task::deschedule(); }
404         });
405
406         select! (
407             a = rx1.recv() => { assert_eq!(a, 1); },
408             _b = rx2.recv() => { fail!() }
409         )
410         tx3.send(1);
411         select! (
412             a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
413             _b = rx2.recv() => { fail!() }
414         )
415     })
416
417     test!(fn both_ready() {
418         let (tx1, rx1) = channel::<int>();
419         let (tx2, rx2) = channel::<int>();
420         let (tx3, rx3) = channel::<()>();
421
422         spawn(proc() {
423             for _ in range(0, 20) { task::deschedule(); }
424             tx1.send(1);
425             tx2.send(2);
426             rx3.recv();
427         });
428
429         select! (
430             a = rx1.recv() => { assert_eq!(a, 1); },
431             a = rx2.recv() => { assert_eq!(a, 2); }
432         )
433         select! (
434             a = rx1.recv() => { assert_eq!(a, 1); },
435             a = rx2.recv() => { assert_eq!(a, 2); }
436         )
437         assert_eq!(rx1.try_recv(), Err(Empty));
438         assert_eq!(rx2.try_recv(), Err(Empty));
439         tx3.send(());
440     })
441
442     test!(fn stress() {
443         static AMT: int = 10000;
444         let (tx1, rx1) = channel::<int>();
445         let (tx2, rx2) = channel::<int>();
446         let (tx3, rx3) = channel::<()>();
447
448         spawn(proc() {
449             for i in range(0, AMT) {
450                 if i % 2 == 0 {
451                     tx1.send(i);
452                 } else {
453                     tx2.send(i);
454                 }
455                 rx3.recv();
456             }
457         });
458
459         for i in range(0, AMT) {
460             select! (
461                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
462                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
463             )
464             tx3.send(());
465         }
466     })
467
468     test!(fn cloning() {
469         let (tx1, rx1) = channel::<int>();
470         let (_tx2, rx2) = channel::<int>();
471         let (tx3, rx3) = channel::<()>();
472
473         spawn(proc() {
474             rx3.recv();
475             tx1.clone();
476             assert_eq!(rx3.try_recv(), Err(Empty));
477             tx1.send(2);
478             rx3.recv();
479         });
480
481         tx3.send(());
482         select!(
483             _i1 = rx1.recv() => {},
484             _i2 = rx2.recv() => fail!()
485         )
486         tx3.send(());
487     })
488
489     test!(fn cloning2() {
490         let (tx1, rx1) = channel::<int>();
491         let (_tx2, rx2) = channel::<int>();
492         let (tx3, rx3) = channel::<()>();
493
494         spawn(proc() {
495             rx3.recv();
496             tx1.clone();
497             assert_eq!(rx3.try_recv(), Err(Empty));
498             tx1.send(2);
499             rx3.recv();
500         });
501
502         tx3.send(());
503         select!(
504             _i1 = rx1.recv() => {},
505             _i2 = rx2.recv() => fail!()
506         )
507         tx3.send(());
508     })
509
510     test!(fn cloning3() {
511         let (tx1, rx1) = channel::<()>();
512         let (tx2, rx2) = channel::<()>();
513         let (tx3, rx3) = channel::<()>();
514         spawn(proc() {
515             let s = Select::new();
516             let mut h1 = s.handle(&rx1);
517             let mut h2 = s.handle(&rx2);
518             unsafe { h2.add(); }
519             unsafe { h1.add(); }
520             assert_eq!(s.wait(), h2.id);
521             tx3.send(());
522         });
523
524         for _ in range(0, 1000) { task::deschedule(); }
525         drop(tx1.clone());
526         tx2.send(());
527         rx3.recv();
528     })
529
530     test!(fn preflight1() {
531         let (tx, rx) = channel();
532         tx.send(());
533         select!(
534             () = rx.recv() => {}
535         )
536     })
537
538     test!(fn preflight2() {
539         let (tx, rx) = channel();
540         tx.send(());
541         tx.send(());
542         select!(
543             () = rx.recv() => {}
544         )
545     })
546
547     test!(fn preflight3() {
548         let (tx, rx) = channel();
549         drop(tx.clone());
550         tx.send(());
551         select!(
552             () = rx.recv() => {}
553         )
554     })
555
556     test!(fn preflight4() {
557         let (tx, rx) = channel();
558         tx.send(());
559         let s = Select::new();
560         let mut h = s.handle(&rx);
561         unsafe { h.add(); }
562         assert_eq!(s.wait2(false), h.id);
563     })
564
565     test!(fn preflight5() {
566         let (tx, rx) = channel();
567         tx.send(());
568         tx.send(());
569         let s = Select::new();
570         let mut h = s.handle(&rx);
571         unsafe { h.add(); }
572         assert_eq!(s.wait2(false), h.id);
573     })
574
575     test!(fn preflight6() {
576         let (tx, rx) = channel();
577         drop(tx.clone());
578         tx.send(());
579         let s = Select::new();
580         let mut h = s.handle(&rx);
581         unsafe { h.add(); }
582         assert_eq!(s.wait2(false), h.id);
583     })
584
585     test!(fn preflight7() {
586         let (tx, rx) = channel::<()>();
587         drop(tx);
588         let s = Select::new();
589         let mut h = s.handle(&rx);
590         unsafe { h.add(); }
591         assert_eq!(s.wait2(false), h.id);
592     })
593
594     test!(fn preflight8() {
595         let (tx, rx) = channel();
596         tx.send(());
597         drop(tx);
598         rx.recv();
599         let s = Select::new();
600         let mut h = s.handle(&rx);
601         unsafe { h.add(); }
602         assert_eq!(s.wait2(false), h.id);
603     })
604
605     test!(fn preflight9() {
606         let (tx, rx) = channel();
607         drop(tx.clone());
608         tx.send(());
609         drop(tx);
610         rx.recv();
611         let s = Select::new();
612         let mut h = s.handle(&rx);
613         unsafe { h.add(); }
614         assert_eq!(s.wait2(false), h.id);
615     })
616
617     test!(fn oneshot_data_waiting() {
618         let (tx1, rx1) = channel();
619         let (tx2, rx2) = channel();
620         spawn(proc() {
621             select! {
622                 () = rx1.recv() => {}
623             }
624             tx2.send(());
625         });
626
627         for _ in range(0, 100) { task::deschedule() }
628         tx1.send(());
629         rx2.recv();
630     })
631
632     test!(fn stream_data_waiting() {
633         let (tx1, rx1) = channel();
634         let (tx2, rx2) = channel();
635         tx1.send(());
636         tx1.send(());
637         rx1.recv();
638         rx1.recv();
639         spawn(proc() {
640             select! {
641                 () = rx1.recv() => {}
642             }
643             tx2.send(());
644         });
645
646         for _ in range(0, 100) { task::deschedule() }
647         tx1.send(());
648         rx2.recv();
649     })
650
651     test!(fn shared_data_waiting() {
652         let (tx1, rx1) = channel();
653         let (tx2, rx2) = channel();
654         drop(tx1.clone());
655         tx1.send(());
656         rx1.recv();
657         spawn(proc() {
658             select! {
659                 () = rx1.recv() => {}
660             }
661             tx2.send(());
662         });
663
664         for _ in range(0, 100) { task::deschedule() }
665         tx1.send(());
666         rx2.recv();
667     })
668
669     test!(fn sync1() {
670         let (tx, rx) = sync_channel(1);
671         tx.send(1);
672         select! {
673             n = rx.recv() => { assert_eq!(n, 1); }
674         }
675     })
676
677     test!(fn sync2() {
678         let (tx, rx) = sync_channel(0);
679         spawn(proc() {
680             for _ in range(0, 100) { task::deschedule() }
681             tx.send(1);
682         });
683         select! {
684             n = rx.recv() => { assert_eq!(n, 1); }
685         }
686     })
687
688     test!(fn sync3() {
689         let (tx1, rx1) = sync_channel(0);
690         let (tx2, rx2) = channel();
691         spawn(proc() { tx1.send(1); });
692         spawn(proc() { tx2.send(2); });
693         select! {
694             n = rx1.recv() => {
695                 assert_eq!(n, 1);
696                 assert_eq!(rx2.recv(), 2);
697             },
698             n = rx2.recv() => {
699                 assert_eq!(n, 2);
700                 assert_eq!(rx1.recv(), 1);
701             }
702         }
703     })
704 }