]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/select.rs
auto merge of #21132 : sfackler/rust/wait_timeout, r=alexcrichton
[rust.git] / src / libstd / sync / mpsc / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! use std::sync::mpsc::channel;
31 //!
32 //! let (tx1, rx1) = channel();
33 //! let (tx2, rx2) = channel();
34 //!
35 //! tx1.send(1i).unwrap();
36 //! tx2.send(2i).unwrap();
37 //!
38 //! select! {
39 //!     val = rx1.recv() => {
40 //!         assert_eq!(val.unwrap(), 1i);
41 //!     },
42 //!     val = rx2.recv() => {
43 //!         assert_eq!(val.unwrap(), 2i);
44 //!     }
45 //! }
46 //! ```
47
48 #![allow(dead_code)]
49 #![unstable = "This implementation, while likely sufficient, is unsafe and \
50                    likely to be error prone. At some point in the future this \
51                    module will likely be replaced, and it is currently \
52                    unknown how much API breakage that will cause. The ability \
53                    to select over a number of channels will remain forever, \
54                    but no guarantees beyond this are being made"]
55
56
57 use core::prelude::*;
58
59 use core::cell::Cell;
60 use core::marker;
61 use core::mem;
62 use core::uint;
63
64 use sync::mpsc::{Receiver, RecvError};
65 use sync::mpsc::blocking::{self, SignalToken};
66
67 /// The "receiver set" of the select interface. This structure is used to manage
68 /// a set of receivers which are being selected over.
69 #[cfg(stage0)] // NOTE remove impl after next snapshot
70 pub struct Select {
71     head: *mut Handle<'static, ()>,
72     tail: *mut Handle<'static, ()>,
73     next_id: Cell<uint>,
74     marker1: marker::NoSend,
75 }
76
77 /// The "receiver set" of the select interface. This structure is used to manage
78 /// a set of receivers which are being selected over.
79 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
80 pub struct Select {
81     head: *mut Handle<'static, ()>,
82     tail: *mut Handle<'static, ()>,
83     next_id: Cell<uint>,
84 }
85
86 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
87 impl !marker::Send for Select {}
88
89 /// A handle to a receiver which is currently a member of a `Select` set of
90 /// receivers.  This handle is used to keep the receiver in the set as well as
91 /// interact with the underlying receiver.
92 pub struct Handle<'rx, T:'rx> {
93     /// The ID of this handle, used to compare against the return value of
94     /// `Select::wait()`
95     id: uint,
96     selector: &'rx Select,
97     next: *mut Handle<'static, ()>,
98     prev: *mut Handle<'static, ()>,
99     added: bool,
100     packet: &'rx (Packet+'rx),
101
102     // due to our fun transmutes, we be sure to place this at the end. (nothing
103     // previous relies on T)
104     rx: &'rx Receiver<T>,
105 }
106
107 struct Packets { cur: *mut Handle<'static, ()> }
108
109 #[doc(hidden)]
110 #[derive(PartialEq)]
111 pub enum StartResult {
112     Installed,
113     Abort,
114 }
115
116 #[doc(hidden)]
117 pub trait Packet {
118     fn can_recv(&self) -> bool;
119     fn start_selection(&self, token: SignalToken) -> StartResult;
120     fn abort_selection(&self) -> bool;
121 }
122
123 impl Select {
124     /// Creates a new selection structure. This set is initially empty and
125     /// `wait` will panic!() if called.
126     ///
127     /// Usage of this struct directly can sometimes be burdensome, and usage is
128     /// rather much easier through the `select!` macro.
129     #[cfg(stage0)] // NOTE remove impl after next snapshot
130     pub fn new() -> Select {
131         Select {
132             marker1: marker::NoSend,
133             head: 0 as *mut Handle<'static, ()>,
134             tail: 0 as *mut Handle<'static, ()>,
135             next_id: Cell::new(1),
136         }
137     }
138
139     /// Creates a new selection structure. This set is initially empty and
140     /// `wait` will panic!() if called.
141     ///
142     /// Usage of this struct directly can sometimes be burdensome, and usage is
143     /// rather much easier through the `select!` macro.
144     #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
145     pub fn new() -> Select {
146         Select {
147             head: 0 as *mut Handle<'static, ()>,
148             tail: 0 as *mut Handle<'static, ()>,
149             next_id: Cell::new(1),
150         }
151     }
152
153     /// Creates a new handle into this receiver set for a new receiver. Note
154     /// that this does *not* add the receiver to the receiver set, for that you
155     /// must call the `add` method on the handle itself.
156     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
157         let id = self.next_id.get();
158         self.next_id.set(id + 1);
159         Handle {
160             id: id,
161             selector: self,
162             next: 0 as *mut Handle<'static, ()>,
163             prev: 0 as *mut Handle<'static, ()>,
164             added: false,
165             rx: rx,
166             packet: rx,
167         }
168     }
169
170     /// Waits for an event on this receiver set. The returned value is *not* an
171     /// index, but rather an id. This id can be queried against any active
172     /// `Handle` structures (each one has an `id` method). The handle with
173     /// the matching `id` will have some sort of event available on it. The
174     /// event could either be that data is available or the corresponding
175     /// channel has been closed.
176     pub fn wait(&self) -> uint {
177         self.wait2(true)
178     }
179
180     /// Helper method for skipping the preflight checks during testing
181     fn wait2(&self, do_preflight_checks: bool) -> uint {
182         // Note that this is currently an inefficient implementation. We in
183         // theory have knowledge about all receivers in the set ahead of time,
184         // so this method shouldn't really have to iterate over all of them yet
185         // again. The idea with this "receiver set" interface is to get the
186         // interface right this time around, and later this implementation can
187         // be optimized.
188         //
189         // This implementation can be summarized by:
190         //
191         //      fn select(receivers) {
192         //          if any receiver ready { return ready index }
193         //          deschedule {
194         //              block on all receivers
195         //          }
196         //          unblock on all receivers
197         //          return ready index
198         //      }
199         //
200         // Most notably, the iterations over all of the receivers shouldn't be
201         // necessary.
202         unsafe {
203             // Stage 1: preflight checks. Look for any packets ready to receive
204             if do_preflight_checks {
205                 for handle in self.iter() {
206                     if (*handle).packet.can_recv() {
207                         return (*handle).id();
208                     }
209                 }
210             }
211
212             // Stage 2: begin the blocking process
213             //
214             // Create a number of signal tokens, and install each one
215             // sequentially until one fails. If one fails, then abort the
216             // selection on the already-installed tokens.
217             let (wait_token, signal_token) = blocking::tokens();
218             for (i, handle) in self.iter().enumerate() {
219                 match (*handle).packet.start_selection(signal_token.clone()) {
220                     StartResult::Installed => {}
221                     StartResult::Abort => {
222                         // Go back and abort the already-begun selections
223                         for handle in self.iter().take(i) {
224                             (*handle).packet.abort_selection();
225                         }
226                         return (*handle).id;
227                     }
228                 }
229             }
230
231             // Stage 3: no messages available, actually block
232             wait_token.wait();
233
234             // Stage 4: there *must* be message available; find it.
235             //
236             // Abort the selection process on each receiver. If the abort
237             // process returns `true`, then that means that the receiver is
238             // ready to receive some data. Note that this also means that the
239             // receiver may have yet to have fully read the `to_wake` field and
240             // woken us up (although the wakeup is guaranteed to fail).
241             //
242             // This situation happens in the window of where a sender invokes
243             // increment(), sees -1, and then decides to wake up the task. After
244             // all this is done, the sending thread will set `selecting` to
245             // `false`. Until this is done, we cannot return. If we were to
246             // return, then a sender could wake up a receiver which has gone
247             // back to sleep after this call to `select`.
248             //
249             // Note that it is a "fairly small window" in which an increment()
250             // views that it should wake a thread up until the `selecting` bit
251             // is set to false. For now, the implementation currently just spins
252             // in a yield loop. This is very distasteful, but this
253             // implementation is already nowhere near what it should ideally be.
254             // A rewrite should focus on avoiding a yield loop, and for now this
255             // implementation is tying us over to a more efficient "don't
256             // iterate over everything every time" implementation.
257             let mut ready_id = uint::MAX;
258             for handle in self.iter() {
259                 if (*handle).packet.abort_selection() {
260                     ready_id = (*handle).id;
261                 }
262             }
263
264             // We must have found a ready receiver
265             assert!(ready_id != uint::MAX);
266             return ready_id;
267         }
268     }
269
270     fn iter(&self) -> Packets { Packets { cur: self.head } }
271 }
272
273 impl<'rx, T: Send> Handle<'rx, T> {
274     /// Retrieve the id of this handle.
275     #[inline]
276     pub fn id(&self) -> uint { self.id }
277
278     /// Block to receive a value on the underlying receiver, returning `Some` on
279     /// success or `None` if the channel disconnects. This function has the same
280     /// semantics as `Receiver.recv`
281     pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
282
283     /// Adds this handle to the receiver set that the handle was created from. This
284     /// method can be called multiple times, but it has no effect if `add` was
285     /// called previously.
286     ///
287     /// This method is unsafe because it requires that the `Handle` is not moved
288     /// while it is added to the `Select` set.
289     pub unsafe fn add(&mut self) {
290         if self.added { return }
291         let selector: &mut Select = mem::transmute(&*self.selector);
292         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
293
294         if selector.head.is_null() {
295             selector.head = me;
296             selector.tail = me;
297         } else {
298             (*me).prev = selector.tail;
299             assert!((*me).next.is_null());
300             (*selector.tail).next = me;
301             selector.tail = me;
302         }
303         self.added = true;
304     }
305
306     /// Removes this handle from the `Select` set. This method is unsafe because
307     /// it has no guarantee that the `Handle` was not moved since `add` was
308     /// called.
309     pub unsafe fn remove(&mut self) {
310         if !self.added { return }
311
312         let selector: &mut Select = mem::transmute(&*self.selector);
313         let me: *mut Handle<'static, ()> = mem::transmute(&*self);
314
315         if self.prev.is_null() {
316             assert_eq!(selector.head, me);
317             selector.head = self.next;
318         } else {
319             (*self.prev).next = self.next;
320         }
321         if self.next.is_null() {
322             assert_eq!(selector.tail, me);
323             selector.tail = self.prev;
324         } else {
325             (*self.next).prev = self.prev;
326         }
327
328         self.next = 0 as *mut Handle<'static, ()>;
329         self.prev = 0 as *mut Handle<'static, ()>;
330
331         self.added = false;
332     }
333 }
334
335 #[unsafe_destructor]
336 impl Drop for Select {
337     fn drop(&mut self) {
338         assert!(self.head.is_null());
339         assert!(self.tail.is_null());
340     }
341 }
342
343 #[unsafe_destructor]
344 impl<'rx, T: Send> Drop for Handle<'rx, T> {
345     fn drop(&mut self) {
346         unsafe { self.remove() }
347     }
348 }
349
350 impl Iterator for Packets {
351     type Item = *mut Handle<'static, ()>;
352
353     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
354         if self.cur.is_null() {
355             None
356         } else {
357             let ret = Some(self.cur);
358             unsafe { self.cur = (*self.cur).next; }
359             ret
360         }
361     }
362 }
363
364 #[cfg(test)]
365 #[allow(unused_imports)]
366 mod test {
367     use prelude::v1::*;
368
369     use thread::Thread;
370     use sync::mpsc::*;
371
372     // Don't use the libstd version so we can pull in the right Select structure
373     // (std::comm points at the wrong one)
374     macro_rules! select {
375         (
376             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
377         ) => ({
378             let sel = Select::new();
379             $( let mut $rx = sel.handle(&$rx); )+
380             unsafe {
381                 $( $rx.add(); )+
382             }
383             let ret = sel.wait();
384             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
385             { unreachable!() }
386         })
387     }
388
389     #[test]
390     fn smoke() {
391         let (tx1, rx1) = channel::<int>();
392         let (tx2, rx2) = channel::<int>();
393         tx1.send(1).unwrap();
394         select! {
395             foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
396             _bar = rx2.recv() => { panic!() }
397         }
398         tx2.send(2).unwrap();
399         select! {
400             _foo = rx1.recv() => { panic!() },
401             bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
402         }
403         drop(tx1);
404         select! {
405             foo = rx1.recv() => { assert!(foo.is_err()); },
406             _bar = rx2.recv() => { panic!() }
407         }
408         drop(tx2);
409         select! {
410             bar = rx2.recv() => { assert!(bar.is_err()); }
411         }
412     }
413
414     #[test]
415     fn smoke2() {
416         let (_tx1, rx1) = channel::<int>();
417         let (_tx2, rx2) = channel::<int>();
418         let (_tx3, rx3) = channel::<int>();
419         let (_tx4, rx4) = channel::<int>();
420         let (tx5, rx5) = channel::<int>();
421         tx5.send(4).unwrap();
422         select! {
423             _foo = rx1.recv() => { panic!("1") },
424             _foo = rx2.recv() => { panic!("2") },
425             _foo = rx3.recv() => { panic!("3") },
426             _foo = rx4.recv() => { panic!("4") },
427             foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
428         }
429     }
430
431     #[test]
432     fn closed() {
433         let (_tx1, rx1) = channel::<int>();
434         let (tx2, rx2) = channel::<int>();
435         drop(tx2);
436
437         select! {
438             _a1 = rx1.recv() => { panic!() },
439             a2 = rx2.recv() => { assert!(a2.is_err()); }
440         }
441     }
442
443     #[test]
444     fn unblocks() {
445         let (tx1, rx1) = channel::<int>();
446         let (_tx2, rx2) = channel::<int>();
447         let (tx3, rx3) = channel::<int>();
448
449         let _t = Thread::spawn(move|| {
450             for _ in range(0u, 20) { Thread::yield_now(); }
451             tx1.send(1).unwrap();
452             rx3.recv().unwrap();
453             for _ in range(0u, 20) { Thread::yield_now(); }
454         });
455
456         select! {
457             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
458             _b = rx2.recv() => { panic!() }
459         }
460         tx3.send(1).unwrap();
461         select! {
462             a = rx1.recv() => { assert!(a.is_err()) },
463             _b = rx2.recv() => { panic!() }
464         }
465     }
466
467     #[test]
468     fn both_ready() {
469         let (tx1, rx1) = channel::<int>();
470         let (tx2, rx2) = channel::<int>();
471         let (tx3, rx3) = channel::<()>();
472
473         let _t = Thread::spawn(move|| {
474             for _ in range(0u, 20) { Thread::yield_now(); }
475             tx1.send(1).unwrap();
476             tx2.send(2).unwrap();
477             rx3.recv().unwrap();
478         });
479
480         select! {
481             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
482             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
483         }
484         select! {
485             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
486             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
487         }
488         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
489         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
490         tx3.send(()).unwrap();
491     }
492
493     #[test]
494     fn stress() {
495         static AMT: int = 10000;
496         let (tx1, rx1) = channel::<int>();
497         let (tx2, rx2) = channel::<int>();
498         let (tx3, rx3) = channel::<()>();
499
500         let _t = Thread::spawn(move|| {
501             for i in range(0, AMT) {
502                 if i % 2 == 0 {
503                     tx1.send(i).unwrap();
504                 } else {
505                     tx2.send(i).unwrap();
506                 }
507                 rx3.recv().unwrap();
508             }
509         });
510
511         for i in range(0, AMT) {
512             select! {
513                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
514                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
515             }
516             tx3.send(()).unwrap();
517         }
518     }
519
520     #[test]
521     fn cloning() {
522         let (tx1, rx1) = channel::<int>();
523         let (_tx2, rx2) = channel::<int>();
524         let (tx3, rx3) = channel::<()>();
525
526         let _t = Thread::spawn(move|| {
527             rx3.recv().unwrap();
528             tx1.clone();
529             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
530             tx1.send(2).unwrap();
531             rx3.recv().unwrap();
532         });
533
534         tx3.send(()).unwrap();
535         select! {
536             _i1 = rx1.recv() => {},
537             _i2 = rx2.recv() => panic!()
538         }
539         tx3.send(()).unwrap();
540     }
541
542     #[test]
543     fn cloning2() {
544         let (tx1, rx1) = channel::<int>();
545         let (_tx2, rx2) = channel::<int>();
546         let (tx3, rx3) = channel::<()>();
547
548         let _t = Thread::spawn(move|| {
549             rx3.recv().unwrap();
550             tx1.clone();
551             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
552             tx1.send(2).unwrap();
553             rx3.recv().unwrap();
554         });
555
556         tx3.send(()).unwrap();
557         select! {
558             _i1 = rx1.recv() => {},
559             _i2 = rx2.recv() => panic!()
560         }
561         tx3.send(()).unwrap();
562     }
563
564     #[test]
565     fn cloning3() {
566         let (tx1, rx1) = channel::<()>();
567         let (tx2, rx2) = channel::<()>();
568         let (tx3, rx3) = channel::<()>();
569         let _t = Thread::spawn(move|| {
570             let s = Select::new();
571             let mut h1 = s.handle(&rx1);
572             let mut h2 = s.handle(&rx2);
573             unsafe { h2.add(); }
574             unsafe { h1.add(); }
575             assert_eq!(s.wait(), h2.id);
576             tx3.send(()).unwrap();
577         });
578
579         for _ in range(0u, 1000) { Thread::yield_now(); }
580         drop(tx1.clone());
581         tx2.send(()).unwrap();
582         rx3.recv().unwrap();
583     }
584
585     #[test]
586     fn preflight1() {
587         let (tx, rx) = channel();
588         tx.send(()).unwrap();
589         select! {
590             _n = rx.recv() => {}
591         }
592     }
593
594     #[test]
595     fn preflight2() {
596         let (tx, rx) = channel();
597         tx.send(()).unwrap();
598         tx.send(()).unwrap();
599         select! {
600             _n = rx.recv() => {}
601         }
602     }
603
604     #[test]
605     fn preflight3() {
606         let (tx, rx) = channel();
607         drop(tx.clone());
608         tx.send(()).unwrap();
609         select! {
610             _n = rx.recv() => {}
611         }
612     }
613
614     #[test]
615     fn preflight4() {
616         let (tx, rx) = channel();
617         tx.send(()).unwrap();
618         let s = Select::new();
619         let mut h = s.handle(&rx);
620         unsafe { h.add(); }
621         assert_eq!(s.wait2(false), h.id);
622     }
623
624     #[test]
625     fn preflight5() {
626         let (tx, rx) = channel();
627         tx.send(()).unwrap();
628         tx.send(()).unwrap();
629         let s = Select::new();
630         let mut h = s.handle(&rx);
631         unsafe { h.add(); }
632         assert_eq!(s.wait2(false), h.id);
633     }
634
635     #[test]
636     fn preflight6() {
637         let (tx, rx) = channel();
638         drop(tx.clone());
639         tx.send(()).unwrap();
640         let s = Select::new();
641         let mut h = s.handle(&rx);
642         unsafe { h.add(); }
643         assert_eq!(s.wait2(false), h.id);
644     }
645
646     #[test]
647     fn preflight7() {
648         let (tx, rx) = channel::<()>();
649         drop(tx);
650         let s = Select::new();
651         let mut h = s.handle(&rx);
652         unsafe { h.add(); }
653         assert_eq!(s.wait2(false), h.id);
654     }
655
656     #[test]
657     fn preflight8() {
658         let (tx, rx) = channel();
659         tx.send(()).unwrap();
660         drop(tx);
661         rx.recv().unwrap();
662         let s = Select::new();
663         let mut h = s.handle(&rx);
664         unsafe { h.add(); }
665         assert_eq!(s.wait2(false), h.id);
666     }
667
668     #[test]
669     fn preflight9() {
670         let (tx, rx) = channel();
671         drop(tx.clone());
672         tx.send(()).unwrap();
673         drop(tx);
674         rx.recv().unwrap();
675         let s = Select::new();
676         let mut h = s.handle(&rx);
677         unsafe { h.add(); }
678         assert_eq!(s.wait2(false), h.id);
679     }
680
681     #[test]
682     fn oneshot_data_waiting() {
683         let (tx1, rx1) = channel();
684         let (tx2, rx2) = channel();
685         let _t = Thread::spawn(move|| {
686             select! {
687                 _n = rx1.recv() => {}
688             }
689             tx2.send(()).unwrap();
690         });
691
692         for _ in range(0u, 100) { Thread::yield_now() }
693         tx1.send(()).unwrap();
694         rx2.recv().unwrap();
695     }
696
697     #[test]
698     fn stream_data_waiting() {
699         let (tx1, rx1) = channel();
700         let (tx2, rx2) = channel();
701         tx1.send(()).unwrap();
702         tx1.send(()).unwrap();
703         rx1.recv().unwrap();
704         rx1.recv().unwrap();
705         let _t = Thread::spawn(move|| {
706             select! {
707                 _n = rx1.recv() => {}
708             }
709             tx2.send(()).unwrap();
710         });
711
712         for _ in range(0u, 100) { Thread::yield_now() }
713         tx1.send(()).unwrap();
714         rx2.recv().unwrap();
715     }
716
717     #[test]
718     fn shared_data_waiting() {
719         let (tx1, rx1) = channel();
720         let (tx2, rx2) = channel();
721         drop(tx1.clone());
722         tx1.send(()).unwrap();
723         rx1.recv().unwrap();
724         let _t = Thread::spawn(move|| {
725             select! {
726                 _n = rx1.recv() => {}
727             }
728             tx2.send(()).unwrap();
729         });
730
731         for _ in range(0u, 100) { Thread::yield_now() }
732         tx1.send(()).unwrap();
733         rx2.recv().unwrap();
734     }
735
736     #[test]
737     fn sync1() {
738         let (tx, rx) = sync_channel::<int>(1);
739         tx.send(1).unwrap();
740         select! {
741             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
742         }
743     }
744
745     #[test]
746     fn sync2() {
747         let (tx, rx) = sync_channel::<int>(0);
748         let _t = Thread::spawn(move|| {
749             for _ in range(0u, 100) { Thread::yield_now() }
750             tx.send(1).unwrap();
751         });
752         select! {
753             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
754         }
755     }
756
757     #[test]
758     fn sync3() {
759         let (tx1, rx1) = sync_channel::<int>(0);
760         let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
761         let _t = Thread::spawn(move|| { tx1.send(1).unwrap(); });
762         let _t = Thread::spawn(move|| { tx2.send(2).unwrap(); });
763         select! {
764             n = rx1.recv() => {
765                 let n = n.unwrap();
766                 assert_eq!(n, 1);
767                 assert_eq!(rx2.recv().unwrap(), 2);
768             },
769             n = rx2.recv() => {
770                 let n = n.unwrap();
771                 assert_eq!(n, 2);
772                 assert_eq!(rx1.recv().unwrap(), 1);
773             }
774         }
775     }
776 }