]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/comm.rs
4f5f26513b4c6fcf558cd52ab22bbac382a7f2d3
[rust.git] / src / libstd / rt / comm.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Ports and channels.
12
13 use option::*;
14 use cast;
15 use ops::Drop;
16 use rt::kill::BlockedTask;
17 use kinds::Send;
18 use rt;
19 use rt::sched::Scheduler;
20 use rt::local::Local;
21 use rt::select::{SelectInner, SelectPortInner};
22 use select::{Select, SelectPort};
23 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
24 use unstable::sync::UnsafeArc;
25 use util::Void;
26 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
27 use cell::Cell;
28 use clone::Clone;
29 use tuple::ImmutableTuple;
30
31 /// A combined refcount / BlockedTask-as-uint pointer.
32 ///
33 /// Can be equal to the following values:
34 ///
35 /// * 2 - both endpoints are alive
36 /// * 1 - either the sender or the receiver is dead, determined by context
37 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
38 type State = uint;
39
40 static STATE_BOTH: State = 2;
41 static STATE_ONE: State = 1;
42
43 /// The heap-allocated structure shared between two endpoints.
44 struct Packet<T> {
45     state: AtomicUint,
46     payload: Option<T>,
47 }
48
49 // A one-shot channel.
50 pub struct ChanOne<T> {
51     void_packet: *mut Void,
52     suppress_finalize: bool
53 }
54
55 /// A one-shot port.
56 pub struct PortOne<T> {
57     void_packet: *mut Void,
58     suppress_finalize: bool
59 }
60
61 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
62     let packet: ~Packet<T> = ~Packet {
63         state: AtomicUint::new(STATE_BOTH),
64         payload: None
65     };
66
67     unsafe {
68         let packet: *mut Void = cast::transmute(packet);
69         let port = PortOne {
70             void_packet: packet,
71             suppress_finalize: false
72         };
73         let chan = ChanOne {
74             void_packet: packet,
75             suppress_finalize: false
76         };
77         return (port, chan);
78     }
79 }
80
81 impl<T> ChanOne<T> {
82     #[inline]
83     fn packet(&self) -> *mut Packet<T> {
84         unsafe {
85             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
86             let p: *mut Packet<T> = &mut **p;
87             return p;
88         }
89     }
90
91     /// Send a message on the one-shot channel. If a receiver task is blocked
92     /// waiting for the message, will wake it up and reschedule to it.
93     pub fn send(self, val: T) {
94         self.try_send(val);
95     }
96
97     /// As `send`, but also returns whether or not the receiver endpoint is still open.
98     pub fn try_send(self, val: T) -> bool {
99         self.try_send_inner(val, true)
100     }
101
102     /// Send a message without immediately rescheduling to a blocked receiver.
103     /// This can be useful in contexts where rescheduling is forbidden, or to
104     /// optimize for when the sender expects to still have useful work to do.
105     pub fn send_deferred(self, val: T) {
106         self.try_send_deferred(val);
107     }
108
109     /// As `send_deferred` and `try_send` together.
110     pub fn try_send_deferred(self, val: T) -> bool {
111         self.try_send_inner(val, false)
112     }
113
114     // 'do_resched' configures whether the scheduler immediately switches to
115     // the receiving task, or leaves the sending task still running.
116     fn try_send_inner(self, val: T, do_resched: bool) -> bool {
117         if do_resched {
118             rtassert!(!rt::in_sched_context());
119         }
120
121         let mut this = self;
122         let mut recvr_active = true;
123         let packet = this.packet();
124
125         unsafe {
126
127             // Install the payload
128             rtassert!((*packet).payload.is_none());
129             (*packet).payload = Some(val);
130
131             // Atomically swap out the old state to figure out what
132             // the port's up to, issuing a release barrier to prevent
133             // reordering of the payload write. This also issues an
134             // acquire barrier that keeps the subsequent access of the
135             // ~Task pointer from being reordered.
136             let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
137
138             // Suppress the synchronizing actions in the finalizer. We're
139             // done with the packet. NB: In case of do_resched, this *must*
140             // happen before waking up a blocked task (or be unkillable),
141             // because we might get a kill signal during the reschedule.
142             this.suppress_finalize = true;
143
144             match oldstate {
145                 STATE_BOTH => {
146                     // Port is not waiting yet. Nothing to do
147                 }
148                 STATE_ONE => {
149                     // Port has closed. Need to clean up.
150                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
151                     recvr_active = false;
152                 }
153                 task_as_state => {
154                     // Port is blocked. Wake it up.
155                     let recvr = BlockedTask::cast_from_uint(task_as_state);
156                     if do_resched {
157                         do recvr.wake().map_move |woken_task| {
158                             Scheduler::run_task(woken_task);
159                         };
160                     } else {
161                         let recvr = Cell::new(recvr);
162                         do Local::borrow |sched: &mut Scheduler| {
163                             sched.enqueue_blocked_task(recvr.take());
164                         }
165                     }
166                 }
167             }
168         }
169
170         return recvr_active;
171     }
172 }
173
174 impl<T> PortOne<T> {
175     fn packet(&self) -> *mut Packet<T> {
176         unsafe {
177             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
178             let p: *mut Packet<T> = &mut **p;
179             return p;
180         }
181     }
182
183     /// Wait for a message on the one-shot port. Fails if the send end is closed.
184     pub fn recv(self) -> T {
185         match self.try_recv() {
186             Some(val) => val,
187             None => {
188                 fail!("receiving on closed channel");
189             }
190         }
191     }
192
193     /// As `recv`, but returns `None` if the send end is closed rather than failing.
194     pub fn try_recv(self) -> Option<T> {
195         let mut this = self;
196
197         // Optimistic check. If data was sent already, we don't even need to block.
198         // No release barrier needed here; we're not handing off our task pointer yet.
199         if !this.optimistic_check() {
200             // No data available yet.
201             // Switch to the scheduler to put the ~Task into the Packet state.
202             let sched: ~Scheduler = Local::take();
203             do sched.deschedule_running_task_and_then |sched, task| {
204                 this.block_on(sched, task);
205             }
206         }
207
208         // Task resumes.
209         this.recv_ready()
210     }
211 }
212
213 impl<T> SelectInner for PortOne<T> {
214     #[inline] #[cfg(not(test))]
215     fn optimistic_check(&mut self) -> bool {
216         unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
217     }
218
219     #[inline] #[cfg(test)]
220     fn optimistic_check(&mut self) -> bool {
221         // The optimistic check is never necessary for correctness. For testing
222         // purposes, making it randomly return false simulates a racing sender.
223         use rand::{Rand};
224         let actually_check = do Local::borrow |sched: &mut Scheduler| {
225             Rand::rand(&mut sched.rng)
226         };
227         if actually_check {
228             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
229         } else {
230             false
231         }
232     }
233
234     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
235         unsafe {
236             // Atomically swap the task pointer into the Packet state, issuing
237             // an acquire barrier to prevent reordering of the subsequent read
238             // of the payload. Also issues a release barrier to prevent
239             // reordering of any previous writes to the task structure.
240             let task_as_state = task.cast_to_uint();
241             let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
242             match oldstate {
243                 STATE_BOTH => {
244                     // Data has not been sent. Now we're blocked.
245                     rtdebug!("non-rendezvous recv");
246                     false
247                 }
248                 STATE_ONE => {
249                     // Re-record that we are the only owner of the packet.
250                     // No barrier needed, even if the task gets reawoken
251                     // on a different core -- this is analogous to writing a
252                     // payload; a barrier in enqueueing the task protects it.
253                     // NB(#8132). This *must* occur before the enqueue below.
254                     // FIXME(#6842, #8130) This is usually only needed for the
255                     // assertion in recv_ready, except in the case of select().
256                     // This won't actually ever have cacheline contention, but
257                     // maybe should be optimized out with a cfg(test) anyway?
258                     (*self.packet()).state.store(STATE_ONE, Relaxed);
259
260                     rtdebug!("rendezvous recv");
261
262                     // Channel is closed. Switch back and check the data.
263                     // NB: We have to drop back into the scheduler event loop here
264                     // instead of switching immediately back or we could end up
265                     // triggering infinite recursion on the scheduler's stack.
266                     let recvr = BlockedTask::cast_from_uint(task_as_state);
267                     sched.enqueue_blocked_task(recvr);
268                     true
269                 }
270                 _ => rtabort!("can't block_on; a task is already blocked")
271             }
272         }
273     }
274
275     // This is the only select trait function that's not also used in recv.
276     fn unblock_from(&mut self) -> bool {
277         let packet = self.packet();
278         unsafe {
279             // In case the data is available, the acquire barrier here matches
280             // the release barrier the sender used to release the payload.
281             match (*packet).state.load(Acquire) {
282                 // Impossible. We removed STATE_BOTH when blocking on it, and
283                 // no self-respecting sender would put it back.
284                 STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
285                 // Here, a sender already tried to wake us up. Perhaps they
286                 // even succeeded! Data is available.
287                 STATE_ONE     => true,
288                 // Still registered as blocked. Need to "unblock" the pointer.
289                 task_as_state => {
290                     // In the window between the load and the CAS, a sender
291                     // might take the pointer and set the refcount to ONE. If
292                     // that happens, we shouldn't clobber that with BOTH!
293                     // Acquire barrier again for the same reason as above.
294                     match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
295                                                            Acquire) {
296                         STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
297                         STATE_ONE  => true, // Lost the race. Data available.
298                         same_ptr   => {
299                             // We successfully unblocked our task pointer.
300                             rtassert!(task_as_state == same_ptr);
301                             let handle = BlockedTask::cast_from_uint(task_as_state);
302                             // Because we are already awake, the handle we
303                             // gave to this port shall already be empty.
304                             handle.assert_already_awake();
305                             false
306                         }
307                     }
308                 }
309             }
310         }
311     }
312 }
313
314 impl<T> Select for PortOne<T> { }
315
316 impl<T> SelectPortInner<T> for PortOne<T> {
317     fn recv_ready(self) -> Option<T> {
318         let mut this = self;
319         let packet = this.packet();
320
321         // No further memory barrier is needed here to access the
322         // payload. Some scenarios:
323         //
324         // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
325         // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
326         //    and ran on its thread. The sending task issued a read barrier when taking the
327         //    pointer to the receiving task.
328         // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
329         //    is pinned to some other scheduler, so the sending task had to give us to
330         //    a different scheduler for resuming. That send synchronized memory.
331         unsafe {
332             // See corresponding store() above in block_on for rationale.
333             // FIXME(#8130) This can happen only in test builds.
334             // This load is not required for correctness and may be compiled out.
335             rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
336
337             let payload = (*packet).payload.take();
338
339             // The sender has closed up shop. Drop the packet.
340             let _packet: ~Packet<T> = cast::transmute(this.void_packet);
341             // Suppress the synchronizing actions in the finalizer. We're done with the packet.
342             this.suppress_finalize = true;
343             return payload;
344         }
345     }
346 }
347
348 impl<T> SelectPort<T> for PortOne<T> { }
349
350 impl<T> Peekable<T> for PortOne<T> {
351     fn peek(&self) -> bool {
352         unsafe {
353             let packet: *mut Packet<T> = self.packet();
354             let oldstate = (*packet).state.load(SeqCst);
355             match oldstate {
356                 STATE_BOTH => false,
357                 STATE_ONE => (*packet).payload.is_some(),
358                 _ => rtabort!("peeked on a blocked task")
359             }
360         }
361     }
362 }
363
364 #[unsafe_destructor]
365 impl<T> Drop for ChanOne<T> {
366     fn drop(&self) {
367         if self.suppress_finalize { return }
368
369         unsafe {
370             let this = cast::transmute_mut(self);
371             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
372             match oldstate {
373                 STATE_BOTH => {
374                     // Port still active. It will destroy the Packet.
375                 },
376                 STATE_ONE => {
377                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
378                 },
379                 task_as_state => {
380                     // The port is blocked waiting for a message we will never send. Wake it.
381                     rtassert!((*this.packet()).payload.is_none());
382                     let recvr = BlockedTask::cast_from_uint(task_as_state);
383                     do recvr.wake().map_move |woken_task| {
384                         Scheduler::run_task(woken_task);
385                     };
386                 }
387             }
388         }
389     }
390 }
391
392 #[unsafe_destructor]
393 impl<T> Drop for PortOne<T> {
394     fn drop(&self) {
395         if self.suppress_finalize { return }
396
397         unsafe {
398             let this = cast::transmute_mut(self);
399             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
400             match oldstate {
401                 STATE_BOTH => {
402                     // Chan still active. It will destroy the packet.
403                 },
404                 STATE_ONE => {
405                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
406                 }
407                 task_as_state => {
408                     // This case occurs during unwinding, when the blocked
409                     // receiver was killed awake. The task can't still be
410                     // blocked (we are it), but we need to free the handle.
411                     let recvr = BlockedTask::cast_from_uint(task_as_state);
412                     recvr.assert_already_awake();
413                 }
414             }
415         }
416     }
417 }
418
419 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
420 pub trait SendDeferred<T> {
421     fn send_deferred(&self, val: T);
422     fn try_send_deferred(&self, val: T) -> bool;
423 }
424
425 struct StreamPayload<T> {
426     val: T,
427     next: PortOne<StreamPayload<T>>
428 }
429
430 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
431 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
432
433 /// A channel with unbounded size.
434 pub struct Chan<T> {
435     // FIXME #5372. Using Cell because we don't take &mut self
436     next: Cell<StreamChanOne<T>>
437 }
438
439 /// An port with unbounded size.
440 pub struct Port<T> {
441     // FIXME #5372. Using Cell because we don't take &mut self
442     next: Cell<StreamPortOne<T>>
443 }
444
445 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
446     let (pone, cone) = oneshot();
447     let port = Port { next: Cell::new(pone) };
448     let chan = Chan { next: Cell::new(cone) };
449     return (port, chan);
450 }
451
452 impl<T: Send> Chan<T> {
453     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
454         let (next_pone, next_cone) = oneshot();
455         let cone = self.next.take();
456         self.next.put_back(next_cone);
457         cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
458     }
459 }
460
461 impl<T: Send> GenericChan<T> for Chan<T> {
462     fn send(&self, val: T) {
463         self.try_send(val);
464     }
465 }
466
467 impl<T: Send> GenericSmartChan<T> for Chan<T> {
468     fn try_send(&self, val: T) -> bool {
469         self.try_send_inner(val, true)
470     }
471 }
472
473 impl<T: Send> SendDeferred<T> for Chan<T> {
474     fn send_deferred(&self, val: T) {
475         self.try_send_deferred(val);
476     }
477     fn try_send_deferred(&self, val: T) -> bool {
478         self.try_send_inner(val, false)
479     }
480 }
481
482 impl<T> GenericPort<T> for Port<T> {
483     fn recv(&self) -> T {
484         match self.try_recv() {
485             Some(val) => val,
486             None => {
487                 fail!("receiving on closed channel");
488             }
489         }
490     }
491
492     fn try_recv(&self) -> Option<T> {
493         do self.next.take_opt().map_move_default(None) |pone| {
494             match pone.try_recv() {
495                 Some(StreamPayload { val, next }) => {
496                     self.next.put_back(next);
497                     Some(val)
498                 }
499                 None => None
500             }
501         }
502     }
503 }
504
505 impl<T> Peekable<T> for Port<T> {
506     fn peek(&self) -> bool {
507         self.next.with_mut_ref(|p| p.peek())
508     }
509 }
510
511 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
512 // of them, but a &Port<T> should also be selectable so you can select2 on it
513 // alongside a PortOne<U> without passing the port by value in recv_ready.
514
515 impl<'self, T> SelectInner for &'self Port<T> {
516     #[inline]
517     fn optimistic_check(&mut self) -> bool {
518         do self.next.with_mut_ref |pone| { pone.optimistic_check() }
519     }
520
521     #[inline]
522     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
523         let task = Cell::new(task);
524         do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
525     }
526
527     #[inline]
528     fn unblock_from(&mut self) -> bool {
529         do self.next.with_mut_ref |pone| { pone.unblock_from() }
530     }
531 }
532
533 impl<'self, T> Select for &'self Port<T> { }
534
535 impl<T> SelectInner for Port<T> {
536     #[inline]
537     fn optimistic_check(&mut self) -> bool {
538         (&*self).optimistic_check()
539     }
540
541     #[inline]
542     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
543         (&*self).block_on(sched, task)
544     }
545
546     #[inline]
547     fn unblock_from(&mut self) -> bool {
548         (&*self).unblock_from()
549     }
550 }
551
552 impl<T> Select for Port<T> { }
553
554 impl<'self, T> SelectPortInner<T> for &'self Port<T> {
555     fn recv_ready(self) -> Option<T> {
556         match self.next.take().recv_ready() {
557             Some(StreamPayload { val, next }) => {
558                 self.next.put_back(next);
559                 Some(val)
560             }
561             None => None
562         }
563     }
564 }
565
566 impl<'self, T> SelectPort<T> for &'self Port<T> { }
567
568 pub struct SharedChan<T> {
569     // Just like Chan, but a shared AtomicOption instead of Cell
570     priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
571 }
572
573 impl<T> SharedChan<T> {
574     pub fn new(chan: Chan<T>) -> SharedChan<T> {
575         let next = chan.next.take();
576         let next = AtomicOption::new(~next);
577         SharedChan { next: UnsafeArc::new(next) }
578     }
579 }
580
581 impl<T: Send> SharedChan<T> {
582     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
583         unsafe {
584             let (next_pone, next_cone) = oneshot();
585             let cone = (*self.next.get()).swap(~next_cone, SeqCst);
586             cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
587                                          do_resched)
588         }
589     }
590 }
591
592 impl<T: Send> GenericChan<T> for SharedChan<T> {
593     fn send(&self, val: T) {
594         self.try_send(val);
595     }
596 }
597
598 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
599     fn try_send(&self, val: T) -> bool {
600         self.try_send_inner(val, true)
601     }
602 }
603
604 impl<T: Send> SendDeferred<T> for SharedChan<T> {
605     fn send_deferred(&self, val: T) {
606         self.try_send_deferred(val);
607     }
608     fn try_send_deferred(&self, val: T) -> bool {
609         self.try_send_inner(val, false)
610     }
611 }
612
613 impl<T> Clone for SharedChan<T> {
614     fn clone(&self) -> SharedChan<T> {
615         SharedChan {
616             next: self.next.clone()
617         }
618     }
619 }
620
621 pub struct SharedPort<T> {
622     // The next port on which we will receive the next port on which we will receive T
623     priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
624 }
625
626 impl<T> SharedPort<T> {
627     pub fn new(port: Port<T>) -> SharedPort<T> {
628         // Put the data port into a new link pipe
629         let next_data_port = port.next.take();
630         let (next_link_port, next_link_chan) = oneshot();
631         next_link_chan.send(next_data_port);
632         let next_link = AtomicOption::new(~next_link_port);
633         SharedPort { next_link: UnsafeArc::new(next_link) }
634     }
635 }
636
637 impl<T: Send> GenericPort<T> for SharedPort<T> {
638     fn recv(&self) -> T {
639         match self.try_recv() {
640             Some(val) => val,
641             None => {
642                 fail!("receiving on a closed channel");
643             }
644         }
645     }
646
647     fn try_recv(&self) -> Option<T> {
648         unsafe {
649             let (next_link_port, next_link_chan) = oneshot();
650             let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
651             let link_port = link_port.unwrap();
652             let data_port = link_port.recv();
653             let (next_data_port, res) = match data_port.try_recv() {
654                 Some(StreamPayload { val, next }) => {
655                     (next, Some(val))
656                 }
657                 None => {
658                     let (next_data_port, _) = oneshot();
659                     (next_data_port, None)
660                 }
661             };
662             next_link_chan.send(next_data_port);
663             return res;
664         }
665     }
666 }
667
668 impl<T> Clone for SharedPort<T> {
669     fn clone(&self) -> SharedPort<T> {
670         SharedPort {
671             next_link: self.next_link.clone()
672         }
673     }
674 }
675
676 // FIXME #7760: Need better name
677 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
678
679 pub fn megapipe<T: Send>() -> MegaPipe<T> {
680     let (port, chan) = stream();
681     (SharedPort::new(port), SharedChan::new(chan))
682 }
683
684 impl<T: Send> GenericChan<T> for MegaPipe<T> {
685     fn send(&self, val: T) {
686         self.second_ref().send(val)
687     }
688 }
689
690 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
691     fn try_send(&self, val: T) -> bool {
692         self.second_ref().try_send(val)
693     }
694 }
695
696 impl<T: Send> GenericPort<T> for MegaPipe<T> {
697     fn recv(&self) -> T {
698         self.first_ref().recv()
699     }
700
701     fn try_recv(&self) -> Option<T> {
702         self.first_ref().try_recv()
703     }
704 }
705
706 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
707     fn send_deferred(&self, val: T) {
708         self.second_ref().send_deferred(val)
709     }
710     fn try_send_deferred(&self, val: T) -> bool {
711         self.second_ref().try_send_deferred(val)
712     }
713 }
714
715 #[cfg(test)]
716 mod test {
717     use super::*;
718     use option::*;
719     use rt::test::*;
720     use cell::Cell;
721     use num::Times;
722     use rt::util;
723
724     #[test]
725     fn oneshot_single_thread_close_port_first() {
726         // Simple test of closing without sending
727         do run_in_newsched_task {
728             let (port, _chan) = oneshot::<int>();
729             { let _p = port; }
730         }
731     }
732
733     #[test]
734     fn oneshot_single_thread_close_chan_first() {
735         // Simple test of closing without sending
736         do run_in_newsched_task {
737             let (_port, chan) = oneshot::<int>();
738             { let _c = chan; }
739         }
740     }
741
742     #[test]
743     fn oneshot_single_thread_send_port_close() {
744         // Testing that the sender cleans up the payload if receiver is closed
745         do run_in_newsched_task {
746             let (port, chan) = oneshot::<~int>();
747             { let _p = port; }
748             chan.send(~0);
749         }
750     }
751
752     #[test]
753     fn oneshot_single_thread_recv_chan_close() {
754         // Receiving on a closed chan will fail
755         do run_in_newsched_task {
756             let res = do spawntask_try {
757                 let (port, chan) = oneshot::<~int>();
758                 { let _c = chan; }
759                 port.recv();
760             };
761             // What is our res?
762             rtdebug!("res is: %?", res.is_err());
763             assert!(res.is_err());
764         }
765     }
766
767     #[test]
768     fn oneshot_single_thread_send_then_recv() {
769         do run_in_newsched_task {
770             let (port, chan) = oneshot::<~int>();
771             chan.send(~10);
772             assert!(port.recv() == ~10);
773         }
774     }
775
776     #[test]
777     fn oneshot_single_thread_try_send_open() {
778         do run_in_newsched_task {
779             let (port, chan) = oneshot::<int>();
780             assert!(chan.try_send(10));
781             assert!(port.recv() == 10);
782         }
783     }
784
785     #[test]
786     fn oneshot_single_thread_try_send_closed() {
787         do run_in_newsched_task {
788             let (port, chan) = oneshot::<int>();
789             { let _p = port; }
790             assert!(!chan.try_send(10));
791         }
792     }
793
794     #[test]
795     fn oneshot_single_thread_try_recv_open() {
796         do run_in_newsched_task {
797             let (port, chan) = oneshot::<int>();
798             chan.send(10);
799             assert!(port.try_recv() == Some(10));
800         }
801     }
802
803     #[test]
804     fn oneshot_single_thread_try_recv_closed() {
805         do run_in_newsched_task {
806             let (port, chan) = oneshot::<int>();
807             { let _c = chan; }
808             assert!(port.try_recv() == None);
809         }
810     }
811
812     #[test]
813     fn oneshot_single_thread_peek_data() {
814         do run_in_newsched_task {
815             let (port, chan) = oneshot::<int>();
816             assert!(!port.peek());
817             chan.send(10);
818             assert!(port.peek());
819         }
820     }
821
822     #[test]
823     fn oneshot_single_thread_peek_close() {
824         do run_in_newsched_task {
825             let (port, chan) = oneshot::<int>();
826             { let _c = chan; }
827             assert!(!port.peek());
828             assert!(!port.peek());
829         }
830     }
831
832     #[test]
833     fn oneshot_single_thread_peek_open() {
834         do run_in_newsched_task {
835             let (port, _) = oneshot::<int>();
836             assert!(!port.peek());
837         }
838     }
839
840     #[test]
841     fn oneshot_multi_task_recv_then_send() {
842         do run_in_newsched_task {
843             let (port, chan) = oneshot::<~int>();
844             let port_cell = Cell::new(port);
845             do spawntask {
846                 assert!(port_cell.take().recv() == ~10);
847             }
848
849             chan.send(~10);
850         }
851     }
852
853     #[test]
854     fn oneshot_multi_task_recv_then_close() {
855         do run_in_newsched_task {
856             let (port, chan) = oneshot::<~int>();
857             let port_cell = Cell::new(port);
858             let chan_cell = Cell::new(chan);
859             do spawntask_later {
860                 let _cell = chan_cell.take();
861             }
862             let res = do spawntask_try {
863                 assert!(port_cell.take().recv() == ~10);
864             };
865             assert!(res.is_err());
866         }
867     }
868
869     #[test]
870     fn oneshot_multi_thread_close_stress() {
871         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
872         do stress_factor().times {
873             do run_in_newsched_task {
874                 let (port, chan) = oneshot::<int>();
875                 let port_cell = Cell::new(port);
876                 let thread = do spawntask_thread {
877                     let _p = port_cell.take();
878                 };
879                 let _chan = chan;
880                 thread.join();
881             }
882         }
883     }
884
885     #[test]
886     fn oneshot_multi_thread_send_close_stress() {
887         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
888         do stress_factor().times {
889             do run_in_newsched_task {
890                 let (port, chan) = oneshot::<int>();
891                 let chan_cell = Cell::new(chan);
892                 let port_cell = Cell::new(port);
893                 let thread1 = do spawntask_thread {
894                     let _p = port_cell.take();
895                 };
896                 let thread2 = do spawntask_thread {
897                     let c = chan_cell.take();
898                     c.send(1);
899                 };
900                 thread1.join();
901                 thread2.join();
902             }
903         }
904     }
905
906     #[test]
907     fn oneshot_multi_thread_recv_close_stress() {
908         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
909         do stress_factor().times {
910             do run_in_newsched_task {
911                 let (port, chan) = oneshot::<int>();
912                 let chan_cell = Cell::new(chan);
913                 let port_cell = Cell::new(port);
914                 let thread1 = do spawntask_thread {
915                     let port_cell = Cell::new(port_cell.take());
916                     let res = do spawntask_try {
917                         port_cell.take().recv();
918                     };
919                     assert!(res.is_err());
920                 };
921                 let thread2 = do spawntask_thread {
922                     let chan_cell = Cell::new(chan_cell.take());
923                     do spawntask {
924                         chan_cell.take();
925                     }
926                 };
927                 thread1.join();
928                 thread2.join();
929             }
930         }
931     }
932
933     #[test]
934     fn oneshot_multi_thread_send_recv_stress() {
935         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
936         do stress_factor().times {
937             do run_in_newsched_task {
938                 let (port, chan) = oneshot::<~int>();
939                 let chan_cell = Cell::new(chan);
940                 let port_cell = Cell::new(port);
941                 let thread1 = do spawntask_thread {
942                     chan_cell.take().send(~10);
943                 };
944                 let thread2 = do spawntask_thread {
945                     assert!(port_cell.take().recv() == ~10);
946                 };
947                 thread1.join();
948                 thread2.join();
949             }
950         }
951     }
952
953     #[test]
954     fn stream_send_recv_stress() {
955         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
956         do stress_factor().times {
957             do run_in_mt_newsched_task {
958                 let (port, chan) = stream::<~int>();
959
960                 send(chan, 0);
961                 recv(port, 0);
962
963                 fn send(chan: Chan<~int>, i: int) {
964                     if i == 10 { return }
965
966                     let chan_cell = Cell::new(chan);
967                     do spawntask_random {
968                         let chan = chan_cell.take();
969                         chan.send(~i);
970                         send(chan, i + 1);
971                     }
972                 }
973
974                 fn recv(port: Port<~int>, i: int) {
975                     if i == 10 { return }
976
977                     let port_cell = Cell::new(port);
978                     do spawntask_random {
979                         let port = port_cell.take();
980                         assert!(port.recv() == ~i);
981                         recv(port, i + 1);
982                     };
983                 }
984             }
985         }
986     }
987
988     #[test]
989     fn recv_a_lot() {
990         // Regression test that we don't run out of stack in scheduler context
991         do run_in_newsched_task {
992             let (port, chan) = stream();
993             do 10000.times { chan.send(()) }
994             do 10000.times { port.recv() }
995         }
996     }
997
998     #[test]
999     fn shared_chan_stress() {
1000         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1001         do run_in_mt_newsched_task {
1002             let (port, chan) = stream();
1003             let chan = SharedChan::new(chan);
1004             let total = stress_factor() + 100;
1005             do total.times {
1006                 let chan_clone = chan.clone();
1007                 do spawntask_random {
1008                     chan_clone.send(());
1009                 }
1010             }
1011
1012             do total.times {
1013                 port.recv();
1014             }
1015         }
1016     }
1017
1018     #[test]
1019     fn shared_port_stress() {
1020         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1021         do run_in_mt_newsched_task {
1022             let (end_port, end_chan) = stream();
1023             let (port, chan) = stream();
1024             let end_chan = SharedChan::new(end_chan);
1025             let port = SharedPort::new(port);
1026             let total = stress_factor() + 100;
1027             do total.times {
1028                 let end_chan_clone = end_chan.clone();
1029                 let port_clone = port.clone();
1030                 do spawntask_random {
1031                     port_clone.recv();
1032                     end_chan_clone.send(());
1033                 }
1034             }
1035
1036             do total.times {
1037                 chan.send(());
1038             }
1039
1040             do total.times {
1041                 end_port.recv();
1042             }
1043         }
1044     }
1045
1046     #[test]
1047     fn shared_port_close_simple() {
1048         do run_in_mt_newsched_task {
1049             let (port, chan) = stream::<()>();
1050             let port = SharedPort::new(port);
1051             { let _chan = chan; }
1052             assert!(port.try_recv().is_none());
1053         }
1054     }
1055
1056     #[test]
1057     fn shared_port_close() {
1058         do run_in_mt_newsched_task {
1059             let (end_port, end_chan) = stream::<bool>();
1060             let (port, chan) = stream::<()>();
1061             let end_chan = SharedChan::new(end_chan);
1062             let port = SharedPort::new(port);
1063             let chan = SharedChan::new(chan);
1064             let send_total = 10;
1065             let recv_total = 20;
1066             do spawntask_random {
1067                 do send_total.times {
1068                     let chan_clone = chan.clone();
1069                     do spawntask_random {
1070                         chan_clone.send(());
1071                     }
1072                 }
1073             }
1074             let end_chan_clone = end_chan.clone();
1075             do spawntask_random {
1076                 do recv_total.times {
1077                     let port_clone = port.clone();
1078                     let end_chan_clone = end_chan_clone.clone();
1079                     do spawntask_random {
1080                         let recvd = port_clone.try_recv().is_some();
1081                         end_chan_clone.send(recvd);
1082                     }
1083                 }
1084             }
1085
1086             let mut recvd = 0;
1087             do recv_total.times {
1088                 recvd += if end_port.recv() { 1 } else { 0 };
1089             }
1090
1091             assert!(recvd == send_total);
1092         }
1093     }
1094
1095     #[test]
1096     fn megapipe_stress() {
1097         use rand;
1098         use rand::RngUtil;
1099
1100         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1101
1102         do run_in_mt_newsched_task {
1103             let (end_port, end_chan) = stream::<()>();
1104             let end_chan = SharedChan::new(end_chan);
1105             let pipe = megapipe();
1106             let total = stress_factor() + 10;
1107             let mut rng = rand::rng();
1108             do total.times {
1109                 let msgs = rng.gen_uint_range(0, 10);
1110                 let pipe_clone = pipe.clone();
1111                 let end_chan_clone = end_chan.clone();
1112                 do spawntask_random {
1113                     do msgs.times {
1114                         pipe_clone.send(());
1115                     }
1116                     do msgs.times {
1117                         pipe_clone.recv();
1118                     }
1119                 }
1120
1121                 end_chan_clone.send(());
1122             }
1123
1124             do total.times {
1125                 end_port.recv();
1126             }
1127         }
1128     }
1129
1130     #[test]
1131     fn send_deferred() {
1132         use unstable::sync::atomically;
1133
1134         // Tests no-rescheduling of send_deferred on all types of channels.
1135         do run_in_newsched_task {
1136             let (pone, cone) = oneshot();
1137             let (pstream, cstream) = stream();
1138             let (pshared, cshared) = stream();
1139             let cshared = SharedChan::new(cshared);
1140             let mp = megapipe();
1141
1142             let pone = Cell::new(pone);
1143             do spawntask { pone.take().recv(); }
1144             let pstream = Cell::new(pstream);
1145             do spawntask { pstream.take().recv(); }
1146             let pshared = Cell::new(pshared);
1147             do spawntask { pshared.take().recv(); }
1148             let p_mp = Cell::new(mp.clone());
1149             do spawntask { p_mp.take().recv(); }
1150
1151             let cs = Cell::new((cone, cstream, cshared, mp));
1152             unsafe {
1153                 do atomically {
1154                     let (cone, cstream, cshared, mp) = cs.take();
1155                     cone.send_deferred(());
1156                     cstream.send_deferred(());
1157                     cshared.send_deferred(());
1158                     mp.send_deferred(());
1159                 }
1160             }
1161         }
1162     }
1163
1164 }