]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Mention the queueueue-ness of mpsc.
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::old_io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::old_io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 #![stable(feature = "rust1", since = "1.0.0")]
167
168 // A description of how Rust's channel implementation works
169 //
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
173 //
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
179 //
180 // ## Flavors of channels
181 //
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
185 // channels in play.
186 //
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 //              They contain as few atomics as possible and involve one and
189 //              exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 //             use a different concurrent queue that is more tailored for this
192 //             use case. The initial allocation of this flavor of channel is not
193 //             optimized.
194 // * Shared - this is the most general form of channel that this module offers,
195 //            a channel with multiple senders. This type is as optimized as it
196 //            can be, but the previous two types mentioned are much faster for
197 //            their use-cases.
198 //
199 // ## Concurrent queues
200 //
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
204 //
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // shared channels.
210 //
211 // ### SPSC optimizations
212 //
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
217 // pop().
218 //
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
224 //
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
228 // coexist at once.
229 //
230 // ### MPSC optimizations
231 //
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
235 //
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
239 //
240 // ## Overview of the Implementation
241 //
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
245 //
246 //
247 //      send(t)                             recv()
248 //        queue.push(t)                       return if queue.pop()
249 //        if increment() == -1                deschedule {
250 //          wakeup()                            if decrement() > 0
251 //                                                cancel_deschedule()
252 //                                            }
253 //                                            queue.pop()
254 //
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
257 //
258 // ### The internal atomic counter
259 //
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
263 //
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
268 //
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
275 //
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
279 //
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
282 //
283 // ## Native Implementation
284 //
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
288 //
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
294 //
295 // ## Select
296 //
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
300 //
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // task to unblock.
307 //
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
312 //
313 // # Conclusion
314 //
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
317
318 use prelude::v1::*;
319
320 use sync::Arc;
321 use fmt;
322 use mem;
323 use cell::UnsafeCell;
324
325 pub use self::select::{Select, Handle};
326 use self::select::StartResult;
327 use self::select::StartResult::*;
328 use self::blocking::SignalToken;
329
330 mod blocking;
331 mod oneshot;
332 mod select;
333 mod shared;
334 mod stream;
335 mod sync;
336 mod mpsc_queue;
337 mod spsc_queue;
338
339 /// The receiving-half of Rust's channel type. This half can only be owned by
340 /// one task
341 #[stable(feature = "rust1", since = "1.0.0")]
342 pub struct Receiver<T> {
343     inner: UnsafeCell<Flavor<T>>,
344 }
345
346 // The receiver port can be sent from place to place, so long as it
347 // is not used to receive non-sendable things.
348 unsafe impl<T:Send> Send for Receiver<T> { }
349
350 /// An iterator over messages on a receiver, this iterator will block
351 /// whenever `next` is called, waiting for a new message, and `None` will be
352 /// returned when the corresponding channel has hung up.
353 #[stable(feature = "rust1", since = "1.0.0")]
354 pub struct Iter<'a, T:'a> {
355     rx: &'a Receiver<T>
356 }
357
358 /// The sending-half of Rust's asynchronous channel type. This half can only be
359 /// owned by one task, but it can be cloned to send to other tasks.
360 #[stable(feature = "rust1", since = "1.0.0")]
361 pub struct Sender<T> {
362     inner: UnsafeCell<Flavor<T>>,
363 }
364
365 // The send port can be sent from place to place, so long as it
366 // is not used to send non-sendable things.
367 unsafe impl<T:Send> Send for Sender<T> { }
368
369 /// The sending-half of Rust's synchronous channel type. This half can only be
370 /// owned by one task, but it can be cloned to send to other tasks.
371 #[stable(feature = "rust1", since = "1.0.0")]
372 pub struct SyncSender<T> {
373     inner: Arc<UnsafeCell<sync::Packet<T>>>,
374 }
375
376 unsafe impl<T:Send> Send for SyncSender<T> {}
377
378 impl<T> !Sync for SyncSender<T> {}
379
380 /// An error returned from the `send` function on channels.
381 ///
382 /// A `send` operation can only fail if the receiving end of a channel is
383 /// disconnected, implying that the data could never be received. The error
384 /// contains the data being sent as a payload so it can be recovered.
385 #[stable(feature = "rust1", since = "1.0.0")]
386 #[derive(PartialEq, Eq, Clone, Copy)]
387 pub struct SendError<T>(pub T);
388
389 /// An error returned from the `recv` function on a `Receiver`.
390 ///
391 /// The `recv` operation can only fail if the sending half of a channel is
392 /// disconnected, implying that no further messages will ever be received.
393 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
394 #[stable(feature = "rust1", since = "1.0.0")]
395 pub struct RecvError;
396
397 /// This enumeration is the list of the possible reasons that try_recv could not
398 /// return data when called.
399 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
400 #[stable(feature = "rust1", since = "1.0.0")]
401 pub enum TryRecvError {
402     /// This channel is currently empty, but the sender(s) have not yet
403     /// disconnected, so data may yet become available.
404     #[stable(feature = "rust1", since = "1.0.0")]
405     Empty,
406
407     /// This channel's sending half has become disconnected, and there will
408     /// never be any more data received on this channel
409     #[stable(feature = "rust1", since = "1.0.0")]
410     Disconnected,
411 }
412
413 /// This enumeration is the list of the possible error outcomes for the
414 /// `SyncSender::try_send` method.
415 #[stable(feature = "rust1", since = "1.0.0")]
416 #[derive(PartialEq, Eq, Clone, Copy)]
417 pub enum TrySendError<T> {
418     /// The data could not be sent on the channel because it would require that
419     /// the callee block to send the data.
420     ///
421     /// If this is a buffered channel, then the buffer is full at this time. If
422     /// this is not a buffered channel, then there is no receiver available to
423     /// acquire the data.
424     #[stable(feature = "rust1", since = "1.0.0")]
425     Full(T),
426
427     /// This channel's receiving half has disconnected, so the data could not be
428     /// sent. The data is returned back to the callee in this case.
429     #[stable(feature = "rust1", since = "1.0.0")]
430     Disconnected(T),
431 }
432
433 enum Flavor<T> {
434     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
435     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
436     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
437     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
438 }
439
440 #[doc(hidden)]
441 trait UnsafeFlavor<T> {
442     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
443     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
444         &mut *self.inner_unsafe().get()
445     }
446     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
447         &*self.inner_unsafe().get()
448     }
449 }
450 impl<T> UnsafeFlavor<T> for Sender<T> {
451     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
452         &self.inner
453     }
454 }
455 impl<T> UnsafeFlavor<T> for Receiver<T> {
456     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
457         &self.inner
458     }
459 }
460
461 /// Creates a new asynchronous channel, returning the sender/receiver halves.
462 ///
463 /// All data sent on the sender will become available on the receiver, and no
464 /// send will block the calling task (this channel has an "infinite buffer").
465 ///
466 /// # Example
467 ///
468 /// ```
469 /// use std::sync::mpsc::channel;
470 /// use std::thread::Thread;
471 ///
472 /// // tx is is the sending half (tx for transmission), and rx is the receiving
473 /// // half (rx for receiving).
474 /// let (tx, rx) = channel();
475 ///
476 /// // Spawn off an expensive computation
477 /// Thread::spawn(move|| {
478 /// #   fn expensive_computation() {}
479 ///     tx.send(expensive_computation()).unwrap();
480 /// });
481 ///
482 /// // Do some useful work for awhile
483 ///
484 /// // Let's see what that answer was
485 /// println!("{:?}", rx.recv().unwrap());
486 /// ```
487 #[stable(feature = "rust1", since = "1.0.0")]
488 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
489     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
490     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
491 }
492
493 /// Creates a new synchronous, bounded channel.
494 ///
495 /// Like asynchronous channels, the `Receiver` will block until a message
496 /// becomes available. These channels differ greatly in the semantics of the
497 /// sender from asynchronous channels, however.
498 ///
499 /// This channel has an internal buffer on which messages will be queued. When
500 /// the internal buffer becomes full, future sends will *block* waiting for the
501 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
502 /// becomes  "rendezvous channel" where each send will not return until a recv
503 /// is paired with it.
504 ///
505 /// As with asynchronous channels, all senders will panic in `send` if the
506 /// `Receiver` has been destroyed.
507 ///
508 /// # Example
509 ///
510 /// ```
511 /// use std::sync::mpsc::sync_channel;
512 /// use std::thread::Thread;
513 ///
514 /// let (tx, rx) = sync_channel(1);
515 ///
516 /// // this returns immediately
517 /// tx.send(1).unwrap();
518 ///
519 /// Thread::spawn(move|| {
520 ///     // this will block until the previous message has been received
521 ///     tx.send(2).unwrap();
522 /// });
523 ///
524 /// assert_eq!(rx.recv().unwrap(), 1);
525 /// assert_eq!(rx.recv().unwrap(), 2);
526 /// ```
527 #[stable(feature = "rust1", since = "1.0.0")]
528 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
529     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
530     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
531 }
532
533 ////////////////////////////////////////////////////////////////////////////////
534 // Sender
535 ////////////////////////////////////////////////////////////////////////////////
536
537 impl<T: Send> Sender<T> {
538     fn new(inner: Flavor<T>) -> Sender<T> {
539         Sender {
540             inner: UnsafeCell::new(inner),
541         }
542     }
543
544     /// Attempts to send a value on this channel, returning it back if it could
545     /// not be sent.
546     ///
547     /// A successful send occurs when it is determined that the other end of
548     /// the channel has not hung up already. An unsuccessful send would be one
549     /// where the corresponding receiver has already been deallocated. Note
550     /// that a return value of `Err` means that the data will never be
551     /// received, but a return value of `Ok` does *not* mean that the data
552     /// will be received.  It is possible for the corresponding receiver to
553     /// hang up immediately after this function returns `Ok`.
554     ///
555     /// This method will never block the current thread.
556     ///
557     /// # Example
558     ///
559     /// ```
560     /// use std::sync::mpsc::channel;
561     ///
562     /// let (tx, rx) = channel();
563     ///
564     /// // This send is always successful
565     /// tx.send(1).unwrap();
566     ///
567     /// // This send will fail because the receiver is gone
568     /// drop(rx);
569     /// assert_eq!(tx.send(1).err().unwrap().0, 1);
570     /// ```
571     #[stable(feature = "rust1", since = "1.0.0")]
572     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
573         let (new_inner, ret) = match *unsafe { self.inner() } {
574             Flavor::Oneshot(ref p) => {
575                 unsafe {
576                     let p = p.get();
577                     if !(*p).sent() {
578                         return (*p).send(t).map_err(SendError);
579                     } else {
580                         let a =
581                             Arc::new(UnsafeCell::new(stream::Packet::new()));
582                         let rx = Receiver::new(Flavor::Stream(a.clone()));
583                         match (*p).upgrade(rx) {
584                             oneshot::UpSuccess => {
585                                 let ret = (*a.get()).send(t);
586                                 (a, ret)
587                             }
588                             oneshot::UpDisconnected => (a, Err(t)),
589                             oneshot::UpWoke(token) => {
590                                 // This send cannot panic because the thread is
591                                 // asleep (we're looking at it), so the receiver
592                                 // can't go away.
593                                 (*a.get()).send(t).ok().unwrap();
594                         token.signal();
595                                 (a, Ok(()))
596                             }
597                         }
598                     }
599                 }
600             }
601             Flavor::Stream(ref p) => return unsafe {
602                 (*p.get()).send(t).map_err(SendError)
603             },
604             Flavor::Shared(ref p) => return unsafe {
605                 (*p.get()).send(t).map_err(SendError)
606             },
607             Flavor::Sync(..) => unreachable!(),
608         };
609
610         unsafe {
611             let tmp = Sender::new(Flavor::Stream(new_inner));
612             mem::swap(self.inner_mut(), tmp.inner_mut());
613         }
614         ret.map_err(SendError)
615     }
616 }
617
618 #[stable(feature = "rust1", since = "1.0.0")]
619 impl<T: Send> Clone for Sender<T> {
620     fn clone(&self) -> Sender<T> {
621         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
622             Flavor::Oneshot(ref p) => {
623                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
624                 unsafe {
625                     let guard = (*a.get()).postinit_lock();
626                     let rx = Receiver::new(Flavor::Shared(a.clone()));
627                     match (*p.get()).upgrade(rx) {
628                         oneshot::UpSuccess |
629                         oneshot::UpDisconnected => (a, None, guard),
630                         oneshot::UpWoke(task) => (a, Some(task), guard)
631                     }
632                 }
633             }
634             Flavor::Stream(ref p) => {
635                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
636                 unsafe {
637                     let guard = (*a.get()).postinit_lock();
638                     let rx = Receiver::new(Flavor::Shared(a.clone()));
639                     match (*p.get()).upgrade(rx) {
640                         stream::UpSuccess |
641                         stream::UpDisconnected => (a, None, guard),
642                         stream::UpWoke(task) => (a, Some(task), guard),
643                     }
644                 }
645             }
646             Flavor::Shared(ref p) => {
647                 unsafe { (*p.get()).clone_chan(); }
648                 return Sender::new(Flavor::Shared(p.clone()));
649             }
650             Flavor::Sync(..) => unreachable!(),
651         };
652
653         unsafe {
654             (*packet.get()).inherit_blocker(sleeper, guard);
655
656             let tmp = Sender::new(Flavor::Shared(packet.clone()));
657             mem::swap(self.inner_mut(), tmp.inner_mut());
658         }
659         Sender::new(Flavor::Shared(packet))
660     }
661 }
662
663 #[unsafe_destructor]
664 #[stable(feature = "rust1", since = "1.0.0")]
665 impl<T: Send> Drop for Sender<T> {
666     fn drop(&mut self) {
667         match *unsafe { self.inner_mut() } {
668             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
669             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
670             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
671             Flavor::Sync(..) => unreachable!(),
672         }
673     }
674 }
675
676 ////////////////////////////////////////////////////////////////////////////////
677 // SyncSender
678 ////////////////////////////////////////////////////////////////////////////////
679
680 impl<T: Send> SyncSender<T> {
681     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
682         SyncSender { inner: inner }
683     }
684
685     /// Sends a value on this synchronous channel.
686     ///
687     /// This function will *block* until space in the internal buffer becomes
688     /// available or a receiver is available to hand off the message to.
689     ///
690     /// Note that a successful send does *not* guarantee that the receiver will
691     /// ever see the data if there is a buffer on this channel. Items may be
692     /// enqueued in the internal buffer for the receiver to receive at a later
693     /// time. If the buffer size is 0, however, it can be guaranteed that the
694     /// receiver has indeed received the data if this function returns success.
695     ///
696     /// This function will never panic, but it may return `Err` if the
697     /// `Receiver` has disconnected and is no longer able to receive
698     /// information.
699     #[stable(feature = "rust1", since = "1.0.0")]
700     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
701         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
702     }
703
704     /// Attempts to send a value on this channel without blocking.
705     ///
706     /// This method differs from `send` by returning immediately if the
707     /// channel's buffer is full or no receiver is waiting to acquire some
708     /// data. Compared with `send`, this function has two failure cases
709     /// instead of one (one for disconnection, one for a full buffer).
710     ///
711     /// See `SyncSender::send` for notes about guarantees of whether the
712     /// receiver has received the data or not if this function is successful.
713     #[stable(feature = "rust1", since = "1.0.0")]
714     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
715         unsafe { (*self.inner.get()).try_send(t) }
716     }
717 }
718
719 #[stable(feature = "rust1", since = "1.0.0")]
720 impl<T: Send> Clone for SyncSender<T> {
721     fn clone(&self) -> SyncSender<T> {
722         unsafe { (*self.inner.get()).clone_chan(); }
723         return SyncSender::new(self.inner.clone());
724     }
725 }
726
727 #[unsafe_destructor]
728 #[stable(feature = "rust1", since = "1.0.0")]
729 impl<T: Send> Drop for SyncSender<T> {
730     fn drop(&mut self) {
731         unsafe { (*self.inner.get()).drop_chan(); }
732     }
733 }
734
735 ////////////////////////////////////////////////////////////////////////////////
736 // Receiver
737 ////////////////////////////////////////////////////////////////////////////////
738
739 impl<T: Send> Receiver<T> {
740     fn new(inner: Flavor<T>) -> Receiver<T> {
741         Receiver { inner: UnsafeCell::new(inner) }
742     }
743
744     /// Attempts to return a pending value on this receiver without blocking
745     ///
746     /// This method will never block the caller in order to wait for data to
747     /// become available. Instead, this will always return immediately with a
748     /// possible option of pending data on the channel.
749     ///
750     /// This is useful for a flavor of "optimistic check" before deciding to
751     /// block on a receiver.
752     #[stable(feature = "rust1", since = "1.0.0")]
753     pub fn try_recv(&self) -> Result<T, TryRecvError> {
754         loop {
755             let new_port = match *unsafe { self.inner() } {
756                 Flavor::Oneshot(ref p) => {
757                     match unsafe { (*p.get()).try_recv() } {
758                         Ok(t) => return Ok(t),
759                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
760                         Err(oneshot::Disconnected) => {
761                             return Err(TryRecvError::Disconnected)
762                         }
763                         Err(oneshot::Upgraded(rx)) => rx,
764                     }
765                 }
766                 Flavor::Stream(ref p) => {
767                     match unsafe { (*p.get()).try_recv() } {
768                         Ok(t) => return Ok(t),
769                         Err(stream::Empty) => return Err(TryRecvError::Empty),
770                         Err(stream::Disconnected) => {
771                             return Err(TryRecvError::Disconnected)
772                         }
773                         Err(stream::Upgraded(rx)) => rx,
774                     }
775                 }
776                 Flavor::Shared(ref p) => {
777                     match unsafe { (*p.get()).try_recv() } {
778                         Ok(t) => return Ok(t),
779                         Err(shared::Empty) => return Err(TryRecvError::Empty),
780                         Err(shared::Disconnected) => {
781                             return Err(TryRecvError::Disconnected)
782                         }
783                     }
784                 }
785                 Flavor::Sync(ref p) => {
786                     match unsafe { (*p.get()).try_recv() } {
787                         Ok(t) => return Ok(t),
788                         Err(sync::Empty) => return Err(TryRecvError::Empty),
789                         Err(sync::Disconnected) => {
790                             return Err(TryRecvError::Disconnected)
791                         }
792                     }
793                 }
794             };
795             unsafe {
796                 mem::swap(self.inner_mut(),
797                           new_port.inner_mut());
798             }
799         }
800     }
801
802     /// Attempt to wait for a value on this receiver, returning an error if the
803     /// corresponding channel has hung up.
804     ///
805     /// This function will always block the current thread if there is no data
806     /// available and it's possible for more data to be sent. Once a message is
807     /// sent to the corresponding `Sender`, then this receiver will wake up and
808     /// return that message.
809     ///
810     /// If the corresponding `Sender` has disconnected, or it disconnects while
811     /// this call is blocking, this call will wake up and return `Err` to
812     /// indicate that no more messages can ever be received on this channel.
813     #[stable(feature = "rust1", since = "1.0.0")]
814     pub fn recv(&self) -> Result<T, RecvError> {
815         loop {
816             let new_port = match *unsafe { self.inner() } {
817                 Flavor::Oneshot(ref p) => {
818                     match unsafe { (*p.get()).recv() } {
819                         Ok(t) => return Ok(t),
820                         Err(oneshot::Empty) => return unreachable!(),
821                         Err(oneshot::Disconnected) => return Err(RecvError),
822                         Err(oneshot::Upgraded(rx)) => rx,
823                     }
824                 }
825                 Flavor::Stream(ref p) => {
826                     match unsafe { (*p.get()).recv() } {
827                         Ok(t) => return Ok(t),
828                         Err(stream::Empty) => return unreachable!(),
829                         Err(stream::Disconnected) => return Err(RecvError),
830                         Err(stream::Upgraded(rx)) => rx,
831                     }
832                 }
833                 Flavor::Shared(ref p) => {
834                     match unsafe { (*p.get()).recv() } {
835                         Ok(t) => return Ok(t),
836                         Err(shared::Empty) => return unreachable!(),
837                         Err(shared::Disconnected) => return Err(RecvError),
838                     }
839                 }
840                 Flavor::Sync(ref p) => return unsafe {
841                     (*p.get()).recv().map_err(|()| RecvError)
842                 }
843             };
844             unsafe {
845                 mem::swap(self.inner_mut(), new_port.inner_mut());
846             }
847         }
848     }
849
850     /// Returns an iterator that will block waiting for messages, but never
851     /// `panic!`. It will return `None` when the channel has hung up.
852     #[stable(feature = "rust1", since = "1.0.0")]
853     pub fn iter(&self) -> Iter<T> {
854         Iter { rx: self }
855     }
856 }
857
858 impl<T: Send> select::Packet for Receiver<T> {
859     fn can_recv(&self) -> bool {
860         loop {
861             let new_port = match *unsafe { self.inner() } {
862                 Flavor::Oneshot(ref p) => {
863                     match unsafe { (*p.get()).can_recv() } {
864                         Ok(ret) => return ret,
865                         Err(upgrade) => upgrade,
866                     }
867                 }
868                 Flavor::Stream(ref p) => {
869                     match unsafe { (*p.get()).can_recv() } {
870                         Ok(ret) => return ret,
871                         Err(upgrade) => upgrade,
872                     }
873                 }
874                 Flavor::Shared(ref p) => {
875                     return unsafe { (*p.get()).can_recv() };
876                 }
877                 Flavor::Sync(ref p) => {
878                     return unsafe { (*p.get()).can_recv() };
879                 }
880             };
881             unsafe {
882                 mem::swap(self.inner_mut(),
883                           new_port.inner_mut());
884             }
885         }
886     }
887
888     fn start_selection(&self, mut token: SignalToken) -> StartResult {
889         loop {
890             let (t, new_port) = match *unsafe { self.inner() } {
891                 Flavor::Oneshot(ref p) => {
892                     match unsafe { (*p.get()).start_selection(token) } {
893                         oneshot::SelSuccess => return Installed,
894                         oneshot::SelCanceled => return Abort,
895                         oneshot::SelUpgraded(t, rx) => (t, rx),
896                     }
897                 }
898                 Flavor::Stream(ref p) => {
899                     match unsafe { (*p.get()).start_selection(token) } {
900                         stream::SelSuccess => return Installed,
901                         stream::SelCanceled => return Abort,
902                         stream::SelUpgraded(t, rx) => (t, rx),
903                     }
904                 }
905                 Flavor::Shared(ref p) => {
906                     return unsafe { (*p.get()).start_selection(token) };
907                 }
908                 Flavor::Sync(ref p) => {
909                     return unsafe { (*p.get()).start_selection(token) };
910                 }
911             };
912             token = t;
913             unsafe {
914                 mem::swap(self.inner_mut(), new_port.inner_mut());
915             }
916         }
917     }
918
919     fn abort_selection(&self) -> bool {
920         let mut was_upgrade = false;
921         loop {
922             let result = match *unsafe { self.inner() } {
923                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
924                 Flavor::Stream(ref p) => unsafe {
925                     (*p.get()).abort_selection(was_upgrade)
926                 },
927                 Flavor::Shared(ref p) => return unsafe {
928                     (*p.get()).abort_selection(was_upgrade)
929                 },
930                 Flavor::Sync(ref p) => return unsafe {
931                     (*p.get()).abort_selection()
932                 },
933             };
934             let new_port = match result { Ok(b) => return b, Err(p) => p };
935             was_upgrade = true;
936             unsafe {
937                 mem::swap(self.inner_mut(),
938                           new_port.inner_mut());
939             }
940         }
941     }
942 }
943
944 #[stable(feature = "rust1", since = "1.0.0")]
945 impl<'a, T: Send> Iterator for Iter<'a, T> {
946     type Item = T;
947
948     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
949 }
950
951 #[unsafe_destructor]
952 #[stable(feature = "rust1", since = "1.0.0")]
953 impl<T: Send> Drop for Receiver<T> {
954     fn drop(&mut self) {
955         match *unsafe { self.inner_mut() } {
956             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
957             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
958             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
959             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
960         }
961     }
962 }
963
964 #[stable(feature = "rust1", since = "1.0.0")]
965 impl<T> fmt::Debug for SendError<T> {
966     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
967         "SendError(..)".fmt(f)
968     }
969 }
970
971 #[stable(feature = "rust1", since = "1.0.0")]
972 impl<T> fmt::Display for SendError<T> {
973     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
974         "sending on a closed channel".fmt(f)
975     }
976 }
977
978 #[stable(feature = "rust1", since = "1.0.0")]
979 impl<T> fmt::Debug for TrySendError<T> {
980     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
981         match *self {
982             TrySendError::Full(..) => "Full(..)".fmt(f),
983             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
984         }
985     }
986 }
987
988 #[stable(feature = "rust1", since = "1.0.0")]
989 impl<T> fmt::Display for TrySendError<T> {
990     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
991         match *self {
992             TrySendError::Full(..) => {
993                 "sending on a full channel".fmt(f)
994             }
995             TrySendError::Disconnected(..) => {
996                 "sending on a closed channel".fmt(f)
997             }
998         }
999     }
1000 }
1001
1002 #[stable(feature = "rust1", since = "1.0.0")]
1003 impl fmt::Display for RecvError {
1004     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1005         "receiving on a closed channel".fmt(f)
1006     }
1007 }
1008
1009 #[stable(feature = "rust1", since = "1.0.0")]
1010 impl fmt::Display for TryRecvError {
1011     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012         match *self {
1013             TryRecvError::Empty => {
1014                 "receiving on an empty channel".fmt(f)
1015             }
1016             TryRecvError::Disconnected => {
1017                 "receiving on a closed channel".fmt(f)
1018             }
1019         }
1020     }
1021 }
1022
1023 #[cfg(test)]
1024 mod test {
1025     use prelude::v1::*;
1026
1027     use os;
1028     use super::*;
1029     use thread::Thread;
1030
1031     pub fn stress_factor() -> uint {
1032         match os::getenv("RUST_TEST_STRESS") {
1033             Some(val) => val.parse().unwrap(),
1034             None => 1,
1035         }
1036     }
1037
1038     #[test]
1039     fn smoke() {
1040         let (tx, rx) = channel::<int>();
1041         tx.send(1).unwrap();
1042         assert_eq!(rx.recv().unwrap(), 1);
1043     }
1044
1045     #[test]
1046     fn drop_full() {
1047         let (tx, _rx) = channel();
1048         tx.send(box 1).unwrap();
1049     }
1050
1051     #[test]
1052     fn drop_full_shared() {
1053         let (tx, _rx) = channel();
1054         drop(tx.clone());
1055         drop(tx.clone());
1056         tx.send(box 1).unwrap();
1057     }
1058
1059     #[test]
1060     fn smoke_shared() {
1061         let (tx, rx) = channel::<int>();
1062         tx.send(1).unwrap();
1063         assert_eq!(rx.recv().unwrap(), 1);
1064         let tx = tx.clone();
1065         tx.send(1).unwrap();
1066         assert_eq!(rx.recv().unwrap(), 1);
1067     }
1068
1069     #[test]
1070     fn smoke_threads() {
1071         let (tx, rx) = channel::<int>();
1072         let _t = Thread::spawn(move|| {
1073             tx.send(1).unwrap();
1074         });
1075         assert_eq!(rx.recv().unwrap(), 1);
1076     }
1077
1078     #[test]
1079     fn smoke_port_gone() {
1080         let (tx, rx) = channel::<int>();
1081         drop(rx);
1082         assert!(tx.send(1).is_err());
1083     }
1084
1085     #[test]
1086     fn smoke_shared_port_gone() {
1087         let (tx, rx) = channel::<int>();
1088         drop(rx);
1089         assert!(tx.send(1).is_err())
1090     }
1091
1092     #[test]
1093     fn smoke_shared_port_gone2() {
1094         let (tx, rx) = channel::<int>();
1095         drop(rx);
1096         let tx2 = tx.clone();
1097         drop(tx);
1098         assert!(tx2.send(1).is_err());
1099     }
1100
1101     #[test]
1102     fn port_gone_concurrent() {
1103         let (tx, rx) = channel::<int>();
1104         let _t = Thread::spawn(move|| {
1105             rx.recv().unwrap();
1106         });
1107         while tx.send(1).is_ok() {}
1108     }
1109
1110     #[test]
1111     fn port_gone_concurrent_shared() {
1112         let (tx, rx) = channel::<int>();
1113         let tx2 = tx.clone();
1114         let _t = Thread::spawn(move|| {
1115             rx.recv().unwrap();
1116         });
1117         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1118     }
1119
1120     #[test]
1121     fn smoke_chan_gone() {
1122         let (tx, rx) = channel::<int>();
1123         drop(tx);
1124         assert!(rx.recv().is_err());
1125     }
1126
1127     #[test]
1128     fn smoke_chan_gone_shared() {
1129         let (tx, rx) = channel::<()>();
1130         let tx2 = tx.clone();
1131         drop(tx);
1132         drop(tx2);
1133         assert!(rx.recv().is_err());
1134     }
1135
1136     #[test]
1137     fn chan_gone_concurrent() {
1138         let (tx, rx) = channel::<int>();
1139         let _t = Thread::spawn(move|| {
1140             tx.send(1).unwrap();
1141             tx.send(1).unwrap();
1142         });
1143         while rx.recv().is_ok() {}
1144     }
1145
1146     #[test]
1147     fn stress() {
1148         let (tx, rx) = channel::<int>();
1149         let t = Thread::scoped(move|| {
1150             for _ in 0u..10000 { tx.send(1).unwrap(); }
1151         });
1152         for _ in 0u..10000 {
1153             assert_eq!(rx.recv().unwrap(), 1);
1154         }
1155         t.join().ok().unwrap();
1156     }
1157
1158     #[test]
1159     fn stress_shared() {
1160         static AMT: uint = 10000;
1161         static NTHREADS: uint = 8;
1162         let (tx, rx) = channel::<int>();
1163
1164         let t = Thread::scoped(move|| {
1165             for _ in 0..AMT * NTHREADS {
1166                 assert_eq!(rx.recv().unwrap(), 1);
1167             }
1168             match rx.try_recv() {
1169                 Ok(..) => panic!(),
1170                 _ => {}
1171             }
1172         });
1173
1174         for _ in 0..NTHREADS {
1175             let tx = tx.clone();
1176             Thread::spawn(move|| {
1177                 for _ in 0..AMT { tx.send(1).unwrap(); }
1178             });
1179         }
1180         drop(tx);
1181         t.join().ok().unwrap();
1182     }
1183
1184     #[test]
1185     fn send_from_outside_runtime() {
1186         let (tx1, rx1) = channel::<()>();
1187         let (tx2, rx2) = channel::<int>();
1188         let t1 = Thread::scoped(move|| {
1189             tx1.send(()).unwrap();
1190             for _ in 0..40 {
1191                 assert_eq!(rx2.recv().unwrap(), 1);
1192             }
1193         });
1194         rx1.recv().unwrap();
1195         let t2 = Thread::scoped(move|| {
1196             for _ in 0..40 {
1197                 tx2.send(1).unwrap();
1198             }
1199         });
1200         t1.join().ok().unwrap();
1201         t2.join().ok().unwrap();
1202     }
1203
1204     #[test]
1205     fn recv_from_outside_runtime() {
1206         let (tx, rx) = channel::<int>();
1207         let t = Thread::scoped(move|| {
1208             for _ in 0..40 {
1209                 assert_eq!(rx.recv().unwrap(), 1);
1210             }
1211         });
1212         for _ in 0u..40 {
1213             tx.send(1).unwrap();
1214         }
1215         t.join().ok().unwrap();
1216     }
1217
1218     #[test]
1219     fn no_runtime() {
1220         let (tx1, rx1) = channel::<int>();
1221         let (tx2, rx2) = channel::<int>();
1222         let t1 = Thread::scoped(move|| {
1223             assert_eq!(rx1.recv().unwrap(), 1);
1224             tx2.send(2).unwrap();
1225         });
1226         let t2 = Thread::scoped(move|| {
1227             tx1.send(1).unwrap();
1228             assert_eq!(rx2.recv().unwrap(), 2);
1229         });
1230         t1.join().ok().unwrap();
1231         t2.join().ok().unwrap();
1232     }
1233
1234     #[test]
1235     fn oneshot_single_thread_close_port_first() {
1236         // Simple test of closing without sending
1237         let (_tx, rx) = channel::<int>();
1238         drop(rx);
1239     }
1240
1241     #[test]
1242     fn oneshot_single_thread_close_chan_first() {
1243         // Simple test of closing without sending
1244         let (tx, _rx) = channel::<int>();
1245         drop(tx);
1246     }
1247
1248     #[test]
1249     fn oneshot_single_thread_send_port_close() {
1250         // Testing that the sender cleans up the payload if receiver is closed
1251         let (tx, rx) = channel::<Box<int>>();
1252         drop(rx);
1253         assert!(tx.send(box 0).is_err());
1254     }
1255
1256     #[test]
1257     fn oneshot_single_thread_recv_chan_close() {
1258         // Receiving on a closed chan will panic
1259         let res = Thread::scoped(move|| {
1260             let (tx, rx) = channel::<int>();
1261             drop(tx);
1262             rx.recv().unwrap();
1263         }).join();
1264         // What is our res?
1265         assert!(res.is_err());
1266     }
1267
1268     #[test]
1269     fn oneshot_single_thread_send_then_recv() {
1270         let (tx, rx) = channel::<Box<int>>();
1271         tx.send(box 10).unwrap();
1272         assert!(rx.recv().unwrap() == box 10);
1273     }
1274
1275     #[test]
1276     fn oneshot_single_thread_try_send_open() {
1277         let (tx, rx) = channel::<int>();
1278         assert!(tx.send(10).is_ok());
1279         assert!(rx.recv().unwrap() == 10);
1280     }
1281
1282     #[test]
1283     fn oneshot_single_thread_try_send_closed() {
1284         let (tx, rx) = channel::<int>();
1285         drop(rx);
1286         assert!(tx.send(10).is_err());
1287     }
1288
1289     #[test]
1290     fn oneshot_single_thread_try_recv_open() {
1291         let (tx, rx) = channel::<int>();
1292         tx.send(10).unwrap();
1293         assert!(rx.recv() == Ok(10));
1294     }
1295
1296     #[test]
1297     fn oneshot_single_thread_try_recv_closed() {
1298         let (tx, rx) = channel::<int>();
1299         drop(tx);
1300         assert!(rx.recv().is_err());
1301     }
1302
1303     #[test]
1304     fn oneshot_single_thread_peek_data() {
1305         let (tx, rx) = channel::<int>();
1306         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1307         tx.send(10).unwrap();
1308         assert_eq!(rx.try_recv(), Ok(10));
1309     }
1310
1311     #[test]
1312     fn oneshot_single_thread_peek_close() {
1313         let (tx, rx) = channel::<int>();
1314         drop(tx);
1315         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1316         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317     }
1318
1319     #[test]
1320     fn oneshot_single_thread_peek_open() {
1321         let (_tx, rx) = channel::<int>();
1322         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323     }
1324
1325     #[test]
1326     fn oneshot_multi_task_recv_then_send() {
1327         let (tx, rx) = channel::<Box<int>>();
1328         let _t = Thread::spawn(move|| {
1329             assert!(rx.recv().unwrap() == box 10);
1330         });
1331
1332         tx.send(box 10).unwrap();
1333     }
1334
1335     #[test]
1336     fn oneshot_multi_task_recv_then_close() {
1337         let (tx, rx) = channel::<Box<int>>();
1338         let _t = Thread::spawn(move|| {
1339             drop(tx);
1340         });
1341         let res = Thread::scoped(move|| {
1342             assert!(rx.recv().unwrap() == box 10);
1343         }).join();
1344         assert!(res.is_err());
1345     }
1346
1347     #[test]
1348     fn oneshot_multi_thread_close_stress() {
1349         for _ in 0..stress_factor() {
1350             let (tx, rx) = channel::<int>();
1351             let _t = Thread::spawn(move|| {
1352                 drop(rx);
1353             });
1354             drop(tx);
1355         }
1356     }
1357
1358     #[test]
1359     fn oneshot_multi_thread_send_close_stress() {
1360         for _ in 0..stress_factor() {
1361             let (tx, rx) = channel::<int>();
1362             let _t = Thread::spawn(move|| {
1363                 drop(rx);
1364             });
1365             let _ = Thread::scoped(move|| {
1366                 tx.send(1).unwrap();
1367             }).join();
1368         }
1369     }
1370
1371     #[test]
1372     fn oneshot_multi_thread_recv_close_stress() {
1373         for _ in 0..stress_factor() {
1374             let (tx, rx) = channel::<int>();
1375             Thread::spawn(move|| {
1376                 let res = Thread::scoped(move|| {
1377                     rx.recv().unwrap();
1378                 }).join();
1379                 assert!(res.is_err());
1380             });
1381             let _t = Thread::spawn(move|| {
1382                 Thread::spawn(move|| {
1383                     drop(tx);
1384                 });
1385             });
1386         }
1387     }
1388
1389     #[test]
1390     fn oneshot_multi_thread_send_recv_stress() {
1391         for _ in 0..stress_factor() {
1392             let (tx, rx) = channel();
1393             let _t = Thread::spawn(move|| {
1394                 tx.send(box 10).unwrap();
1395             });
1396             assert!(rx.recv().unwrap() == box 10);
1397         }
1398     }
1399
1400     #[test]
1401     fn stream_send_recv_stress() {
1402         for _ in 0..stress_factor() {
1403             let (tx, rx) = channel();
1404
1405             send(tx, 0);
1406             recv(rx, 0);
1407
1408             fn send(tx: Sender<Box<int>>, i: int) {
1409                 if i == 10 { return }
1410
1411                 Thread::spawn(move|| {
1412                     tx.send(box i).unwrap();
1413                     send(tx, i + 1);
1414                 });
1415             }
1416
1417             fn recv(rx: Receiver<Box<int>>, i: int) {
1418                 if i == 10 { return }
1419
1420                 Thread::spawn(move|| {
1421                     assert!(rx.recv().unwrap() == box i);
1422                     recv(rx, i + 1);
1423                 });
1424             }
1425         }
1426     }
1427
1428     #[test]
1429     fn recv_a_lot() {
1430         // Regression test that we don't run out of stack in scheduler context
1431         let (tx, rx) = channel();
1432         for _ in 0..10000 { tx.send(()).unwrap(); }
1433         for _ in 0..10000 { rx.recv().unwrap(); }
1434     }
1435
1436     #[test]
1437     fn shared_chan_stress() {
1438         let (tx, rx) = channel();
1439         let total = stress_factor() + 100;
1440         for _ in 0..total {
1441             let tx = tx.clone();
1442             Thread::spawn(move|| {
1443                 tx.send(()).unwrap();
1444             });
1445         }
1446
1447         for _ in 0..total {
1448             rx.recv().unwrap();
1449         }
1450     }
1451
1452     #[test]
1453     fn test_nested_recv_iter() {
1454         let (tx, rx) = channel::<int>();
1455         let (total_tx, total_rx) = channel::<int>();
1456
1457         let _t = Thread::spawn(move|| {
1458             let mut acc = 0;
1459             for x in rx.iter() {
1460                 acc += x;
1461             }
1462             total_tx.send(acc).unwrap();
1463         });
1464
1465         tx.send(3).unwrap();
1466         tx.send(1).unwrap();
1467         tx.send(2).unwrap();
1468         drop(tx);
1469         assert_eq!(total_rx.recv().unwrap(), 6);
1470     }
1471
1472     #[test]
1473     fn test_recv_iter_break() {
1474         let (tx, rx) = channel::<int>();
1475         let (count_tx, count_rx) = channel();
1476
1477         let _t = Thread::spawn(move|| {
1478             let mut count = 0;
1479             for x in rx.iter() {
1480                 if count >= 3 {
1481                     break;
1482                 } else {
1483                     count += x;
1484                 }
1485             }
1486             count_tx.send(count).unwrap();
1487         });
1488
1489         tx.send(2).unwrap();
1490         tx.send(2).unwrap();
1491         tx.send(2).unwrap();
1492         let _ = tx.send(2);
1493         drop(tx);
1494         assert_eq!(count_rx.recv().unwrap(), 4);
1495     }
1496
1497     #[test]
1498     fn try_recv_states() {
1499         let (tx1, rx1) = channel::<int>();
1500         let (tx2, rx2) = channel::<()>();
1501         let (tx3, rx3) = channel::<()>();
1502         let _t = Thread::spawn(move|| {
1503             rx2.recv().unwrap();
1504             tx1.send(1).unwrap();
1505             tx3.send(()).unwrap();
1506             rx2.recv().unwrap();
1507             drop(tx1);
1508             tx3.send(()).unwrap();
1509         });
1510
1511         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1512         tx2.send(()).unwrap();
1513         rx3.recv().unwrap();
1514         assert_eq!(rx1.try_recv(), Ok(1));
1515         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1516         tx2.send(()).unwrap();
1517         rx3.recv().unwrap();
1518         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1519     }
1520
1521     // This bug used to end up in a livelock inside of the Receiver destructor
1522     // because the internal state of the Shared packet was corrupted
1523     #[test]
1524     fn destroy_upgraded_shared_port_when_sender_still_active() {
1525         let (tx, rx) = channel();
1526         let (tx2, rx2) = channel();
1527         let _t = Thread::spawn(move|| {
1528             rx.recv().unwrap(); // wait on a oneshot
1529             drop(rx);  // destroy a shared
1530             tx2.send(()).unwrap();
1531         });
1532         // make sure the other task has gone to sleep
1533         for _ in 0u..5000 { Thread::yield_now(); }
1534
1535         // upgrade to a shared chan and send a message
1536         let t = tx.clone();
1537         drop(tx);
1538         t.send(()).unwrap();
1539
1540         // wait for the child task to exit before we exit
1541         rx2.recv().unwrap();
1542     }
1543 }
1544
1545 #[cfg(test)]
1546 mod sync_tests {
1547     use prelude::v1::*;
1548
1549     use os;
1550     use thread::Thread;
1551     use super::*;
1552
1553     pub fn stress_factor() -> uint {
1554         match os::getenv("RUST_TEST_STRESS") {
1555             Some(val) => val.parse().unwrap(),
1556             None => 1,
1557         }
1558     }
1559
1560     #[test]
1561     fn smoke() {
1562         let (tx, rx) = sync_channel::<int>(1);
1563         tx.send(1).unwrap();
1564         assert_eq!(rx.recv().unwrap(), 1);
1565     }
1566
1567     #[test]
1568     fn drop_full() {
1569         let (tx, _rx) = sync_channel(1);
1570         tx.send(box 1).unwrap();
1571     }
1572
1573     #[test]
1574     fn smoke_shared() {
1575         let (tx, rx) = sync_channel::<int>(1);
1576         tx.send(1).unwrap();
1577         assert_eq!(rx.recv().unwrap(), 1);
1578         let tx = tx.clone();
1579         tx.send(1).unwrap();
1580         assert_eq!(rx.recv().unwrap(), 1);
1581     }
1582
1583     #[test]
1584     fn smoke_threads() {
1585         let (tx, rx) = sync_channel::<int>(0);
1586         let _t = Thread::spawn(move|| {
1587             tx.send(1).unwrap();
1588         });
1589         assert_eq!(rx.recv().unwrap(), 1);
1590     }
1591
1592     #[test]
1593     fn smoke_port_gone() {
1594         let (tx, rx) = sync_channel::<int>(0);
1595         drop(rx);
1596         assert!(tx.send(1).is_err());
1597     }
1598
1599     #[test]
1600     fn smoke_shared_port_gone2() {
1601         let (tx, rx) = sync_channel::<int>(0);
1602         drop(rx);
1603         let tx2 = tx.clone();
1604         drop(tx);
1605         assert!(tx2.send(1).is_err());
1606     }
1607
1608     #[test]
1609     fn port_gone_concurrent() {
1610         let (tx, rx) = sync_channel::<int>(0);
1611         let _t = Thread::spawn(move|| {
1612             rx.recv().unwrap();
1613         });
1614         while tx.send(1).is_ok() {}
1615     }
1616
1617     #[test]
1618     fn port_gone_concurrent_shared() {
1619         let (tx, rx) = sync_channel::<int>(0);
1620         let tx2 = tx.clone();
1621         let _t = Thread::spawn(move|| {
1622             rx.recv().unwrap();
1623         });
1624         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1625     }
1626
1627     #[test]
1628     fn smoke_chan_gone() {
1629         let (tx, rx) = sync_channel::<int>(0);
1630         drop(tx);
1631         assert!(rx.recv().is_err());
1632     }
1633
1634     #[test]
1635     fn smoke_chan_gone_shared() {
1636         let (tx, rx) = sync_channel::<()>(0);
1637         let tx2 = tx.clone();
1638         drop(tx);
1639         drop(tx2);
1640         assert!(rx.recv().is_err());
1641     }
1642
1643     #[test]
1644     fn chan_gone_concurrent() {
1645         let (tx, rx) = sync_channel::<int>(0);
1646         Thread::spawn(move|| {
1647             tx.send(1).unwrap();
1648             tx.send(1).unwrap();
1649         });
1650         while rx.recv().is_ok() {}
1651     }
1652
1653     #[test]
1654     fn stress() {
1655         let (tx, rx) = sync_channel::<int>(0);
1656         Thread::spawn(move|| {
1657             for _ in 0u..10000 { tx.send(1).unwrap(); }
1658         });
1659         for _ in 0u..10000 {
1660             assert_eq!(rx.recv().unwrap(), 1);
1661         }
1662     }
1663
1664     #[test]
1665     fn stress_shared() {
1666         static AMT: uint = 1000;
1667         static NTHREADS: uint = 8;
1668         let (tx, rx) = sync_channel::<int>(0);
1669         let (dtx, drx) = sync_channel::<()>(0);
1670
1671         Thread::spawn(move|| {
1672             for _ in 0..AMT * NTHREADS {
1673                 assert_eq!(rx.recv().unwrap(), 1);
1674             }
1675             match rx.try_recv() {
1676                 Ok(..) => panic!(),
1677                 _ => {}
1678             }
1679             dtx.send(()).unwrap();
1680         });
1681
1682         for _ in 0..NTHREADS {
1683             let tx = tx.clone();
1684             Thread::spawn(move|| {
1685                 for _ in 0..AMT { tx.send(1).unwrap(); }
1686             });
1687         }
1688         drop(tx);
1689         drx.recv().unwrap();
1690     }
1691
1692     #[test]
1693     fn oneshot_single_thread_close_port_first() {
1694         // Simple test of closing without sending
1695         let (_tx, rx) = sync_channel::<int>(0);
1696         drop(rx);
1697     }
1698
1699     #[test]
1700     fn oneshot_single_thread_close_chan_first() {
1701         // Simple test of closing without sending
1702         let (tx, _rx) = sync_channel::<int>(0);
1703         drop(tx);
1704     }
1705
1706     #[test]
1707     fn oneshot_single_thread_send_port_close() {
1708         // Testing that the sender cleans up the payload if receiver is closed
1709         let (tx, rx) = sync_channel::<Box<int>>(0);
1710         drop(rx);
1711         assert!(tx.send(box 0).is_err());
1712     }
1713
1714     #[test]
1715     fn oneshot_single_thread_recv_chan_close() {
1716         // Receiving on a closed chan will panic
1717         let res = Thread::scoped(move|| {
1718             let (tx, rx) = sync_channel::<int>(0);
1719             drop(tx);
1720             rx.recv().unwrap();
1721         }).join();
1722         // What is our res?
1723         assert!(res.is_err());
1724     }
1725
1726     #[test]
1727     fn oneshot_single_thread_send_then_recv() {
1728         let (tx, rx) = sync_channel::<Box<int>>(1);
1729         tx.send(box 10).unwrap();
1730         assert!(rx.recv().unwrap() == box 10);
1731     }
1732
1733     #[test]
1734     fn oneshot_single_thread_try_send_open() {
1735         let (tx, rx) = sync_channel::<int>(1);
1736         assert_eq!(tx.try_send(10), Ok(()));
1737         assert!(rx.recv().unwrap() == 10);
1738     }
1739
1740     #[test]
1741     fn oneshot_single_thread_try_send_closed() {
1742         let (tx, rx) = sync_channel::<int>(0);
1743         drop(rx);
1744         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1745     }
1746
1747     #[test]
1748     fn oneshot_single_thread_try_send_closed2() {
1749         let (tx, _rx) = sync_channel::<int>(0);
1750         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1751     }
1752
1753     #[test]
1754     fn oneshot_single_thread_try_recv_open() {
1755         let (tx, rx) = sync_channel::<int>(1);
1756         tx.send(10).unwrap();
1757         assert!(rx.recv() == Ok(10));
1758     }
1759
1760     #[test]
1761     fn oneshot_single_thread_try_recv_closed() {
1762         let (tx, rx) = sync_channel::<int>(0);
1763         drop(tx);
1764         assert!(rx.recv().is_err());
1765     }
1766
1767     #[test]
1768     fn oneshot_single_thread_peek_data() {
1769         let (tx, rx) = sync_channel::<int>(1);
1770         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1771         tx.send(10).unwrap();
1772         assert_eq!(rx.try_recv(), Ok(10));
1773     }
1774
1775     #[test]
1776     fn oneshot_single_thread_peek_close() {
1777         let (tx, rx) = sync_channel::<int>(0);
1778         drop(tx);
1779         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1780         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1781     }
1782
1783     #[test]
1784     fn oneshot_single_thread_peek_open() {
1785         let (_tx, rx) = sync_channel::<int>(0);
1786         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1787     }
1788
1789     #[test]
1790     fn oneshot_multi_task_recv_then_send() {
1791         let (tx, rx) = sync_channel::<Box<int>>(0);
1792         let _t = Thread::spawn(move|| {
1793             assert!(rx.recv().unwrap() == box 10);
1794         });
1795
1796         tx.send(box 10).unwrap();
1797     }
1798
1799     #[test]
1800     fn oneshot_multi_task_recv_then_close() {
1801         let (tx, rx) = sync_channel::<Box<int>>(0);
1802         let _t = Thread::spawn(move|| {
1803             drop(tx);
1804         });
1805         let res = Thread::scoped(move|| {
1806             assert!(rx.recv().unwrap() == box 10);
1807         }).join();
1808         assert!(res.is_err());
1809     }
1810
1811     #[test]
1812     fn oneshot_multi_thread_close_stress() {
1813         for _ in 0..stress_factor() {
1814             let (tx, rx) = sync_channel::<int>(0);
1815             let _t = Thread::spawn(move|| {
1816                 drop(rx);
1817             });
1818             drop(tx);
1819         }
1820     }
1821
1822     #[test]
1823     fn oneshot_multi_thread_send_close_stress() {
1824         for _ in 0..stress_factor() {
1825             let (tx, rx) = sync_channel::<int>(0);
1826             let _t = Thread::spawn(move|| {
1827                 drop(rx);
1828             });
1829             let _ = Thread::scoped(move || {
1830                 tx.send(1).unwrap();
1831             }).join();
1832         }
1833     }
1834
1835     #[test]
1836     fn oneshot_multi_thread_recv_close_stress() {
1837         for _ in 0..stress_factor() {
1838             let (tx, rx) = sync_channel::<int>(0);
1839             let _t = Thread::spawn(move|| {
1840                 let res = Thread::scoped(move|| {
1841                     rx.recv().unwrap();
1842                 }).join();
1843                 assert!(res.is_err());
1844             });
1845             let _t = Thread::spawn(move|| {
1846                 Thread::spawn(move|| {
1847                     drop(tx);
1848                 });
1849             });
1850         }
1851     }
1852
1853     #[test]
1854     fn oneshot_multi_thread_send_recv_stress() {
1855         for _ in 0..stress_factor() {
1856             let (tx, rx) = sync_channel::<Box<int>>(0);
1857             let _t = Thread::spawn(move|| {
1858                 tx.send(box 10).unwrap();
1859             });
1860             assert!(rx.recv().unwrap() == box 10);
1861         }
1862     }
1863
1864     #[test]
1865     fn stream_send_recv_stress() {
1866         for _ in 0..stress_factor() {
1867             let (tx, rx) = sync_channel::<Box<int>>(0);
1868
1869             send(tx, 0);
1870             recv(rx, 0);
1871
1872             fn send(tx: SyncSender<Box<int>>, i: int) {
1873                 if i == 10 { return }
1874
1875                 Thread::spawn(move|| {
1876                     tx.send(box i).unwrap();
1877                     send(tx, i + 1);
1878                 });
1879             }
1880
1881             fn recv(rx: Receiver<Box<int>>, i: int) {
1882                 if i == 10 { return }
1883
1884                 Thread::spawn(move|| {
1885                     assert!(rx.recv().unwrap() == box i);
1886                     recv(rx, i + 1);
1887                 });
1888             }
1889         }
1890     }
1891
1892     #[test]
1893     fn recv_a_lot() {
1894         // Regression test that we don't run out of stack in scheduler context
1895         let (tx, rx) = sync_channel(10000);
1896         for _ in 0u..10000 { tx.send(()).unwrap(); }
1897         for _ in 0u..10000 { rx.recv().unwrap(); }
1898     }
1899
1900     #[test]
1901     fn shared_chan_stress() {
1902         let (tx, rx) = sync_channel(0);
1903         let total = stress_factor() + 100;
1904         for _ in 0..total {
1905             let tx = tx.clone();
1906             Thread::spawn(move|| {
1907                 tx.send(()).unwrap();
1908             });
1909         }
1910
1911         for _ in 0..total {
1912             rx.recv().unwrap();
1913         }
1914     }
1915
1916     #[test]
1917     fn test_nested_recv_iter() {
1918         let (tx, rx) = sync_channel::<int>(0);
1919         let (total_tx, total_rx) = sync_channel::<int>(0);
1920
1921         let _t = Thread::spawn(move|| {
1922             let mut acc = 0;
1923             for x in rx.iter() {
1924                 acc += x;
1925             }
1926             total_tx.send(acc).unwrap();
1927         });
1928
1929         tx.send(3).unwrap();
1930         tx.send(1).unwrap();
1931         tx.send(2).unwrap();
1932         drop(tx);
1933         assert_eq!(total_rx.recv().unwrap(), 6);
1934     }
1935
1936     #[test]
1937     fn test_recv_iter_break() {
1938         let (tx, rx) = sync_channel::<int>(0);
1939         let (count_tx, count_rx) = sync_channel(0);
1940
1941         let _t = Thread::spawn(move|| {
1942             let mut count = 0;
1943             for x in rx.iter() {
1944                 if count >= 3 {
1945                     break;
1946                 } else {
1947                     count += x;
1948                 }
1949             }
1950             count_tx.send(count).unwrap();
1951         });
1952
1953         tx.send(2).unwrap();
1954         tx.send(2).unwrap();
1955         tx.send(2).unwrap();
1956         let _ = tx.try_send(2);
1957         drop(tx);
1958         assert_eq!(count_rx.recv().unwrap(), 4);
1959     }
1960
1961     #[test]
1962     fn try_recv_states() {
1963         let (tx1, rx1) = sync_channel::<int>(1);
1964         let (tx2, rx2) = sync_channel::<()>(1);
1965         let (tx3, rx3) = sync_channel::<()>(1);
1966         let _t = Thread::spawn(move|| {
1967             rx2.recv().unwrap();
1968             tx1.send(1).unwrap();
1969             tx3.send(()).unwrap();
1970             rx2.recv().unwrap();
1971             drop(tx1);
1972             tx3.send(()).unwrap();
1973         });
1974
1975         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1976         tx2.send(()).unwrap();
1977         rx3.recv().unwrap();
1978         assert_eq!(rx1.try_recv(), Ok(1));
1979         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1980         tx2.send(()).unwrap();
1981         rx3.recv().unwrap();
1982         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1983     }
1984
1985     // This bug used to end up in a livelock inside of the Receiver destructor
1986     // because the internal state of the Shared packet was corrupted
1987     #[test]
1988     fn destroy_upgraded_shared_port_when_sender_still_active() {
1989         let (tx, rx) = sync_channel::<()>(0);
1990         let (tx2, rx2) = sync_channel::<()>(0);
1991         let _t = Thread::spawn(move|| {
1992             rx.recv().unwrap(); // wait on a oneshot
1993             drop(rx);  // destroy a shared
1994             tx2.send(()).unwrap();
1995         });
1996         // make sure the other task has gone to sleep
1997         for _ in 0u..5000 { Thread::yield_now(); }
1998
1999         // upgrade to a shared chan and send a message
2000         let t = tx.clone();
2001         drop(tx);
2002         t.send(()).unwrap();
2003
2004         // wait for the child task to exit before we exit
2005         rx2.recv().unwrap();
2006     }
2007
2008     #[test]
2009     fn send1() {
2010         let (tx, rx) = sync_channel::<int>(0);
2011         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2012         assert_eq!(tx.send(1), Ok(()));
2013     }
2014
2015     #[test]
2016     fn send2() {
2017         let (tx, rx) = sync_channel::<int>(0);
2018         let _t = Thread::spawn(move|| { drop(rx); });
2019         assert!(tx.send(1).is_err());
2020     }
2021
2022     #[test]
2023     fn send3() {
2024         let (tx, rx) = sync_channel::<int>(1);
2025         assert_eq!(tx.send(1), Ok(()));
2026         let _t =Thread::spawn(move|| { drop(rx); });
2027         assert!(tx.send(1).is_err());
2028     }
2029
2030     #[test]
2031     fn send4() {
2032         let (tx, rx) = sync_channel::<int>(0);
2033         let tx2 = tx.clone();
2034         let (done, donerx) = channel();
2035         let done2 = done.clone();
2036         let _t = Thread::spawn(move|| {
2037             assert!(tx.send(1).is_err());
2038             done.send(()).unwrap();
2039         });
2040         let _t = Thread::spawn(move|| {
2041             assert!(tx2.send(2).is_err());
2042             done2.send(()).unwrap();
2043         });
2044         drop(rx);
2045         donerx.recv().unwrap();
2046         donerx.recv().unwrap();
2047     }
2048
2049     #[test]
2050     fn try_send1() {
2051         let (tx, _rx) = sync_channel::<int>(0);
2052         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2053     }
2054
2055     #[test]
2056     fn try_send2() {
2057         let (tx, _rx) = sync_channel::<int>(1);
2058         assert_eq!(tx.try_send(1), Ok(()));
2059         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2060     }
2061
2062     #[test]
2063     fn try_send3() {
2064         let (tx, rx) = sync_channel::<int>(1);
2065         assert_eq!(tx.try_send(1), Ok(()));
2066         drop(rx);
2067         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2068     }
2069
2070     #[test]
2071     fn issue_15761() {
2072         fn repro() {
2073             let (tx1, rx1) = sync_channel::<()>(3);
2074             let (tx2, rx2) = sync_channel::<()>(3);
2075
2076             let _t = Thread::spawn(move|| {
2077                 rx1.recv().unwrap();
2078                 tx2.try_send(()).unwrap();
2079             });
2080
2081             tx1.try_send(()).unwrap();
2082             rx2.recv().unwrap();
2083         }
2084
2085         for _ in 0u..100 {
2086             repro()
2087         }
2088     }
2089 }