]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/mod.rs
247f50d666e1402000ba4a3bcec7f49262f96576
[rust.git] / src / libsync / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver.  These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //!    will return a `(Sender, Receiver)` tuple where all sends will be
35 //!    **asynchronous** (they never block). The channel conceptually has an
36 //!    infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //!    is a pre-allocated buffer of a fixed size. All sends will be
41 //!    **synchronous** by blocking until there is buffer space available. Note
42 //!    that a bound of 0 is allowed, causing the channel to become a
43 //!    "rendezvous" channel where each sender atomically hands off a message to
44 //!    a receiver.
45 //!
46 //! ## Failure Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which panics are propagated among tasks.  Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `panic!`. The purpose of this is to allow propagation of panics among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without panicking, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! Additionally, channels can interoperate between runtimes. If one task in a
69 //! program is running on libnative and another is running on libgreen, they can
70 //! still communicate with one another using channels.
71 //!
72 //! # Example
73 //!
74 //! Simple usage:
75 //!
76 //! ```
77 //! // Create a simple streaming channel
78 //! let (tx, rx) = channel();
79 //! spawn(proc() {
80 //!     tx.send(10i);
81 //! });
82 //! assert_eq!(rx.recv(), 10i);
83 //! ```
84 //!
85 //! Shared usage:
86 //!
87 //! ```
88 //! // Create a shared channel which can be sent along from many tasks
89 //! // where tx is the sending half (tx for transmission), and rx is the receiving
90 //! // half (rx for receiving).
91 //! let (tx, rx) = channel();
92 //! for i in range(0i, 10i) {
93 //!     let tx = tx.clone();
94 //!     spawn(proc() {
95 //!         tx.send(i);
96 //!     })
97 //! }
98 //!
99 //! for _ in range(0i, 10i) {
100 //!     let j = rx.recv();
101 //!     assert!(0 <= j && j < 10);
102 //! }
103 //! ```
104 //!
105 //! Propagating panics:
106 //!
107 //! ```should_fail
108 //! // The call to recv() will panic!() because the channel has already hung
109 //! // up (or been deallocated)
110 //! let (tx, rx) = channel::<int>();
111 //! drop(tx);
112 //! rx.recv();
113 //! ```
114 //!
115 //! Synchronous channels:
116 //!
117 //! ```
118 //! let (tx, rx) = sync_channel::<int>(0);
119 //! spawn(proc() {
120 //!     // This will wait for the parent task to start receiving
121 //!     tx.send(53);
122 //! });
123 //! rx.recv();
124 //! ```
125 //!
126 //! Reading from a channel with a timeout requires to use a Timer together
127 //! with the channel. You can use the select! macro to select either and
128 //! handle the timeout case. This first example will break out of the loop
129 //! after 10 seconds no matter what:
130 //!
131 //! ```no_run
132 //! use std::io::timer::Timer;
133 //! use std::time::Duration;
134 //!
135 //! let (tx, rx) = channel::<int>();
136 //! let mut timer = Timer::new().unwrap();
137 //! let timeout = timer.oneshot(Duration::seconds(10));
138 //!
139 //! loop {
140 //!     select! {
141 //!         val = rx.recv() => println!("Received {}", val),
142 //!         () = timeout.recv() => {
143 //!             println!("timed out, total time was more than 10 seconds")
144 //!             break;
145 //!         }
146 //!     }
147 //! }
148 //! ```
149 //!
150 //! This second example is more costly since it allocates a new timer every
151 //! time a message is received, but it allows you to timeout after the channel
152 //! has been inactive for 5 seconds:
153 //!
154 //! ```no_run
155 //! use std::io::timer::Timer;
156 //! use std::time::Duration;
157 //!
158 //! let (tx, rx) = channel::<int>();
159 //! let mut timer = Timer::new().unwrap();
160 //!
161 //! loop {
162 //!     let timeout = timer.oneshot(Duration::seconds(5));
163 //!
164 //!     select! {
165 //!         val = rx.recv() => println!("Received {}", val),
166 //!         () = timeout.recv() => {
167 //!             println!("timed out, no message received in 5 seconds")
168 //!             break;
169 //!         }
170 //!     }
171 //! }
172 //! ```
173
174 // A description of how Rust's channel implementation works
175 //
176 // Channels are supposed to be the basic building block for all other
177 // concurrent primitives that are used in Rust. As a result, the channel type
178 // needs to be highly optimized, flexible, and broad enough for use everywhere.
179 //
180 // The choice of implementation of all channels is to be built on lock-free data
181 // structures. The channels themselves are then consequently also lock-free data
182 // structures. As always with lock-free code, this is a very "here be dragons"
183 // territory, especially because I'm unaware of any academic papers which have
184 // gone into great length about channels of these flavors.
185 //
186 // ## Flavors of channels
187 //
188 // From the perspective of a consumer of this library, there is only one flavor
189 // of channel. This channel can be used as a stream and cloned to allow multiple
190 // senders. Under the hood, however, there are actually three flavors of
191 // channels in play.
192 //
193 // * Oneshots - these channels are highly optimized for the one-send use case.
194 //              They contain as few atomics as possible and involve one and
195 //              exactly one allocation.
196 // * Streams - these channels are optimized for the non-shared use case. They
197 //             use a different concurrent queue which is more tailored for this
198 //             use case. The initial allocation of this flavor of channel is not
199 //             optimized.
200 // * Shared - this is the most general form of channel that this module offers,
201 //            a channel with multiple senders. This type is as optimized as it
202 //            can be, but the previous two types mentioned are much faster for
203 //            their use-cases.
204 //
205 // ## Concurrent queues
206 //
207 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
208 // recv() obviously blocks. This means that under the hood there must be some
209 // shared and concurrent queue holding all of the actual data.
210 //
211 // With two flavors of channels, two flavors of queues are also used. We have
212 // chosen to use queues from a well-known author which are abbreviated as SPSC
213 // and MPSC (single producer, single consumer and multiple producer, single
214 // consumer). SPSC queues are used for streams while MPSC queues are used for
215 // shared channels.
216 //
217 // ### SPSC optimizations
218 //
219 // The SPSC queue found online is essentially a linked list of nodes where one
220 // half of the nodes are the "queue of data" and the other half of nodes are a
221 // cache of unused nodes. The unused nodes are used such that an allocation is
222 // not required on every push() and a free doesn't need to happen on every
223 // pop().
224 //
225 // As found online, however, the cache of nodes is of an infinite size. This
226 // means that if a channel at one point in its life had 50k items in the queue,
227 // then the queue will always have the capacity for 50k items. I believed that
228 // this was an unnecessary limitation of the implementation, so I have altered
229 // the queue to optionally have a bound on the cache size.
230 //
231 // By default, streams will have an unbounded SPSC queue with a small-ish cache
232 // size. The hope is that the cache is still large enough to have very fast
233 // send() operations while not too large such that millions of channels can
234 // coexist at once.
235 //
236 // ### MPSC optimizations
237 //
238 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
239 // a linked list under the hood to earn its unboundedness, but I have not put
240 // forth much effort into having a cache of nodes similar to the SPSC queue.
241 //
242 // For now, I believe that this is "ok" because shared channels are not the most
243 // common type, but soon we may wish to revisit this queue choice and determine
244 // another candidate for backend storage of shared channels.
245 //
246 // ## Overview of the Implementation
247 //
248 // Now that there's a little background on the concurrent queues used, it's
249 // worth going into much more detail about the channels themselves. The basic
250 // pseudocode for a send/recv are:
251 //
252 //
253 //      send(t)                             recv()
254 //        queue.push(t)                       return if queue.pop()
255 //        if increment() == -1                deschedule {
256 //          wakeup()                            if decrement() > 0
257 //                                                cancel_deschedule()
258 //                                            }
259 //                                            queue.pop()
260 //
261 // As mentioned before, there are no locks in this implementation, only atomic
262 // instructions are used.
263 //
264 // ### The internal atomic counter
265 //
266 // Every channel has a shared counter with each half to keep track of the size
267 // of the queue. This counter is used to abort descheduling by the receiver and
268 // to know when to wake up on the sending side.
269 //
270 // As seen in the pseudocode, senders will increment this count and receivers
271 // will decrement the count. The theory behind this is that if a sender sees a
272 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
273 // then it doesn't need to block.
274 //
275 // The recv() method has a beginning call to pop(), and if successful, it needs
276 // to decrement the count. It is a crucial implementation detail that this
277 // decrement does *not* happen to the shared counter. If this were the case,
278 // then it would be possible for the counter to be very negative when there were
279 // no receivers waiting, in which case the senders would have to determine when
280 // it was actually appropriate to wake up a receiver.
281 //
282 // Instead, the "steal count" is kept track of separately (not atomically
283 // because it's only used by receivers), and then the decrement() call when
284 // descheduling will lump in all of the recent steals into one large decrement.
285 //
286 // The implication of this is that if a sender sees a -1 count, then there's
287 // guaranteed to be a waiter waiting!
288 //
289 // ## Native Implementation
290 //
291 // A major goal of these channels is to work seamlessly on and off the runtime.
292 // All of the previous race conditions have been worded in terms of
293 // scheduler-isms (which is obviously not available without the runtime).
294 //
295 // For now, native usage of channels (off the runtime) will fall back onto
296 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
297 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
298 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
299 // condition variable.
300 //
301 // ## Select
302 //
303 // Being able to support selection over channels has greatly influenced this
304 // design, and not only does selection need to work inside the runtime, but also
305 // outside the runtime.
306 //
307 // The implementation is fairly straightforward. The goal of select() is not to
308 // return some data, but only to return which channel can receive data without
309 // blocking. The implementation is essentially the entire blocking procedure
310 // followed by an increment as soon as its woken up. The cancellation procedure
311 // involves an increment and swapping out of to_wake to acquire ownership of the
312 // task to unblock.
313 //
314 // Sadly this current implementation requires multiple allocations, so I have
315 // seen the throughput of select() be much worse than it should be. I do not
316 // believe that there is anything fundamental which needs to change about these
317 // channels, however, in order to support a more efficient select().
318 //
319 // # Conclusion
320 //
321 // And now that you've seen all the races that I found and attempted to fix,
322 // here's the code for you to find some more!
323
324 use core::prelude::*;
325
326 use alloc::arc::Arc;
327 use alloc::boxed::Box;
328 use core::cell::Cell;
329 use core::kinds::marker;
330 use core::mem;
331 use core::cell::UnsafeCell;
332 use rustrt::local::Local;
333 use rustrt::task::{Task, BlockedTask};
334
335 pub use comm::select::{Select, Handle};
336
337 macro_rules! test (
338     { fn $name:ident() $b:block $(#[$a:meta])*} => (
339         mod $name {
340             #![allow(unused_imports)]
341
342             use std::prelude::*;
343
344             use native;
345             use comm::*;
346             use super::*;
347             use super::super::*;
348             use std::task;
349
350             fn f() $b
351
352             $(#[$a])* #[test] fn uv() { f() }
353             $(#[$a])* #[test] fn native() {
354                 use native;
355                 let (tx, rx) = channel();
356                 spawn(proc() { tx.send(f()) });
357                 rx.recv();
358             }
359         }
360     )
361 )
362
363 mod oneshot;
364 mod select;
365 mod shared;
366 mod stream;
367 mod sync;
368
369 // Use a power of 2 to allow LLVM to optimize to something that's not a
370 // division, this is hit pretty regularly.
371 static RESCHED_FREQ: int = 256;
372
373 /// The receiving-half of Rust's channel type. This half can only be owned by
374 /// one task
375 #[unstable]
376 pub struct Receiver<T> {
377     inner: UnsafeCell<Flavor<T>>,
378     receives: Cell<uint>,
379     // can't share in an arc
380     _marker: marker::NoSync,
381 }
382
383 /// An iterator over messages on a receiver, this iterator will block
384 /// whenever `next` is called, waiting for a new message, and `None` will be
385 /// returned when the corresponding channel has hung up.
386 #[unstable]
387 pub struct Messages<'a, T:'a> {
388     rx: &'a Receiver<T>
389 }
390
391 /// The sending-half of Rust's asynchronous channel type. This half can only be
392 /// owned by one task, but it can be cloned to send to other tasks.
393 #[unstable]
394 pub struct Sender<T> {
395     inner: UnsafeCell<Flavor<T>>,
396     sends: Cell<uint>,
397     // can't share in an arc
398     _marker: marker::NoSync,
399 }
400
401 /// The sending-half of Rust's synchronous channel type. This half can only be
402 /// owned by one task, but it can be cloned to send to other tasks.
403 #[unstable = "this type may be renamed, but it will always exist"]
404 pub struct SyncSender<T> {
405     inner: Arc<UnsafeCell<sync::Packet<T>>>,
406     // can't share in an arc
407     _marker: marker::NoSync,
408 }
409
410 /// This enumeration is the list of the possible reasons that try_recv could not
411 /// return data when called.
412 #[deriving(PartialEq, Clone, Show)]
413 #[experimental = "this is likely to be removed in changing try_recv()"]
414 pub enum TryRecvError {
415     /// This channel is currently empty, but the sender(s) have not yet
416     /// disconnected, so data may yet become available.
417     Empty,
418     /// This channel's sending half has become disconnected, and there will
419     /// never be any more data received on this channel
420     Disconnected,
421 }
422
423 /// This enumeration is the list of the possible error outcomes for the
424 /// `SyncSender::try_send` method.
425 #[deriving(PartialEq, Clone, Show)]
426 #[experimental = "this is likely to be removed in changing try_send()"]
427 pub enum TrySendError<T> {
428     /// The data could not be sent on the channel because it would require that
429     /// the callee block to send the data.
430     ///
431     /// If this is a buffered channel, then the buffer is full at this time. If
432     /// this is not a buffered channel, then there is no receiver available to
433     /// acquire the data.
434     Full(T),
435     /// This channel's receiving half has disconnected, so the data could not be
436     /// sent. The data is returned back to the callee in this case.
437     RecvDisconnected(T),
438 }
439
440 enum Flavor<T> {
441     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
442     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
443     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
444     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
445 }
446
447 #[doc(hidden)]
448 trait UnsafeFlavor<T> {
449     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
450     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
451         &mut *self.inner_unsafe().get()
452     }
453     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
454         &*self.inner_unsafe().get()
455     }
456 }
457 impl<T> UnsafeFlavor<T> for Sender<T> {
458     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
459         &self.inner
460     }
461 }
462 impl<T> UnsafeFlavor<T> for Receiver<T> {
463     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
464         &self.inner
465     }
466 }
467
468 /// Creates a new asynchronous channel, returning the sender/receiver halves.
469 ///
470 /// All data sent on the sender will become available on the receiver, and no
471 /// send will block the calling task (this channel has an "infinite buffer").
472 ///
473 /// # Example
474 ///
475 /// ```
476 /// // tx is is the sending half (tx for transmission), and rx is the receiving
477 /// // half (rx for receiving).
478 /// let (tx, rx) = channel();
479 ///
480 /// // Spawn off an expensive computation
481 /// spawn(proc() {
482 /// #   fn expensive_computation() {}
483 ///     tx.send(expensive_computation());
484 /// });
485 ///
486 /// // Do some useful work for awhile
487 ///
488 /// // Let's see what that answer was
489 /// println!("{}", rx.recv());
490 /// ```
491 #[unstable]
492 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
493     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
494     (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
495 }
496
497 /// Creates a new synchronous, bounded channel.
498 ///
499 /// Like asynchronous channels, the `Receiver` will block until a message
500 /// becomes available. These channels differ greatly in the semantics of the
501 /// sender from asynchronous channels, however.
502 ///
503 /// This channel has an internal buffer on which messages will be queued. When
504 /// the internal buffer becomes full, future sends will *block* waiting for the
505 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
506 /// becomes  "rendezvous channel" where each send will not return until a recv
507 /// is paired with it.
508 ///
509 /// As with asynchronous channels, all senders will panic in `send` if the
510 /// `Receiver` has been destroyed.
511 ///
512 /// # Example
513 ///
514 /// ```
515 /// let (tx, rx) = sync_channel(1);
516 ///
517 /// // this returns immediately
518 /// tx.send(1i);
519 ///
520 /// spawn(proc() {
521 ///     // this will block until the previous message has been received
522 ///     tx.send(2i);
523 /// });
524 ///
525 /// assert_eq!(rx.recv(), 1i);
526 /// assert_eq!(rx.recv(), 2i);
527 /// ```
528 #[unstable = "this function may be renamed to more accurately reflect the type \
529               of channel that is is creating"]
530 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
531     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
532     (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
533 }
534
535 ////////////////////////////////////////////////////////////////////////////////
536 // Sender
537 ////////////////////////////////////////////////////////////////////////////////
538
539 impl<T: Send> Sender<T> {
540     fn new(inner: Flavor<T>) -> Sender<T> {
541         Sender {
542             inner: UnsafeCell::new(inner),
543             sends: Cell::new(0),
544             _marker: marker::NoSync,
545         }
546     }
547
548     /// Sends a value along this channel to be received by the corresponding
549     /// receiver.
550     ///
551     /// Rust channels are infinitely buffered so this method will never block.
552     ///
553     /// # Panics
554     ///
555     /// This function will panic if the other end of the channel has hung up.
556     /// This means that if the corresponding receiver has fallen out of scope,
557     /// this function will trigger a panic message saying that a message is
558     /// being sent on a closed channel.
559     ///
560     /// Note that if this function does *not* panic, it does not mean that the
561     /// data will be successfully received. All sends are placed into a queue,
562     /// so it is possible for a send to succeed (the other end is alive), but
563     /// then the other end could immediately disconnect.
564     ///
565     /// The purpose of this functionality is to propagate panicks among tasks.
566     /// If a panic is not desired, then consider using the `send_opt` method
567     #[experimental = "this function is being considered candidate for removal \
568                       to adhere to the general guidelines of rust"]
569     pub fn send(&self, t: T) {
570         if self.send_opt(t).is_err() {
571             panic!("sending on a closed channel");
572         }
573     }
574
575     /// Attempts to send a value on this channel, returning it back if it could
576     /// not be sent.
577     ///
578     /// A successful send occurs when it is determined that the other end of
579     /// the channel has not hung up already. An unsuccessful send would be one
580     /// where the corresponding receiver has already been deallocated. Note
581     /// that a return value of `Err` means that the data will never be
582     /// received, but a return value of `Ok` does *not* mean that the data
583     /// will be received.  It is possible for the corresponding receiver to
584     /// hang up immediately after this function returns `Ok`.
585     ///
586     /// Like `send`, this method will never block.
587     ///
588     /// # Panics
589     ///
590     /// This method will never panic, it will return the message back to the
591     /// caller if the other end is disconnected
592     ///
593     /// # Example
594     ///
595     /// ```
596     /// let (tx, rx) = channel();
597     ///
598     /// // This send is always successful
599     /// assert_eq!(tx.send_opt(1i), Ok(()));
600     ///
601     /// // This send will fail because the receiver is gone
602     /// drop(rx);
603     /// assert_eq!(tx.send_opt(1i), Err(1));
604     /// ```
605     #[unstable = "this function may be renamed to send() in the future"]
606     pub fn send_opt(&self, t: T) -> Result<(), T> {
607         // In order to prevent starvation of other tasks in situations where
608         // a task sends repeatedly without ever receiving, we occasionally
609         // yield instead of doing a send immediately.
610         //
611         // Don't unconditionally attempt to yield because the TLS overhead can
612         // be a bit much, and also use `try_take` instead of `take` because
613         // there's no reason that this send shouldn't be usable off the
614         // runtime.
615         let cnt = self.sends.get() + 1;
616         self.sends.set(cnt);
617         if cnt % (RESCHED_FREQ as uint) == 0 {
618             let task: Option<Box<Task>> = Local::try_take();
619             task.map(|t| t.maybe_yield());
620         }
621
622         let (new_inner, ret) = match *unsafe { self.inner() } {
623             Oneshot(ref p) => {
624                 unsafe {
625                     let p = p.get();
626                     if !(*p).sent() {
627                         return (*p).send(t);
628                     } else {
629                         let a = Arc::new(UnsafeCell::new(stream::Packet::new()));
630                         match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
631                             oneshot::UpSuccess => {
632                                 let ret = (*a.get()).send(t);
633                                 (a, ret)
634                             }
635                             oneshot::UpDisconnected => (a, Err(t)),
636                             oneshot::UpWoke(task) => {
637                                 // This send cannot panic because the task is
638                                 // asleep (we're looking at it), so the receiver
639                                 // can't go away.
640                                 (*a.get()).send(t).ok().unwrap();
641                                 task.wake().map(|t| t.reawaken());
642                                 (a, Ok(()))
643                             }
644                         }
645                     }
646                 }
647             }
648             Stream(ref p) => return unsafe { (*p.get()).send(t) },
649             Shared(ref p) => return unsafe { (*p.get()).send(t) },
650             Sync(..) => unreachable!(),
651         };
652
653         unsafe {
654             let tmp = Sender::new(Stream(new_inner));
655             mem::swap(self.inner_mut(), tmp.inner_mut());
656         }
657         return ret;
658     }
659 }
660
661 #[unstable]
662 impl<T: Send> Clone for Sender<T> {
663     fn clone(&self) -> Sender<T> {
664         let (packet, sleeper) = match *unsafe { self.inner() } {
665             Oneshot(ref p) => {
666                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
667                 unsafe {
668                     (*a.get()).postinit_lock();
669                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
670                         oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
671                         oneshot::UpWoke(task) => (a, Some(task))
672                     }
673                 }
674             }
675             Stream(ref p) => {
676                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
677                 unsafe {
678                     (*a.get()).postinit_lock();
679                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
680                         stream::UpSuccess | stream::UpDisconnected => (a, None),
681                         stream::UpWoke(task) => (a, Some(task)),
682                     }
683                 }
684             }
685             Shared(ref p) => {
686                 unsafe { (*p.get()).clone_chan(); }
687                 return Sender::new(Shared(p.clone()));
688             }
689             Sync(..) => unreachable!(),
690         };
691
692         unsafe {
693             (*packet.get()).inherit_blocker(sleeper);
694
695             let tmp = Sender::new(Shared(packet.clone()));
696             mem::swap(self.inner_mut(), tmp.inner_mut());
697         }
698         Sender::new(Shared(packet))
699     }
700 }
701
702 #[unsafe_destructor]
703 impl<T: Send> Drop for Sender<T> {
704     fn drop(&mut self) {
705         match *unsafe { self.inner_mut() } {
706             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
707             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
708             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
709             Sync(..) => unreachable!(),
710         }
711     }
712 }
713
714 ////////////////////////////////////////////////////////////////////////////////
715 // SyncSender
716 ////////////////////////////////////////////////////////////////////////////////
717
718 impl<T: Send> SyncSender<T> {
719     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
720         SyncSender { inner: inner, _marker: marker::NoSync }
721     }
722
723     /// Sends a value on this synchronous channel.
724     ///
725     /// This function will *block* until space in the internal buffer becomes
726     /// available or a receiver is available to hand off the message to.
727     ///
728     /// Note that a successful send does *not* guarantee that the receiver will
729     /// ever see the data if there is a buffer on this channel. Messages may be
730     /// enqueued in the internal buffer for the receiver to receive at a later
731     /// time. If the buffer size is 0, however, it can be guaranteed that the
732     /// receiver has indeed received the data if this function returns success.
733     ///
734     /// # Panics
735     ///
736     /// Similarly to `Sender::send`, this function will panic if the
737     /// corresponding `Receiver` for this channel has disconnected. This
738     /// behavior is used to propagate panics among tasks.
739     ///
740     /// If a panic is not desired, you can achieve the same semantics with the
741     /// `SyncSender::send_opt` method which will not panic if the receiver
742     /// disconnects.
743     #[experimental = "this function is being considered candidate for removal \
744                       to adhere to the general guidelines of rust"]
745     pub fn send(&self, t: T) {
746         if self.send_opt(t).is_err() {
747             panic!("sending on a closed channel");
748         }
749     }
750
751     /// Send a value on a channel, returning it back if the receiver
752     /// disconnected
753     ///
754     /// This method will *block* to send the value `t` on the channel, but if
755     /// the value could not be sent due to the receiver disconnecting, the value
756     /// is returned back to the callee. This function is similar to `try_send`,
757     /// except that it will block if the channel is currently full.
758     ///
759     /// # Panics
760     ///
761     /// This function cannot panic.
762     #[unstable = "this function may be renamed to send() in the future"]
763     pub fn send_opt(&self, t: T) -> Result<(), T> {
764         unsafe { (*self.inner.get()).send(t) }
765     }
766
767     /// Attempts to send a value on this channel without blocking.
768     ///
769     /// This method differs from `send_opt` by returning immediately if the
770     /// channel's buffer is full or no receiver is waiting to acquire some
771     /// data. Compared with `send_opt`, this function has two failure cases
772     /// instead of one (one for disconnection, one for a full buffer).
773     ///
774     /// See `SyncSender::send` for notes about guarantees of whether the
775     /// receiver has received the data or not if this function is successful.
776     ///
777     /// # Panics
778     ///
779     /// This function cannot panic
780     #[unstable = "the return type of this function is candidate for \
781                   modification"]
782     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
783         unsafe { (*self.inner.get()).try_send(t) }
784     }
785 }
786
787 #[unstable]
788 impl<T: Send> Clone for SyncSender<T> {
789     fn clone(&self) -> SyncSender<T> {
790         unsafe { (*self.inner.get()).clone_chan(); }
791         return SyncSender::new(self.inner.clone());
792     }
793 }
794
795 #[unsafe_destructor]
796 impl<T: Send> Drop for SyncSender<T> {
797     fn drop(&mut self) {
798         unsafe { (*self.inner.get()).drop_chan(); }
799     }
800 }
801
802 ////////////////////////////////////////////////////////////////////////////////
803 // Receiver
804 ////////////////////////////////////////////////////////////////////////////////
805
806 impl<T: Send> Receiver<T> {
807     fn new(inner: Flavor<T>) -> Receiver<T> {
808         Receiver { inner: UnsafeCell::new(inner), receives: Cell::new(0), _marker: marker::NoSync }
809     }
810
811     /// Blocks waiting for a value on this receiver
812     ///
813     /// This function will block if necessary to wait for a corresponding send
814     /// on the channel from its paired `Sender` structure. This receiver will
815     /// be woken up when data is ready, and the data will be returned.
816     ///
817     /// # Panics
818     ///
819     /// Similar to channels, this method will trigger a task panic if the
820     /// other end of the channel has hung up (been deallocated). The purpose of
821     /// this is to propagate panicks among tasks.
822     ///
823     /// If a panic is not desired, then there are two options:
824     ///
825     /// * If blocking is still desired, the `recv_opt` method will return `None`
826     ///   when the other end hangs up
827     ///
828     /// * If blocking is not desired, then the `try_recv` method will attempt to
829     ///   peek at a value on this receiver.
830     #[experimental = "this function is being considered candidate for removal \
831                       to adhere to the general guidelines of rust"]
832     pub fn recv(&self) -> T {
833         match self.recv_opt() {
834             Ok(t) => t,
835             Err(()) => panic!("receiving on a closed channel"),
836         }
837     }
838
839     /// Attempts to return a pending value on this receiver without blocking
840     ///
841     /// This method will never block the caller in order to wait for data to
842     /// become available. Instead, this will always return immediately with a
843     /// possible option of pending data on the channel.
844     ///
845     /// This is useful for a flavor of "optimistic check" before deciding to
846     /// block on a receiver.
847     ///
848     /// # Panics
849     ///
850     /// This function cannot panic.
851     #[unstable = "the return type of this function may be altered"]
852     pub fn try_recv(&self) -> Result<T, TryRecvError> {
853         // If a thread is spinning in try_recv, we should take the opportunity
854         // to reschedule things occasionally. See notes above in scheduling on
855         // sends for why this doesn't always hit TLS, and also for why this uses
856         // `try_take` instead of `take`.
857         let cnt = self.receives.get() + 1;
858         self.receives.set(cnt);
859         if cnt % (RESCHED_FREQ as uint) == 0 {
860             let task: Option<Box<Task>> = Local::try_take();
861             task.map(|t| t.maybe_yield());
862         }
863
864         loop {
865             let new_port = match *unsafe { self.inner() } {
866                 Oneshot(ref p) => {
867                     match unsafe { (*p.get()).try_recv() } {
868                         Ok(t) => return Ok(t),
869                         Err(oneshot::Empty) => return Err(Empty),
870                         Err(oneshot::Disconnected) => return Err(Disconnected),
871                         Err(oneshot::Upgraded(rx)) => rx,
872                     }
873                 }
874                 Stream(ref p) => {
875                     match unsafe { (*p.get()).try_recv() } {
876                         Ok(t) => return Ok(t),
877                         Err(stream::Empty) => return Err(Empty),
878                         Err(stream::Disconnected) => return Err(Disconnected),
879                         Err(stream::Upgraded(rx)) => rx,
880                     }
881                 }
882                 Shared(ref p) => {
883                     match unsafe { (*p.get()).try_recv() } {
884                         Ok(t) => return Ok(t),
885                         Err(shared::Empty) => return Err(Empty),
886                         Err(shared::Disconnected) => return Err(Disconnected),
887                     }
888                 }
889                 Sync(ref p) => {
890                     match unsafe { (*p.get()).try_recv() } {
891                         Ok(t) => return Ok(t),
892                         Err(sync::Empty) => return Err(Empty),
893                         Err(sync::Disconnected) => return Err(Disconnected),
894                     }
895                 }
896             };
897             unsafe {
898                 mem::swap(self.inner_mut(),
899                           new_port.inner_mut());
900             }
901         }
902     }
903
904     /// Attempt to wait for a value on this receiver, but does not panic if the
905     /// corresponding channel has hung up.
906     ///
907     /// This implementation of iterators for ports will always block if there is
908     /// not data available on the receiver, but it will not panic in the case
909     /// that the channel has been deallocated.
910     ///
911     /// In other words, this function has the same semantics as the `recv`
912     /// method except for the panic aspect.
913     ///
914     /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
915     /// the value found on the receiver is returned.
916     #[unstable = "this function may be renamed to recv()"]
917     pub fn recv_opt(&self) -> Result<T, ()> {
918         loop {
919             let new_port = match *unsafe { self.inner() } {
920                 Oneshot(ref p) => {
921                     match unsafe { (*p.get()).recv() } {
922                         Ok(t) => return Ok(t),
923                         Err(oneshot::Empty) => return unreachable!(),
924                         Err(oneshot::Disconnected) => return Err(()),
925                         Err(oneshot::Upgraded(rx)) => rx,
926                     }
927                 }
928                 Stream(ref p) => {
929                     match unsafe { (*p.get()).recv() } {
930                         Ok(t) => return Ok(t),
931                         Err(stream::Empty) => return unreachable!(),
932                         Err(stream::Disconnected) => return Err(()),
933                         Err(stream::Upgraded(rx)) => rx,
934                     }
935                 }
936                 Shared(ref p) => {
937                     match unsafe { (*p.get()).recv() } {
938                         Ok(t) => return Ok(t),
939                         Err(shared::Empty) => return unreachable!(),
940                         Err(shared::Disconnected) => return Err(()),
941                     }
942                 }
943                 Sync(ref p) => return unsafe { (*p.get()).recv() }
944             };
945             unsafe {
946                 mem::swap(self.inner_mut(), new_port.inner_mut());
947             }
948         }
949     }
950
951     /// Returns an iterator which will block waiting for messages, but never
952     /// `panic!`. It will return `None` when the channel has hung up.
953     #[unstable]
954     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
955         Messages { rx: self }
956     }
957 }
958
959 impl<T: Send> select::Packet for Receiver<T> {
960     fn can_recv(&self) -> bool {
961         loop {
962             let new_port = match *unsafe { self.inner() } {
963                 Oneshot(ref p) => {
964                     match unsafe { (*p.get()).can_recv() } {
965                         Ok(ret) => return ret,
966                         Err(upgrade) => upgrade,
967                     }
968                 }
969                 Stream(ref p) => {
970                     match unsafe { (*p.get()).can_recv() } {
971                         Ok(ret) => return ret,
972                         Err(upgrade) => upgrade,
973                     }
974                 }
975                 Shared(ref p) => {
976                     return unsafe { (*p.get()).can_recv() };
977                 }
978                 Sync(ref p) => {
979                     return unsafe { (*p.get()).can_recv() };
980                 }
981             };
982             unsafe {
983                 mem::swap(self.inner_mut(),
984                           new_port.inner_mut());
985             }
986         }
987     }
988
989     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
990         loop {
991             let (t, new_port) = match *unsafe { self.inner() } {
992                 Oneshot(ref p) => {
993                     match unsafe { (*p.get()).start_selection(task) } {
994                         oneshot::SelSuccess => return Ok(()),
995                         oneshot::SelCanceled(task) => return Err(task),
996                         oneshot::SelUpgraded(t, rx) => (t, rx),
997                     }
998                 }
999                 Stream(ref p) => {
1000                     match unsafe { (*p.get()).start_selection(task) } {
1001                         stream::SelSuccess => return Ok(()),
1002                         stream::SelCanceled(task) => return Err(task),
1003                         stream::SelUpgraded(t, rx) => (t, rx),
1004                     }
1005                 }
1006                 Shared(ref p) => {
1007                     return unsafe { (*p.get()).start_selection(task) };
1008                 }
1009                 Sync(ref p) => {
1010                     return unsafe { (*p.get()).start_selection(task) };
1011                 }
1012             };
1013             task = t;
1014             unsafe {
1015                 mem::swap(self.inner_mut(),
1016                           new_port.inner_mut());
1017             }
1018         }
1019     }
1020
1021     fn abort_selection(&self) -> bool {
1022         let mut was_upgrade = false;
1023         loop {
1024             let result = match *unsafe { self.inner() } {
1025                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
1026                 Stream(ref p) => unsafe {
1027                     (*p.get()).abort_selection(was_upgrade)
1028                 },
1029                 Shared(ref p) => return unsafe {
1030                     (*p.get()).abort_selection(was_upgrade)
1031                 },
1032                 Sync(ref p) => return unsafe {
1033                     (*p.get()).abort_selection()
1034                 },
1035             };
1036             let new_port = match result { Ok(b) => return b, Err(p) => p };
1037             was_upgrade = true;
1038             unsafe {
1039                 mem::swap(self.inner_mut(),
1040                           new_port.inner_mut());
1041             }
1042         }
1043     }
1044 }
1045
1046 #[unstable]
1047 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
1048     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
1049 }
1050
1051 #[unsafe_destructor]
1052 impl<T: Send> Drop for Receiver<T> {
1053     fn drop(&mut self) {
1054         match *unsafe { self.inner_mut() } {
1055             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1056             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1057             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1058             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1059         }
1060     }
1061 }
1062
1063 #[cfg(test)]
1064 mod test {
1065     use std::prelude::*;
1066
1067     use std::os;
1068     use super::*;
1069
1070     pub fn stress_factor() -> uint {
1071         match os::getenv("RUST_TEST_STRESS") {
1072             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1073             None => 1,
1074         }
1075     }
1076
1077     test!(fn smoke() {
1078         let (tx, rx) = channel::<int>();
1079         tx.send(1);
1080         assert_eq!(rx.recv(), 1);
1081     })
1082
1083     test!(fn drop_full() {
1084         let (tx, _rx) = channel();
1085         tx.send(box 1i);
1086     })
1087
1088     test!(fn drop_full_shared() {
1089         let (tx, _rx) = channel();
1090         drop(tx.clone());
1091         drop(tx.clone());
1092         tx.send(box 1i);
1093     })
1094
1095     test!(fn smoke_shared() {
1096         let (tx, rx) = channel::<int>();
1097         tx.send(1);
1098         assert_eq!(rx.recv(), 1);
1099         let tx = tx.clone();
1100         tx.send(1);
1101         assert_eq!(rx.recv(), 1);
1102     })
1103
1104     test!(fn smoke_threads() {
1105         let (tx, rx) = channel::<int>();
1106         spawn(proc() {
1107             tx.send(1);
1108         });
1109         assert_eq!(rx.recv(), 1);
1110     })
1111
1112     test!(fn smoke_port_gone() {
1113         let (tx, rx) = channel::<int>();
1114         drop(rx);
1115         tx.send(1);
1116     } #[should_fail])
1117
1118     test!(fn smoke_shared_port_gone() {
1119         let (tx, rx) = channel::<int>();
1120         drop(rx);
1121         tx.send(1);
1122     } #[should_fail])
1123
1124     test!(fn smoke_shared_port_gone2() {
1125         let (tx, rx) = channel::<int>();
1126         drop(rx);
1127         let tx2 = tx.clone();
1128         drop(tx);
1129         tx2.send(1);
1130     } #[should_fail])
1131
1132     test!(fn port_gone_concurrent() {
1133         let (tx, rx) = channel::<int>();
1134         spawn(proc() {
1135             rx.recv();
1136         });
1137         loop { tx.send(1) }
1138     } #[should_fail])
1139
1140     test!(fn port_gone_concurrent_shared() {
1141         let (tx, rx) = channel::<int>();
1142         let tx2 = tx.clone();
1143         spawn(proc() {
1144             rx.recv();
1145         });
1146         loop {
1147             tx.send(1);
1148             tx2.send(1);
1149         }
1150     } #[should_fail])
1151
1152     test!(fn smoke_chan_gone() {
1153         let (tx, rx) = channel::<int>();
1154         drop(tx);
1155         rx.recv();
1156     } #[should_fail])
1157
1158     test!(fn smoke_chan_gone_shared() {
1159         let (tx, rx) = channel::<()>();
1160         let tx2 = tx.clone();
1161         drop(tx);
1162         drop(tx2);
1163         rx.recv();
1164     } #[should_fail])
1165
1166     test!(fn chan_gone_concurrent() {
1167         let (tx, rx) = channel::<int>();
1168         spawn(proc() {
1169             tx.send(1);
1170             tx.send(1);
1171         });
1172         loop { rx.recv(); }
1173     } #[should_fail])
1174
1175     test!(fn stress() {
1176         let (tx, rx) = channel::<int>();
1177         spawn(proc() {
1178             for _ in range(0u, 10000) { tx.send(1i); }
1179         });
1180         for _ in range(0u, 10000) {
1181             assert_eq!(rx.recv(), 1);
1182         }
1183     })
1184
1185     test!(fn stress_shared() {
1186         static AMT: uint = 10000;
1187         static NTHREADS: uint = 8;
1188         let (tx, rx) = channel::<int>();
1189         let (dtx, drx) = channel::<()>();
1190
1191         spawn(proc() {
1192             for _ in range(0, AMT * NTHREADS) {
1193                 assert_eq!(rx.recv(), 1);
1194             }
1195             match rx.try_recv() {
1196                 Ok(..) => panic!(),
1197                 _ => {}
1198             }
1199             dtx.send(());
1200         });
1201
1202         for _ in range(0, NTHREADS) {
1203             let tx = tx.clone();
1204             spawn(proc() {
1205                 for _ in range(0, AMT) { tx.send(1); }
1206             });
1207         }
1208         drop(tx);
1209         drx.recv();
1210     })
1211
1212     #[test]
1213     fn send_from_outside_runtime() {
1214         let (tx1, rx1) = channel::<()>();
1215         let (tx2, rx2) = channel::<int>();
1216         let (tx3, rx3) = channel::<()>();
1217         let tx4 = tx3.clone();
1218         spawn(proc() {
1219             tx1.send(());
1220             for _ in range(0i, 40) {
1221                 assert_eq!(rx2.recv(), 1);
1222             }
1223             tx3.send(());
1224         });
1225         rx1.recv();
1226         spawn(proc() {
1227             for _ in range(0i, 40) {
1228                 tx2.send(1);
1229             }
1230             tx4.send(());
1231         });
1232         rx3.recv();
1233         rx3.recv();
1234     }
1235
1236     #[test]
1237     fn recv_from_outside_runtime() {
1238         let (tx, rx) = channel::<int>();
1239         let (dtx, drx) = channel();
1240         spawn(proc() {
1241             for _ in range(0i, 40) {
1242                 assert_eq!(rx.recv(), 1);
1243             }
1244             dtx.send(());
1245         });
1246         for _ in range(0u, 40) {
1247             tx.send(1);
1248         }
1249         drx.recv();
1250     }
1251
1252     #[test]
1253     fn no_runtime() {
1254         let (tx1, rx1) = channel::<int>();
1255         let (tx2, rx2) = channel::<int>();
1256         let (tx3, rx3) = channel::<()>();
1257         let tx4 = tx3.clone();
1258         spawn(proc() {
1259             assert_eq!(rx1.recv(), 1);
1260             tx2.send(2);
1261             tx4.send(());
1262         });
1263         spawn(proc() {
1264             tx1.send(1);
1265             assert_eq!(rx2.recv(), 2);
1266             tx3.send(());
1267         });
1268         rx3.recv();
1269         rx3.recv();
1270     }
1271
1272     test!(fn oneshot_single_thread_close_port_first() {
1273         // Simple test of closing without sending
1274         let (_tx, rx) = channel::<int>();
1275         drop(rx);
1276     })
1277
1278     test!(fn oneshot_single_thread_close_chan_first() {
1279         // Simple test of closing without sending
1280         let (tx, _rx) = channel::<int>();
1281         drop(tx);
1282     })
1283
1284     test!(fn oneshot_single_thread_send_port_close() {
1285         // Testing that the sender cleans up the payload if receiver is closed
1286         let (tx, rx) = channel::<Box<int>>();
1287         drop(rx);
1288         tx.send(box 0);
1289     } #[should_fail])
1290
1291     test!(fn oneshot_single_thread_recv_chan_close() {
1292         // Receiving on a closed chan will panic
1293         let res = task::try(proc() {
1294             let (tx, rx) = channel::<int>();
1295             drop(tx);
1296             rx.recv();
1297         });
1298         // What is our res?
1299         assert!(res.is_err());
1300     })
1301
1302     test!(fn oneshot_single_thread_send_then_recv() {
1303         let (tx, rx) = channel::<Box<int>>();
1304         tx.send(box 10);
1305         assert!(rx.recv() == box 10);
1306     })
1307
1308     test!(fn oneshot_single_thread_try_send_open() {
1309         let (tx, rx) = channel::<int>();
1310         assert!(tx.send_opt(10).is_ok());
1311         assert!(rx.recv() == 10);
1312     })
1313
1314     test!(fn oneshot_single_thread_try_send_closed() {
1315         let (tx, rx) = channel::<int>();
1316         drop(rx);
1317         assert!(tx.send_opt(10).is_err());
1318     })
1319
1320     test!(fn oneshot_single_thread_try_recv_open() {
1321         let (tx, rx) = channel::<int>();
1322         tx.send(10);
1323         assert!(rx.recv_opt() == Ok(10));
1324     })
1325
1326     test!(fn oneshot_single_thread_try_recv_closed() {
1327         let (tx, rx) = channel::<int>();
1328         drop(tx);
1329         assert!(rx.recv_opt() == Err(()));
1330     })
1331
1332     test!(fn oneshot_single_thread_peek_data() {
1333         let (tx, rx) = channel::<int>();
1334         assert_eq!(rx.try_recv(), Err(Empty))
1335         tx.send(10);
1336         assert_eq!(rx.try_recv(), Ok(10));
1337     })
1338
1339     test!(fn oneshot_single_thread_peek_close() {
1340         let (tx, rx) = channel::<int>();
1341         drop(tx);
1342         assert_eq!(rx.try_recv(), Err(Disconnected));
1343         assert_eq!(rx.try_recv(), Err(Disconnected));
1344     })
1345
1346     test!(fn oneshot_single_thread_peek_open() {
1347         let (_tx, rx) = channel::<int>();
1348         assert_eq!(rx.try_recv(), Err(Empty));
1349     })
1350
1351     test!(fn oneshot_multi_task_recv_then_send() {
1352         let (tx, rx) = channel::<Box<int>>();
1353         spawn(proc() {
1354             assert!(rx.recv() == box 10);
1355         });
1356
1357         tx.send(box 10);
1358     })
1359
1360     test!(fn oneshot_multi_task_recv_then_close() {
1361         let (tx, rx) = channel::<Box<int>>();
1362         spawn(proc() {
1363             drop(tx);
1364         });
1365         let res = task::try(proc() {
1366             assert!(rx.recv() == box 10);
1367         });
1368         assert!(res.is_err());
1369     })
1370
1371     test!(fn oneshot_multi_thread_close_stress() {
1372         for _ in range(0, stress_factor()) {
1373             let (tx, rx) = channel::<int>();
1374             spawn(proc() {
1375                 drop(rx);
1376             });
1377             drop(tx);
1378         }
1379     })
1380
1381     test!(fn oneshot_multi_thread_send_close_stress() {
1382         for _ in range(0, stress_factor()) {
1383             let (tx, rx) = channel::<int>();
1384             spawn(proc() {
1385                 drop(rx);
1386             });
1387             let _ = task::try(proc() {
1388                 tx.send(1);
1389             });
1390         }
1391     })
1392
1393     test!(fn oneshot_multi_thread_recv_close_stress() {
1394         for _ in range(0, stress_factor()) {
1395             let (tx, rx) = channel::<int>();
1396             spawn(proc() {
1397                 let res = task::try(proc() {
1398                     rx.recv();
1399                 });
1400                 assert!(res.is_err());
1401             });
1402             spawn(proc() {
1403                 spawn(proc() {
1404                     drop(tx);
1405                 });
1406             });
1407         }
1408     })
1409
1410     test!(fn oneshot_multi_thread_send_recv_stress() {
1411         for _ in range(0, stress_factor()) {
1412             let (tx, rx) = channel();
1413             spawn(proc() {
1414                 tx.send(box 10i);
1415             });
1416             spawn(proc() {
1417                 assert!(rx.recv() == box 10i);
1418             });
1419         }
1420     })
1421
1422     test!(fn stream_send_recv_stress() {
1423         for _ in range(0, stress_factor()) {
1424             let (tx, rx) = channel();
1425
1426             send(tx, 0);
1427             recv(rx, 0);
1428
1429             fn send(tx: Sender<Box<int>>, i: int) {
1430                 if i == 10 { return }
1431
1432                 spawn(proc() {
1433                     tx.send(box i);
1434                     send(tx, i + 1);
1435                 });
1436             }
1437
1438             fn recv(rx: Receiver<Box<int>>, i: int) {
1439                 if i == 10 { return }
1440
1441                 spawn(proc() {
1442                     assert!(rx.recv() == box i);
1443                     recv(rx, i + 1);
1444                 });
1445             }
1446         }
1447     })
1448
1449     test!(fn recv_a_lot() {
1450         // Regression test that we don't run out of stack in scheduler context
1451         let (tx, rx) = channel();
1452         for _ in range(0i, 10000) { tx.send(()); }
1453         for _ in range(0i, 10000) { rx.recv(); }
1454     })
1455
1456     test!(fn shared_chan_stress() {
1457         let (tx, rx) = channel();
1458         let total = stress_factor() + 100;
1459         for _ in range(0, total) {
1460             let tx = tx.clone();
1461             spawn(proc() {
1462                 tx.send(());
1463             });
1464         }
1465
1466         for _ in range(0, total) {
1467             rx.recv();
1468         }
1469     })
1470
1471     test!(fn test_nested_recv_iter() {
1472         let (tx, rx) = channel::<int>();
1473         let (total_tx, total_rx) = channel::<int>();
1474
1475         spawn(proc() {
1476             let mut acc = 0;
1477             for x in rx.iter() {
1478                 acc += x;
1479             }
1480             total_tx.send(acc);
1481         });
1482
1483         tx.send(3);
1484         tx.send(1);
1485         tx.send(2);
1486         drop(tx);
1487         assert_eq!(total_rx.recv(), 6);
1488     })
1489
1490     test!(fn test_recv_iter_break() {
1491         let (tx, rx) = channel::<int>();
1492         let (count_tx, count_rx) = channel();
1493
1494         spawn(proc() {
1495             let mut count = 0;
1496             for x in rx.iter() {
1497                 if count >= 3 {
1498                     break;
1499                 } else {
1500                     count += x;
1501                 }
1502             }
1503             count_tx.send(count);
1504         });
1505
1506         tx.send(2);
1507         tx.send(2);
1508         tx.send(2);
1509         let _ = tx.send_opt(2);
1510         drop(tx);
1511         assert_eq!(count_rx.recv(), 4);
1512     })
1513
1514     test!(fn try_recv_states() {
1515         let (tx1, rx1) = channel::<int>();
1516         let (tx2, rx2) = channel::<()>();
1517         let (tx3, rx3) = channel::<()>();
1518         spawn(proc() {
1519             rx2.recv();
1520             tx1.send(1);
1521             tx3.send(());
1522             rx2.recv();
1523             drop(tx1);
1524             tx3.send(());
1525         });
1526
1527         assert_eq!(rx1.try_recv(), Err(Empty));
1528         tx2.send(());
1529         rx3.recv();
1530         assert_eq!(rx1.try_recv(), Ok(1));
1531         assert_eq!(rx1.try_recv(), Err(Empty));
1532         tx2.send(());
1533         rx3.recv();
1534         assert_eq!(rx1.try_recv(), Err(Disconnected));
1535     })
1536
1537     // This bug used to end up in a livelock inside of the Receiver destructor
1538     // because the internal state of the Shared packet was corrupted
1539     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1540         let (tx, rx) = channel();
1541         let (tx2, rx2) = channel();
1542         spawn(proc() {
1543             rx.recv(); // wait on a oneshot
1544             drop(rx);  // destroy a shared
1545             tx2.send(());
1546         });
1547         // make sure the other task has gone to sleep
1548         for _ in range(0u, 5000) { task::deschedule(); }
1549
1550         // upgrade to a shared chan and send a message
1551         let t = tx.clone();
1552         drop(tx);
1553         t.send(());
1554
1555         // wait for the child task to exit before we exit
1556         rx2.recv();
1557     })
1558
1559     test!(fn sends_off_the_runtime() {
1560         use std::rt::thread::Thread;
1561
1562         let (tx, rx) = channel();
1563         let t = Thread::start(proc() {
1564             for _ in range(0u, 1000) {
1565                 tx.send(());
1566             }
1567         });
1568         for _ in range(0u, 1000) {
1569             rx.recv();
1570         }
1571         t.join();
1572     })
1573
1574     test!(fn try_recvs_off_the_runtime() {
1575         use std::rt::thread::Thread;
1576
1577         let (tx, rx) = channel();
1578         let (cdone, pdone) = channel();
1579         let t = Thread::start(proc() {
1580             let mut hits = 0u;
1581             while hits < 10 {
1582                 match rx.try_recv() {
1583                     Ok(()) => { hits += 1; }
1584                     Err(Empty) => { Thread::yield_now(); }
1585                     Err(Disconnected) => return,
1586                 }
1587             }
1588             cdone.send(());
1589         });
1590         for _ in range(0u, 10) {
1591             tx.send(());
1592         }
1593         t.join();
1594         pdone.recv();
1595     })
1596 }
1597
1598 #[cfg(test)]
1599 mod sync_tests {
1600     use std::prelude::*;
1601     use std::os;
1602
1603     pub fn stress_factor() -> uint {
1604         match os::getenv("RUST_TEST_STRESS") {
1605             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1606             None => 1,
1607         }
1608     }
1609
1610     test!(fn smoke() {
1611         let (tx, rx) = sync_channel::<int>(1);
1612         tx.send(1);
1613         assert_eq!(rx.recv(), 1);
1614     })
1615
1616     test!(fn drop_full() {
1617         let (tx, _rx) = sync_channel(1);
1618         tx.send(box 1i);
1619     })
1620
1621     test!(fn smoke_shared() {
1622         let (tx, rx) = sync_channel::<int>(1);
1623         tx.send(1);
1624         assert_eq!(rx.recv(), 1);
1625         let tx = tx.clone();
1626         tx.send(1);
1627         assert_eq!(rx.recv(), 1);
1628     })
1629
1630     test!(fn smoke_threads() {
1631         let (tx, rx) = sync_channel::<int>(0);
1632         spawn(proc() {
1633             tx.send(1);
1634         });
1635         assert_eq!(rx.recv(), 1);
1636     })
1637
1638     test!(fn smoke_port_gone() {
1639         let (tx, rx) = sync_channel::<int>(0);
1640         drop(rx);
1641         tx.send(1);
1642     } #[should_fail])
1643
1644     test!(fn smoke_shared_port_gone2() {
1645         let (tx, rx) = sync_channel::<int>(0);
1646         drop(rx);
1647         let tx2 = tx.clone();
1648         drop(tx);
1649         tx2.send(1);
1650     } #[should_fail])
1651
1652     test!(fn port_gone_concurrent() {
1653         let (tx, rx) = sync_channel::<int>(0);
1654         spawn(proc() {
1655             rx.recv();
1656         });
1657         loop { tx.send(1) }
1658     } #[should_fail])
1659
1660     test!(fn port_gone_concurrent_shared() {
1661         let (tx, rx) = sync_channel::<int>(0);
1662         let tx2 = tx.clone();
1663         spawn(proc() {
1664             rx.recv();
1665         });
1666         loop {
1667             tx.send(1);
1668             tx2.send(1);
1669         }
1670     } #[should_fail])
1671
1672     test!(fn smoke_chan_gone() {
1673         let (tx, rx) = sync_channel::<int>(0);
1674         drop(tx);
1675         rx.recv();
1676     } #[should_fail])
1677
1678     test!(fn smoke_chan_gone_shared() {
1679         let (tx, rx) = sync_channel::<()>(0);
1680         let tx2 = tx.clone();
1681         drop(tx);
1682         drop(tx2);
1683         rx.recv();
1684     } #[should_fail])
1685
1686     test!(fn chan_gone_concurrent() {
1687         let (tx, rx) = sync_channel::<int>(0);
1688         spawn(proc() {
1689             tx.send(1);
1690             tx.send(1);
1691         });
1692         loop { rx.recv(); }
1693     } #[should_fail])
1694
1695     test!(fn stress() {
1696         let (tx, rx) = sync_channel::<int>(0);
1697         spawn(proc() {
1698             for _ in range(0u, 10000) { tx.send(1); }
1699         });
1700         for _ in range(0u, 10000) {
1701             assert_eq!(rx.recv(), 1);
1702         }
1703     })
1704
1705     test!(fn stress_shared() {
1706         static AMT: uint = 1000;
1707         static NTHREADS: uint = 8;
1708         let (tx, rx) = sync_channel::<int>(0);
1709         let (dtx, drx) = sync_channel::<()>(0);
1710
1711         spawn(proc() {
1712             for _ in range(0, AMT * NTHREADS) {
1713                 assert_eq!(rx.recv(), 1);
1714             }
1715             match rx.try_recv() {
1716                 Ok(..) => panic!(),
1717                 _ => {}
1718             }
1719             dtx.send(());
1720         });
1721
1722         for _ in range(0, NTHREADS) {
1723             let tx = tx.clone();
1724             spawn(proc() {
1725                 for _ in range(0, AMT) { tx.send(1); }
1726             });
1727         }
1728         drop(tx);
1729         drx.recv();
1730     })
1731
1732     test!(fn oneshot_single_thread_close_port_first() {
1733         // Simple test of closing without sending
1734         let (_tx, rx) = sync_channel::<int>(0);
1735         drop(rx);
1736     })
1737
1738     test!(fn oneshot_single_thread_close_chan_first() {
1739         // Simple test of closing without sending
1740         let (tx, _rx) = sync_channel::<int>(0);
1741         drop(tx);
1742     })
1743
1744     test!(fn oneshot_single_thread_send_port_close() {
1745         // Testing that the sender cleans up the payload if receiver is closed
1746         let (tx, rx) = sync_channel::<Box<int>>(0);
1747         drop(rx);
1748         tx.send(box 0);
1749     } #[should_fail])
1750
1751     test!(fn oneshot_single_thread_recv_chan_close() {
1752         // Receiving on a closed chan will panic
1753         let res = task::try(proc() {
1754             let (tx, rx) = sync_channel::<int>(0);
1755             drop(tx);
1756             rx.recv();
1757         });
1758         // What is our res?
1759         assert!(res.is_err());
1760     })
1761
1762     test!(fn oneshot_single_thread_send_then_recv() {
1763         let (tx, rx) = sync_channel::<Box<int>>(1);
1764         tx.send(box 10);
1765         assert!(rx.recv() == box 10);
1766     })
1767
1768     test!(fn oneshot_single_thread_try_send_open() {
1769         let (tx, rx) = sync_channel::<int>(1);
1770         assert_eq!(tx.try_send(10), Ok(()));
1771         assert!(rx.recv() == 10);
1772     })
1773
1774     test!(fn oneshot_single_thread_try_send_closed() {
1775         let (tx, rx) = sync_channel::<int>(0);
1776         drop(rx);
1777         assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1778     })
1779
1780     test!(fn oneshot_single_thread_try_send_closed2() {
1781         let (tx, _rx) = sync_channel::<int>(0);
1782         assert_eq!(tx.try_send(10), Err(Full(10)));
1783     })
1784
1785     test!(fn oneshot_single_thread_try_recv_open() {
1786         let (tx, rx) = sync_channel::<int>(1);
1787         tx.send(10);
1788         assert!(rx.recv_opt() == Ok(10));
1789     })
1790
1791     test!(fn oneshot_single_thread_try_recv_closed() {
1792         let (tx, rx) = sync_channel::<int>(0);
1793         drop(tx);
1794         assert!(rx.recv_opt() == Err(()));
1795     })
1796
1797     test!(fn oneshot_single_thread_peek_data() {
1798         let (tx, rx) = sync_channel::<int>(1);
1799         assert_eq!(rx.try_recv(), Err(Empty))
1800         tx.send(10);
1801         assert_eq!(rx.try_recv(), Ok(10));
1802     })
1803
1804     test!(fn oneshot_single_thread_peek_close() {
1805         let (tx, rx) = sync_channel::<int>(0);
1806         drop(tx);
1807         assert_eq!(rx.try_recv(), Err(Disconnected));
1808         assert_eq!(rx.try_recv(), Err(Disconnected));
1809     })
1810
1811     test!(fn oneshot_single_thread_peek_open() {
1812         let (_tx, rx) = sync_channel::<int>(0);
1813         assert_eq!(rx.try_recv(), Err(Empty));
1814     })
1815
1816     test!(fn oneshot_multi_task_recv_then_send() {
1817         let (tx, rx) = sync_channel::<Box<int>>(0);
1818         spawn(proc() {
1819             assert!(rx.recv() == box 10);
1820         });
1821
1822         tx.send(box 10);
1823     })
1824
1825     test!(fn oneshot_multi_task_recv_then_close() {
1826         let (tx, rx) = sync_channel::<Box<int>>(0);
1827         spawn(proc() {
1828             drop(tx);
1829         });
1830         let res = task::try(proc() {
1831             assert!(rx.recv() == box 10);
1832         });
1833         assert!(res.is_err());
1834     })
1835
1836     test!(fn oneshot_multi_thread_close_stress() {
1837         for _ in range(0, stress_factor()) {
1838             let (tx, rx) = sync_channel::<int>(0);
1839             spawn(proc() {
1840                 drop(rx);
1841             });
1842             drop(tx);
1843         }
1844     })
1845
1846     test!(fn oneshot_multi_thread_send_close_stress() {
1847         for _ in range(0, stress_factor()) {
1848             let (tx, rx) = sync_channel::<int>(0);
1849             spawn(proc() {
1850                 drop(rx);
1851             });
1852             let _ = task::try(proc() {
1853                 tx.send(1);
1854             });
1855         }
1856     })
1857
1858     test!(fn oneshot_multi_thread_recv_close_stress() {
1859         for _ in range(0, stress_factor()) {
1860             let (tx, rx) = sync_channel::<int>(0);
1861             spawn(proc() {
1862                 let res = task::try(proc() {
1863                     rx.recv();
1864                 });
1865                 assert!(res.is_err());
1866             });
1867             spawn(proc() {
1868                 spawn(proc() {
1869                     drop(tx);
1870                 });
1871             });
1872         }
1873     })
1874
1875     test!(fn oneshot_multi_thread_send_recv_stress() {
1876         for _ in range(0, stress_factor()) {
1877             let (tx, rx) = sync_channel::<Box<int>>(0);
1878             spawn(proc() {
1879                 tx.send(box 10i);
1880             });
1881             spawn(proc() {
1882                 assert!(rx.recv() == box 10i);
1883             });
1884         }
1885     })
1886
1887     test!(fn stream_send_recv_stress() {
1888         for _ in range(0, stress_factor()) {
1889             let (tx, rx) = sync_channel::<Box<int>>(0);
1890
1891             send(tx, 0);
1892             recv(rx, 0);
1893
1894             fn send(tx: SyncSender<Box<int>>, i: int) {
1895                 if i == 10 { return }
1896
1897                 spawn(proc() {
1898                     tx.send(box i);
1899                     send(tx, i + 1);
1900                 });
1901             }
1902
1903             fn recv(rx: Receiver<Box<int>>, i: int) {
1904                 if i == 10 { return }
1905
1906                 spawn(proc() {
1907                     assert!(rx.recv() == box i);
1908                     recv(rx, i + 1);
1909                 });
1910             }
1911         }
1912     })
1913
1914     test!(fn recv_a_lot() {
1915         // Regression test that we don't run out of stack in scheduler context
1916         let (tx, rx) = sync_channel(10000);
1917         for _ in range(0u, 10000) { tx.send(()); }
1918         for _ in range(0u, 10000) { rx.recv(); }
1919     })
1920
1921     test!(fn shared_chan_stress() {
1922         let (tx, rx) = sync_channel(0);
1923         let total = stress_factor() + 100;
1924         for _ in range(0, total) {
1925             let tx = tx.clone();
1926             spawn(proc() {
1927                 tx.send(());
1928             });
1929         }
1930
1931         for _ in range(0, total) {
1932             rx.recv();
1933         }
1934     })
1935
1936     test!(fn test_nested_recv_iter() {
1937         let (tx, rx) = sync_channel::<int>(0);
1938         let (total_tx, total_rx) = sync_channel::<int>(0);
1939
1940         spawn(proc() {
1941             let mut acc = 0;
1942             for x in rx.iter() {
1943                 acc += x;
1944             }
1945             total_tx.send(acc);
1946         });
1947
1948         tx.send(3);
1949         tx.send(1);
1950         tx.send(2);
1951         drop(tx);
1952         assert_eq!(total_rx.recv(), 6);
1953     })
1954
1955     test!(fn test_recv_iter_break() {
1956         let (tx, rx) = sync_channel::<int>(0);
1957         let (count_tx, count_rx) = sync_channel(0);
1958
1959         spawn(proc() {
1960             let mut count = 0;
1961             for x in rx.iter() {
1962                 if count >= 3 {
1963                     break;
1964                 } else {
1965                     count += x;
1966                 }
1967             }
1968             count_tx.send(count);
1969         });
1970
1971         tx.send(2);
1972         tx.send(2);
1973         tx.send(2);
1974         let _ = tx.try_send(2);
1975         drop(tx);
1976         assert_eq!(count_rx.recv(), 4);
1977     })
1978
1979     test!(fn try_recv_states() {
1980         let (tx1, rx1) = sync_channel::<int>(1);
1981         let (tx2, rx2) = sync_channel::<()>(1);
1982         let (tx3, rx3) = sync_channel::<()>(1);
1983         spawn(proc() {
1984             rx2.recv();
1985             tx1.send(1);
1986             tx3.send(());
1987             rx2.recv();
1988             drop(tx1);
1989             tx3.send(());
1990         });
1991
1992         assert_eq!(rx1.try_recv(), Err(Empty));
1993         tx2.send(());
1994         rx3.recv();
1995         assert_eq!(rx1.try_recv(), Ok(1));
1996         assert_eq!(rx1.try_recv(), Err(Empty));
1997         tx2.send(());
1998         rx3.recv();
1999         assert_eq!(rx1.try_recv(), Err(Disconnected));
2000     })
2001
2002     // This bug used to end up in a livelock inside of the Receiver destructor
2003     // because the internal state of the Shared packet was corrupted
2004     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
2005         let (tx, rx) = sync_channel::<()>(0);
2006         let (tx2, rx2) = sync_channel::<()>(0);
2007         spawn(proc() {
2008             rx.recv(); // wait on a oneshot
2009             drop(rx);  // destroy a shared
2010             tx2.send(());
2011         });
2012         // make sure the other task has gone to sleep
2013         for _ in range(0u, 5000) { task::deschedule(); }
2014
2015         // upgrade to a shared chan and send a message
2016         let t = tx.clone();
2017         drop(tx);
2018         t.send(());
2019
2020         // wait for the child task to exit before we exit
2021         rx2.recv();
2022     })
2023
2024     test!(fn try_recvs_off_the_runtime() {
2025         use std::rt::thread::Thread;
2026
2027         let (tx, rx) = sync_channel::<()>(0);
2028         let (cdone, pdone) = channel();
2029         let t = Thread::start(proc() {
2030             let mut hits = 0u;
2031             while hits < 10 {
2032                 match rx.try_recv() {
2033                     Ok(()) => { hits += 1; }
2034                     Err(Empty) => { Thread::yield_now(); }
2035                     Err(Disconnected) => return,
2036                 }
2037             }
2038             cdone.send(());
2039         });
2040         for _ in range(0u, 10) {
2041             tx.send(());
2042         }
2043         t.join();
2044         pdone.recv();
2045     })
2046
2047     test!(fn send_opt1() {
2048         let (tx, rx) = sync_channel::<int>(0);
2049         spawn(proc() { rx.recv(); });
2050         assert_eq!(tx.send_opt(1), Ok(()));
2051     })
2052
2053     test!(fn send_opt2() {
2054         let (tx, rx) = sync_channel::<int>(0);
2055         spawn(proc() { drop(rx); });
2056         assert_eq!(tx.send_opt(1), Err(1));
2057     })
2058
2059     test!(fn send_opt3() {
2060         let (tx, rx) = sync_channel::<int>(1);
2061         assert_eq!(tx.send_opt(1), Ok(()));
2062         spawn(proc() { drop(rx); });
2063         assert_eq!(tx.send_opt(1), Err(1));
2064     })
2065
2066     test!(fn send_opt4() {
2067         let (tx, rx) = sync_channel::<int>(0);
2068         let tx2 = tx.clone();
2069         let (done, donerx) = channel();
2070         let done2 = done.clone();
2071         spawn(proc() {
2072             assert_eq!(tx.send_opt(1), Err(1));
2073             done.send(());
2074         });
2075         spawn(proc() {
2076             assert_eq!(tx2.send_opt(2), Err(2));
2077             done2.send(());
2078         });
2079         drop(rx);
2080         donerx.recv();
2081         donerx.recv();
2082     })
2083
2084     test!(fn try_send1() {
2085         let (tx, _rx) = sync_channel::<int>(0);
2086         assert_eq!(tx.try_send(1), Err(Full(1)));
2087     })
2088
2089     test!(fn try_send2() {
2090         let (tx, _rx) = sync_channel::<int>(1);
2091         assert_eq!(tx.try_send(1), Ok(()));
2092         assert_eq!(tx.try_send(1), Err(Full(1)));
2093     })
2094
2095     test!(fn try_send3() {
2096         let (tx, rx) = sync_channel::<int>(1);
2097         assert_eq!(tx.try_send(1), Ok(()));
2098         drop(rx);
2099         assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2100     })
2101
2102     test!(fn try_send4() {
2103         let (tx, rx) = sync_channel::<int>(0);
2104         spawn(proc() {
2105             for _ in range(0u, 1000) { task::deschedule(); }
2106             assert_eq!(tx.try_send(1), Ok(()));
2107         });
2108         assert_eq!(rx.recv(), 1);
2109     } #[ignore(reason = "flaky on libnative")])
2110
2111     test!(fn issue_15761() {
2112         fn repro() {
2113             let (tx1, rx1) = sync_channel::<()>(3);
2114             let (tx2, rx2) = sync_channel::<()>(3);
2115
2116             spawn(proc() {
2117                 rx1.recv();
2118                 tx2.try_send(()).unwrap();
2119             });
2120
2121             tx1.try_send(()).unwrap();
2122             rx2.recv();
2123         }
2124
2125         for _ in range(0u, 100) {
2126             repro()
2127         }
2128     })
2129 }