]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/comm.rs
auto merge of #8350 : dim-an/rust/fix-struct-match, r=pcwalton
[rust.git] / src / libstd / rt / comm.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Ports and channels.
12
13 use option::*;
14 use cast;
15 use ops::Drop;
16 use rt::kill::BlockedTask;
17 use kinds::Send;
18 use rt::sched::Scheduler;
19 use rt::local::Local;
20 use rt::select::{Select, SelectPort};
21 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
22 use unstable::sync::UnsafeAtomicRcBox;
23 use util::Void;
24 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
25 use cell::Cell;
26 use clone::Clone;
27 use rt::{context, SchedulerContext};
28 use tuple::ImmutableTuple;
29
30 /// A combined refcount / BlockedTask-as-uint pointer.
31 ///
32 /// Can be equal to the following values:
33 ///
34 /// * 2 - both endpoints are alive
35 /// * 1 - either the sender or the receiver is dead, determined by context
36 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
37 type State = uint;
38
39 static STATE_BOTH: State = 2;
40 static STATE_ONE: State = 1;
41
42 /// The heap-allocated structure shared between two endpoints.
43 struct Packet<T> {
44     state: AtomicUint,
45     payload: Option<T>,
46 }
47
48 /// A one-shot channel.
49 pub struct ChanOne<T> {
50     void_packet: *mut Void,
51     suppress_finalize: bool
52 }
53
54 /// A one-shot port.
55 pub struct PortOne<T> {
56     void_packet: *mut Void,
57     suppress_finalize: bool
58 }
59
60 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
61     let packet: ~Packet<T> = ~Packet {
62         state: AtomicUint::new(STATE_BOTH),
63         payload: None
64     };
65
66     unsafe {
67         let packet: *mut Void = cast::transmute(packet);
68         let port = PortOne {
69             void_packet: packet,
70             suppress_finalize: false
71         };
72         let chan = ChanOne {
73             void_packet: packet,
74             suppress_finalize: false
75         };
76         return (port, chan);
77     }
78 }
79
80 impl<T> ChanOne<T> {
81     #[inline]
82     fn packet(&self) -> *mut Packet<T> {
83         unsafe {
84             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
85             let p: *mut Packet<T> = &mut **p;
86             return p;
87         }
88     }
89
90     /// Send a message on the one-shot channel. If a receiver task is blocked
91     /// waiting for the message, will wake it up and reschedule to it.
92     pub fn send(self, val: T) {
93         self.try_send(val);
94     }
95
96     /// As `send`, but also returns whether or not the receiver endpoint is still open.
97     pub fn try_send(self, val: T) -> bool {
98         self.try_send_inner(val, true)
99     }
100
101     /// Send a message without immediately rescheduling to a blocked receiver.
102     /// This can be useful in contexts where rescheduling is forbidden, or to
103     /// optimize for when the sender expects to still have useful work to do.
104     pub fn send_deferred(self, val: T) {
105         self.try_send_deferred(val);
106     }
107
108     /// As `send_deferred` and `try_send` together.
109     pub fn try_send_deferred(self, val: T) -> bool {
110         self.try_send_inner(val, false)
111     }
112
113     // 'do_resched' configures whether the scheduler immediately switches to
114     // the receiving task, or leaves the sending task still running.
115     fn try_send_inner(self, val: T, do_resched: bool) -> bool {
116         rtassert!(context() != SchedulerContext);
117
118         let mut this = self;
119         let mut recvr_active = true;
120         let packet = this.packet();
121
122         unsafe {
123
124             // Install the payload
125             assert!((*packet).payload.is_none());
126             (*packet).payload = Some(val);
127
128             // Atomically swap out the old state to figure out what
129             // the port's up to, issuing a release barrier to prevent
130             // reordering of the payload write. This also issues an
131             // acquire barrier that keeps the subsequent access of the
132             // ~Task pointer from being reordered.
133             let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
134
135             // Suppress the synchronizing actions in the finalizer. We're
136             // done with the packet. NB: In case of do_resched, this *must*
137             // happen before waking up a blocked task (or be unkillable),
138             // because we might get a kill signal during the reschedule.
139             this.suppress_finalize = true;
140
141             match oldstate {
142                 STATE_BOTH => {
143                     // Port is not waiting yet. Nothing to do
144                     do Local::borrow::<Scheduler, ()> |sched| {
145                         rtdebug!("non-rendezvous send");
146                         sched.metrics.non_rendezvous_sends += 1;
147                     }
148                 }
149                 STATE_ONE => {
150                     do Local::borrow::<Scheduler, ()> |sched| {
151                         rtdebug!("rendezvous send");
152                         sched.metrics.rendezvous_sends += 1;
153                     }
154                     // Port has closed. Need to clean up.
155                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
156                     recvr_active = false;
157                 }
158                 task_as_state => {
159                     // Port is blocked. Wake it up.
160                     let recvr = BlockedTask::cast_from_uint(task_as_state);
161                     if do_resched {
162                         do recvr.wake().map_move |woken_task| {
163                             Scheduler::run_task(woken_task);
164                         };
165                     } else {
166                         let recvr = Cell::new(recvr);
167                         do Local::borrow::<Scheduler, ()> |sched| {
168                             sched.enqueue_blocked_task(recvr.take());
169                         }
170                     }
171                 }
172             }
173         }
174
175         return recvr_active;
176     }
177 }
178
179 impl<T> PortOne<T> {
180     fn packet(&self) -> *mut Packet<T> {
181         unsafe {
182             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
183             let p: *mut Packet<T> = &mut **p;
184             return p;
185         }
186     }
187
188     /// Wait for a message on the one-shot port. Fails if the send end is closed.
189     pub fn recv(self) -> T {
190         match self.try_recv() {
191             Some(val) => val,
192             None => {
193                 fail!("receiving on closed channel");
194             }
195         }
196     }
197
198     /// As `recv`, but returns `None` if the send end is closed rather than failing.
199     pub fn try_recv(self) -> Option<T> {
200         let mut this = self;
201
202         // Optimistic check. If data was sent already, we don't even need to block.
203         // No release barrier needed here; we're not handing off our task pointer yet.
204         if !this.optimistic_check() {
205             // No data available yet.
206             // Switch to the scheduler to put the ~Task into the Packet state.
207             let sched = Local::take::<Scheduler>();
208             do sched.deschedule_running_task_and_then |sched, task| {
209                 this.block_on(sched, task);
210             }
211         }
212
213         // Task resumes.
214         this.recv_ready()
215     }
216 }
217
218 impl<T> Select for PortOne<T> {
219     #[inline] #[cfg(not(test))]
220     fn optimistic_check(&mut self) -> bool {
221         unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
222     }
223
224     #[inline] #[cfg(test)]
225     fn optimistic_check(&mut self) -> bool {
226         // The optimistic check is never necessary for correctness. For testing
227         // purposes, making it randomly return false simulates a racing sender.
228         use rand::{Rand};
229         let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
230             Rand::rand(&mut sched.rng)
231         };
232         if actually_check {
233             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
234         } else {
235             false
236         }
237     }
238
239     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
240         unsafe {
241             // Atomically swap the task pointer into the Packet state, issuing
242             // an acquire barrier to prevent reordering of the subsequent read
243             // of the payload. Also issues a release barrier to prevent
244             // reordering of any previous writes to the task structure.
245             let task_as_state = task.cast_to_uint();
246             let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
247             match oldstate {
248                 STATE_BOTH => {
249                     // Data has not been sent. Now we're blocked.
250                     rtdebug!("non-rendezvous recv");
251                     sched.metrics.non_rendezvous_recvs += 1;
252                     false
253                 }
254                 STATE_ONE => {
255                     // Re-record that we are the only owner of the packet.
256                     // No barrier needed, even if the task gets reawoken
257                     // on a different core -- this is analogous to writing a
258                     // payload; a barrier in enqueueing the task protects it.
259                     // NB(#8132). This *must* occur before the enqueue below.
260                     // FIXME(#6842, #8130) This is usually only needed for the
261                     // assertion in recv_ready, except in the case of select().
262                     // This won't actually ever have cacheline contention, but
263                     // maybe should be optimized out with a cfg(test) anyway?
264                     (*self.packet()).state.store(STATE_ONE, Relaxed);
265
266                     rtdebug!("rendezvous recv");
267                     sched.metrics.rendezvous_recvs += 1;
268
269                     // Channel is closed. Switch back and check the data.
270                     // NB: We have to drop back into the scheduler event loop here
271                     // instead of switching immediately back or we could end up
272                     // triggering infinite recursion on the scheduler's stack.
273                     let recvr = BlockedTask::cast_from_uint(task_as_state);
274                     sched.enqueue_blocked_task(recvr);
275                     true
276                 }
277                 _ => rtabort!("can't block_on; a task is already blocked")
278             }
279         }
280     }
281
282     // This is the only select trait function that's not also used in recv.
283     fn unblock_from(&mut self) -> bool {
284         let packet = self.packet();
285         unsafe {
286             // In case the data is available, the acquire barrier here matches
287             // the release barrier the sender used to release the payload.
288             match (*packet).state.load(Acquire) {
289                 // Impossible. We removed STATE_BOTH when blocking on it, and
290                 // no self-respecting sender would put it back.
291                 STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
292                 // Here, a sender already tried to wake us up. Perhaps they
293                 // even succeeded! Data is available.
294                 STATE_ONE     => true,
295                 // Still registered as blocked. Need to "unblock" the pointer.
296                 task_as_state => {
297                     // In the window between the load and the CAS, a sender
298                     // might take the pointer and set the refcount to ONE. If
299                     // that happens, we shouldn't clobber that with BOTH!
300                     // Acquire barrier again for the same reason as above.
301                     match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
302                                                            Acquire) {
303                         STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
304                         STATE_ONE  => true, // Lost the race. Data available.
305                         same_ptr   => {
306                             // We successfully unblocked our task pointer.
307                             assert!(task_as_state == same_ptr);
308                             let handle = BlockedTask::cast_from_uint(task_as_state);
309                             // Because we are already awake, the handle we
310                             // gave to this port shall already be empty.
311                             handle.assert_already_awake();
312                             false
313                         }
314                     }
315                 }
316             }
317         }
318     }
319 }
320
321 impl<T> SelectPort<T> for PortOne<T> {
322     fn recv_ready(self) -> Option<T> {
323         let mut this = self;
324         let packet = this.packet();
325
326         // No further memory barrier is needed here to access the
327         // payload. Some scenarios:
328         //
329         // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
330         // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
331         //    and ran on its thread. The sending task issued a read barrier when taking the
332         //    pointer to the receiving task.
333         // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
334         //    is pinned to some other scheduler, so the sending task had to give us to
335         //    a different scheduler for resuming. That send synchronized memory.
336         unsafe {
337             // See corresponding store() above in block_on for rationale.
338             // FIXME(#8130) This can happen only in test builds.
339             assert!((*packet).state.load(Relaxed) == STATE_ONE);
340
341             let payload = (*packet).payload.take();
342
343             // The sender has closed up shop. Drop the packet.
344             let _packet: ~Packet<T> = cast::transmute(this.void_packet);
345             // Suppress the synchronizing actions in the finalizer. We're done with the packet.
346             this.suppress_finalize = true;
347             return payload;
348         }
349     }
350 }
351
352 impl<T> Peekable<T> for PortOne<T> {
353     fn peek(&self) -> bool {
354         unsafe {
355             let packet: *mut Packet<T> = self.packet();
356             let oldstate = (*packet).state.load(SeqCst);
357             match oldstate {
358                 STATE_BOTH => false,
359                 STATE_ONE => (*packet).payload.is_some(),
360                 _ => rtabort!("peeked on a blocked task")
361             }
362         }
363     }
364 }
365
366 #[unsafe_destructor]
367 impl<T> Drop for ChanOne<T> {
368     fn drop(&self) {
369         if self.suppress_finalize { return }
370
371         unsafe {
372             let this = cast::transmute_mut(self);
373             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
374             match oldstate {
375                 STATE_BOTH => {
376                     // Port still active. It will destroy the Packet.
377                 },
378                 STATE_ONE => {
379                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
380                 },
381                 task_as_state => {
382                     // The port is blocked waiting for a message we will never send. Wake it.
383                     assert!((*this.packet()).payload.is_none());
384                     let recvr = BlockedTask::cast_from_uint(task_as_state);
385                     do recvr.wake().map_move |woken_task| {
386                         Scheduler::run_task(woken_task);
387                     };
388                 }
389             }
390         }
391     }
392 }
393
394 #[unsafe_destructor]
395 impl<T> Drop for PortOne<T> {
396     fn drop(&self) {
397         if self.suppress_finalize { return }
398
399         unsafe {
400             let this = cast::transmute_mut(self);
401             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
402             match oldstate {
403                 STATE_BOTH => {
404                     // Chan still active. It will destroy the packet.
405                 },
406                 STATE_ONE => {
407                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
408                 }
409                 task_as_state => {
410                     // This case occurs during unwinding, when the blocked
411                     // receiver was killed awake. The task can't still be
412                     // blocked (we are it), but we need to free the handle.
413                     let recvr = BlockedTask::cast_from_uint(task_as_state);
414                     recvr.assert_already_awake();
415                 }
416             }
417         }
418     }
419 }
420
421 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
422 pub trait SendDeferred<T> {
423     fn send_deferred(&self, val: T);
424     fn try_send_deferred(&self, val: T) -> bool;
425 }
426
427 struct StreamPayload<T> {
428     val: T,
429     next: PortOne<StreamPayload<T>>
430 }
431
432 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
433 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
434
435 /// A channel with unbounded size.
436 pub struct Chan<T> {
437     // FIXME #5372. Using Cell because we don't take &mut self
438     next: Cell<StreamChanOne<T>>
439 }
440
441 /// An port with unbounded size.
442 pub struct Port<T> {
443     // FIXME #5372. Using Cell because we don't take &mut self
444     next: Cell<StreamPortOne<T>>
445 }
446
447 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
448     let (pone, cone) = oneshot();
449     let port = Port { next: Cell::new(pone) };
450     let chan = Chan { next: Cell::new(cone) };
451     return (port, chan);
452 }
453
454 impl<T: Send> Chan<T> {
455     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
456         let (next_pone, next_cone) = oneshot();
457         let cone = self.next.take();
458         self.next.put_back(next_cone);
459         cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
460     }
461 }
462
463 impl<T: Send> GenericChan<T> for Chan<T> {
464     fn send(&self, val: T) {
465         self.try_send(val);
466     }
467 }
468
469 impl<T: Send> GenericSmartChan<T> for Chan<T> {
470     fn try_send(&self, val: T) -> bool {
471         self.try_send_inner(val, true)
472     }
473 }
474
475 impl<T: Send> SendDeferred<T> for Chan<T> {
476     fn send_deferred(&self, val: T) {
477         self.try_send_deferred(val);
478     }
479     fn try_send_deferred(&self, val: T) -> bool {
480         self.try_send_inner(val, false)
481     }
482 }
483
484 impl<T> GenericPort<T> for Port<T> {
485     fn recv(&self) -> T {
486         match self.try_recv() {
487             Some(val) => val,
488             None => {
489                 fail!("receiving on closed channel");
490             }
491         }
492     }
493
494     fn try_recv(&self) -> Option<T> {
495         let pone = self.next.take();
496         match pone.try_recv() {
497             Some(StreamPayload { val, next }) => {
498                 self.next.put_back(next);
499                 Some(val)
500             }
501             None => None
502         }
503     }
504 }
505
506 impl<T> Peekable<T> for Port<T> {
507     fn peek(&self) -> bool {
508         self.next.with_mut_ref(|p| p.peek())
509     }
510 }
511
512 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
513 // of them, but a &Port<T> should also be selectable so you can select2 on it
514 // alongside a PortOne<U> without passing the port by value in recv_ready.
515
516 impl<'self, T> Select for &'self Port<T> {
517     #[inline]
518     fn optimistic_check(&mut self) -> bool {
519         do self.next.with_mut_ref |pone| { pone.optimistic_check() }
520     }
521
522     #[inline]
523     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
524         let task = Cell::new(task);
525         do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
526     }
527
528     #[inline]
529     fn unblock_from(&mut self) -> bool {
530         do self.next.with_mut_ref |pone| { pone.unblock_from() }
531     }
532 }
533
534 impl<T> Select for Port<T> {
535     #[inline]
536     fn optimistic_check(&mut self) -> bool {
537         (&*self).optimistic_check()
538     }
539
540     #[inline]
541     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
542         (&*self).block_on(sched, task)
543     }
544
545     #[inline]
546     fn unblock_from(&mut self) -> bool {
547         (&*self).unblock_from()
548     }
549 }
550
551 impl<'self, T> SelectPort<T> for &'self Port<T> {
552     fn recv_ready(self) -> Option<T> {
553         match self.next.take().recv_ready() {
554             Some(StreamPayload { val, next }) => {
555                 self.next.put_back(next);
556                 Some(val)
557             }
558             None => None
559         }
560     }
561 }
562
563 pub struct SharedChan<T> {
564     // Just like Chan, but a shared AtomicOption instead of Cell
565     priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
566 }
567
568 impl<T> SharedChan<T> {
569     pub fn new(chan: Chan<T>) -> SharedChan<T> {
570         let next = chan.next.take();
571         let next = AtomicOption::new(~next);
572         SharedChan { next: UnsafeAtomicRcBox::new(next) }
573     }
574 }
575
576 impl<T: Send> SharedChan<T> {
577     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
578         unsafe {
579             let (next_pone, next_cone) = oneshot();
580             let cone = (*self.next.get()).swap(~next_cone, SeqCst);
581             cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
582                                          do_resched)
583         }
584     }
585 }
586
587 impl<T: Send> GenericChan<T> for SharedChan<T> {
588     fn send(&self, val: T) {
589         self.try_send(val);
590     }
591 }
592
593 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
594     fn try_send(&self, val: T) -> bool {
595         self.try_send_inner(val, true)
596     }
597 }
598
599 impl<T: Send> SendDeferred<T> for SharedChan<T> {
600     fn send_deferred(&self, val: T) {
601         self.try_send_deferred(val);
602     }
603     fn try_send_deferred(&self, val: T) -> bool {
604         self.try_send_inner(val, false)
605     }
606 }
607
608 impl<T> Clone for SharedChan<T> {
609     fn clone(&self) -> SharedChan<T> {
610         SharedChan {
611             next: self.next.clone()
612         }
613     }
614 }
615
616 pub struct SharedPort<T> {
617     // The next port on which we will receive the next port on which we will receive T
618     priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
619 }
620
621 impl<T> SharedPort<T> {
622     pub fn new(port: Port<T>) -> SharedPort<T> {
623         // Put the data port into a new link pipe
624         let next_data_port = port.next.take();
625         let (next_link_port, next_link_chan) = oneshot();
626         next_link_chan.send(next_data_port);
627         let next_link = AtomicOption::new(~next_link_port);
628         SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
629     }
630 }
631
632 impl<T: Send> GenericPort<T> for SharedPort<T> {
633     fn recv(&self) -> T {
634         match self.try_recv() {
635             Some(val) => val,
636             None => {
637                 fail!("receiving on a closed channel");
638             }
639         }
640     }
641
642     fn try_recv(&self) -> Option<T> {
643         unsafe {
644             let (next_link_port, next_link_chan) = oneshot();
645             let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
646             let link_port = link_port.unwrap();
647             let data_port = link_port.recv();
648             let (next_data_port, res) = match data_port.try_recv() {
649                 Some(StreamPayload { val, next }) => {
650                     (next, Some(val))
651                 }
652                 None => {
653                     let (next_data_port, _) = oneshot();
654                     (next_data_port, None)
655                 }
656             };
657             next_link_chan.send(next_data_port);
658             return res;
659         }
660     }
661 }
662
663 impl<T> Clone for SharedPort<T> {
664     fn clone(&self) -> SharedPort<T> {
665         SharedPort {
666             next_link: self.next_link.clone()
667         }
668     }
669 }
670
671 // XXX: Need better name
672 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
673
674 pub fn megapipe<T: Send>() -> MegaPipe<T> {
675     let (port, chan) = stream();
676     (SharedPort::new(port), SharedChan::new(chan))
677 }
678
679 impl<T: Send> GenericChan<T> for MegaPipe<T> {
680     fn send(&self, val: T) {
681         self.second_ref().send(val)
682     }
683 }
684
685 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
686     fn try_send(&self, val: T) -> bool {
687         self.second_ref().try_send(val)
688     }
689 }
690
691 impl<T: Send> GenericPort<T> for MegaPipe<T> {
692     fn recv(&self) -> T {
693         self.first_ref().recv()
694     }
695
696     fn try_recv(&self) -> Option<T> {
697         self.first_ref().try_recv()
698     }
699 }
700
701 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
702     fn send_deferred(&self, val: T) {
703         self.second_ref().send_deferred(val)
704     }
705     fn try_send_deferred(&self, val: T) -> bool {
706         self.second_ref().try_send_deferred(val)
707     }
708 }
709
710 #[cfg(test)]
711 mod test {
712     use super::*;
713     use option::*;
714     use rt::test::*;
715     use cell::Cell;
716     use iter::Times;
717
718     #[test]
719     fn oneshot_single_thread_close_port_first() {
720         // Simple test of closing without sending
721         do run_in_newsched_task {
722             let (port, _chan) = oneshot::<int>();
723             { let _p = port; }
724         }
725     }
726
727     #[test]
728     fn oneshot_single_thread_close_chan_first() {
729         // Simple test of closing without sending
730         do run_in_newsched_task {
731             let (_port, chan) = oneshot::<int>();
732             { let _c = chan; }
733         }
734     }
735
736     #[test]
737     fn oneshot_single_thread_send_port_close() {
738         // Testing that the sender cleans up the payload if receiver is closed
739         do run_in_newsched_task {
740             let (port, chan) = oneshot::<~int>();
741             { let _p = port; }
742             chan.send(~0);
743         }
744     }
745
746     #[test]
747     fn oneshot_single_thread_recv_chan_close() {
748         // Receiving on a closed chan will fail
749         do run_in_newsched_task {
750             let res = do spawntask_try {
751                 let (port, chan) = oneshot::<~int>();
752                 { let _c = chan; }
753                 port.recv();
754             };
755             // What is our res?
756             rtdebug!("res is: %?", res.is_err());
757             assert!(res.is_err());
758         }
759     }
760
761     #[test]
762     fn oneshot_single_thread_send_then_recv() {
763         do run_in_newsched_task {
764             let (port, chan) = oneshot::<~int>();
765             chan.send(~10);
766             assert!(port.recv() == ~10);
767         }
768     }
769
770     #[test]
771     fn oneshot_single_thread_try_send_open() {
772         do run_in_newsched_task {
773             let (port, chan) = oneshot::<int>();
774             assert!(chan.try_send(10));
775             assert!(port.recv() == 10);
776         }
777     }
778
779     #[test]
780     fn oneshot_single_thread_try_send_closed() {
781         do run_in_newsched_task {
782             let (port, chan) = oneshot::<int>();
783             { let _p = port; }
784             assert!(!chan.try_send(10));
785         }
786     }
787
788     #[test]
789     fn oneshot_single_thread_try_recv_open() {
790         do run_in_newsched_task {
791             let (port, chan) = oneshot::<int>();
792             chan.send(10);
793             assert!(port.try_recv() == Some(10));
794         }
795     }
796
797     #[test]
798     fn oneshot_single_thread_try_recv_closed() {
799         do run_in_newsched_task {
800             let (port, chan) = oneshot::<int>();
801             { let _c = chan; }
802             assert!(port.try_recv() == None);
803         }
804     }
805
806     #[test]
807     fn oneshot_single_thread_peek_data() {
808         do run_in_newsched_task {
809             let (port, chan) = oneshot::<int>();
810             assert!(!port.peek());
811             chan.send(10);
812             assert!(port.peek());
813         }
814     }
815
816     #[test]
817     fn oneshot_single_thread_peek_close() {
818         do run_in_newsched_task {
819             let (port, chan) = oneshot::<int>();
820             { let _c = chan; }
821             assert!(!port.peek());
822             assert!(!port.peek());
823         }
824     }
825
826     #[test]
827     fn oneshot_single_thread_peek_open() {
828         do run_in_newsched_task {
829             let (port, _) = oneshot::<int>();
830             assert!(!port.peek());
831         }
832     }
833
834     #[test]
835     fn oneshot_multi_task_recv_then_send() {
836         do run_in_newsched_task {
837             let (port, chan) = oneshot::<~int>();
838             let port_cell = Cell::new(port);
839             do spawntask {
840                 assert!(port_cell.take().recv() == ~10);
841             }
842
843             chan.send(~10);
844         }
845     }
846
847     #[test]
848     fn oneshot_multi_task_recv_then_close() {
849         do run_in_newsched_task {
850             let (port, chan) = oneshot::<~int>();
851             let port_cell = Cell::new(port);
852             let chan_cell = Cell::new(chan);
853             do spawntask_later {
854                 let _cell = chan_cell.take();
855             }
856             let res = do spawntask_try {
857                 assert!(port_cell.take().recv() == ~10);
858             };
859             assert!(res.is_err());
860         }
861     }
862
863     #[test]
864     fn oneshot_multi_thread_close_stress() {
865         do stress_factor().times {
866             do run_in_newsched_task {
867                 let (port, chan) = oneshot::<int>();
868                 let port_cell = Cell::new(port);
869                 let thread = do spawntask_thread {
870                     let _p = port_cell.take();
871                 };
872                 let _chan = chan;
873                 thread.join();
874             }
875         }
876     }
877
878     #[test]
879     fn oneshot_multi_thread_send_close_stress() {
880         do stress_factor().times {
881             do run_in_newsched_task {
882                 let (port, chan) = oneshot::<int>();
883                 let chan_cell = Cell::new(chan);
884                 let port_cell = Cell::new(port);
885                 let thread1 = do spawntask_thread {
886                     let _p = port_cell.take();
887                 };
888                 let thread2 = do spawntask_thread {
889                     let c = chan_cell.take();
890                     c.send(1);
891                 };
892                 thread1.join();
893                 thread2.join();
894             }
895         }
896     }
897
898     #[test]
899     fn oneshot_multi_thread_recv_close_stress() {
900         do stress_factor().times {
901             do run_in_newsched_task {
902                 let (port, chan) = oneshot::<int>();
903                 let chan_cell = Cell::new(chan);
904                 let port_cell = Cell::new(port);
905                 let thread1 = do spawntask_thread {
906                     let port_cell = Cell::new(port_cell.take());
907                     let res = do spawntask_try {
908                         port_cell.take().recv();
909                     };
910                     assert!(res.is_err());
911                 };
912                 let thread2 = do spawntask_thread {
913                     let chan_cell = Cell::new(chan_cell.take());
914                     do spawntask {
915                         chan_cell.take();
916                     }
917                 };
918                 thread1.join();
919                 thread2.join();
920             }
921         }
922     }
923
924     #[test]
925     fn oneshot_multi_thread_send_recv_stress() {
926         do stress_factor().times {
927             do run_in_newsched_task {
928                 let (port, chan) = oneshot::<~int>();
929                 let chan_cell = Cell::new(chan);
930                 let port_cell = Cell::new(port);
931                 let thread1 = do spawntask_thread {
932                     chan_cell.take().send(~10);
933                 };
934                 let thread2 = do spawntask_thread {
935                     assert!(port_cell.take().recv() == ~10);
936                 };
937                 thread1.join();
938                 thread2.join();
939             }
940         }
941     }
942
943     #[test]
944     fn stream_send_recv_stress() {
945         do stress_factor().times {
946             do run_in_mt_newsched_task {
947                 let (port, chan) = stream::<~int>();
948
949                 send(chan, 0);
950                 recv(port, 0);
951
952                 fn send(chan: Chan<~int>, i: int) {
953                     if i == 10 { return }
954
955                     let chan_cell = Cell::new(chan);
956                     do spawntask_random {
957                         let chan = chan_cell.take();
958                         chan.send(~i);
959                         send(chan, i + 1);
960                     }
961                 }
962
963                 fn recv(port: Port<~int>, i: int) {
964                     if i == 10 { return }
965
966                     let port_cell = Cell::new(port);
967                     do spawntask_random {
968                         let port = port_cell.take();
969                         assert!(port.recv() == ~i);
970                         recv(port, i + 1);
971                     };
972                 }
973             }
974         }
975     }
976
977     #[test]
978     fn recv_a_lot() {
979         // Regression test that we don't run out of stack in scheduler context
980         do run_in_newsched_task {
981             let (port, chan) = stream();
982             do 10000.times { chan.send(()) }
983             do 10000.times { port.recv() }
984         }
985     }
986
987     #[test]
988     fn shared_chan_stress() {
989         do run_in_mt_newsched_task {
990             let (port, chan) = stream();
991             let chan = SharedChan::new(chan);
992             let total = stress_factor() + 100;
993             do total.times {
994                 let chan_clone = chan.clone();
995                 do spawntask_random {
996                     chan_clone.send(());
997                 }
998             }
999
1000             do total.times {
1001                 port.recv();
1002             }
1003         }
1004     }
1005
1006     #[test]
1007     fn shared_port_stress() {
1008         do run_in_mt_newsched_task {
1009             // XXX: Removing these type annotations causes an ICE
1010             let (end_port, end_chan) = stream::<()>();
1011             let (port, chan) = stream::<()>();
1012             let end_chan = SharedChan::new(end_chan);
1013             let port = SharedPort::new(port);
1014             let total = stress_factor() + 100;
1015             do total.times {
1016                 let end_chan_clone = end_chan.clone();
1017                 let port_clone = port.clone();
1018                 do spawntask_random {
1019                     port_clone.recv();
1020                     end_chan_clone.send(());
1021                 }
1022             }
1023
1024             do total.times {
1025                 chan.send(());
1026             }
1027
1028             do total.times {
1029                 end_port.recv();
1030             }
1031         }
1032     }
1033
1034     #[test]
1035     fn shared_port_close_simple() {
1036         do run_in_mt_newsched_task {
1037             let (port, chan) = stream::<()>();
1038             let port = SharedPort::new(port);
1039             { let _chan = chan; }
1040             assert!(port.try_recv().is_none());
1041         }
1042     }
1043
1044     #[test]
1045     fn shared_port_close() {
1046         do run_in_mt_newsched_task {
1047             let (end_port, end_chan) = stream::<bool>();
1048             let (port, chan) = stream::<()>();
1049             let end_chan = SharedChan::new(end_chan);
1050             let port = SharedPort::new(port);
1051             let chan = SharedChan::new(chan);
1052             let send_total = 10;
1053             let recv_total = 20;
1054             do spawntask_random {
1055                 do send_total.times {
1056                     let chan_clone = chan.clone();
1057                     do spawntask_random {
1058                         chan_clone.send(());
1059                     }
1060                 }
1061             }
1062             let end_chan_clone = end_chan.clone();
1063             do spawntask_random {
1064                 do recv_total.times {
1065                     let port_clone = port.clone();
1066                     let end_chan_clone = end_chan_clone.clone();
1067                     do spawntask_random {
1068                         let recvd = port_clone.try_recv().is_some();
1069                         end_chan_clone.send(recvd);
1070                     }
1071                 }
1072             }
1073
1074             let mut recvd = 0;
1075             do recv_total.times {
1076                 recvd += if end_port.recv() { 1 } else { 0 };
1077             }
1078
1079             assert!(recvd == send_total);
1080         }
1081     }
1082
1083     #[test]
1084     fn megapipe_stress() {
1085         use rand;
1086         use rand::RngUtil;
1087
1088         do run_in_mt_newsched_task {
1089             let (end_port, end_chan) = stream::<()>();
1090             let end_chan = SharedChan::new(end_chan);
1091             let pipe = megapipe();
1092             let total = stress_factor() + 10;
1093             let mut rng = rand::rng();
1094             do total.times {
1095                 let msgs = rng.gen_uint_range(0, 10);
1096                 let pipe_clone = pipe.clone();
1097                 let end_chan_clone = end_chan.clone();
1098                 do spawntask_random {
1099                     do msgs.times {
1100                         pipe_clone.send(());
1101                     }
1102                     do msgs.times {
1103                         pipe_clone.recv();
1104                     }
1105                 }
1106
1107                 end_chan_clone.send(());
1108             }
1109
1110             do total.times {
1111                 end_port.recv();
1112             }
1113         }
1114     }
1115
1116     #[test]
1117     fn send_deferred() {
1118         use unstable::sync::atomically;
1119
1120         // Tests no-rescheduling of send_deferred on all types of channels.
1121         do run_in_newsched_task {
1122             let (pone, cone) = oneshot();
1123             let (pstream, cstream) = stream();
1124             let (pshared, cshared) = stream();
1125             let cshared = SharedChan::new(cshared);
1126             let mp = megapipe();
1127
1128             let pone = Cell::new(pone);
1129             do spawntask { pone.take().recv(); }
1130             let pstream = Cell::new(pstream);
1131             do spawntask { pstream.take().recv(); }
1132             let pshared = Cell::new(pshared);
1133             do spawntask { pshared.take().recv(); }
1134             let p_mp = Cell::new(mp.clone());
1135             do spawntask { p_mp.take().recv(); }
1136
1137             let cs = Cell::new((cone, cstream, cshared, mp));
1138             unsafe {
1139                 do atomically {
1140                     let (cone, cstream, cshared, mp) = cs.take();
1141                     cone.send_deferred(());
1142                     cstream.send_deferred(());
1143                     cshared.send_deferred(());
1144                     mp.send_deferred(());
1145                 }
1146             }
1147         }
1148     }
1149
1150 }