]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/mod.rs
auto merge of #16802 : nick29581/rust/dst-bug-1, r=luqmana
[rust.git] / src / libsync / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver.  These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //!    will return a `(Sender, Receiver)` tuple where all sends will be
35 //!    **asynchronous** (they never block). The channel conceptually has an
36 //!    infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //!    is a pre-allocated buffer of a fixed size. All sends will be
41 //!    **synchronous** by blocking until there is buffer space available. Note
42 //!    that a bound of 0 is allowed, causing the channel to become a
43 //!    "rendezvous" channel where each sender atomically hands off a message to
44 //!    a receiver.
45 //!
46 //! ## Failure Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which failure is propagated among tasks.  Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without failing, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! Additionally, channels can interoperate between runtimes. If one task in a
69 //! program is running on libnative and another is running on libgreen, they can
70 //! still communicate with one another using channels.
71 //!
72 //! # Example
73 //!
74 //! Simple usage:
75 //!
76 //! ```
77 //! // Create a simple streaming channel
78 //! let (tx, rx) = channel();
79 //! spawn(proc() {
80 //!     tx.send(10i);
81 //! });
82 //! assert_eq!(rx.recv(), 10i);
83 //! ```
84 //!
85 //! Shared usage:
86 //!
87 //! ```
88 //! // Create a shared channel which can be sent along from many tasks
89 //! // where tx is the sending half (tx for transmission), and rx is the receiving
90 //! // half (rx for receiving).
91 //! let (tx, rx) = channel();
92 //! for i in range(0i, 10i) {
93 //!     let tx = tx.clone();
94 //!     spawn(proc() {
95 //!         tx.send(i);
96 //!     })
97 //! }
98 //!
99 //! for _ in range(0i, 10i) {
100 //!     let j = rx.recv();
101 //!     assert!(0 <= j && j < 10);
102 //! }
103 //! ```
104 //!
105 //! Propagating failure:
106 //!
107 //! ```should_fail
108 //! // The call to recv() will fail!() because the channel has already hung
109 //! // up (or been deallocated)
110 //! let (tx, rx) = channel::<int>();
111 //! drop(tx);
112 //! rx.recv();
113 //! ```
114 //!
115 //! Synchronous channels:
116 //!
117 //! ```
118 //! let (tx, rx) = sync_channel::<int>(0);
119 //! spawn(proc() {
120 //!     // This will wait for the parent task to start receiving
121 //!     tx.send(53);
122 //! });
123 //! rx.recv();
124 //! ```
125 //!
126 //! Reading from a channel with a timeout requires to use a Timer together
127 //! with the channel. You can use the select! macro to select either and
128 //! handle the timeout case. This first example will break out of the loop
129 //! after 10 seconds no matter what:
130 //!
131 //! ```no_run
132 //! use std::io::timer::Timer;
133 //! use std::time::Duration;
134 //!
135 //! let (tx, rx) = channel::<int>();
136 //! let mut timer = Timer::new().unwrap();
137 //! let timeout = timer.oneshot(Duration::seconds(10));
138 //!
139 //! loop {
140 //!     select! {
141 //!         val = rx.recv() => println!("Received {}", val),
142 //!         () = timeout.recv() => {
143 //!             println!("timed out, total time was more than 10 seconds")
144 //!             break;
145 //!         }
146 //!     }
147 //! }
148 //! ```
149 //!
150 //! This second example is more costly since it allocates a new timer every
151 //! time a message is received, but it allows you to timeout after the channel
152 //! has been inactive for 5 seconds:
153 //!
154 //! ```no_run
155 //! use std::io::timer::Timer;
156 //! use std::time::Duration;
157 //!
158 //! let (tx, rx) = channel::<int>();
159 //! let mut timer = Timer::new().unwrap();
160 //!
161 //! loop {
162 //!     let timeout = timer.oneshot(Duration::seconds(5));
163 //!
164 //!     select! {
165 //!         val = rx.recv() => println!("Received {}", val),
166 //!         () = timeout.recv() => {
167 //!             println!("timed out, no message received in 5 seconds")
168 //!             break;
169 //!         }
170 //!     }
171 //! }
172 //! ```
173
174 // A description of how Rust's channel implementation works
175 //
176 // Channels are supposed to be the basic building block for all other
177 // concurrent primitives that are used in Rust. As a result, the channel type
178 // needs to be highly optimized, flexible, and broad enough for use everywhere.
179 //
180 // The choice of implementation of all channels is to be built on lock-free data
181 // structures. The channels themselves are then consequently also lock-free data
182 // structures. As always with lock-free code, this is a very "here be dragons"
183 // territory, especially because I'm unaware of any academic papers which have
184 // gone into great length about channels of these flavors.
185 //
186 // ## Flavors of channels
187 //
188 // From the perspective of a consumer of this library, there is only one flavor
189 // of channel. This channel can be used as a stream and cloned to allow multiple
190 // senders. Under the hood, however, there are actually three flavors of
191 // channels in play.
192 //
193 // * Oneshots - these channels are highly optimized for the one-send use case.
194 //              They contain as few atomics as possible and involve one and
195 //              exactly one allocation.
196 // * Streams - these channels are optimized for the non-shared use case. They
197 //             use a different concurrent queue which is more tailored for this
198 //             use case. The initial allocation of this flavor of channel is not
199 //             optimized.
200 // * Shared - this is the most general form of channel that this module offers,
201 //            a channel with multiple senders. This type is as optimized as it
202 //            can be, but the previous two types mentioned are much faster for
203 //            their use-cases.
204 //
205 // ## Concurrent queues
206 //
207 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
208 // recv() obviously blocks. This means that under the hood there must be some
209 // shared and concurrent queue holding all of the actual data.
210 //
211 // With two flavors of channels, two flavors of queues are also used. We have
212 // chosen to use queues from a well-known author which are abbreviated as SPSC
213 // and MPSC (single producer, single consumer and multiple producer, single
214 // consumer). SPSC queues are used for streams while MPSC queues are used for
215 // shared channels.
216 //
217 // ### SPSC optimizations
218 //
219 // The SPSC queue found online is essentially a linked list of nodes where one
220 // half of the nodes are the "queue of data" and the other half of nodes are a
221 // cache of unused nodes. The unused nodes are used such that an allocation is
222 // not required on every push() and a free doesn't need to happen on every
223 // pop().
224 //
225 // As found online, however, the cache of nodes is of an infinite size. This
226 // means that if a channel at one point in its life had 50k items in the queue,
227 // then the queue will always have the capacity for 50k items. I believed that
228 // this was an unnecessary limitation of the implementation, so I have altered
229 // the queue to optionally have a bound on the cache size.
230 //
231 // By default, streams will have an unbounded SPSC queue with a small-ish cache
232 // size. The hope is that the cache is still large enough to have very fast
233 // send() operations while not too large such that millions of channels can
234 // coexist at once.
235 //
236 // ### MPSC optimizations
237 //
238 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
239 // a linked list under the hood to earn its unboundedness, but I have not put
240 // forth much effort into having a cache of nodes similar to the SPSC queue.
241 //
242 // For now, I believe that this is "ok" because shared channels are not the most
243 // common type, but soon we may wish to revisit this queue choice and determine
244 // another candidate for backend storage of shared channels.
245 //
246 // ## Overview of the Implementation
247 //
248 // Now that there's a little background on the concurrent queues used, it's
249 // worth going into much more detail about the channels themselves. The basic
250 // pseudocode for a send/recv are:
251 //
252 //
253 //      send(t)                             recv()
254 //        queue.push(t)                       return if queue.pop()
255 //        if increment() == -1                deschedule {
256 //          wakeup()                            if decrement() > 0
257 //                                                cancel_deschedule()
258 //                                            }
259 //                                            queue.pop()
260 //
261 // As mentioned before, there are no locks in this implementation, only atomic
262 // instructions are used.
263 //
264 // ### The internal atomic counter
265 //
266 // Every channel has a shared counter with each half to keep track of the size
267 // of the queue. This counter is used to abort descheduling by the receiver and
268 // to know when to wake up on the sending side.
269 //
270 // As seen in the pseudocode, senders will increment this count and receivers
271 // will decrement the count. The theory behind this is that if a sender sees a
272 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
273 // then it doesn't need to block.
274 //
275 // The recv() method has a beginning call to pop(), and if successful, it needs
276 // to decrement the count. It is a crucial implementation detail that this
277 // decrement does *not* happen to the shared counter. If this were the case,
278 // then it would be possible for the counter to be very negative when there were
279 // no receivers waiting, in which case the senders would have to determine when
280 // it was actually appropriate to wake up a receiver.
281 //
282 // Instead, the "steal count" is kept track of separately (not atomically
283 // because it's only used by receivers), and then the decrement() call when
284 // descheduling will lump in all of the recent steals into one large decrement.
285 //
286 // The implication of this is that if a sender sees a -1 count, then there's
287 // guaranteed to be a waiter waiting!
288 //
289 // ## Native Implementation
290 //
291 // A major goal of these channels is to work seamlessly on and off the runtime.
292 // All of the previous race conditions have been worded in terms of
293 // scheduler-isms (which is obviously not available without the runtime).
294 //
295 // For now, native usage of channels (off the runtime) will fall back onto
296 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
297 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
298 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
299 // condition variable.
300 //
301 // ## Select
302 //
303 // Being able to support selection over channels has greatly influenced this
304 // design, and not only does selection need to work inside the runtime, but also
305 // outside the runtime.
306 //
307 // The implementation is fairly straightforward. The goal of select() is not to
308 // return some data, but only to return which channel can receive data without
309 // blocking. The implementation is essentially the entire blocking procedure
310 // followed by an increment as soon as its woken up. The cancellation procedure
311 // involves an increment and swapping out of to_wake to acquire ownership of the
312 // task to unblock.
313 //
314 // Sadly this current implementation requires multiple allocations, so I have
315 // seen the throughput of select() be much worse than it should be. I do not
316 // believe that there is anything fundamental which needs to change about these
317 // channels, however, in order to support a more efficient select().
318 //
319 // # Conclusion
320 //
321 // And now that you've seen all the races that I found and attempted to fix,
322 // here's the code for you to find some more!
323
324 use core::prelude::*;
325
326 use alloc::arc::Arc;
327 use alloc::boxed::Box;
328 use core::cell::Cell;
329 use core::kinds::marker;
330 use core::mem;
331 use core::cell::UnsafeCell;
332 use rustrt::local::Local;
333 use rustrt::task::{Task, BlockedTask};
334
335 pub use comm::select::{Select, Handle};
336 pub use comm::duplex::{DuplexStream, duplex};
337
338 macro_rules! test (
339     { fn $name:ident() $b:block $(#[$a:meta])*} => (
340         mod $name {
341             #![allow(unused_imports)]
342
343             use std::prelude::*;
344
345             use native;
346             use comm::*;
347             use super::*;
348             use super::super::*;
349             use std::task;
350
351             fn f() $b
352
353             $(#[$a])* #[test] fn uv() { f() }
354             $(#[$a])* #[test] fn native() {
355                 use native;
356                 let (tx, rx) = channel();
357                 native::task::spawn(proc() { tx.send(f()) });
358                 rx.recv();
359             }
360         }
361     )
362 )
363
364 mod duplex;
365 mod oneshot;
366 mod select;
367 mod shared;
368 mod stream;
369 mod sync;
370
371 // Use a power of 2 to allow LLVM to optimize to something that's not a
372 // division, this is hit pretty regularly.
373 static RESCHED_FREQ: int = 256;
374
375 /// The receiving-half of Rust's channel type. This half can only be owned by
376 /// one task
377 #[unstable]
378 pub struct Receiver<T> {
379     inner: UnsafeCell<Flavor<T>>,
380     receives: Cell<uint>,
381     // can't share in an arc
382     marker: marker::NoSync,
383 }
384
385 /// An iterator over messages on a receiver, this iterator will block
386 /// whenever `next` is called, waiting for a new message, and `None` will be
387 /// returned when the corresponding channel has hung up.
388 #[unstable]
389 pub struct Messages<'a, T:'a> {
390     rx: &'a Receiver<T>
391 }
392
393 /// The sending-half of Rust's asynchronous channel type. This half can only be
394 /// owned by one task, but it can be cloned to send to other tasks.
395 #[unstable]
396 pub struct Sender<T> {
397     inner: UnsafeCell<Flavor<T>>,
398     sends: Cell<uint>,
399     // can't share in an arc
400     marker: marker::NoSync,
401 }
402
403 /// The sending-half of Rust's synchronous channel type. This half can only be
404 /// owned by one task, but it can be cloned to send to other tasks.
405 #[unstable = "this type may be renamed, but it will always exist"]
406 pub struct SyncSender<T> {
407     inner: Arc<UnsafeCell<sync::Packet<T>>>,
408     // can't share in an arc
409     marker: marker::NoSync,
410 }
411
412 /// This enumeration is the list of the possible reasons that try_recv could not
413 /// return data when called.
414 #[deriving(PartialEq, Clone, Show)]
415 #[experimental = "this is likely to be removed in changing try_recv()"]
416 pub enum TryRecvError {
417     /// This channel is currently empty, but the sender(s) have not yet
418     /// disconnected, so data may yet become available.
419     Empty,
420     /// This channel's sending half has become disconnected, and there will
421     /// never be any more data received on this channel
422     Disconnected,
423 }
424
425 /// This enumeration is the list of the possible error outcomes for the
426 /// `SyncSender::try_send` method.
427 #[deriving(PartialEq, Clone, Show)]
428 #[experimental = "this is likely to be removed in changing try_send()"]
429 pub enum TrySendError<T> {
430     /// The data could not be sent on the channel because it would require that
431     /// the callee block to send the data.
432     ///
433     /// If this is a buffered channel, then the buffer is full at this time. If
434     /// this is not a buffered channel, then there is no receiver available to
435     /// acquire the data.
436     Full(T),
437     /// This channel's receiving half has disconnected, so the data could not be
438     /// sent. The data is returned back to the callee in this case.
439     RecvDisconnected(T),
440 }
441
442 enum Flavor<T> {
443     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
444     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
445     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
446     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
447 }
448
449 #[doc(hidden)]
450 trait UnsafeFlavor<T> {
451     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
452     unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
453         &mut *self.inner_unsafe().get()
454     }
455     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
456         &*self.inner_unsafe().get()
457     }
458 }
459 impl<T> UnsafeFlavor<T> for Sender<T> {
460     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
461         &self.inner
462     }
463 }
464 impl<T> UnsafeFlavor<T> for Receiver<T> {
465     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
466         &self.inner
467     }
468 }
469
470 /// Creates a new asynchronous channel, returning the sender/receiver halves.
471 ///
472 /// All data sent on the sender will become available on the receiver, and no
473 /// send will block the calling task (this channel has an "infinite buffer").
474 ///
475 /// # Example
476 ///
477 /// ```
478 /// // tx is is the sending half (tx for transmission), and rx is the receiving
479 /// // half (rx for receiving).
480 /// let (tx, rx) = channel();
481 ///
482 /// // Spawn off an expensive computation
483 /// spawn(proc() {
484 /// #   fn expensive_computation() {}
485 ///     tx.send(expensive_computation());
486 /// });
487 ///
488 /// // Do some useful work for awhile
489 ///
490 /// // Let's see what that answer was
491 /// println!("{}", rx.recv());
492 /// ```
493 #[unstable]
494 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
495     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
496     (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
497 }
498
499 /// Creates a new synchronous, bounded channel.
500 ///
501 /// Like asynchronous channels, the `Receiver` will block until a message
502 /// becomes available. These channels differ greatly in the semantics of the
503 /// sender from asynchronous channels, however.
504 ///
505 /// This channel has an internal buffer on which messages will be queued. When
506 /// the internal buffer becomes full, future sends will *block* waiting for the
507 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
508 /// becomes  "rendezvous channel" where each send will not return until a recv
509 /// is paired with it.
510 ///
511 /// As with asynchronous channels, all senders will fail in `send` if the
512 /// `Receiver` has been destroyed.
513 ///
514 /// # Example
515 ///
516 /// ```
517 /// let (tx, rx) = sync_channel(1);
518 ///
519 /// // this returns immediately
520 /// tx.send(1i);
521 ///
522 /// spawn(proc() {
523 ///     // this will block until the previous message has been received
524 ///     tx.send(2i);
525 /// });
526 ///
527 /// assert_eq!(rx.recv(), 1i);
528 /// assert_eq!(rx.recv(), 2i);
529 /// ```
530 #[unstable = "this function may be renamed to more accurately reflect the type \
531               of channel that is is creating"]
532 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
533     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
534     (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
535 }
536
537 ////////////////////////////////////////////////////////////////////////////////
538 // Sender
539 ////////////////////////////////////////////////////////////////////////////////
540
541 impl<T: Send> Sender<T> {
542     fn new(inner: Flavor<T>) -> Sender<T> {
543         Sender {
544             inner: UnsafeCell::new(inner),
545             sends: Cell::new(0),
546             marker: marker::NoSync,
547         }
548     }
549
550     /// Sends a value along this channel to be received by the corresponding
551     /// receiver.
552     ///
553     /// Rust channels are infinitely buffered so this method will never block.
554     ///
555     /// # Failure
556     ///
557     /// This function will fail if the other end of the channel has hung up.
558     /// This means that if the corresponding receiver has fallen out of scope,
559     /// this function will trigger a fail message saying that a message is
560     /// being sent on a closed channel.
561     ///
562     /// Note that if this function does *not* fail, it does not mean that the
563     /// data will be successfully received. All sends are placed into a queue,
564     /// so it is possible for a send to succeed (the other end is alive), but
565     /// then the other end could immediately disconnect.
566     ///
567     /// The purpose of this functionality is to propagate failure among tasks.
568     /// If failure is not desired, then consider using the `send_opt` method
569     #[experimental = "this function is being considered candidate for removal \
570                       to adhere to the general guidelines of rust"]
571     pub fn send(&self, t: T) {
572         if self.send_opt(t).is_err() {
573             fail!("sending on a closed channel");
574         }
575     }
576
577     /// Attempts to send a value on this channel, returning it back if it could
578     /// not be sent.
579     ///
580     /// A successful send occurs when it is determined that the other end of
581     /// the channel has not hung up already. An unsuccessful send would be one
582     /// where the corresponding receiver has already been deallocated. Note
583     /// that a return value of `Err` means that the data will never be
584     /// received, but a return value of `Ok` does *not* mean that the data
585     /// will be received.  It is possible for the corresponding receiver to
586     /// hang up immediately after this function returns `Ok`.
587     ///
588     /// Like `send`, this method will never block.
589     ///
590     /// # Failure
591     ///
592     /// This method will never fail, it will return the message back to the
593     /// caller if the other end is disconnected
594     ///
595     /// # Example
596     ///
597     /// ```
598     /// let (tx, rx) = channel();
599     ///
600     /// // This send is always successful
601     /// assert_eq!(tx.send_opt(1i), Ok(()));
602     ///
603     /// // This send will fail because the receiver is gone
604     /// drop(rx);
605     /// assert_eq!(tx.send_opt(1i), Err(1));
606     /// ```
607     #[unstable = "this function may be renamed to send() in the future"]
608     pub fn send_opt(&self, t: T) -> Result<(), T> {
609         // In order to prevent starvation of other tasks in situations where
610         // a task sends repeatedly without ever receiving, we occasionally
611         // yield instead of doing a send immediately.
612         //
613         // Don't unconditionally attempt to yield because the TLS overhead can
614         // be a bit much, and also use `try_take` instead of `take` because
615         // there's no reason that this send shouldn't be usable off the
616         // runtime.
617         let cnt = self.sends.get() + 1;
618         self.sends.set(cnt);
619         if cnt % (RESCHED_FREQ as uint) == 0 {
620             let task: Option<Box<Task>> = Local::try_take();
621             task.map(|t| t.maybe_yield());
622         }
623
624         let (new_inner, ret) = match *unsafe { self.inner() } {
625             Oneshot(ref p) => {
626                 unsafe {
627                     let p = p.get();
628                     if !(*p).sent() {
629                         return (*p).send(t);
630                     } else {
631                         let a = Arc::new(UnsafeCell::new(stream::Packet::new()));
632                         match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
633                             oneshot::UpSuccess => {
634                                 let ret = (*a.get()).send(t);
635                                 (a, ret)
636                             }
637                             oneshot::UpDisconnected => (a, Err(t)),
638                             oneshot::UpWoke(task) => {
639                                 // This send cannot fail because the task is
640                                 // asleep (we're looking at it), so the receiver
641                                 // can't go away.
642                                 (*a.get()).send(t).ok().unwrap();
643                                 task.wake().map(|t| t.reawaken());
644                                 (a, Ok(()))
645                             }
646                         }
647                     }
648                 }
649             }
650             Stream(ref p) => return unsafe { (*p.get()).send(t) },
651             Shared(ref p) => return unsafe { (*p.get()).send(t) },
652             Sync(..) => unreachable!(),
653         };
654
655         unsafe {
656             let tmp = Sender::new(Stream(new_inner));
657             mem::swap(self.mut_inner(), tmp.mut_inner());
658         }
659         return ret;
660     }
661 }
662
663 #[unstable]
664 impl<T: Send> Clone for Sender<T> {
665     fn clone(&self) -> Sender<T> {
666         let (packet, sleeper) = match *unsafe { self.inner() } {
667             Oneshot(ref p) => {
668                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
669                 unsafe {
670                     (*a.get()).postinit_lock();
671                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
672                         oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
673                         oneshot::UpWoke(task) => (a, Some(task))
674                     }
675                 }
676             }
677             Stream(ref p) => {
678                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
679                 unsafe {
680                     (*a.get()).postinit_lock();
681                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
682                         stream::UpSuccess | stream::UpDisconnected => (a, None),
683                         stream::UpWoke(task) => (a, Some(task)),
684                     }
685                 }
686             }
687             Shared(ref p) => {
688                 unsafe { (*p.get()).clone_chan(); }
689                 return Sender::new(Shared(p.clone()));
690             }
691             Sync(..) => unreachable!(),
692         };
693
694         unsafe {
695             (*packet.get()).inherit_blocker(sleeper);
696
697             let tmp = Sender::new(Shared(packet.clone()));
698             mem::swap(self.mut_inner(), tmp.mut_inner());
699         }
700         Sender::new(Shared(packet))
701     }
702 }
703
704 #[unsafe_destructor]
705 impl<T: Send> Drop for Sender<T> {
706     fn drop(&mut self) {
707         match *unsafe { self.mut_inner() } {
708             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
709             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
710             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
711             Sync(..) => unreachable!(),
712         }
713     }
714 }
715
716 ////////////////////////////////////////////////////////////////////////////////
717 // SyncSender
718 ////////////////////////////////////////////////////////////////////////////////
719
720 impl<T: Send> SyncSender<T> {
721     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
722         SyncSender { inner: inner, marker: marker::NoSync }
723     }
724
725     /// Sends a value on this synchronous channel.
726     ///
727     /// This function will *block* until space in the internal buffer becomes
728     /// available or a receiver is available to hand off the message to.
729     ///
730     /// Note that a successful send does *not* guarantee that the receiver will
731     /// ever see the data if there is a buffer on this channel. Messages may be
732     /// enqueued in the internal buffer for the receiver to receive at a later
733     /// time. If the buffer size is 0, however, it can be guaranteed that the
734     /// receiver has indeed received the data if this function returns success.
735     ///
736     /// # Failure
737     ///
738     /// Similarly to `Sender::send`, this function will fail if the
739     /// corresponding `Receiver` for this channel has disconnected. This
740     /// behavior is used to propagate failure among tasks.
741     ///
742     /// If failure is not desired, you can achieve the same semantics with the
743     /// `SyncSender::send_opt` method which will not fail if the receiver
744     /// disconnects.
745     #[experimental = "this function is being considered candidate for removal \
746                       to adhere to the general guidelines of rust"]
747     pub fn send(&self, t: T) {
748         if self.send_opt(t).is_err() {
749             fail!("sending on a closed channel");
750         }
751     }
752
753     /// Send a value on a channel, returning it back if the receiver
754     /// disconnected
755     ///
756     /// This method will *block* to send the value `t` on the channel, but if
757     /// the value could not be sent due to the receiver disconnecting, the value
758     /// is returned back to the callee. This function is similar to `try_send`,
759     /// except that it will block if the channel is currently full.
760     ///
761     /// # Failure
762     ///
763     /// This function cannot fail.
764     #[unstable = "this function may be renamed to send() in the future"]
765     pub fn send_opt(&self, t: T) -> Result<(), T> {
766         unsafe { (*self.inner.get()).send(t) }
767     }
768
769     /// Attempts to send a value on this channel without blocking.
770     ///
771     /// This method differs from `send_opt` by returning immediately if the
772     /// channel's buffer is full or no receiver is waiting to acquire some
773     /// data. Compared with `send_opt`, this function has two failure cases
774     /// instead of one (one for disconnection, one for a full buffer).
775     ///
776     /// See `SyncSender::send` for notes about guarantees of whether the
777     /// receiver has received the data or not if this function is successful.
778     ///
779     /// # Failure
780     ///
781     /// This function cannot fail
782     #[unstable = "the return type of this function is candidate for \
783                   modification"]
784     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
785         unsafe { (*self.inner.get()).try_send(t) }
786     }
787 }
788
789 #[unstable]
790 impl<T: Send> Clone for SyncSender<T> {
791     fn clone(&self) -> SyncSender<T> {
792         unsafe { (*self.inner.get()).clone_chan(); }
793         return SyncSender::new(self.inner.clone());
794     }
795 }
796
797 #[unsafe_destructor]
798 impl<T: Send> Drop for SyncSender<T> {
799     fn drop(&mut self) {
800         unsafe { (*self.inner.get()).drop_chan(); }
801     }
802 }
803
804 ////////////////////////////////////////////////////////////////////////////////
805 // Receiver
806 ////////////////////////////////////////////////////////////////////////////////
807
808 impl<T: Send> Receiver<T> {
809     fn new(inner: Flavor<T>) -> Receiver<T> {
810         Receiver { inner: UnsafeCell::new(inner), receives: Cell::new(0), marker: marker::NoSync }
811     }
812
813     /// Blocks waiting for a value on this receiver
814     ///
815     /// This function will block if necessary to wait for a corresponding send
816     /// on the channel from its paired `Sender` structure. This receiver will
817     /// be woken up when data is ready, and the data will be returned.
818     ///
819     /// # Failure
820     ///
821     /// Similar to channels, this method will trigger a task failure if the
822     /// other end of the channel has hung up (been deallocated). The purpose of
823     /// this is to propagate failure among tasks.
824     ///
825     /// If failure is not desired, then there are two options:
826     ///
827     /// * If blocking is still desired, the `recv_opt` method will return `None`
828     ///   when the other end hangs up
829     ///
830     /// * If blocking is not desired, then the `try_recv` method will attempt to
831     ///   peek at a value on this receiver.
832     #[experimental = "this function is being considered candidate for removal \
833                       to adhere to the general guidelines of rust"]
834     pub fn recv(&self) -> T {
835         match self.recv_opt() {
836             Ok(t) => t,
837             Err(()) => fail!("receiving on a closed channel"),
838         }
839     }
840
841     /// Attempts to return a pending value on this receiver without blocking
842     ///
843     /// This method will never block the caller in order to wait for data to
844     /// become available. Instead, this will always return immediately with a
845     /// possible option of pending data on the channel.
846     ///
847     /// This is useful for a flavor of "optimistic check" before deciding to
848     /// block on a receiver.
849     ///
850     /// This function cannot fail.
851     #[unstable = "the return type of this function may be altered"]
852     pub fn try_recv(&self) -> Result<T, TryRecvError> {
853         // If a thread is spinning in try_recv, we should take the opportunity
854         // to reschedule things occasionally. See notes above in scheduling on
855         // sends for why this doesn't always hit TLS, and also for why this uses
856         // `try_take` instead of `take`.
857         let cnt = self.receives.get() + 1;
858         self.receives.set(cnt);
859         if cnt % (RESCHED_FREQ as uint) == 0 {
860             let task: Option<Box<Task>> = Local::try_take();
861             task.map(|t| t.maybe_yield());
862         }
863
864         loop {
865             let new_port = match *unsafe { self.inner() } {
866                 Oneshot(ref p) => {
867                     match unsafe { (*p.get()).try_recv() } {
868                         Ok(t) => return Ok(t),
869                         Err(oneshot::Empty) => return Err(Empty),
870                         Err(oneshot::Disconnected) => return Err(Disconnected),
871                         Err(oneshot::Upgraded(rx)) => rx,
872                     }
873                 }
874                 Stream(ref p) => {
875                     match unsafe { (*p.get()).try_recv() } {
876                         Ok(t) => return Ok(t),
877                         Err(stream::Empty) => return Err(Empty),
878                         Err(stream::Disconnected) => return Err(Disconnected),
879                         Err(stream::Upgraded(rx)) => rx,
880                     }
881                 }
882                 Shared(ref p) => {
883                     match unsafe { (*p.get()).try_recv() } {
884                         Ok(t) => return Ok(t),
885                         Err(shared::Empty) => return Err(Empty),
886                         Err(shared::Disconnected) => return Err(Disconnected),
887                     }
888                 }
889                 Sync(ref p) => {
890                     match unsafe { (*p.get()).try_recv() } {
891                         Ok(t) => return Ok(t),
892                         Err(sync::Empty) => return Err(Empty),
893                         Err(sync::Disconnected) => return Err(Disconnected),
894                     }
895                 }
896             };
897             unsafe {
898                 mem::swap(self.mut_inner(),
899                           new_port.mut_inner());
900             }
901         }
902     }
903
904     /// Attempt to wait for a value on this receiver, but does not fail if the
905     /// corresponding channel has hung up.
906     ///
907     /// This implementation of iterators for ports will always block if there is
908     /// not data available on the receiver, but it will not fail in the case
909     /// that the channel has been deallocated.
910     ///
911     /// In other words, this function has the same semantics as the `recv`
912     /// method except for the failure aspect.
913     ///
914     /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
915     /// the value found on the receiver is returned.
916     #[unstable = "this function may be renamed to recv()"]
917     pub fn recv_opt(&self) -> Result<T, ()> {
918         loop {
919             let new_port = match *unsafe { self.inner() } {
920                 Oneshot(ref p) => {
921                     match unsafe { (*p.get()).recv() } {
922                         Ok(t) => return Ok(t),
923                         Err(oneshot::Empty) => return unreachable!(),
924                         Err(oneshot::Disconnected) => return Err(()),
925                         Err(oneshot::Upgraded(rx)) => rx,
926                     }
927                 }
928                 Stream(ref p) => {
929                     match unsafe { (*p.get()).recv() } {
930                         Ok(t) => return Ok(t),
931                         Err(stream::Empty) => return unreachable!(),
932                         Err(stream::Disconnected) => return Err(()),
933                         Err(stream::Upgraded(rx)) => rx,
934                     }
935                 }
936                 Shared(ref p) => {
937                     match unsafe { (*p.get()).recv() } {
938                         Ok(t) => return Ok(t),
939                         Err(shared::Empty) => return unreachable!(),
940                         Err(shared::Disconnected) => return Err(()),
941                     }
942                 }
943                 Sync(ref p) => return unsafe { (*p.get()).recv() }
944             };
945             unsafe {
946                 mem::swap(self.mut_inner(), new_port.mut_inner());
947             }
948         }
949     }
950
951     /// Returns an iterator which will block waiting for messages, but never
952     /// `fail!`. It will return `None` when the channel has hung up.
953     #[unstable]
954     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
955         Messages { rx: self }
956     }
957 }
958
959 impl<T: Send> select::Packet for Receiver<T> {
960     fn can_recv(&self) -> bool {
961         loop {
962             let new_port = match *unsafe { self.inner() } {
963                 Oneshot(ref p) => {
964                     match unsafe { (*p.get()).can_recv() } {
965                         Ok(ret) => return ret,
966                         Err(upgrade) => upgrade,
967                     }
968                 }
969                 Stream(ref p) => {
970                     match unsafe { (*p.get()).can_recv() } {
971                         Ok(ret) => return ret,
972                         Err(upgrade) => upgrade,
973                     }
974                 }
975                 Shared(ref p) => {
976                     return unsafe { (*p.get()).can_recv() };
977                 }
978                 Sync(ref p) => {
979                     return unsafe { (*p.get()).can_recv() };
980                 }
981             };
982             unsafe {
983                 mem::swap(self.mut_inner(),
984                           new_port.mut_inner());
985             }
986         }
987     }
988
989     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
990         loop {
991             let (t, new_port) = match *unsafe { self.inner() } {
992                 Oneshot(ref p) => {
993                     match unsafe { (*p.get()).start_selection(task) } {
994                         oneshot::SelSuccess => return Ok(()),
995                         oneshot::SelCanceled(task) => return Err(task),
996                         oneshot::SelUpgraded(t, rx) => (t, rx),
997                     }
998                 }
999                 Stream(ref p) => {
1000                     match unsafe { (*p.get()).start_selection(task) } {
1001                         stream::SelSuccess => return Ok(()),
1002                         stream::SelCanceled(task) => return Err(task),
1003                         stream::SelUpgraded(t, rx) => (t, rx),
1004                     }
1005                 }
1006                 Shared(ref p) => {
1007                     return unsafe { (*p.get()).start_selection(task) };
1008                 }
1009                 Sync(ref p) => {
1010                     return unsafe { (*p.get()).start_selection(task) };
1011                 }
1012             };
1013             task = t;
1014             unsafe {
1015                 mem::swap(self.mut_inner(),
1016                           new_port.mut_inner());
1017             }
1018         }
1019     }
1020
1021     fn abort_selection(&self) -> bool {
1022         let mut was_upgrade = false;
1023         loop {
1024             let result = match *unsafe { self.inner() } {
1025                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
1026                 Stream(ref p) => unsafe {
1027                     (*p.get()).abort_selection(was_upgrade)
1028                 },
1029                 Shared(ref p) => return unsafe {
1030                     (*p.get()).abort_selection(was_upgrade)
1031                 },
1032                 Sync(ref p) => return unsafe {
1033                     (*p.get()).abort_selection()
1034                 },
1035             };
1036             let new_port = match result { Ok(b) => return b, Err(p) => p };
1037             was_upgrade = true;
1038             unsafe {
1039                 mem::swap(self.mut_inner(),
1040                           new_port.mut_inner());
1041             }
1042         }
1043     }
1044 }
1045
1046 #[unstable]
1047 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
1048     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
1049 }
1050
1051 #[unsafe_destructor]
1052 impl<T: Send> Drop for Receiver<T> {
1053     fn drop(&mut self) {
1054         match *unsafe { self.mut_inner() } {
1055             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1056             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1057             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1058             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1059         }
1060     }
1061 }
1062
1063 #[cfg(test)]
1064 mod test {
1065     use std::prelude::*;
1066
1067     use native;
1068     use std::os;
1069     use super::*;
1070
1071     pub fn stress_factor() -> uint {
1072         match os::getenv("RUST_TEST_STRESS") {
1073             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1074             None => 1,
1075         }
1076     }
1077
1078     test!(fn smoke() {
1079         let (tx, rx) = channel::<int>();
1080         tx.send(1);
1081         assert_eq!(rx.recv(), 1);
1082     })
1083
1084     test!(fn drop_full() {
1085         let (tx, _rx) = channel();
1086         tx.send(box 1i);
1087     })
1088
1089     test!(fn drop_full_shared() {
1090         let (tx, _rx) = channel();
1091         drop(tx.clone());
1092         drop(tx.clone());
1093         tx.send(box 1i);
1094     })
1095
1096     test!(fn smoke_shared() {
1097         let (tx, rx) = channel::<int>();
1098         tx.send(1);
1099         assert_eq!(rx.recv(), 1);
1100         let tx = tx.clone();
1101         tx.send(1);
1102         assert_eq!(rx.recv(), 1);
1103     })
1104
1105     test!(fn smoke_threads() {
1106         let (tx, rx) = channel::<int>();
1107         spawn(proc() {
1108             tx.send(1);
1109         });
1110         assert_eq!(rx.recv(), 1);
1111     })
1112
1113     test!(fn smoke_port_gone() {
1114         let (tx, rx) = channel::<int>();
1115         drop(rx);
1116         tx.send(1);
1117     } #[should_fail])
1118
1119     test!(fn smoke_shared_port_gone() {
1120         let (tx, rx) = channel::<int>();
1121         drop(rx);
1122         tx.send(1);
1123     } #[should_fail])
1124
1125     test!(fn smoke_shared_port_gone2() {
1126         let (tx, rx) = channel::<int>();
1127         drop(rx);
1128         let tx2 = tx.clone();
1129         drop(tx);
1130         tx2.send(1);
1131     } #[should_fail])
1132
1133     test!(fn port_gone_concurrent() {
1134         let (tx, rx) = channel::<int>();
1135         spawn(proc() {
1136             rx.recv();
1137         });
1138         loop { tx.send(1) }
1139     } #[should_fail])
1140
1141     test!(fn port_gone_concurrent_shared() {
1142         let (tx, rx) = channel::<int>();
1143         let tx2 = tx.clone();
1144         spawn(proc() {
1145             rx.recv();
1146         });
1147         loop {
1148             tx.send(1);
1149             tx2.send(1);
1150         }
1151     } #[should_fail])
1152
1153     test!(fn smoke_chan_gone() {
1154         let (tx, rx) = channel::<int>();
1155         drop(tx);
1156         rx.recv();
1157     } #[should_fail])
1158
1159     test!(fn smoke_chan_gone_shared() {
1160         let (tx, rx) = channel::<()>();
1161         let tx2 = tx.clone();
1162         drop(tx);
1163         drop(tx2);
1164         rx.recv();
1165     } #[should_fail])
1166
1167     test!(fn chan_gone_concurrent() {
1168         let (tx, rx) = channel::<int>();
1169         spawn(proc() {
1170             tx.send(1);
1171             tx.send(1);
1172         });
1173         loop { rx.recv(); }
1174     } #[should_fail])
1175
1176     test!(fn stress() {
1177         let (tx, rx) = channel::<int>();
1178         spawn(proc() {
1179             for _ in range(0u, 10000) { tx.send(1i); }
1180         });
1181         for _ in range(0u, 10000) {
1182             assert_eq!(rx.recv(), 1);
1183         }
1184     })
1185
1186     test!(fn stress_shared() {
1187         static AMT: uint = 10000;
1188         static NTHREADS: uint = 8;
1189         let (tx, rx) = channel::<int>();
1190         let (dtx, drx) = channel::<()>();
1191
1192         spawn(proc() {
1193             for _ in range(0, AMT * NTHREADS) {
1194                 assert_eq!(rx.recv(), 1);
1195             }
1196             match rx.try_recv() {
1197                 Ok(..) => fail!(),
1198                 _ => {}
1199             }
1200             dtx.send(());
1201         });
1202
1203         for _ in range(0, NTHREADS) {
1204             let tx = tx.clone();
1205             spawn(proc() {
1206                 for _ in range(0, AMT) { tx.send(1); }
1207             });
1208         }
1209         drop(tx);
1210         drx.recv();
1211     })
1212
1213     #[test]
1214     fn send_from_outside_runtime() {
1215         let (tx1, rx1) = channel::<()>();
1216         let (tx2, rx2) = channel::<int>();
1217         let (tx3, rx3) = channel::<()>();
1218         let tx4 = tx3.clone();
1219         spawn(proc() {
1220             tx1.send(());
1221             for _ in range(0i, 40) {
1222                 assert_eq!(rx2.recv(), 1);
1223             }
1224             tx3.send(());
1225         });
1226         rx1.recv();
1227         native::task::spawn(proc() {
1228             for _ in range(0i, 40) {
1229                 tx2.send(1);
1230             }
1231             tx4.send(());
1232         });
1233         rx3.recv();
1234         rx3.recv();
1235     }
1236
1237     #[test]
1238     fn recv_from_outside_runtime() {
1239         let (tx, rx) = channel::<int>();
1240         let (dtx, drx) = channel();
1241         native::task::spawn(proc() {
1242             for _ in range(0i, 40) {
1243                 assert_eq!(rx.recv(), 1);
1244             }
1245             dtx.send(());
1246         });
1247         for _ in range(0u, 40) {
1248             tx.send(1);
1249         }
1250         drx.recv();
1251     }
1252
1253     #[test]
1254     fn no_runtime() {
1255         let (tx1, rx1) = channel::<int>();
1256         let (tx2, rx2) = channel::<int>();
1257         let (tx3, rx3) = channel::<()>();
1258         let tx4 = tx3.clone();
1259         native::task::spawn(proc() {
1260             assert_eq!(rx1.recv(), 1);
1261             tx2.send(2);
1262             tx4.send(());
1263         });
1264         native::task::spawn(proc() {
1265             tx1.send(1);
1266             assert_eq!(rx2.recv(), 2);
1267             tx3.send(());
1268         });
1269         rx3.recv();
1270         rx3.recv();
1271     }
1272
1273     test!(fn oneshot_single_thread_close_port_first() {
1274         // Simple test of closing without sending
1275         let (_tx, rx) = channel::<int>();
1276         drop(rx);
1277     })
1278
1279     test!(fn oneshot_single_thread_close_chan_first() {
1280         // Simple test of closing without sending
1281         let (tx, _rx) = channel::<int>();
1282         drop(tx);
1283     })
1284
1285     test!(fn oneshot_single_thread_send_port_close() {
1286         // Testing that the sender cleans up the payload if receiver is closed
1287         let (tx, rx) = channel::<Box<int>>();
1288         drop(rx);
1289         tx.send(box 0);
1290     } #[should_fail])
1291
1292     test!(fn oneshot_single_thread_recv_chan_close() {
1293         // Receiving on a closed chan will fail
1294         let res = task::try(proc() {
1295             let (tx, rx) = channel::<int>();
1296             drop(tx);
1297             rx.recv();
1298         });
1299         // What is our res?
1300         assert!(res.is_err());
1301     })
1302
1303     test!(fn oneshot_single_thread_send_then_recv() {
1304         let (tx, rx) = channel::<Box<int>>();
1305         tx.send(box 10);
1306         assert!(rx.recv() == box 10);
1307     })
1308
1309     test!(fn oneshot_single_thread_try_send_open() {
1310         let (tx, rx) = channel::<int>();
1311         assert!(tx.send_opt(10).is_ok());
1312         assert!(rx.recv() == 10);
1313     })
1314
1315     test!(fn oneshot_single_thread_try_send_closed() {
1316         let (tx, rx) = channel::<int>();
1317         drop(rx);
1318         assert!(tx.send_opt(10).is_err());
1319     })
1320
1321     test!(fn oneshot_single_thread_try_recv_open() {
1322         let (tx, rx) = channel::<int>();
1323         tx.send(10);
1324         assert!(rx.recv_opt() == Ok(10));
1325     })
1326
1327     test!(fn oneshot_single_thread_try_recv_closed() {
1328         let (tx, rx) = channel::<int>();
1329         drop(tx);
1330         assert!(rx.recv_opt() == Err(()));
1331     })
1332
1333     test!(fn oneshot_single_thread_peek_data() {
1334         let (tx, rx) = channel::<int>();
1335         assert_eq!(rx.try_recv(), Err(Empty))
1336         tx.send(10);
1337         assert_eq!(rx.try_recv(), Ok(10));
1338     })
1339
1340     test!(fn oneshot_single_thread_peek_close() {
1341         let (tx, rx) = channel::<int>();
1342         drop(tx);
1343         assert_eq!(rx.try_recv(), Err(Disconnected));
1344         assert_eq!(rx.try_recv(), Err(Disconnected));
1345     })
1346
1347     test!(fn oneshot_single_thread_peek_open() {
1348         let (_tx, rx) = channel::<int>();
1349         assert_eq!(rx.try_recv(), Err(Empty));
1350     })
1351
1352     test!(fn oneshot_multi_task_recv_then_send() {
1353         let (tx, rx) = channel::<Box<int>>();
1354         spawn(proc() {
1355             assert!(rx.recv() == box 10);
1356         });
1357
1358         tx.send(box 10);
1359     })
1360
1361     test!(fn oneshot_multi_task_recv_then_close() {
1362         let (tx, rx) = channel::<Box<int>>();
1363         spawn(proc() {
1364             drop(tx);
1365         });
1366         let res = task::try(proc() {
1367             assert!(rx.recv() == box 10);
1368         });
1369         assert!(res.is_err());
1370     })
1371
1372     test!(fn oneshot_multi_thread_close_stress() {
1373         for _ in range(0, stress_factor()) {
1374             let (tx, rx) = channel::<int>();
1375             spawn(proc() {
1376                 drop(rx);
1377             });
1378             drop(tx);
1379         }
1380     })
1381
1382     test!(fn oneshot_multi_thread_send_close_stress() {
1383         for _ in range(0, stress_factor()) {
1384             let (tx, rx) = channel::<int>();
1385             spawn(proc() {
1386                 drop(rx);
1387             });
1388             let _ = task::try(proc() {
1389                 tx.send(1);
1390             });
1391         }
1392     })
1393
1394     test!(fn oneshot_multi_thread_recv_close_stress() {
1395         for _ in range(0, stress_factor()) {
1396             let (tx, rx) = channel::<int>();
1397             spawn(proc() {
1398                 let res = task::try(proc() {
1399                     rx.recv();
1400                 });
1401                 assert!(res.is_err());
1402             });
1403             spawn(proc() {
1404                 spawn(proc() {
1405                     drop(tx);
1406                 });
1407             });
1408         }
1409     })
1410
1411     test!(fn oneshot_multi_thread_send_recv_stress() {
1412         for _ in range(0, stress_factor()) {
1413             let (tx, rx) = channel();
1414             spawn(proc() {
1415                 tx.send(box 10i);
1416             });
1417             spawn(proc() {
1418                 assert!(rx.recv() == box 10i);
1419             });
1420         }
1421     })
1422
1423     test!(fn stream_send_recv_stress() {
1424         for _ in range(0, stress_factor()) {
1425             let (tx, rx) = channel();
1426
1427             send(tx, 0);
1428             recv(rx, 0);
1429
1430             fn send(tx: Sender<Box<int>>, i: int) {
1431                 if i == 10 { return }
1432
1433                 spawn(proc() {
1434                     tx.send(box i);
1435                     send(tx, i + 1);
1436                 });
1437             }
1438
1439             fn recv(rx: Receiver<Box<int>>, i: int) {
1440                 if i == 10 { return }
1441
1442                 spawn(proc() {
1443                     assert!(rx.recv() == box i);
1444                     recv(rx, i + 1);
1445                 });
1446             }
1447         }
1448     })
1449
1450     test!(fn recv_a_lot() {
1451         // Regression test that we don't run out of stack in scheduler context
1452         let (tx, rx) = channel();
1453         for _ in range(0i, 10000) { tx.send(()); }
1454         for _ in range(0i, 10000) { rx.recv(); }
1455     })
1456
1457     test!(fn shared_chan_stress() {
1458         let (tx, rx) = channel();
1459         let total = stress_factor() + 100;
1460         for _ in range(0, total) {
1461             let tx = tx.clone();
1462             spawn(proc() {
1463                 tx.send(());
1464             });
1465         }
1466
1467         for _ in range(0, total) {
1468             rx.recv();
1469         }
1470     })
1471
1472     test!(fn test_nested_recv_iter() {
1473         let (tx, rx) = channel::<int>();
1474         let (total_tx, total_rx) = channel::<int>();
1475
1476         spawn(proc() {
1477             let mut acc = 0;
1478             for x in rx.iter() {
1479                 acc += x;
1480             }
1481             total_tx.send(acc);
1482         });
1483
1484         tx.send(3);
1485         tx.send(1);
1486         tx.send(2);
1487         drop(tx);
1488         assert_eq!(total_rx.recv(), 6);
1489     })
1490
1491     test!(fn test_recv_iter_break() {
1492         let (tx, rx) = channel::<int>();
1493         let (count_tx, count_rx) = channel();
1494
1495         spawn(proc() {
1496             let mut count = 0;
1497             for x in rx.iter() {
1498                 if count >= 3 {
1499                     break;
1500                 } else {
1501                     count += x;
1502                 }
1503             }
1504             count_tx.send(count);
1505         });
1506
1507         tx.send(2);
1508         tx.send(2);
1509         tx.send(2);
1510         let _ = tx.send_opt(2);
1511         drop(tx);
1512         assert_eq!(count_rx.recv(), 4);
1513     })
1514
1515     test!(fn try_recv_states() {
1516         let (tx1, rx1) = channel::<int>();
1517         let (tx2, rx2) = channel::<()>();
1518         let (tx3, rx3) = channel::<()>();
1519         spawn(proc() {
1520             rx2.recv();
1521             tx1.send(1);
1522             tx3.send(());
1523             rx2.recv();
1524             drop(tx1);
1525             tx3.send(());
1526         });
1527
1528         assert_eq!(rx1.try_recv(), Err(Empty));
1529         tx2.send(());
1530         rx3.recv();
1531         assert_eq!(rx1.try_recv(), Ok(1));
1532         assert_eq!(rx1.try_recv(), Err(Empty));
1533         tx2.send(());
1534         rx3.recv();
1535         assert_eq!(rx1.try_recv(), Err(Disconnected));
1536     })
1537
1538     // This bug used to end up in a livelock inside of the Receiver destructor
1539     // because the internal state of the Shared packet was corrupted
1540     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1541         let (tx, rx) = channel();
1542         let (tx2, rx2) = channel();
1543         spawn(proc() {
1544             rx.recv(); // wait on a oneshot
1545             drop(rx);  // destroy a shared
1546             tx2.send(());
1547         });
1548         // make sure the other task has gone to sleep
1549         for _ in range(0u, 5000) { task::deschedule(); }
1550
1551         // upgrade to a shared chan and send a message
1552         let t = tx.clone();
1553         drop(tx);
1554         t.send(());
1555
1556         // wait for the child task to exit before we exit
1557         rx2.recv();
1558     })
1559
1560     test!(fn sends_off_the_runtime() {
1561         use std::rt::thread::Thread;
1562
1563         let (tx, rx) = channel();
1564         let t = Thread::start(proc() {
1565             for _ in range(0u, 1000) {
1566                 tx.send(());
1567             }
1568         });
1569         for _ in range(0u, 1000) {
1570             rx.recv();
1571         }
1572         t.join();
1573     })
1574
1575     test!(fn try_recvs_off_the_runtime() {
1576         use std::rt::thread::Thread;
1577
1578         let (tx, rx) = channel();
1579         let (cdone, pdone) = channel();
1580         let t = Thread::start(proc() {
1581             let mut hits = 0u;
1582             while hits < 10 {
1583                 match rx.try_recv() {
1584                     Ok(()) => { hits += 1; }
1585                     Err(Empty) => { Thread::yield_now(); }
1586                     Err(Disconnected) => return,
1587                 }
1588             }
1589             cdone.send(());
1590         });
1591         for _ in range(0u, 10) {
1592             tx.send(());
1593         }
1594         t.join();
1595         pdone.recv();
1596     })
1597 }
1598
1599 #[cfg(test)]
1600 mod sync_tests {
1601     use std::prelude::*;
1602     use std::os;
1603
1604     pub fn stress_factor() -> uint {
1605         match os::getenv("RUST_TEST_STRESS") {
1606             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1607             None => 1,
1608         }
1609     }
1610
1611     test!(fn smoke() {
1612         let (tx, rx) = sync_channel::<int>(1);
1613         tx.send(1);
1614         assert_eq!(rx.recv(), 1);
1615     })
1616
1617     test!(fn drop_full() {
1618         let (tx, _rx) = sync_channel(1);
1619         tx.send(box 1i);
1620     })
1621
1622     test!(fn smoke_shared() {
1623         let (tx, rx) = sync_channel::<int>(1);
1624         tx.send(1);
1625         assert_eq!(rx.recv(), 1);
1626         let tx = tx.clone();
1627         tx.send(1);
1628         assert_eq!(rx.recv(), 1);
1629     })
1630
1631     test!(fn smoke_threads() {
1632         let (tx, rx) = sync_channel::<int>(0);
1633         spawn(proc() {
1634             tx.send(1);
1635         });
1636         assert_eq!(rx.recv(), 1);
1637     })
1638
1639     test!(fn smoke_port_gone() {
1640         let (tx, rx) = sync_channel::<int>(0);
1641         drop(rx);
1642         tx.send(1);
1643     } #[should_fail])
1644
1645     test!(fn smoke_shared_port_gone2() {
1646         let (tx, rx) = sync_channel::<int>(0);
1647         drop(rx);
1648         let tx2 = tx.clone();
1649         drop(tx);
1650         tx2.send(1);
1651     } #[should_fail])
1652
1653     test!(fn port_gone_concurrent() {
1654         let (tx, rx) = sync_channel::<int>(0);
1655         spawn(proc() {
1656             rx.recv();
1657         });
1658         loop { tx.send(1) }
1659     } #[should_fail])
1660
1661     test!(fn port_gone_concurrent_shared() {
1662         let (tx, rx) = sync_channel::<int>(0);
1663         let tx2 = tx.clone();
1664         spawn(proc() {
1665             rx.recv();
1666         });
1667         loop {
1668             tx.send(1);
1669             tx2.send(1);
1670         }
1671     } #[should_fail])
1672
1673     test!(fn smoke_chan_gone() {
1674         let (tx, rx) = sync_channel::<int>(0);
1675         drop(tx);
1676         rx.recv();
1677     } #[should_fail])
1678
1679     test!(fn smoke_chan_gone_shared() {
1680         let (tx, rx) = sync_channel::<()>(0);
1681         let tx2 = tx.clone();
1682         drop(tx);
1683         drop(tx2);
1684         rx.recv();
1685     } #[should_fail])
1686
1687     test!(fn chan_gone_concurrent() {
1688         let (tx, rx) = sync_channel::<int>(0);
1689         spawn(proc() {
1690             tx.send(1);
1691             tx.send(1);
1692         });
1693         loop { rx.recv(); }
1694     } #[should_fail])
1695
1696     test!(fn stress() {
1697         let (tx, rx) = sync_channel::<int>(0);
1698         spawn(proc() {
1699             for _ in range(0u, 10000) { tx.send(1); }
1700         });
1701         for _ in range(0u, 10000) {
1702             assert_eq!(rx.recv(), 1);
1703         }
1704     })
1705
1706     test!(fn stress_shared() {
1707         static AMT: uint = 1000;
1708         static NTHREADS: uint = 8;
1709         let (tx, rx) = sync_channel::<int>(0);
1710         let (dtx, drx) = sync_channel::<()>(0);
1711
1712         spawn(proc() {
1713             for _ in range(0, AMT * NTHREADS) {
1714                 assert_eq!(rx.recv(), 1);
1715             }
1716             match rx.try_recv() {
1717                 Ok(..) => fail!(),
1718                 _ => {}
1719             }
1720             dtx.send(());
1721         });
1722
1723         for _ in range(0, NTHREADS) {
1724             let tx = tx.clone();
1725             spawn(proc() {
1726                 for _ in range(0, AMT) { tx.send(1); }
1727             });
1728         }
1729         drop(tx);
1730         drx.recv();
1731     })
1732
1733     test!(fn oneshot_single_thread_close_port_first() {
1734         // Simple test of closing without sending
1735         let (_tx, rx) = sync_channel::<int>(0);
1736         drop(rx);
1737     })
1738
1739     test!(fn oneshot_single_thread_close_chan_first() {
1740         // Simple test of closing without sending
1741         let (tx, _rx) = sync_channel::<int>(0);
1742         drop(tx);
1743     })
1744
1745     test!(fn oneshot_single_thread_send_port_close() {
1746         // Testing that the sender cleans up the payload if receiver is closed
1747         let (tx, rx) = sync_channel::<Box<int>>(0);
1748         drop(rx);
1749         tx.send(box 0);
1750     } #[should_fail])
1751
1752     test!(fn oneshot_single_thread_recv_chan_close() {
1753         // Receiving on a closed chan will fail
1754         let res = task::try(proc() {
1755             let (tx, rx) = sync_channel::<int>(0);
1756             drop(tx);
1757             rx.recv();
1758         });
1759         // What is our res?
1760         assert!(res.is_err());
1761     })
1762
1763     test!(fn oneshot_single_thread_send_then_recv() {
1764         let (tx, rx) = sync_channel::<Box<int>>(1);
1765         tx.send(box 10);
1766         assert!(rx.recv() == box 10);
1767     })
1768
1769     test!(fn oneshot_single_thread_try_send_open() {
1770         let (tx, rx) = sync_channel::<int>(1);
1771         assert_eq!(tx.try_send(10), Ok(()));
1772         assert!(rx.recv() == 10);
1773     })
1774
1775     test!(fn oneshot_single_thread_try_send_closed() {
1776         let (tx, rx) = sync_channel::<int>(0);
1777         drop(rx);
1778         assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1779     })
1780
1781     test!(fn oneshot_single_thread_try_send_closed2() {
1782         let (tx, _rx) = sync_channel::<int>(0);
1783         assert_eq!(tx.try_send(10), Err(Full(10)));
1784     })
1785
1786     test!(fn oneshot_single_thread_try_recv_open() {
1787         let (tx, rx) = sync_channel::<int>(1);
1788         tx.send(10);
1789         assert!(rx.recv_opt() == Ok(10));
1790     })
1791
1792     test!(fn oneshot_single_thread_try_recv_closed() {
1793         let (tx, rx) = sync_channel::<int>(0);
1794         drop(tx);
1795         assert!(rx.recv_opt() == Err(()));
1796     })
1797
1798     test!(fn oneshot_single_thread_peek_data() {
1799         let (tx, rx) = sync_channel::<int>(1);
1800         assert_eq!(rx.try_recv(), Err(Empty))
1801         tx.send(10);
1802         assert_eq!(rx.try_recv(), Ok(10));
1803     })
1804
1805     test!(fn oneshot_single_thread_peek_close() {
1806         let (tx, rx) = sync_channel::<int>(0);
1807         drop(tx);
1808         assert_eq!(rx.try_recv(), Err(Disconnected));
1809         assert_eq!(rx.try_recv(), Err(Disconnected));
1810     })
1811
1812     test!(fn oneshot_single_thread_peek_open() {
1813         let (_tx, rx) = sync_channel::<int>(0);
1814         assert_eq!(rx.try_recv(), Err(Empty));
1815     })
1816
1817     test!(fn oneshot_multi_task_recv_then_send() {
1818         let (tx, rx) = sync_channel::<Box<int>>(0);
1819         spawn(proc() {
1820             assert!(rx.recv() == box 10);
1821         });
1822
1823         tx.send(box 10);
1824     })
1825
1826     test!(fn oneshot_multi_task_recv_then_close() {
1827         let (tx, rx) = sync_channel::<Box<int>>(0);
1828         spawn(proc() {
1829             drop(tx);
1830         });
1831         let res = task::try(proc() {
1832             assert!(rx.recv() == box 10);
1833         });
1834         assert!(res.is_err());
1835     })
1836
1837     test!(fn oneshot_multi_thread_close_stress() {
1838         for _ in range(0, stress_factor()) {
1839             let (tx, rx) = sync_channel::<int>(0);
1840             spawn(proc() {
1841                 drop(rx);
1842             });
1843             drop(tx);
1844         }
1845     })
1846
1847     test!(fn oneshot_multi_thread_send_close_stress() {
1848         for _ in range(0, stress_factor()) {
1849             let (tx, rx) = sync_channel::<int>(0);
1850             spawn(proc() {
1851                 drop(rx);
1852             });
1853             let _ = task::try(proc() {
1854                 tx.send(1);
1855             });
1856         }
1857     })
1858
1859     test!(fn oneshot_multi_thread_recv_close_stress() {
1860         for _ in range(0, stress_factor()) {
1861             let (tx, rx) = sync_channel::<int>(0);
1862             spawn(proc() {
1863                 let res = task::try(proc() {
1864                     rx.recv();
1865                 });
1866                 assert!(res.is_err());
1867             });
1868             spawn(proc() {
1869                 spawn(proc() {
1870                     drop(tx);
1871                 });
1872             });
1873         }
1874     })
1875
1876     test!(fn oneshot_multi_thread_send_recv_stress() {
1877         for _ in range(0, stress_factor()) {
1878             let (tx, rx) = sync_channel::<Box<int>>(0);
1879             spawn(proc() {
1880                 tx.send(box 10i);
1881             });
1882             spawn(proc() {
1883                 assert!(rx.recv() == box 10i);
1884             });
1885         }
1886     })
1887
1888     test!(fn stream_send_recv_stress() {
1889         for _ in range(0, stress_factor()) {
1890             let (tx, rx) = sync_channel::<Box<int>>(0);
1891
1892             send(tx, 0);
1893             recv(rx, 0);
1894
1895             fn send(tx: SyncSender<Box<int>>, i: int) {
1896                 if i == 10 { return }
1897
1898                 spawn(proc() {
1899                     tx.send(box i);
1900                     send(tx, i + 1);
1901                 });
1902             }
1903
1904             fn recv(rx: Receiver<Box<int>>, i: int) {
1905                 if i == 10 { return }
1906
1907                 spawn(proc() {
1908                     assert!(rx.recv() == box i);
1909                     recv(rx, i + 1);
1910                 });
1911             }
1912         }
1913     })
1914
1915     test!(fn recv_a_lot() {
1916         // Regression test that we don't run out of stack in scheduler context
1917         let (tx, rx) = sync_channel(10000);
1918         for _ in range(0u, 10000) { tx.send(()); }
1919         for _ in range(0u, 10000) { rx.recv(); }
1920     })
1921
1922     test!(fn shared_chan_stress() {
1923         let (tx, rx) = sync_channel(0);
1924         let total = stress_factor() + 100;
1925         for _ in range(0, total) {
1926             let tx = tx.clone();
1927             spawn(proc() {
1928                 tx.send(());
1929             });
1930         }
1931
1932         for _ in range(0, total) {
1933             rx.recv();
1934         }
1935     })
1936
1937     test!(fn test_nested_recv_iter() {
1938         let (tx, rx) = sync_channel::<int>(0);
1939         let (total_tx, total_rx) = sync_channel::<int>(0);
1940
1941         spawn(proc() {
1942             let mut acc = 0;
1943             for x in rx.iter() {
1944                 acc += x;
1945             }
1946             total_tx.send(acc);
1947         });
1948
1949         tx.send(3);
1950         tx.send(1);
1951         tx.send(2);
1952         drop(tx);
1953         assert_eq!(total_rx.recv(), 6);
1954     })
1955
1956     test!(fn test_recv_iter_break() {
1957         let (tx, rx) = sync_channel::<int>(0);
1958         let (count_tx, count_rx) = sync_channel(0);
1959
1960         spawn(proc() {
1961             let mut count = 0;
1962             for x in rx.iter() {
1963                 if count >= 3 {
1964                     break;
1965                 } else {
1966                     count += x;
1967                 }
1968             }
1969             count_tx.send(count);
1970         });
1971
1972         tx.send(2);
1973         tx.send(2);
1974         tx.send(2);
1975         let _ = tx.try_send(2);
1976         drop(tx);
1977         assert_eq!(count_rx.recv(), 4);
1978     })
1979
1980     test!(fn try_recv_states() {
1981         let (tx1, rx1) = sync_channel::<int>(1);
1982         let (tx2, rx2) = sync_channel::<()>(1);
1983         let (tx3, rx3) = sync_channel::<()>(1);
1984         spawn(proc() {
1985             rx2.recv();
1986             tx1.send(1);
1987             tx3.send(());
1988             rx2.recv();
1989             drop(tx1);
1990             tx3.send(());
1991         });
1992
1993         assert_eq!(rx1.try_recv(), Err(Empty));
1994         tx2.send(());
1995         rx3.recv();
1996         assert_eq!(rx1.try_recv(), Ok(1));
1997         assert_eq!(rx1.try_recv(), Err(Empty));
1998         tx2.send(());
1999         rx3.recv();
2000         assert_eq!(rx1.try_recv(), Err(Disconnected));
2001     })
2002
2003     // This bug used to end up in a livelock inside of the Receiver destructor
2004     // because the internal state of the Shared packet was corrupted
2005     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
2006         let (tx, rx) = sync_channel::<()>(0);
2007         let (tx2, rx2) = sync_channel::<()>(0);
2008         spawn(proc() {
2009             rx.recv(); // wait on a oneshot
2010             drop(rx);  // destroy a shared
2011             tx2.send(());
2012         });
2013         // make sure the other task has gone to sleep
2014         for _ in range(0u, 5000) { task::deschedule(); }
2015
2016         // upgrade to a shared chan and send a message
2017         let t = tx.clone();
2018         drop(tx);
2019         t.send(());
2020
2021         // wait for the child task to exit before we exit
2022         rx2.recv();
2023     })
2024
2025     test!(fn try_recvs_off_the_runtime() {
2026         use std::rt::thread::Thread;
2027
2028         let (tx, rx) = sync_channel::<()>(0);
2029         let (cdone, pdone) = channel();
2030         let t = Thread::start(proc() {
2031             let mut hits = 0u;
2032             while hits < 10 {
2033                 match rx.try_recv() {
2034                     Ok(()) => { hits += 1; }
2035                     Err(Empty) => { Thread::yield_now(); }
2036                     Err(Disconnected) => return,
2037                 }
2038             }
2039             cdone.send(());
2040         });
2041         for _ in range(0u, 10) {
2042             tx.send(());
2043         }
2044         t.join();
2045         pdone.recv();
2046     })
2047
2048     test!(fn send_opt1() {
2049         let (tx, rx) = sync_channel::<int>(0);
2050         spawn(proc() { rx.recv(); });
2051         assert_eq!(tx.send_opt(1), Ok(()));
2052     })
2053
2054     test!(fn send_opt2() {
2055         let (tx, rx) = sync_channel::<int>(0);
2056         spawn(proc() { drop(rx); });
2057         assert_eq!(tx.send_opt(1), Err(1));
2058     })
2059
2060     test!(fn send_opt3() {
2061         let (tx, rx) = sync_channel::<int>(1);
2062         assert_eq!(tx.send_opt(1), Ok(()));
2063         spawn(proc() { drop(rx); });
2064         assert_eq!(tx.send_opt(1), Err(1));
2065     })
2066
2067     test!(fn send_opt4() {
2068         let (tx, rx) = sync_channel::<int>(0);
2069         let tx2 = tx.clone();
2070         let (done, donerx) = channel();
2071         let done2 = done.clone();
2072         spawn(proc() {
2073             assert_eq!(tx.send_opt(1), Err(1));
2074             done.send(());
2075         });
2076         spawn(proc() {
2077             assert_eq!(tx2.send_opt(2), Err(2));
2078             done2.send(());
2079         });
2080         drop(rx);
2081         donerx.recv();
2082         donerx.recv();
2083     })
2084
2085     test!(fn try_send1() {
2086         let (tx, _rx) = sync_channel::<int>(0);
2087         assert_eq!(tx.try_send(1), Err(Full(1)));
2088     })
2089
2090     test!(fn try_send2() {
2091         let (tx, _rx) = sync_channel::<int>(1);
2092         assert_eq!(tx.try_send(1), Ok(()));
2093         assert_eq!(tx.try_send(1), Err(Full(1)));
2094     })
2095
2096     test!(fn try_send3() {
2097         let (tx, rx) = sync_channel::<int>(1);
2098         assert_eq!(tx.try_send(1), Ok(()));
2099         drop(rx);
2100         assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2101     })
2102
2103     test!(fn try_send4() {
2104         let (tx, rx) = sync_channel::<int>(0);
2105         spawn(proc() {
2106             for _ in range(0u, 1000) { task::deschedule(); }
2107             assert_eq!(tx.try_send(1), Ok(()));
2108         });
2109         assert_eq!(rx.recv(), 1);
2110     } #[ignore(reason = "flaky on libnative")])
2111
2112     test!(fn issue_15761() {
2113         fn repro() {
2114             let (tx1, rx1) = sync_channel::<()>(3);
2115             let (tx2, rx2) = sync_channel::<()>(3);
2116
2117             spawn(proc() {
2118                 rx1.recv();
2119                 tx2.try_send(()).unwrap();
2120             });
2121
2122             tx1.try_send(()).unwrap();
2123             rx2.recv();
2124         }
2125
2126         for _ in range(0u, 100) {
2127             repro()
2128         }
2129     })
2130 }