]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/select.rs
Auto merge of #36767 - jseyfried:enforce_rfc_1560_shadowing, r=nrc
[rust.git] / src / libstd / sync / mpsc / select.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Examples
28 //!
29 //! ```rust
30 //! #![feature(mpsc_select)]
31 //!
32 //! use std::sync::mpsc::channel;
33 //!
34 //! let (tx1, rx1) = channel();
35 //! let (tx2, rx2) = channel();
36 //!
37 //! tx1.send(1).unwrap();
38 //! tx2.send(2).unwrap();
39 //!
40 //! select! {
41 //!     val = rx1.recv() => {
42 //!         assert_eq!(val.unwrap(), 1);
43 //!     },
44 //!     val = rx2.recv() => {
45 //!         assert_eq!(val.unwrap(), 2);
46 //!     }
47 //! }
48 //! ```
49
50 #![allow(dead_code)]
51 #![unstable(feature = "mpsc_select",
52             reason = "This implementation, while likely sufficient, is unsafe and \
53                       likely to be error prone. At some point in the future this \
54                       module will likely be replaced, and it is currently \
55                       unknown how much API breakage that will cause. The ability \
56                       to select over a number of channels will remain forever, \
57                       but no guarantees beyond this are being made",
58             issue = "27800")]
59
60
61 use fmt;
62
63 use core::cell::{Cell, UnsafeCell};
64 use core::marker;
65 use core::ptr;
66 use core::usize;
67
68 use sync::mpsc::{Receiver, RecvError};
69 use sync::mpsc::blocking::{self, SignalToken};
70
71 /// The "receiver set" of the select interface. This structure is used to manage
72 /// a set of receivers which are being selected over.
73 pub struct Select {
74     inner: UnsafeCell<SelectInner>,
75     next_id: Cell<usize>,
76 }
77
78 struct SelectInner {
79     head: *mut Handle<'static, ()>,
80     tail: *mut Handle<'static, ()>,
81 }
82
83 impl !marker::Send for Select {}
84
85 /// A handle to a receiver which is currently a member of a `Select` set of
86 /// receivers.  This handle is used to keep the receiver in the set as well as
87 /// interact with the underlying receiver.
88 pub struct Handle<'rx, T:Send+'rx> {
89     /// The ID of this handle, used to compare against the return value of
90     /// `Select::wait()`
91     id: usize,
92     selector: *mut SelectInner,
93     next: *mut Handle<'static, ()>,
94     prev: *mut Handle<'static, ()>,
95     added: bool,
96     packet: &'rx (Packet+'rx),
97
98     // due to our fun transmutes, we be sure to place this at the end. (nothing
99     // previous relies on T)
100     rx: &'rx Receiver<T>,
101 }
102
103 struct Packets { cur: *mut Handle<'static, ()> }
104
105 #[doc(hidden)]
106 #[derive(PartialEq, Eq)]
107 pub enum StartResult {
108     Installed,
109     Abort,
110 }
111
112 #[doc(hidden)]
113 pub trait Packet {
114     fn can_recv(&self) -> bool;
115     fn start_selection(&self, token: SignalToken) -> StartResult;
116     fn abort_selection(&self) -> bool;
117 }
118
119 impl Select {
120     /// Creates a new selection structure. This set is initially empty.
121     ///
122     /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
123     /// the `select!` macro.
124     ///
125     /// # Examples
126     ///
127     /// ```
128     /// #![feature(mpsc_select)]
129     ///
130     /// use std::sync::mpsc::Select;
131     ///
132     /// let select = Select::new();
133     /// ```
134     pub fn new() -> Select {
135         Select {
136             inner: UnsafeCell::new(SelectInner {
137                 head: ptr::null_mut(),
138                 tail: ptr::null_mut(),
139             }),
140             next_id: Cell::new(1),
141         }
142     }
143
144     /// Creates a new handle into this receiver set for a new receiver. Note
145     /// that this does *not* add the receiver to the receiver set, for that you
146     /// must call the `add` method on the handle itself.
147     pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
148         let id = self.next_id.get();
149         self.next_id.set(id + 1);
150         Handle {
151             id: id,
152             selector: self.inner.get(),
153             next: ptr::null_mut(),
154             prev: ptr::null_mut(),
155             added: false,
156             rx: rx,
157             packet: rx,
158         }
159     }
160
161     /// Waits for an event on this receiver set. The returned value is *not* an
162     /// index, but rather an id. This id can be queried against any active
163     /// `Handle` structures (each one has an `id` method). The handle with
164     /// the matching `id` will have some sort of event available on it. The
165     /// event could either be that data is available or the corresponding
166     /// channel has been closed.
167     pub fn wait(&self) -> usize {
168         self.wait2(true)
169     }
170
171     /// Helper method for skipping the preflight checks during testing
172     fn wait2(&self, do_preflight_checks: bool) -> usize {
173         // Note that this is currently an inefficient implementation. We in
174         // theory have knowledge about all receivers in the set ahead of time,
175         // so this method shouldn't really have to iterate over all of them yet
176         // again. The idea with this "receiver set" interface is to get the
177         // interface right this time around, and later this implementation can
178         // be optimized.
179         //
180         // This implementation can be summarized by:
181         //
182         //      fn select(receivers) {
183         //          if any receiver ready { return ready index }
184         //          deschedule {
185         //              block on all receivers
186         //          }
187         //          unblock on all receivers
188         //          return ready index
189         //      }
190         //
191         // Most notably, the iterations over all of the receivers shouldn't be
192         // necessary.
193         unsafe {
194             // Stage 1: preflight checks. Look for any packets ready to receive
195             if do_preflight_checks {
196                 for handle in self.iter() {
197                     if (*handle).packet.can_recv() {
198                         return (*handle).id();
199                     }
200                 }
201             }
202
203             // Stage 2: begin the blocking process
204             //
205             // Create a number of signal tokens, and install each one
206             // sequentially until one fails. If one fails, then abort the
207             // selection on the already-installed tokens.
208             let (wait_token, signal_token) = blocking::tokens();
209             for (i, handle) in self.iter().enumerate() {
210                 match (*handle).packet.start_selection(signal_token.clone()) {
211                     StartResult::Installed => {}
212                     StartResult::Abort => {
213                         // Go back and abort the already-begun selections
214                         for handle in self.iter().take(i) {
215                             (*handle).packet.abort_selection();
216                         }
217                         return (*handle).id;
218                     }
219                 }
220             }
221
222             // Stage 3: no messages available, actually block
223             wait_token.wait();
224
225             // Stage 4: there *must* be message available; find it.
226             //
227             // Abort the selection process on each receiver. If the abort
228             // process returns `true`, then that means that the receiver is
229             // ready to receive some data. Note that this also means that the
230             // receiver may have yet to have fully read the `to_wake` field and
231             // woken us up (although the wakeup is guaranteed to fail).
232             //
233             // This situation happens in the window of where a sender invokes
234             // increment(), sees -1, and then decides to wake up the thread. After
235             // all this is done, the sending thread will set `selecting` to
236             // `false`. Until this is done, we cannot return. If we were to
237             // return, then a sender could wake up a receiver which has gone
238             // back to sleep after this call to `select`.
239             //
240             // Note that it is a "fairly small window" in which an increment()
241             // views that it should wake a thread up until the `selecting` bit
242             // is set to false. For now, the implementation currently just spins
243             // in a yield loop. This is very distasteful, but this
244             // implementation is already nowhere near what it should ideally be.
245             // A rewrite should focus on avoiding a yield loop, and for now this
246             // implementation is tying us over to a more efficient "don't
247             // iterate over everything every time" implementation.
248             let mut ready_id = usize::MAX;
249             for handle in self.iter() {
250                 if (*handle).packet.abort_selection() {
251                     ready_id = (*handle).id;
252                 }
253             }
254
255             // We must have found a ready receiver
256             assert!(ready_id != usize::MAX);
257             return ready_id;
258         }
259     }
260
261     fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
262 }
263
264 impl<'rx, T: Send> Handle<'rx, T> {
265     /// Retrieves the id of this handle.
266     #[inline]
267     pub fn id(&self) -> usize { self.id }
268
269     /// Blocks to receive a value on the underlying receiver, returning `Some` on
270     /// success or `None` if the channel disconnects. This function has the same
271     /// semantics as `Receiver.recv`
272     pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
273
274     /// Adds this handle to the receiver set that the handle was created from. This
275     /// method can be called multiple times, but it has no effect if `add` was
276     /// called previously.
277     ///
278     /// This method is unsafe because it requires that the `Handle` is not moved
279     /// while it is added to the `Select` set.
280     pub unsafe fn add(&mut self) {
281         if self.added { return }
282         let selector = &mut *self.selector;
283         let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
284
285         if selector.head.is_null() {
286             selector.head = me;
287             selector.tail = me;
288         } else {
289             (*me).prev = selector.tail;
290             assert!((*me).next.is_null());
291             (*selector.tail).next = me;
292             selector.tail = me;
293         }
294         self.added = true;
295     }
296
297     /// Removes this handle from the `Select` set. This method is unsafe because
298     /// it has no guarantee that the `Handle` was not moved since `add` was
299     /// called.
300     pub unsafe fn remove(&mut self) {
301         if !self.added { return }
302
303         let selector = &mut *self.selector;
304         let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
305
306         if self.prev.is_null() {
307             assert_eq!(selector.head, me);
308             selector.head = self.next;
309         } else {
310             (*self.prev).next = self.next;
311         }
312         if self.next.is_null() {
313             assert_eq!(selector.tail, me);
314             selector.tail = self.prev;
315         } else {
316             (*self.next).prev = self.prev;
317         }
318
319         self.next = ptr::null_mut();
320         self.prev = ptr::null_mut();
321
322         self.added = false;
323     }
324 }
325
326 impl Drop for Select {
327     fn drop(&mut self) {
328         unsafe {
329             assert!((&*self.inner.get()).head.is_null());
330             assert!((&*self.inner.get()).tail.is_null());
331         }
332     }
333 }
334
335 impl<'rx, T: Send> Drop for Handle<'rx, T> {
336     fn drop(&mut self) {
337         unsafe { self.remove() }
338     }
339 }
340
341 impl Iterator for Packets {
342     type Item = *mut Handle<'static, ()>;
343
344     fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
345         if self.cur.is_null() {
346             None
347         } else {
348             let ret = Some(self.cur);
349             unsafe { self.cur = (*self.cur).next; }
350             ret
351         }
352     }
353 }
354
355 #[stable(feature = "mpsc_debug", since = "1.7.0")]
356 impl fmt::Debug for Select {
357     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
358         write!(f, "Select {{ .. }}")
359     }
360 }
361
362 #[stable(feature = "mpsc_debug", since = "1.7.0")]
363 impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
364     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365         write!(f, "Handle {{ .. }}")
366     }
367 }
368
369 #[allow(unused_imports)]
370 #[cfg(all(test, not(target_os = "emscripten")))]
371 mod tests {
372     use thread;
373     use sync::mpsc::*;
374
375     // Don't use the libstd version so we can pull in the right Select structure
376     // (std::comm points at the wrong one)
377     macro_rules! select {
378         (
379             $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
380         ) => ({
381             let sel = Select::new();
382             $( let mut $rx = sel.handle(&$rx); )+
383             unsafe {
384                 $( $rx.add(); )+
385             }
386             let ret = sel.wait();
387             $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
388             { unreachable!() }
389         })
390     }
391
392     #[test]
393     fn smoke() {
394         let (tx1, rx1) = channel::<i32>();
395         let (tx2, rx2) = channel::<i32>();
396         tx1.send(1).unwrap();
397         select! {
398             foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
399             _bar = rx2.recv() => { panic!() }
400         }
401         tx2.send(2).unwrap();
402         select! {
403             _foo = rx1.recv() => { panic!() },
404             bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
405         }
406         drop(tx1);
407         select! {
408             foo = rx1.recv() => { assert!(foo.is_err()); },
409             _bar = rx2.recv() => { panic!() }
410         }
411         drop(tx2);
412         select! {
413             bar = rx2.recv() => { assert!(bar.is_err()); }
414         }
415     }
416
417     #[test]
418     fn smoke2() {
419         let (_tx1, rx1) = channel::<i32>();
420         let (_tx2, rx2) = channel::<i32>();
421         let (_tx3, rx3) = channel::<i32>();
422         let (_tx4, rx4) = channel::<i32>();
423         let (tx5, rx5) = channel::<i32>();
424         tx5.send(4).unwrap();
425         select! {
426             _foo = rx1.recv() => { panic!("1") },
427             _foo = rx2.recv() => { panic!("2") },
428             _foo = rx3.recv() => { panic!("3") },
429             _foo = rx4.recv() => { panic!("4") },
430             foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
431         }
432     }
433
434     #[test]
435     fn closed() {
436         let (_tx1, rx1) = channel::<i32>();
437         let (tx2, rx2) = channel::<i32>();
438         drop(tx2);
439
440         select! {
441             _a1 = rx1.recv() => { panic!() },
442             a2 = rx2.recv() => { assert!(a2.is_err()); }
443         }
444     }
445
446     #[test]
447     fn unblocks() {
448         let (tx1, rx1) = channel::<i32>();
449         let (_tx2, rx2) = channel::<i32>();
450         let (tx3, rx3) = channel::<i32>();
451
452         let _t = thread::spawn(move|| {
453             for _ in 0..20 { thread::yield_now(); }
454             tx1.send(1).unwrap();
455             rx3.recv().unwrap();
456             for _ in 0..20 { thread::yield_now(); }
457         });
458
459         select! {
460             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
461             _b = rx2.recv() => { panic!() }
462         }
463         tx3.send(1).unwrap();
464         select! {
465             a = rx1.recv() => { assert!(a.is_err()) },
466             _b = rx2.recv() => { panic!() }
467         }
468     }
469
470     #[test]
471     fn both_ready() {
472         let (tx1, rx1) = channel::<i32>();
473         let (tx2, rx2) = channel::<i32>();
474         let (tx3, rx3) = channel::<()>();
475
476         let _t = thread::spawn(move|| {
477             for _ in 0..20 { thread::yield_now(); }
478             tx1.send(1).unwrap();
479             tx2.send(2).unwrap();
480             rx3.recv().unwrap();
481         });
482
483         select! {
484             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
485             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
486         }
487         select! {
488             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
489             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
490         }
491         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
492         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
493         tx3.send(()).unwrap();
494     }
495
496     #[test]
497     fn stress() {
498         const AMT: i32 = 10000;
499         let (tx1, rx1) = channel::<i32>();
500         let (tx2, rx2) = channel::<i32>();
501         let (tx3, rx3) = channel::<()>();
502
503         let _t = thread::spawn(move|| {
504             for i in 0..AMT {
505                 if i % 2 == 0 {
506                     tx1.send(i).unwrap();
507                 } else {
508                     tx2.send(i).unwrap();
509                 }
510                 rx3.recv().unwrap();
511             }
512         });
513
514         for i in 0..AMT {
515             select! {
516                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
517                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
518             }
519             tx3.send(()).unwrap();
520         }
521     }
522
523     #[test]
524     fn cloning() {
525         let (tx1, rx1) = channel::<i32>();
526         let (_tx2, rx2) = channel::<i32>();
527         let (tx3, rx3) = channel::<()>();
528
529         let _t = thread::spawn(move|| {
530             rx3.recv().unwrap();
531             tx1.clone();
532             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
533             tx1.send(2).unwrap();
534             rx3.recv().unwrap();
535         });
536
537         tx3.send(()).unwrap();
538         select! {
539             _i1 = rx1.recv() => {},
540             _i2 = rx2.recv() => panic!()
541         }
542         tx3.send(()).unwrap();
543     }
544
545     #[test]
546     fn cloning2() {
547         let (tx1, rx1) = channel::<i32>();
548         let (_tx2, rx2) = channel::<i32>();
549         let (tx3, rx3) = channel::<()>();
550
551         let _t = thread::spawn(move|| {
552             rx3.recv().unwrap();
553             tx1.clone();
554             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
555             tx1.send(2).unwrap();
556             rx3.recv().unwrap();
557         });
558
559         tx3.send(()).unwrap();
560         select! {
561             _i1 = rx1.recv() => {},
562             _i2 = rx2.recv() => panic!()
563         }
564         tx3.send(()).unwrap();
565     }
566
567     #[test]
568     fn cloning3() {
569         let (tx1, rx1) = channel::<()>();
570         let (tx2, rx2) = channel::<()>();
571         let (tx3, rx3) = channel::<()>();
572         let _t = thread::spawn(move|| {
573             let s = Select::new();
574             let mut h1 = s.handle(&rx1);
575             let mut h2 = s.handle(&rx2);
576             unsafe { h2.add(); }
577             unsafe { h1.add(); }
578             assert_eq!(s.wait(), h2.id);
579             tx3.send(()).unwrap();
580         });
581
582         for _ in 0..1000 { thread::yield_now(); }
583         drop(tx1.clone());
584         tx2.send(()).unwrap();
585         rx3.recv().unwrap();
586     }
587
588     #[test]
589     fn preflight1() {
590         let (tx, rx) = channel();
591         tx.send(()).unwrap();
592         select! {
593             _n = rx.recv() => {}
594         }
595     }
596
597     #[test]
598     fn preflight2() {
599         let (tx, rx) = channel();
600         tx.send(()).unwrap();
601         tx.send(()).unwrap();
602         select! {
603             _n = rx.recv() => {}
604         }
605     }
606
607     #[test]
608     fn preflight3() {
609         let (tx, rx) = channel();
610         drop(tx.clone());
611         tx.send(()).unwrap();
612         select! {
613             _n = rx.recv() => {}
614         }
615     }
616
617     #[test]
618     fn preflight4() {
619         let (tx, rx) = channel();
620         tx.send(()).unwrap();
621         let s = Select::new();
622         let mut h = s.handle(&rx);
623         unsafe { h.add(); }
624         assert_eq!(s.wait2(false), h.id);
625     }
626
627     #[test]
628     fn preflight5() {
629         let (tx, rx) = channel();
630         tx.send(()).unwrap();
631         tx.send(()).unwrap();
632         let s = Select::new();
633         let mut h = s.handle(&rx);
634         unsafe { h.add(); }
635         assert_eq!(s.wait2(false), h.id);
636     }
637
638     #[test]
639     fn preflight6() {
640         let (tx, rx) = channel();
641         drop(tx.clone());
642         tx.send(()).unwrap();
643         let s = Select::new();
644         let mut h = s.handle(&rx);
645         unsafe { h.add(); }
646         assert_eq!(s.wait2(false), h.id);
647     }
648
649     #[test]
650     fn preflight7() {
651         let (tx, rx) = channel::<()>();
652         drop(tx);
653         let s = Select::new();
654         let mut h = s.handle(&rx);
655         unsafe { h.add(); }
656         assert_eq!(s.wait2(false), h.id);
657     }
658
659     #[test]
660     fn preflight8() {
661         let (tx, rx) = channel();
662         tx.send(()).unwrap();
663         drop(tx);
664         rx.recv().unwrap();
665         let s = Select::new();
666         let mut h = s.handle(&rx);
667         unsafe { h.add(); }
668         assert_eq!(s.wait2(false), h.id);
669     }
670
671     #[test]
672     fn preflight9() {
673         let (tx, rx) = channel();
674         drop(tx.clone());
675         tx.send(()).unwrap();
676         drop(tx);
677         rx.recv().unwrap();
678         let s = Select::new();
679         let mut h = s.handle(&rx);
680         unsafe { h.add(); }
681         assert_eq!(s.wait2(false), h.id);
682     }
683
684     #[test]
685     fn oneshot_data_waiting() {
686         let (tx1, rx1) = channel();
687         let (tx2, rx2) = channel();
688         let _t = thread::spawn(move|| {
689             select! {
690                 _n = rx1.recv() => {}
691             }
692             tx2.send(()).unwrap();
693         });
694
695         for _ in 0..100 { thread::yield_now() }
696         tx1.send(()).unwrap();
697         rx2.recv().unwrap();
698     }
699
700     #[test]
701     fn stream_data_waiting() {
702         let (tx1, rx1) = channel();
703         let (tx2, rx2) = channel();
704         tx1.send(()).unwrap();
705         tx1.send(()).unwrap();
706         rx1.recv().unwrap();
707         rx1.recv().unwrap();
708         let _t = thread::spawn(move|| {
709             select! {
710                 _n = rx1.recv() => {}
711             }
712             tx2.send(()).unwrap();
713         });
714
715         for _ in 0..100 { thread::yield_now() }
716         tx1.send(()).unwrap();
717         rx2.recv().unwrap();
718     }
719
720     #[test]
721     fn shared_data_waiting() {
722         let (tx1, rx1) = channel();
723         let (tx2, rx2) = channel();
724         drop(tx1.clone());
725         tx1.send(()).unwrap();
726         rx1.recv().unwrap();
727         let _t = thread::spawn(move|| {
728             select! {
729                 _n = rx1.recv() => {}
730             }
731             tx2.send(()).unwrap();
732         });
733
734         for _ in 0..100 { thread::yield_now() }
735         tx1.send(()).unwrap();
736         rx2.recv().unwrap();
737     }
738
739     #[test]
740     fn sync1() {
741         let (tx, rx) = sync_channel::<i32>(1);
742         tx.send(1).unwrap();
743         select! {
744             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
745         }
746     }
747
748     #[test]
749     fn sync2() {
750         let (tx, rx) = sync_channel::<i32>(0);
751         let _t = thread::spawn(move|| {
752             for _ in 0..100 { thread::yield_now() }
753             tx.send(1).unwrap();
754         });
755         select! {
756             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
757         }
758     }
759
760     #[test]
761     fn sync3() {
762         let (tx1, rx1) = sync_channel::<i32>(0);
763         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
764         let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
765         let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
766         select! {
767             n = rx1.recv() => {
768                 let n = n.unwrap();
769                 assert_eq!(n, 1);
770                 assert_eq!(rx2.recv().unwrap(), 2);
771             },
772             n = rx2.recv() => {
773                 let n = n.unwrap();
774                 assert_eq!(n, 2);
775                 assert_eq!(rx1.recv().unwrap(), 1);
776             }
777         }
778     }
779
780     #[test]
781     fn fmt_debug_select() {
782         let sel = Select::new();
783         assert_eq!(format!("{:?}", sel), "Select { .. }");
784     }
785
786     #[test]
787     fn fmt_debug_handle() {
788         let (_, rx) = channel::<i32>();
789         let sel = Select::new();
790         let handle = sel.handle(&rx);
791         assert_eq!(format!("{:?}", handle), "Handle { .. }");
792     }
793 }