]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
debuginfo: Make debuginfo source location assignment more stable (Pt. 1)
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer communication primitives threads
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10i).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10i);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in range(0i, 10i) {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 #![stable]
167
168 // A description of how Rust's channel implementation works
169 //
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
173 //
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
179 //
180 // ## Flavors of channels
181 //
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
185 // channels in play.
186 //
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 //              They contain as few atomics as possible and involve one and
189 //              exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 //             use a different concurrent queue that is more tailored for this
192 //             use case. The initial allocation of this flavor of channel is not
193 //             optimized.
194 // * Shared - this is the most general form of channel that this module offers,
195 //            a channel with multiple senders. This type is as optimized as it
196 //            can be, but the previous two types mentioned are much faster for
197 //            their use-cases.
198 //
199 // ## Concurrent queues
200 //
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
204 //
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // shared channels.
210 //
211 // ### SPSC optimizations
212 //
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
217 // pop().
218 //
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
224 //
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
228 // coexist at once.
229 //
230 // ### MPSC optimizations
231 //
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
235 //
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
239 //
240 // ## Overview of the Implementation
241 //
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
245 //
246 //
247 //      send(t)                             recv()
248 //        queue.push(t)                       return if queue.pop()
249 //        if increment() == -1                deschedule {
250 //          wakeup()                            if decrement() > 0
251 //                                                cancel_deschedule()
252 //                                            }
253 //                                            queue.pop()
254 //
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
257 //
258 // ### The internal atomic counter
259 //
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
263 //
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
268 //
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
275 //
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
279 //
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
282 //
283 // ## Native Implementation
284 //
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
288 //
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
294 //
295 // ## Select
296 //
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
300 //
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // task to unblock.
307 //
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
312 //
313 // # Conclusion
314 //
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
317
318 use prelude::v1::*;
319
320 use sync::Arc;
321 use fmt;
322 use marker;
323 use mem;
324 use cell::UnsafeCell;
325
326 pub use self::select::{Select, Handle};
327 use self::select::StartResult;
328 use self::select::StartResult::*;
329 use self::blocking::SignalToken;
330
331 mod blocking;
332 mod oneshot;
333 mod select;
334 mod shared;
335 mod stream;
336 mod sync;
337 mod mpsc_queue;
338 mod spsc_queue;
339
340 /// The receiving-half of Rust's channel type. This half can only be owned by
341 /// one task
342 #[stable]
343 pub struct Receiver<T> {
344     inner: UnsafeCell<Flavor<T>>,
345 }
346
347 // The receiver port can be sent from place to place, so long as it
348 // is not used to receive non-sendable things.
349 unsafe impl<T:Send> Send for Receiver<T> { }
350
351 /// An iterator over messages on a receiver, this iterator will block
352 /// whenever `next` is called, waiting for a new message, and `None` will be
353 /// returned when the corresponding channel has hung up.
354 #[stable]
355 pub struct Iter<'a, T:'a> {
356     rx: &'a Receiver<T>
357 }
358
359 /// The sending-half of Rust's asynchronous channel type. This half can only be
360 /// owned by one task, but it can be cloned to send to other tasks.
361 #[stable]
362 pub struct Sender<T> {
363     inner: UnsafeCell<Flavor<T>>,
364 }
365
366 // The send port can be sent from place to place, so long as it
367 // is not used to send non-sendable things.
368 unsafe impl<T:Send> Send for Sender<T> { }
369
370 /// The sending-half of Rust's synchronous channel type. This half can only be
371 /// owned by one task, but it can be cloned to send to other tasks.
372 #[stable]
373 #[cfg(stage0)] // NOTE remove impl after next snapshot
374 pub struct SyncSender<T> {
375     inner: Arc<RacyCell<sync::Packet<T>>>,
376     // can't share in an arc
377     _marker: marker::NoSync,
378 }
379
380 /// The sending-half of Rust's synchronous channel type. This half can only be
381 /// owned by one task, but it can be cloned to send to other tasks.
382 #[stable]
383 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
384 pub struct SyncSender<T> {
385     inner: Arc<RacyCell<sync::Packet<T>>>,
386 }
387
388 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
389 impl<T> !marker::Sync for SyncSender<T> {}
390
391 /// An error returned from the `send` function on channels.
392 ///
393 /// A `send` operation can only fail if the receiving end of a channel is
394 /// disconnected, implying that the data could never be received. The error
395 /// contains the data being sent as a payload so it can be recovered.
396 #[derive(PartialEq, Eq)]
397 #[stable]
398 pub struct SendError<T>(pub T);
399
400 /// An error returned from the `recv` function on a `Receiver`.
401 ///
402 /// The `recv` operation can only fail if the sending half of a channel is
403 /// disconnected, implying that no further messages will ever be received.
404 #[derive(PartialEq, Eq, Clone, Copy)]
405 #[stable]
406 pub struct RecvError;
407
408 /// This enumeration is the list of the possible reasons that try_recv could not
409 /// return data when called.
410 #[derive(PartialEq, Clone, Copy)]
411 #[stable]
412 pub enum TryRecvError {
413     /// This channel is currently empty, but the sender(s) have not yet
414     /// disconnected, so data may yet become available.
415     #[stable]
416     Empty,
417
418     /// This channel's sending half has become disconnected, and there will
419     /// never be any more data received on this channel
420     #[stable]
421     Disconnected,
422 }
423
424 /// This enumeration is the list of the possible error outcomes for the
425 /// `SyncSender::try_send` method.
426 #[derive(PartialEq, Clone)]
427 #[stable]
428 pub enum TrySendError<T> {
429     /// The data could not be sent on the channel because it would require that
430     /// the callee block to send the data.
431     ///
432     /// If this is a buffered channel, then the buffer is full at this time. If
433     /// this is not a buffered channel, then there is no receiver available to
434     /// acquire the data.
435     #[stable]
436     Full(T),
437
438     /// This channel's receiving half has disconnected, so the data could not be
439     /// sent. The data is returned back to the callee in this case.
440     #[stable]
441     Disconnected(T),
442 }
443
444 enum Flavor<T> {
445     Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
446     Stream(Arc<RacyCell<stream::Packet<T>>>),
447     Shared(Arc<RacyCell<shared::Packet<T>>>),
448     Sync(Arc<RacyCell<sync::Packet<T>>>),
449 }
450
451 #[doc(hidden)]
452 trait UnsafeFlavor<T> {
453     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
454     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
455         &mut *self.inner_unsafe().get()
456     }
457     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
458         &*self.inner_unsafe().get()
459     }
460 }
461 impl<T> UnsafeFlavor<T> for Sender<T> {
462     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
463         &self.inner
464     }
465 }
466 impl<T> UnsafeFlavor<T> for Receiver<T> {
467     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
468         &self.inner
469     }
470 }
471
472 /// Creates a new asynchronous channel, returning the sender/receiver halves.
473 ///
474 /// All data sent on the sender will become available on the receiver, and no
475 /// send will block the calling task (this channel has an "infinite buffer").
476 ///
477 /// # Example
478 ///
479 /// ```
480 /// use std::sync::mpsc::channel;
481 /// use std::thread::Thread;
482 ///
483 /// // tx is is the sending half (tx for transmission), and rx is the receiving
484 /// // half (rx for receiving).
485 /// let (tx, rx) = channel();
486 ///
487 /// // Spawn off an expensive computation
488 /// Thread::spawn(move|| {
489 /// #   fn expensive_computation() {}
490 ///     tx.send(expensive_computation()).unwrap();
491 /// });
492 ///
493 /// // Do some useful work for awhile
494 ///
495 /// // Let's see what that answer was
496 /// println!("{:?}", rx.recv().unwrap());
497 /// ```
498 #[stable]
499 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
500     let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
501     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
502 }
503
504 /// Creates a new synchronous, bounded channel.
505 ///
506 /// Like asynchronous channels, the `Receiver` will block until a message
507 /// becomes available. These channels differ greatly in the semantics of the
508 /// sender from asynchronous channels, however.
509 ///
510 /// This channel has an internal buffer on which messages will be queued. When
511 /// the internal buffer becomes full, future sends will *block* waiting for the
512 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
513 /// becomes  "rendezvous channel" where each send will not return until a recv
514 /// is paired with it.
515 ///
516 /// As with asynchronous channels, all senders will panic in `send` if the
517 /// `Receiver` has been destroyed.
518 ///
519 /// # Example
520 ///
521 /// ```
522 /// use std::sync::mpsc::sync_channel;
523 /// use std::thread::Thread;
524 ///
525 /// let (tx, rx) = sync_channel(1);
526 ///
527 /// // this returns immediately
528 /// tx.send(1i).unwrap();
529 ///
530 /// Thread::spawn(move|| {
531 ///     // this will block until the previous message has been received
532 ///     tx.send(2i).unwrap();
533 /// });
534 ///
535 /// assert_eq!(rx.recv().unwrap(), 1i);
536 /// assert_eq!(rx.recv().unwrap(), 2i);
537 /// ```
538 #[stable]
539 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
540     let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
541     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
542 }
543
544 ////////////////////////////////////////////////////////////////////////////////
545 // Sender
546 ////////////////////////////////////////////////////////////////////////////////
547
548 impl<T: Send> Sender<T> {
549     fn new(inner: Flavor<T>) -> Sender<T> {
550         Sender {
551             inner: UnsafeCell::new(inner),
552         }
553     }
554
555     /// Attempts to send a value on this channel, returning it back if it could
556     /// not be sent.
557     ///
558     /// A successful send occurs when it is determined that the other end of
559     /// the channel has not hung up already. An unsuccessful send would be one
560     /// where the corresponding receiver has already been deallocated. Note
561     /// that a return value of `Err` means that the data will never be
562     /// received, but a return value of `Ok` does *not* mean that the data
563     /// will be received.  It is possible for the corresponding receiver to
564     /// hang up immediately after this function returns `Ok`.
565     ///
566     /// This method will never block the current thread.
567     ///
568     /// # Example
569     ///
570     /// ```
571     /// use std::sync::mpsc::channel;
572     ///
573     /// let (tx, rx) = channel();
574     ///
575     /// // This send is always successful
576     /// tx.send(1i).unwrap();
577     ///
578     /// // This send will fail because the receiver is gone
579     /// drop(rx);
580     /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
581     /// ```
582     #[stable]
583     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
584         let (new_inner, ret) = match *unsafe { self.inner() } {
585             Flavor::Oneshot(ref p) => {
586                 unsafe {
587                     let p = p.get();
588                     if !(*p).sent() {
589                         return (*p).send(t).map_err(SendError);
590                     } else {
591                         let a =
592                             Arc::new(RacyCell::new(stream::Packet::new()));
593                         let rx = Receiver::new(Flavor::Stream(a.clone()));
594                         match (*p).upgrade(rx) {
595                             oneshot::UpSuccess => {
596                                 let ret = (*a.get()).send(t);
597                                 (a, ret)
598                             }
599                             oneshot::UpDisconnected => (a, Err(t)),
600                             oneshot::UpWoke(token) => {
601                                 // This send cannot panic because the thread is
602                                 // asleep (we're looking at it), so the receiver
603                                 // can't go away.
604                                 (*a.get()).send(t).ok().unwrap();
605                         token.signal();
606                                 (a, Ok(()))
607                             }
608                         }
609                     }
610                 }
611             }
612             Flavor::Stream(ref p) => return unsafe {
613                 (*p.get()).send(t).map_err(SendError)
614             },
615             Flavor::Shared(ref p) => return unsafe {
616                 (*p.get()).send(t).map_err(SendError)
617             },
618             Flavor::Sync(..) => unreachable!(),
619         };
620
621         unsafe {
622             let tmp = Sender::new(Flavor::Stream(new_inner));
623             mem::swap(self.inner_mut(), tmp.inner_mut());
624         }
625         ret.map_err(SendError)
626     }
627 }
628
629 #[stable]
630 impl<T: Send> Clone for Sender<T> {
631     fn clone(&self) -> Sender<T> {
632         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
633             Flavor::Oneshot(ref p) => {
634                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
635                 unsafe {
636                     let guard = (*a.get()).postinit_lock();
637                     let rx = Receiver::new(Flavor::Shared(a.clone()));
638                     match (*p.get()).upgrade(rx) {
639                         oneshot::UpSuccess |
640                         oneshot::UpDisconnected => (a, None, guard),
641                         oneshot::UpWoke(task) => (a, Some(task), guard)
642                     }
643                 }
644             }
645             Flavor::Stream(ref p) => {
646                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
647                 unsafe {
648                     let guard = (*a.get()).postinit_lock();
649                     let rx = Receiver::new(Flavor::Shared(a.clone()));
650                     match (*p.get()).upgrade(rx) {
651                         stream::UpSuccess |
652                         stream::UpDisconnected => (a, None, guard),
653                         stream::UpWoke(task) => (a, Some(task), guard),
654                     }
655                 }
656             }
657             Flavor::Shared(ref p) => {
658                 unsafe { (*p.get()).clone_chan(); }
659                 return Sender::new(Flavor::Shared(p.clone()));
660             }
661             Flavor::Sync(..) => unreachable!(),
662         };
663
664         unsafe {
665             (*packet.get()).inherit_blocker(sleeper, guard);
666
667             let tmp = Sender::new(Flavor::Shared(packet.clone()));
668             mem::swap(self.inner_mut(), tmp.inner_mut());
669         }
670         Sender::new(Flavor::Shared(packet))
671     }
672 }
673
674 #[unsafe_destructor]
675 #[stable]
676 impl<T: Send> Drop for Sender<T> {
677     fn drop(&mut self) {
678         match *unsafe { self.inner_mut() } {
679             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
680             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
681             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
682             Flavor::Sync(..) => unreachable!(),
683         }
684     }
685 }
686
687 ////////////////////////////////////////////////////////////////////////////////
688 // SyncSender
689 ////////////////////////////////////////////////////////////////////////////////
690
691 impl<T: Send> SyncSender<T> {
692     #[cfg(stage0)] // NOTE remove impl after next snapshot
693     fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
694         SyncSender { inner: inner, _marker: marker::NoSync }
695     }
696
697     #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
698     fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
699         SyncSender { inner: inner }
700     }
701
702     /// Sends a value on this synchronous channel.
703     ///
704     /// This function will *block* until space in the internal buffer becomes
705     /// available or a receiver is available to hand off the message to.
706     ///
707     /// Note that a successful send does *not* guarantee that the receiver will
708     /// ever see the data if there is a buffer on this channel. Items may be
709     /// enqueued in the internal buffer for the receiver to receive at a later
710     /// time. If the buffer size is 0, however, it can be guaranteed that the
711     /// receiver has indeed received the data if this function returns success.
712     ///
713     /// This function will never panic, but it may return `Err` if the
714     /// `Receiver` has disconnected and is no longer able to receive
715     /// information.
716     #[stable]
717     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
718         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
719     }
720
721     /// Attempts to send a value on this channel without blocking.
722     ///
723     /// This method differs from `send` by returning immediately if the
724     /// channel's buffer is full or no receiver is waiting to acquire some
725     /// data. Compared with `send`, this function has two failure cases
726     /// instead of one (one for disconnection, one for a full buffer).
727     ///
728     /// See `SyncSender::send` for notes about guarantees of whether the
729     /// receiver has received the data or not if this function is successful.
730     #[stable]
731     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
732         unsafe { (*self.inner.get()).try_send(t) }
733     }
734 }
735
736 #[stable]
737 impl<T: Send> Clone for SyncSender<T> {
738     fn clone(&self) -> SyncSender<T> {
739         unsafe { (*self.inner.get()).clone_chan(); }
740         return SyncSender::new(self.inner.clone());
741     }
742 }
743
744 #[unsafe_destructor]
745 #[stable]
746 impl<T: Send> Drop for SyncSender<T> {
747     fn drop(&mut self) {
748         unsafe { (*self.inner.get()).drop_chan(); }
749     }
750 }
751
752 ////////////////////////////////////////////////////////////////////////////////
753 // Receiver
754 ////////////////////////////////////////////////////////////////////////////////
755
756 impl<T: Send> Receiver<T> {
757     fn new(inner: Flavor<T>) -> Receiver<T> {
758         Receiver { inner: UnsafeCell::new(inner) }
759     }
760
761     /// Attempts to return a pending value on this receiver without blocking
762     ///
763     /// This method will never block the caller in order to wait for data to
764     /// become available. Instead, this will always return immediately with a
765     /// possible option of pending data on the channel.
766     ///
767     /// This is useful for a flavor of "optimistic check" before deciding to
768     /// block on a receiver.
769     #[stable]
770     pub fn try_recv(&self) -> Result<T, TryRecvError> {
771         loop {
772             let new_port = match *unsafe { self.inner() } {
773                 Flavor::Oneshot(ref p) => {
774                     match unsafe { (*p.get()).try_recv() } {
775                         Ok(t) => return Ok(t),
776                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
777                         Err(oneshot::Disconnected) => {
778                             return Err(TryRecvError::Disconnected)
779                         }
780                         Err(oneshot::Upgraded(rx)) => rx,
781                     }
782                 }
783                 Flavor::Stream(ref p) => {
784                     match unsafe { (*p.get()).try_recv() } {
785                         Ok(t) => return Ok(t),
786                         Err(stream::Empty) => return Err(TryRecvError::Empty),
787                         Err(stream::Disconnected) => {
788                             return Err(TryRecvError::Disconnected)
789                         }
790                         Err(stream::Upgraded(rx)) => rx,
791                     }
792                 }
793                 Flavor::Shared(ref p) => {
794                     match unsafe { (*p.get()).try_recv() } {
795                         Ok(t) => return Ok(t),
796                         Err(shared::Empty) => return Err(TryRecvError::Empty),
797                         Err(shared::Disconnected) => {
798                             return Err(TryRecvError::Disconnected)
799                         }
800                     }
801                 }
802                 Flavor::Sync(ref p) => {
803                     match unsafe { (*p.get()).try_recv() } {
804                         Ok(t) => return Ok(t),
805                         Err(sync::Empty) => return Err(TryRecvError::Empty),
806                         Err(sync::Disconnected) => {
807                             return Err(TryRecvError::Disconnected)
808                         }
809                     }
810                 }
811             };
812             unsafe {
813                 mem::swap(self.inner_mut(),
814                           new_port.inner_mut());
815             }
816         }
817     }
818
819     /// Attempt to wait for a value on this receiver, returning an error if the
820     /// corresponding channel has hung up.
821     ///
822     /// This function will always block the current thread if there is no data
823     /// available and it's possible for more data to be sent. Once a message is
824     /// sent to the corresponding `Sender`, then this receiver will wake up and
825     /// return that message.
826     ///
827     /// If the corresponding `Sender` has disconnected, or it disconnects while
828     /// this call is blocking, this call will wake up and return `Err` to
829     /// indicate that no more messages can ever be received on this channel.
830     #[stable]
831     pub fn recv(&self) -> Result<T, RecvError> {
832         loop {
833             let new_port = match *unsafe { self.inner() } {
834                 Flavor::Oneshot(ref p) => {
835                     match unsafe { (*p.get()).recv() } {
836                         Ok(t) => return Ok(t),
837                         Err(oneshot::Empty) => return unreachable!(),
838                         Err(oneshot::Disconnected) => return Err(RecvError),
839                         Err(oneshot::Upgraded(rx)) => rx,
840                     }
841                 }
842                 Flavor::Stream(ref p) => {
843                     match unsafe { (*p.get()).recv() } {
844                         Ok(t) => return Ok(t),
845                         Err(stream::Empty) => return unreachable!(),
846                         Err(stream::Disconnected) => return Err(RecvError),
847                         Err(stream::Upgraded(rx)) => rx,
848                     }
849                 }
850                 Flavor::Shared(ref p) => {
851                     match unsafe { (*p.get()).recv() } {
852                         Ok(t) => return Ok(t),
853                         Err(shared::Empty) => return unreachable!(),
854                         Err(shared::Disconnected) => return Err(RecvError),
855                     }
856                 }
857                 Flavor::Sync(ref p) => return unsafe {
858                     (*p.get()).recv().map_err(|()| RecvError)
859                 }
860             };
861             unsafe {
862                 mem::swap(self.inner_mut(), new_port.inner_mut());
863             }
864         }
865     }
866
867     /// Returns an iterator that will block waiting for messages, but never
868     /// `panic!`. It will return `None` when the channel has hung up.
869     #[stable]
870     pub fn iter(&self) -> Iter<T> {
871         Iter { rx: self }
872     }
873 }
874
875 impl<T: Send> select::Packet for Receiver<T> {
876     fn can_recv(&self) -> bool {
877         loop {
878             let new_port = match *unsafe { self.inner() } {
879                 Flavor::Oneshot(ref p) => {
880                     match unsafe { (*p.get()).can_recv() } {
881                         Ok(ret) => return ret,
882                         Err(upgrade) => upgrade,
883                     }
884                 }
885                 Flavor::Stream(ref p) => {
886                     match unsafe { (*p.get()).can_recv() } {
887                         Ok(ret) => return ret,
888                         Err(upgrade) => upgrade,
889                     }
890                 }
891                 Flavor::Shared(ref p) => {
892                     return unsafe { (*p.get()).can_recv() };
893                 }
894                 Flavor::Sync(ref p) => {
895                     return unsafe { (*p.get()).can_recv() };
896                 }
897             };
898             unsafe {
899                 mem::swap(self.inner_mut(),
900                           new_port.inner_mut());
901             }
902         }
903     }
904
905     fn start_selection(&self, mut token: SignalToken) -> StartResult {
906         loop {
907             let (t, new_port) = match *unsafe { self.inner() } {
908                 Flavor::Oneshot(ref p) => {
909                     match unsafe { (*p.get()).start_selection(token) } {
910                         oneshot::SelSuccess => return Installed,
911                         oneshot::SelCanceled => return Abort,
912                         oneshot::SelUpgraded(t, rx) => (t, rx),
913                     }
914                 }
915                 Flavor::Stream(ref p) => {
916                     match unsafe { (*p.get()).start_selection(token) } {
917                         stream::SelSuccess => return Installed,
918                         stream::SelCanceled => return Abort,
919                         stream::SelUpgraded(t, rx) => (t, rx),
920                     }
921                 }
922                 Flavor::Shared(ref p) => {
923                     return unsafe { (*p.get()).start_selection(token) };
924                 }
925                 Flavor::Sync(ref p) => {
926                     return unsafe { (*p.get()).start_selection(token) };
927                 }
928             };
929             token = t;
930             unsafe {
931                 mem::swap(self.inner_mut(), new_port.inner_mut());
932             }
933         }
934     }
935
936     fn abort_selection(&self) -> bool {
937         let mut was_upgrade = false;
938         loop {
939             let result = match *unsafe { self.inner() } {
940                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
941                 Flavor::Stream(ref p) => unsafe {
942                     (*p.get()).abort_selection(was_upgrade)
943                 },
944                 Flavor::Shared(ref p) => return unsafe {
945                     (*p.get()).abort_selection(was_upgrade)
946                 },
947                 Flavor::Sync(ref p) => return unsafe {
948                     (*p.get()).abort_selection()
949                 },
950             };
951             let new_port = match result { Ok(b) => return b, Err(p) => p };
952             was_upgrade = true;
953             unsafe {
954                 mem::swap(self.inner_mut(),
955                           new_port.inner_mut());
956             }
957         }
958     }
959 }
960
961 #[stable]
962 impl<'a, T: Send> Iterator for Iter<'a, T> {
963     type Item = T;
964
965     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
966 }
967
968 #[unsafe_destructor]
969 #[stable]
970 impl<T: Send> Drop for Receiver<T> {
971     fn drop(&mut self) {
972         match *unsafe { self.inner_mut() } {
973             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
974             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
975             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
976             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
977         }
978     }
979 }
980
981 /// A version of `UnsafeCell` intended for use in concurrent data
982 /// structures (for example, you might put it in an `Arc`).
983 struct RacyCell<T>(pub UnsafeCell<T>);
984
985 impl<T> RacyCell<T> {
986
987     fn new(value: T) -> RacyCell<T> {
988         RacyCell(UnsafeCell { value: value })
989     }
990
991     unsafe fn get(&self) -> *mut T {
992         self.0.get()
993     }
994
995 }
996
997 unsafe impl<T:Send> Send for RacyCell<T> { }
998
999 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
1000
1001 impl<T> fmt::Show for SendError<T> {
1002     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1003         "sending on a closed channel".fmt(f)
1004     }
1005 }
1006
1007 impl<T> fmt::Show for TrySendError<T> {
1008     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1009         match *self {
1010             TrySendError::Full(..) => {
1011                 "sending on a full channel".fmt(f)
1012             }
1013             TrySendError::Disconnected(..) => {
1014                 "sending on a closed channel".fmt(f)
1015             }
1016         }
1017     }
1018 }
1019
1020 impl fmt::Show for RecvError {
1021     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1022         "receiving on a closed channel".fmt(f)
1023     }
1024 }
1025
1026 impl fmt::Show for TryRecvError {
1027     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1028         match *self {
1029             TryRecvError::Empty => {
1030                 "receiving on an empty channel".fmt(f)
1031             }
1032             TryRecvError::Disconnected => {
1033                 "receiving on a closed channel".fmt(f)
1034             }
1035         }
1036     }
1037 }
1038
1039 #[cfg(test)]
1040 mod test {
1041     use prelude::v1::*;
1042
1043     use os;
1044     use super::*;
1045     use thread::Thread;
1046
1047     pub fn stress_factor() -> uint {
1048         match os::getenv("RUST_TEST_STRESS") {
1049             Some(val) => val.parse().unwrap(),
1050             None => 1,
1051         }
1052     }
1053
1054     #[test]
1055     fn smoke() {
1056         let (tx, rx) = channel::<int>();
1057         tx.send(1).unwrap();
1058         assert_eq!(rx.recv().unwrap(), 1);
1059     }
1060
1061     #[test]
1062     fn drop_full() {
1063         let (tx, _rx) = channel();
1064         tx.send(box 1i).unwrap();
1065     }
1066
1067     #[test]
1068     fn drop_full_shared() {
1069         let (tx, _rx) = channel();
1070         drop(tx.clone());
1071         drop(tx.clone());
1072         tx.send(box 1i).unwrap();
1073     }
1074
1075     #[test]
1076     fn smoke_shared() {
1077         let (tx, rx) = channel::<int>();
1078         tx.send(1).unwrap();
1079         assert_eq!(rx.recv().unwrap(), 1);
1080         let tx = tx.clone();
1081         tx.send(1).unwrap();
1082         assert_eq!(rx.recv().unwrap(), 1);
1083     }
1084
1085     #[test]
1086     fn smoke_threads() {
1087         let (tx, rx) = channel::<int>();
1088         let _t = Thread::spawn(move|| {
1089             tx.send(1).unwrap();
1090         });
1091         assert_eq!(rx.recv().unwrap(), 1);
1092     }
1093
1094     #[test]
1095     fn smoke_port_gone() {
1096         let (tx, rx) = channel::<int>();
1097         drop(rx);
1098         assert!(tx.send(1).is_err());
1099     }
1100
1101     #[test]
1102     fn smoke_shared_port_gone() {
1103         let (tx, rx) = channel::<int>();
1104         drop(rx);
1105         assert!(tx.send(1).is_err())
1106     }
1107
1108     #[test]
1109     fn smoke_shared_port_gone2() {
1110         let (tx, rx) = channel::<int>();
1111         drop(rx);
1112         let tx2 = tx.clone();
1113         drop(tx);
1114         assert!(tx2.send(1).is_err());
1115     }
1116
1117     #[test]
1118     fn port_gone_concurrent() {
1119         let (tx, rx) = channel::<int>();
1120         let _t = Thread::spawn(move|| {
1121             rx.recv().unwrap();
1122         });
1123         while tx.send(1).is_ok() {}
1124     }
1125
1126     #[test]
1127     fn port_gone_concurrent_shared() {
1128         let (tx, rx) = channel::<int>();
1129         let tx2 = tx.clone();
1130         let _t = Thread::spawn(move|| {
1131             rx.recv().unwrap();
1132         });
1133         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1134     }
1135
1136     #[test]
1137     fn smoke_chan_gone() {
1138         let (tx, rx) = channel::<int>();
1139         drop(tx);
1140         assert!(rx.recv().is_err());
1141     }
1142
1143     #[test]
1144     fn smoke_chan_gone_shared() {
1145         let (tx, rx) = channel::<()>();
1146         let tx2 = tx.clone();
1147         drop(tx);
1148         drop(tx2);
1149         assert!(rx.recv().is_err());
1150     }
1151
1152     #[test]
1153     fn chan_gone_concurrent() {
1154         let (tx, rx) = channel::<int>();
1155         let _t = Thread::spawn(move|| {
1156             tx.send(1).unwrap();
1157             tx.send(1).unwrap();
1158         });
1159         while rx.recv().is_ok() {}
1160     }
1161
1162     #[test]
1163     fn stress() {
1164         let (tx, rx) = channel::<int>();
1165         let t = Thread::scoped(move|| {
1166             for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1167         });
1168         for _ in range(0u, 10000) {
1169             assert_eq!(rx.recv().unwrap(), 1);
1170         }
1171         t.join().ok().unwrap();
1172     }
1173
1174     #[test]
1175     fn stress_shared() {
1176         static AMT: uint = 10000;
1177         static NTHREADS: uint = 8;
1178         let (tx, rx) = channel::<int>();
1179
1180         let t = Thread::scoped(move|| {
1181             for _ in range(0, AMT * NTHREADS) {
1182                 assert_eq!(rx.recv().unwrap(), 1);
1183             }
1184             match rx.try_recv() {
1185                 Ok(..) => panic!(),
1186                 _ => {}
1187             }
1188         });
1189
1190         for _ in range(0, NTHREADS) {
1191             let tx = tx.clone();
1192             Thread::spawn(move|| {
1193                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1194             });
1195         }
1196         drop(tx);
1197         t.join().ok().unwrap();
1198     }
1199
1200     #[test]
1201     fn send_from_outside_runtime() {
1202         let (tx1, rx1) = channel::<()>();
1203         let (tx2, rx2) = channel::<int>();
1204         let t1 = Thread::scoped(move|| {
1205             tx1.send(()).unwrap();
1206             for _ in range(0i, 40) {
1207                 assert_eq!(rx2.recv().unwrap(), 1);
1208             }
1209         });
1210         rx1.recv().unwrap();
1211         let t2 = Thread::scoped(move|| {
1212             for _ in range(0i, 40) {
1213                 tx2.send(1).unwrap();
1214             }
1215         });
1216         t1.join().ok().unwrap();
1217         t2.join().ok().unwrap();
1218     }
1219
1220     #[test]
1221     fn recv_from_outside_runtime() {
1222         let (tx, rx) = channel::<int>();
1223         let t = Thread::scoped(move|| {
1224             for _ in range(0i, 40) {
1225                 assert_eq!(rx.recv().unwrap(), 1);
1226             }
1227         });
1228         for _ in range(0u, 40) {
1229             tx.send(1).unwrap();
1230         }
1231         t.join().ok().unwrap();
1232     }
1233
1234     #[test]
1235     fn no_runtime() {
1236         let (tx1, rx1) = channel::<int>();
1237         let (tx2, rx2) = channel::<int>();
1238         let t1 = Thread::scoped(move|| {
1239             assert_eq!(rx1.recv().unwrap(), 1);
1240             tx2.send(2).unwrap();
1241         });
1242         let t2 = Thread::scoped(move|| {
1243             tx1.send(1).unwrap();
1244             assert_eq!(rx2.recv().unwrap(), 2);
1245         });
1246         t1.join().ok().unwrap();
1247         t2.join().ok().unwrap();
1248     }
1249
1250     #[test]
1251     fn oneshot_single_thread_close_port_first() {
1252         // Simple test of closing without sending
1253         let (_tx, rx) = channel::<int>();
1254         drop(rx);
1255     }
1256
1257     #[test]
1258     fn oneshot_single_thread_close_chan_first() {
1259         // Simple test of closing without sending
1260         let (tx, _rx) = channel::<int>();
1261         drop(tx);
1262     }
1263
1264     #[test]
1265     fn oneshot_single_thread_send_port_close() {
1266         // Testing that the sender cleans up the payload if receiver is closed
1267         let (tx, rx) = channel::<Box<int>>();
1268         drop(rx);
1269         assert!(tx.send(box 0).is_err());
1270     }
1271
1272     #[test]
1273     fn oneshot_single_thread_recv_chan_close() {
1274         // Receiving on a closed chan will panic
1275         let res = Thread::scoped(move|| {
1276             let (tx, rx) = channel::<int>();
1277             drop(tx);
1278             rx.recv().unwrap();
1279         }).join();
1280         // What is our res?
1281         assert!(res.is_err());
1282     }
1283
1284     #[test]
1285     fn oneshot_single_thread_send_then_recv() {
1286         let (tx, rx) = channel::<Box<int>>();
1287         tx.send(box 10).unwrap();
1288         assert!(rx.recv().unwrap() == box 10);
1289     }
1290
1291     #[test]
1292     fn oneshot_single_thread_try_send_open() {
1293         let (tx, rx) = channel::<int>();
1294         assert!(tx.send(10).is_ok());
1295         assert!(rx.recv().unwrap() == 10);
1296     }
1297
1298     #[test]
1299     fn oneshot_single_thread_try_send_closed() {
1300         let (tx, rx) = channel::<int>();
1301         drop(rx);
1302         assert!(tx.send(10).is_err());
1303     }
1304
1305     #[test]
1306     fn oneshot_single_thread_try_recv_open() {
1307         let (tx, rx) = channel::<int>();
1308         tx.send(10).unwrap();
1309         assert!(rx.recv() == Ok(10));
1310     }
1311
1312     #[test]
1313     fn oneshot_single_thread_try_recv_closed() {
1314         let (tx, rx) = channel::<int>();
1315         drop(tx);
1316         assert!(rx.recv().is_err());
1317     }
1318
1319     #[test]
1320     fn oneshot_single_thread_peek_data() {
1321         let (tx, rx) = channel::<int>();
1322         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323         tx.send(10).unwrap();
1324         assert_eq!(rx.try_recv(), Ok(10));
1325     }
1326
1327     #[test]
1328     fn oneshot_single_thread_peek_close() {
1329         let (tx, rx) = channel::<int>();
1330         drop(tx);
1331         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1332         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1333     }
1334
1335     #[test]
1336     fn oneshot_single_thread_peek_open() {
1337         let (_tx, rx) = channel::<int>();
1338         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1339     }
1340
1341     #[test]
1342     fn oneshot_multi_task_recv_then_send() {
1343         let (tx, rx) = channel::<Box<int>>();
1344         let _t = Thread::spawn(move|| {
1345             assert!(rx.recv().unwrap() == box 10);
1346         });
1347
1348         tx.send(box 10).unwrap();
1349     }
1350
1351     #[test]
1352     fn oneshot_multi_task_recv_then_close() {
1353         let (tx, rx) = channel::<Box<int>>();
1354         let _t = Thread::spawn(move|| {
1355             drop(tx);
1356         });
1357         let res = Thread::scoped(move|| {
1358             assert!(rx.recv().unwrap() == box 10);
1359         }).join();
1360         assert!(res.is_err());
1361     }
1362
1363     #[test]
1364     fn oneshot_multi_thread_close_stress() {
1365         for _ in range(0, stress_factor()) {
1366             let (tx, rx) = channel::<int>();
1367             let _t = Thread::spawn(move|| {
1368                 drop(rx);
1369             });
1370             drop(tx);
1371         }
1372     }
1373
1374     #[test]
1375     fn oneshot_multi_thread_send_close_stress() {
1376         for _ in range(0, stress_factor()) {
1377             let (tx, rx) = channel::<int>();
1378             let _t = Thread::spawn(move|| {
1379                 drop(rx);
1380             });
1381             let _ = Thread::scoped(move|| {
1382                 tx.send(1).unwrap();
1383             }).join();
1384         }
1385     }
1386
1387     #[test]
1388     fn oneshot_multi_thread_recv_close_stress() {
1389         for _ in range(0, stress_factor()) {
1390             let (tx, rx) = channel::<int>();
1391             Thread::spawn(move|| {
1392                 let res = Thread::scoped(move|| {
1393                     rx.recv().unwrap();
1394                 }).join();
1395                 assert!(res.is_err());
1396             });
1397             let _t = Thread::spawn(move|| {
1398                 Thread::spawn(move|| {
1399                     drop(tx);
1400                 });
1401             });
1402         }
1403     }
1404
1405     #[test]
1406     fn oneshot_multi_thread_send_recv_stress() {
1407         for _ in range(0, stress_factor()) {
1408             let (tx, rx) = channel();
1409             let _t = Thread::spawn(move|| {
1410                 tx.send(box 10i).unwrap();
1411             });
1412             assert!(rx.recv().unwrap() == box 10i);
1413         }
1414     }
1415
1416     #[test]
1417     fn stream_send_recv_stress() {
1418         for _ in range(0, stress_factor()) {
1419             let (tx, rx) = channel();
1420
1421             send(tx, 0);
1422             recv(rx, 0);
1423
1424             fn send(tx: Sender<Box<int>>, i: int) {
1425                 if i == 10 { return }
1426
1427                 Thread::spawn(move|| {
1428                     tx.send(box i).unwrap();
1429                     send(tx, i + 1);
1430                 });
1431             }
1432
1433             fn recv(rx: Receiver<Box<int>>, i: int) {
1434                 if i == 10 { return }
1435
1436                 Thread::spawn(move|| {
1437                     assert!(rx.recv().unwrap() == box i);
1438                     recv(rx, i + 1);
1439                 });
1440             }
1441         }
1442     }
1443
1444     #[test]
1445     fn recv_a_lot() {
1446         // Regression test that we don't run out of stack in scheduler context
1447         let (tx, rx) = channel();
1448         for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1449         for _ in range(0i, 10000) { rx.recv().unwrap(); }
1450     }
1451
1452     #[test]
1453     fn shared_chan_stress() {
1454         let (tx, rx) = channel();
1455         let total = stress_factor() + 100;
1456         for _ in range(0, total) {
1457             let tx = tx.clone();
1458             Thread::spawn(move|| {
1459                 tx.send(()).unwrap();
1460             });
1461         }
1462
1463         for _ in range(0, total) {
1464             rx.recv().unwrap();
1465         }
1466     }
1467
1468     #[test]
1469     fn test_nested_recv_iter() {
1470         let (tx, rx) = channel::<int>();
1471         let (total_tx, total_rx) = channel::<int>();
1472
1473         let _t = Thread::spawn(move|| {
1474             let mut acc = 0;
1475             for x in rx.iter() {
1476                 acc += x;
1477             }
1478             total_tx.send(acc).unwrap();
1479         });
1480
1481         tx.send(3).unwrap();
1482         tx.send(1).unwrap();
1483         tx.send(2).unwrap();
1484         drop(tx);
1485         assert_eq!(total_rx.recv().unwrap(), 6);
1486     }
1487
1488     #[test]
1489     fn test_recv_iter_break() {
1490         let (tx, rx) = channel::<int>();
1491         let (count_tx, count_rx) = channel();
1492
1493         let _t = Thread::spawn(move|| {
1494             let mut count = 0;
1495             for x in rx.iter() {
1496                 if count >= 3 {
1497                     break;
1498                 } else {
1499                     count += x;
1500                 }
1501             }
1502             count_tx.send(count).unwrap();
1503         });
1504
1505         tx.send(2).unwrap();
1506         tx.send(2).unwrap();
1507         tx.send(2).unwrap();
1508         let _ = tx.send(2);
1509         drop(tx);
1510         assert_eq!(count_rx.recv().unwrap(), 4);
1511     }
1512
1513     #[test]
1514     fn try_recv_states() {
1515         let (tx1, rx1) = channel::<int>();
1516         let (tx2, rx2) = channel::<()>();
1517         let (tx3, rx3) = channel::<()>();
1518         let _t = Thread::spawn(move|| {
1519             rx2.recv().unwrap();
1520             tx1.send(1).unwrap();
1521             tx3.send(()).unwrap();
1522             rx2.recv().unwrap();
1523             drop(tx1);
1524             tx3.send(()).unwrap();
1525         });
1526
1527         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1528         tx2.send(()).unwrap();
1529         rx3.recv().unwrap();
1530         assert_eq!(rx1.try_recv(), Ok(1));
1531         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1532         tx2.send(()).unwrap();
1533         rx3.recv().unwrap();
1534         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1535     }
1536
1537     // This bug used to end up in a livelock inside of the Receiver destructor
1538     // because the internal state of the Shared packet was corrupted
1539     #[test]
1540     fn destroy_upgraded_shared_port_when_sender_still_active() {
1541         let (tx, rx) = channel();
1542         let (tx2, rx2) = channel();
1543         let _t = Thread::spawn(move|| {
1544             rx.recv().unwrap(); // wait on a oneshot
1545             drop(rx);  // destroy a shared
1546             tx2.send(()).unwrap();
1547         });
1548         // make sure the other task has gone to sleep
1549         for _ in range(0u, 5000) { Thread::yield_now(); }
1550
1551         // upgrade to a shared chan and send a message
1552         let t = tx.clone();
1553         drop(tx);
1554         t.send(()).unwrap();
1555
1556         // wait for the child task to exit before we exit
1557         rx2.recv().unwrap();
1558     }
1559 }
1560
1561 #[cfg(test)]
1562 mod sync_tests {
1563     use prelude::v1::*;
1564
1565     use os;
1566     use thread::Thread;
1567     use super::*;
1568
1569     pub fn stress_factor() -> uint {
1570         match os::getenv("RUST_TEST_STRESS") {
1571             Some(val) => val.parse().unwrap(),
1572             None => 1,
1573         }
1574     }
1575
1576     #[test]
1577     fn smoke() {
1578         let (tx, rx) = sync_channel::<int>(1);
1579         tx.send(1).unwrap();
1580         assert_eq!(rx.recv().unwrap(), 1);
1581     }
1582
1583     #[test]
1584     fn drop_full() {
1585         let (tx, _rx) = sync_channel(1);
1586         tx.send(box 1i).unwrap();
1587     }
1588
1589     #[test]
1590     fn smoke_shared() {
1591         let (tx, rx) = sync_channel::<int>(1);
1592         tx.send(1).unwrap();
1593         assert_eq!(rx.recv().unwrap(), 1);
1594         let tx = tx.clone();
1595         tx.send(1).unwrap();
1596         assert_eq!(rx.recv().unwrap(), 1);
1597     }
1598
1599     #[test]
1600     fn smoke_threads() {
1601         let (tx, rx) = sync_channel::<int>(0);
1602         let _t = Thread::spawn(move|| {
1603             tx.send(1).unwrap();
1604         });
1605         assert_eq!(rx.recv().unwrap(), 1);
1606     }
1607
1608     #[test]
1609     fn smoke_port_gone() {
1610         let (tx, rx) = sync_channel::<int>(0);
1611         drop(rx);
1612         assert!(tx.send(1).is_err());
1613     }
1614
1615     #[test]
1616     fn smoke_shared_port_gone2() {
1617         let (tx, rx) = sync_channel::<int>(0);
1618         drop(rx);
1619         let tx2 = tx.clone();
1620         drop(tx);
1621         assert!(tx2.send(1).is_err());
1622     }
1623
1624     #[test]
1625     fn port_gone_concurrent() {
1626         let (tx, rx) = sync_channel::<int>(0);
1627         let _t = Thread::spawn(move|| {
1628             rx.recv().unwrap();
1629         });
1630         while tx.send(1).is_ok() {}
1631     }
1632
1633     #[test]
1634     fn port_gone_concurrent_shared() {
1635         let (tx, rx) = sync_channel::<int>(0);
1636         let tx2 = tx.clone();
1637         let _t = Thread::spawn(move|| {
1638             rx.recv().unwrap();
1639         });
1640         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1641     }
1642
1643     #[test]
1644     fn smoke_chan_gone() {
1645         let (tx, rx) = sync_channel::<int>(0);
1646         drop(tx);
1647         assert!(rx.recv().is_err());
1648     }
1649
1650     #[test]
1651     fn smoke_chan_gone_shared() {
1652         let (tx, rx) = sync_channel::<()>(0);
1653         let tx2 = tx.clone();
1654         drop(tx);
1655         drop(tx2);
1656         assert!(rx.recv().is_err());
1657     }
1658
1659     #[test]
1660     fn chan_gone_concurrent() {
1661         let (tx, rx) = sync_channel::<int>(0);
1662         Thread::spawn(move|| {
1663             tx.send(1).unwrap();
1664             tx.send(1).unwrap();
1665         });
1666         while rx.recv().is_ok() {}
1667     }
1668
1669     #[test]
1670     fn stress() {
1671         let (tx, rx) = sync_channel::<int>(0);
1672         Thread::spawn(move|| {
1673             for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1674         });
1675         for _ in range(0u, 10000) {
1676             assert_eq!(rx.recv().unwrap(), 1);
1677         }
1678     }
1679
1680     #[test]
1681     fn stress_shared() {
1682         static AMT: uint = 1000;
1683         static NTHREADS: uint = 8;
1684         let (tx, rx) = sync_channel::<int>(0);
1685         let (dtx, drx) = sync_channel::<()>(0);
1686
1687         Thread::spawn(move|| {
1688             for _ in range(0, AMT * NTHREADS) {
1689                 assert_eq!(rx.recv().unwrap(), 1);
1690             }
1691             match rx.try_recv() {
1692                 Ok(..) => panic!(),
1693                 _ => {}
1694             }
1695             dtx.send(()).unwrap();
1696         });
1697
1698         for _ in range(0, NTHREADS) {
1699             let tx = tx.clone();
1700             Thread::spawn(move|| {
1701                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1702             });
1703         }
1704         drop(tx);
1705         drx.recv().unwrap();
1706     }
1707
1708     #[test]
1709     fn oneshot_single_thread_close_port_first() {
1710         // Simple test of closing without sending
1711         let (_tx, rx) = sync_channel::<int>(0);
1712         drop(rx);
1713     }
1714
1715     #[test]
1716     fn oneshot_single_thread_close_chan_first() {
1717         // Simple test of closing without sending
1718         let (tx, _rx) = sync_channel::<int>(0);
1719         drop(tx);
1720     }
1721
1722     #[test]
1723     fn oneshot_single_thread_send_port_close() {
1724         // Testing that the sender cleans up the payload if receiver is closed
1725         let (tx, rx) = sync_channel::<Box<int>>(0);
1726         drop(rx);
1727         assert!(tx.send(box 0).is_err());
1728     }
1729
1730     #[test]
1731     fn oneshot_single_thread_recv_chan_close() {
1732         // Receiving on a closed chan will panic
1733         let res = Thread::scoped(move|| {
1734             let (tx, rx) = sync_channel::<int>(0);
1735             drop(tx);
1736             rx.recv().unwrap();
1737         }).join();
1738         // What is our res?
1739         assert!(res.is_err());
1740     }
1741
1742     #[test]
1743     fn oneshot_single_thread_send_then_recv() {
1744         let (tx, rx) = sync_channel::<Box<int>>(1);
1745         tx.send(box 10).unwrap();
1746         assert!(rx.recv().unwrap() == box 10);
1747     }
1748
1749     #[test]
1750     fn oneshot_single_thread_try_send_open() {
1751         let (tx, rx) = sync_channel::<int>(1);
1752         assert_eq!(tx.try_send(10), Ok(()));
1753         assert!(rx.recv().unwrap() == 10);
1754     }
1755
1756     #[test]
1757     fn oneshot_single_thread_try_send_closed() {
1758         let (tx, rx) = sync_channel::<int>(0);
1759         drop(rx);
1760         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1761     }
1762
1763     #[test]
1764     fn oneshot_single_thread_try_send_closed2() {
1765         let (tx, _rx) = sync_channel::<int>(0);
1766         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1767     }
1768
1769     #[test]
1770     fn oneshot_single_thread_try_recv_open() {
1771         let (tx, rx) = sync_channel::<int>(1);
1772         tx.send(10).unwrap();
1773         assert!(rx.recv() == Ok(10));
1774     }
1775
1776     #[test]
1777     fn oneshot_single_thread_try_recv_closed() {
1778         let (tx, rx) = sync_channel::<int>(0);
1779         drop(tx);
1780         assert!(rx.recv().is_err());
1781     }
1782
1783     #[test]
1784     fn oneshot_single_thread_peek_data() {
1785         let (tx, rx) = sync_channel::<int>(1);
1786         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1787         tx.send(10).unwrap();
1788         assert_eq!(rx.try_recv(), Ok(10));
1789     }
1790
1791     #[test]
1792     fn oneshot_single_thread_peek_close() {
1793         let (tx, rx) = sync_channel::<int>(0);
1794         drop(tx);
1795         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1796         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1797     }
1798
1799     #[test]
1800     fn oneshot_single_thread_peek_open() {
1801         let (_tx, rx) = sync_channel::<int>(0);
1802         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1803     }
1804
1805     #[test]
1806     fn oneshot_multi_task_recv_then_send() {
1807         let (tx, rx) = sync_channel::<Box<int>>(0);
1808         let _t = Thread::spawn(move|| {
1809             assert!(rx.recv().unwrap() == box 10);
1810         });
1811
1812         tx.send(box 10).unwrap();
1813     }
1814
1815     #[test]
1816     fn oneshot_multi_task_recv_then_close() {
1817         let (tx, rx) = sync_channel::<Box<int>>(0);
1818         let _t = Thread::spawn(move|| {
1819             drop(tx);
1820         });
1821         let res = Thread::scoped(move|| {
1822             assert!(rx.recv().unwrap() == box 10);
1823         }).join();
1824         assert!(res.is_err());
1825     }
1826
1827     #[test]
1828     fn oneshot_multi_thread_close_stress() {
1829         for _ in range(0, stress_factor()) {
1830             let (tx, rx) = sync_channel::<int>(0);
1831             let _t = Thread::spawn(move|| {
1832                 drop(rx);
1833             });
1834             drop(tx);
1835         }
1836     }
1837
1838     #[test]
1839     fn oneshot_multi_thread_send_close_stress() {
1840         for _ in range(0, stress_factor()) {
1841             let (tx, rx) = sync_channel::<int>(0);
1842             let _t = Thread::spawn(move|| {
1843                 drop(rx);
1844             });
1845             let _ = Thread::scoped(move || {
1846                 tx.send(1).unwrap();
1847             }).join();
1848         }
1849     }
1850
1851     #[test]
1852     fn oneshot_multi_thread_recv_close_stress() {
1853         for _ in range(0, stress_factor()) {
1854             let (tx, rx) = sync_channel::<int>(0);
1855             let _t = Thread::spawn(move|| {
1856                 let res = Thread::scoped(move|| {
1857                     rx.recv().unwrap();
1858                 }).join();
1859                 assert!(res.is_err());
1860             });
1861             let _t = Thread::spawn(move|| {
1862                 Thread::spawn(move|| {
1863                     drop(tx);
1864                 });
1865             });
1866         }
1867     }
1868
1869     #[test]
1870     fn oneshot_multi_thread_send_recv_stress() {
1871         for _ in range(0, stress_factor()) {
1872             let (tx, rx) = sync_channel::<Box<int>>(0);
1873             let _t = Thread::spawn(move|| {
1874                 tx.send(box 10i).unwrap();
1875             });
1876             assert!(rx.recv().unwrap() == box 10i);
1877         }
1878     }
1879
1880     #[test]
1881     fn stream_send_recv_stress() {
1882         for _ in range(0, stress_factor()) {
1883             let (tx, rx) = sync_channel::<Box<int>>(0);
1884
1885             send(tx, 0);
1886             recv(rx, 0);
1887
1888             fn send(tx: SyncSender<Box<int>>, i: int) {
1889                 if i == 10 { return }
1890
1891                 Thread::spawn(move|| {
1892                     tx.send(box i).unwrap();
1893                     send(tx, i + 1);
1894                 });
1895             }
1896
1897             fn recv(rx: Receiver<Box<int>>, i: int) {
1898                 if i == 10 { return }
1899
1900                 Thread::spawn(move|| {
1901                     assert!(rx.recv().unwrap() == box i);
1902                     recv(rx, i + 1);
1903                 });
1904             }
1905         }
1906     }
1907
1908     #[test]
1909     fn recv_a_lot() {
1910         // Regression test that we don't run out of stack in scheduler context
1911         let (tx, rx) = sync_channel(10000);
1912         for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1913         for _ in range(0u, 10000) { rx.recv().unwrap(); }
1914     }
1915
1916     #[test]
1917     fn shared_chan_stress() {
1918         let (tx, rx) = sync_channel(0);
1919         let total = stress_factor() + 100;
1920         for _ in range(0, total) {
1921             let tx = tx.clone();
1922             Thread::spawn(move|| {
1923                 tx.send(()).unwrap();
1924             });
1925         }
1926
1927         for _ in range(0, total) {
1928             rx.recv().unwrap();
1929         }
1930     }
1931
1932     #[test]
1933     fn test_nested_recv_iter() {
1934         let (tx, rx) = sync_channel::<int>(0);
1935         let (total_tx, total_rx) = sync_channel::<int>(0);
1936
1937         let _t = Thread::spawn(move|| {
1938             let mut acc = 0;
1939             for x in rx.iter() {
1940                 acc += x;
1941             }
1942             total_tx.send(acc).unwrap();
1943         });
1944
1945         tx.send(3).unwrap();
1946         tx.send(1).unwrap();
1947         tx.send(2).unwrap();
1948         drop(tx);
1949         assert_eq!(total_rx.recv().unwrap(), 6);
1950     }
1951
1952     #[test]
1953     fn test_recv_iter_break() {
1954         let (tx, rx) = sync_channel::<int>(0);
1955         let (count_tx, count_rx) = sync_channel(0);
1956
1957         let _t = Thread::spawn(move|| {
1958             let mut count = 0;
1959             for x in rx.iter() {
1960                 if count >= 3 {
1961                     break;
1962                 } else {
1963                     count += x;
1964                 }
1965             }
1966             count_tx.send(count).unwrap();
1967         });
1968
1969         tx.send(2).unwrap();
1970         tx.send(2).unwrap();
1971         tx.send(2).unwrap();
1972         let _ = tx.try_send(2);
1973         drop(tx);
1974         assert_eq!(count_rx.recv().unwrap(), 4);
1975     }
1976
1977     #[test]
1978     fn try_recv_states() {
1979         let (tx1, rx1) = sync_channel::<int>(1);
1980         let (tx2, rx2) = sync_channel::<()>(1);
1981         let (tx3, rx3) = sync_channel::<()>(1);
1982         let _t = Thread::spawn(move|| {
1983             rx2.recv().unwrap();
1984             tx1.send(1).unwrap();
1985             tx3.send(()).unwrap();
1986             rx2.recv().unwrap();
1987             drop(tx1);
1988             tx3.send(()).unwrap();
1989         });
1990
1991         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1992         tx2.send(()).unwrap();
1993         rx3.recv().unwrap();
1994         assert_eq!(rx1.try_recv(), Ok(1));
1995         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1996         tx2.send(()).unwrap();
1997         rx3.recv().unwrap();
1998         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1999     }
2000
2001     // This bug used to end up in a livelock inside of the Receiver destructor
2002     // because the internal state of the Shared packet was corrupted
2003     #[test]
2004     fn destroy_upgraded_shared_port_when_sender_still_active() {
2005         let (tx, rx) = sync_channel::<()>(0);
2006         let (tx2, rx2) = sync_channel::<()>(0);
2007         let _t = Thread::spawn(move|| {
2008             rx.recv().unwrap(); // wait on a oneshot
2009             drop(rx);  // destroy a shared
2010             tx2.send(()).unwrap();
2011         });
2012         // make sure the other task has gone to sleep
2013         for _ in range(0u, 5000) { Thread::yield_now(); }
2014
2015         // upgrade to a shared chan and send a message
2016         let t = tx.clone();
2017         drop(tx);
2018         t.send(()).unwrap();
2019
2020         // wait for the child task to exit before we exit
2021         rx2.recv().unwrap();
2022     }
2023
2024     #[test]
2025     fn send1() {
2026         let (tx, rx) = sync_channel::<int>(0);
2027         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2028         assert_eq!(tx.send(1), Ok(()));
2029     }
2030
2031     #[test]
2032     fn send2() {
2033         let (tx, rx) = sync_channel::<int>(0);
2034         let _t = Thread::spawn(move|| { drop(rx); });
2035         assert!(tx.send(1).is_err());
2036     }
2037
2038     #[test]
2039     fn send3() {
2040         let (tx, rx) = sync_channel::<int>(1);
2041         assert_eq!(tx.send(1), Ok(()));
2042         let _t =Thread::spawn(move|| { drop(rx); });
2043         assert!(tx.send(1).is_err());
2044     }
2045
2046     #[test]
2047     fn send4() {
2048         let (tx, rx) = sync_channel::<int>(0);
2049         let tx2 = tx.clone();
2050         let (done, donerx) = channel();
2051         let done2 = done.clone();
2052         let _t = Thread::spawn(move|| {
2053             assert!(tx.send(1).is_err());
2054             done.send(()).unwrap();
2055         });
2056         let _t = Thread::spawn(move|| {
2057             assert!(tx2.send(2).is_err());
2058             done2.send(()).unwrap();
2059         });
2060         drop(rx);
2061         donerx.recv().unwrap();
2062         donerx.recv().unwrap();
2063     }
2064
2065     #[test]
2066     fn try_send1() {
2067         let (tx, _rx) = sync_channel::<int>(0);
2068         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2069     }
2070
2071     #[test]
2072     fn try_send2() {
2073         let (tx, _rx) = sync_channel::<int>(1);
2074         assert_eq!(tx.try_send(1), Ok(()));
2075         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2076     }
2077
2078     #[test]
2079     fn try_send3() {
2080         let (tx, rx) = sync_channel::<int>(1);
2081         assert_eq!(tx.try_send(1), Ok(()));
2082         drop(rx);
2083         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2084     }
2085
2086     #[test]
2087     fn issue_15761() {
2088         fn repro() {
2089             let (tx1, rx1) = sync_channel::<()>(3);
2090             let (tx2, rx2) = sync_channel::<()>(3);
2091
2092             let _t = Thread::spawn(move|| {
2093                 rx1.recv().unwrap();
2094                 tx2.try_send(()).unwrap();
2095             });
2096
2097             tx1.try_send(()).unwrap();
2098             rx2.recv().unwrap();
2099         }
2100
2101         for _ in range(0u, 100) {
2102             repro()
2103         }
2104     }
2105 }