]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/comm.rs
fix recv_ready for Port to take &self and not need to return a tuple. Close #8192.
[rust.git] / src / libstd / rt / comm.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Ports and channels.
12
13 use option::*;
14 use cast;
15 use ops::Drop;
16 use rt::kill::BlockedTask;
17 use kinds::Send;
18 use rt::sched::Scheduler;
19 use rt::local::Local;
20 use rt::select::{Select, SelectPort};
21 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
22 use unstable::sync::UnsafeAtomicRcBox;
23 use util::Void;
24 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
25 use cell::Cell;
26 use clone::Clone;
27 use rt::{context, SchedulerContext};
28 use tuple::ImmutableTuple;
29
30 /// A combined refcount / BlockedTask-as-uint pointer.
31 ///
32 /// Can be equal to the following values:
33 ///
34 /// * 2 - both endpoints are alive
35 /// * 1 - either the sender or the receiver is dead, determined by context
36 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
37 type State = uint;
38
39 static STATE_BOTH: State = 2;
40 static STATE_ONE: State = 1;
41
42 /// The heap-allocated structure shared between two endpoints.
43 struct Packet<T> {
44     state: AtomicUint,
45     payload: Option<T>,
46 }
47
48 /// A one-shot channel.
49 pub struct ChanOne<T> {
50     void_packet: *mut Void,
51     suppress_finalize: bool
52 }
53
54 /// A one-shot port.
55 pub struct PortOne<T> {
56     void_packet: *mut Void,
57     suppress_finalize: bool
58 }
59
60 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
61     let packet: ~Packet<T> = ~Packet {
62         state: AtomicUint::new(STATE_BOTH),
63         payload: None
64     };
65
66     unsafe {
67         let packet: *mut Void = cast::transmute(packet);
68         let port = PortOne {
69             void_packet: packet,
70             suppress_finalize: false
71         };
72         let chan = ChanOne {
73             void_packet: packet,
74             suppress_finalize: false
75         };
76         return (port, chan);
77     }
78 }
79
80 impl<T> ChanOne<T> {
81     #[inline]
82     fn packet(&self) -> *mut Packet<T> {
83         unsafe {
84             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
85             let p: *mut Packet<T> = &mut **p;
86             return p;
87         }
88     }
89
90     /// Send a message on the one-shot channel. If a receiver task is blocked
91     /// waiting for the message, will wake it up and reschedule to it.
92     pub fn send(self, val: T) {
93         self.try_send(val);
94     }
95
96     /// As `send`, but also returns whether or not the receiver endpoint is still open.
97     pub fn try_send(self, val: T) -> bool {
98         self.try_send_inner(val, true)
99     }
100
101     /// Send a message without immediately rescheduling to a blocked receiver.
102     /// This can be useful in contexts where rescheduling is forbidden, or to
103     /// optimize for when the sender expects to still have useful work to do.
104     pub fn send_deferred(self, val: T) {
105         self.try_send_deferred(val);
106     }
107
108     /// As `send_deferred` and `try_send` together.
109     pub fn try_send_deferred(self, val: T) -> bool {
110         self.try_send_inner(val, false)
111     }
112
113     // 'do_resched' configures whether the scheduler immediately switches to
114     // the receiving task, or leaves the sending task still running.
115     fn try_send_inner(self, val: T, do_resched: bool) -> bool {
116         rtassert!(context() != SchedulerContext);
117
118         let mut this = self;
119         let mut recvr_active = true;
120         let packet = this.packet();
121
122         unsafe {
123
124             // Install the payload
125             assert!((*packet).payload.is_none());
126             (*packet).payload = Some(val);
127
128             // Atomically swap out the old state to figure out what
129             // the port's up to, issuing a release barrier to prevent
130             // reordering of the payload write. This also issues an
131             // acquire barrier that keeps the subsequent access of the
132             // ~Task pointer from being reordered.
133             let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
134
135             // Suppress the synchronizing actions in the finalizer. We're
136             // done with the packet. NB: In case of do_resched, this *must*
137             // happen before waking up a blocked task (or be unkillable),
138             // because we might get a kill signal during the reschedule.
139             this.suppress_finalize = true;
140
141             match oldstate {
142                 STATE_BOTH => {
143                     // Port is not waiting yet. Nothing to do
144                     do Local::borrow::<Scheduler, ()> |sched| {
145                         rtdebug!("non-rendezvous send");
146                         sched.metrics.non_rendezvous_sends += 1;
147                     }
148                 }
149                 STATE_ONE => {
150                     do Local::borrow::<Scheduler, ()> |sched| {
151                         rtdebug!("rendezvous send");
152                         sched.metrics.rendezvous_sends += 1;
153                     }
154                     // Port has closed. Need to clean up.
155                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
156                     recvr_active = false;
157                 }
158                 task_as_state => {
159                     // Port is blocked. Wake it up.
160                     let recvr = BlockedTask::cast_from_uint(task_as_state);
161                     if do_resched {
162                         do recvr.wake().map_move |woken_task| {
163                             Scheduler::run_task(woken_task);
164                         };
165                     } else {
166                         let recvr = Cell::new(recvr);
167                         do Local::borrow::<Scheduler, ()> |sched| {
168                             sched.enqueue_blocked_task(recvr.take());
169                         }
170                     }
171                 }
172             }
173         }
174
175         return recvr_active;
176     }
177 }
178
179 impl<T> PortOne<T> {
180     fn packet(&self) -> *mut Packet<T> {
181         unsafe {
182             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
183             let p: *mut Packet<T> = &mut **p;
184             return p;
185         }
186     }
187
188     /// Wait for a message on the one-shot port. Fails if the send end is closed.
189     pub fn recv(self) -> T {
190         match self.try_recv() {
191             Some(val) => val,
192             None => {
193                 fail!("receiving on closed channel");
194             }
195         }
196     }
197
198     /// As `recv`, but returns `None` if the send end is closed rather than failing.
199     pub fn try_recv(self) -> Option<T> {
200         let mut this = self;
201
202         // Optimistic check. If data was sent already, we don't even need to block.
203         // No release barrier needed here; we're not handing off our task pointer yet.
204         if !this.optimistic_check() {
205             // No data available yet.
206             // Switch to the scheduler to put the ~Task into the Packet state.
207             let sched = Local::take::<Scheduler>();
208             do sched.deschedule_running_task_and_then |sched, task| {
209                 this.block_on(sched, task);
210             }
211         }
212
213         // Task resumes.
214         this.recv_ready()
215     }
216 }
217
218 impl<T> Select for PortOne<T> {
219     #[inline] #[cfg(not(test))]
220     fn optimistic_check(&mut self) -> bool {
221         unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
222     }
223
224     #[inline] #[cfg(test)]
225     fn optimistic_check(&mut self) -> bool {
226         // The optimistic check is never necessary for correctness. For testing
227         // purposes, making it randomly return false simulates a racing sender.
228         use rand::{Rand, rng};
229         let mut rng = rng();
230         let actually_check = Rand::rand(&mut rng);
231         if actually_check {
232             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
233         } else {
234             false
235         }
236     }
237
238     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
239         unsafe {
240             // Atomically swap the task pointer into the Packet state, issuing
241             // an acquire barrier to prevent reordering of the subsequent read
242             // of the payload. Also issues a release barrier to prevent
243             // reordering of any previous writes to the task structure.
244             let task_as_state = task.cast_to_uint();
245             let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
246             match oldstate {
247                 STATE_BOTH => {
248                     // Data has not been sent. Now we're blocked.
249                     rtdebug!("non-rendezvous recv");
250                     sched.metrics.non_rendezvous_recvs += 1;
251                     false
252                 }
253                 STATE_ONE => {
254                     // Re-record that we are the only owner of the packet.
255                     // No barrier needed, even if the task gets reawoken
256                     // on a different core -- this is analogous to writing a
257                     // payload; a barrier in enqueueing the task protects it.
258                     // NB(#8132). This *must* occur before the enqueue below.
259                     // FIXME(#6842, #8130) This is usually only needed for the
260                     // assertion in recv_ready, except in the case of select().
261                     // This won't actually ever have cacheline contention, but
262                     // maybe should be optimized out with a cfg(test) anyway?
263                     (*self.packet()).state.store(STATE_ONE, Relaxed);
264
265                     rtdebug!("rendezvous recv");
266                     sched.metrics.rendezvous_recvs += 1;
267
268                     // Channel is closed. Switch back and check the data.
269                     // NB: We have to drop back into the scheduler event loop here
270                     // instead of switching immediately back or we could end up
271                     // triggering infinite recursion on the scheduler's stack.
272                     let recvr = BlockedTask::cast_from_uint(task_as_state);
273                     sched.enqueue_blocked_task(recvr);
274                     true
275                 }
276                 _ => rtabort!("can't block_on; a task is already blocked")
277             }
278         }
279     }
280
281     // This is the only select trait function that's not also used in recv.
282     fn unblock_from(&mut self) -> bool {
283         let packet = self.packet();
284         unsafe {
285             // In case the data is available, the acquire barrier here matches
286             // the release barrier the sender used to release the payload.
287             match (*packet).state.load(Acquire) {
288                 // Impossible. We removed STATE_BOTH when blocking on it, and
289                 // no self-respecting sender would put it back.
290                 STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
291                 // Here, a sender already tried to wake us up. Perhaps they
292                 // even succeeded! Data is available.
293                 STATE_ONE     => true,
294                 // Still registered as blocked. Need to "unblock" the pointer.
295                 task_as_state => {
296                     // In the window between the load and the CAS, a sender
297                     // might take the pointer and set the refcount to ONE. If
298                     // that happens, we shouldn't clobber that with BOTH!
299                     // Acquire barrier again for the same reason as above.
300                     match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
301                                                            Acquire) {
302                         STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
303                         STATE_ONE  => true, // Lost the race. Data available.
304                         same_ptr   => {
305                             // We successfully unblocked our task pointer.
306                             assert!(task_as_state == same_ptr);
307                             let handle = BlockedTask::cast_from_uint(task_as_state);
308                             // Because we are already awake, the handle we
309                             // gave to this port shall already be empty.
310                             handle.assert_already_awake();
311                             false
312                         }
313                     }
314                 }
315             }
316         }
317     }
318 }
319
320 impl<T> SelectPort<T> for PortOne<T> {
321     fn recv_ready(self) -> Option<T> {
322         let mut this = self;
323         let packet = this.packet();
324
325         // No further memory barrier is needed here to access the
326         // payload. Some scenarios:
327         //
328         // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
329         // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
330         //    and ran on its thread. The sending task issued a read barrier when taking the
331         //    pointer to the receiving task.
332         // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
333         //    is pinned to some other scheduler, so the sending task had to give us to
334         //    a different scheduler for resuming. That send synchronized memory.
335         unsafe {
336             // See corresponding store() above in block_on for rationale.
337             // FIXME(#8130) This can happen only in test builds.
338             assert!((*packet).state.load(Relaxed) == STATE_ONE);
339
340             let payload = (*packet).payload.take();
341
342             // The sender has closed up shop. Drop the packet.
343             let _packet: ~Packet<T> = cast::transmute(this.void_packet);
344             // Suppress the synchronizing actions in the finalizer. We're done with the packet.
345             this.suppress_finalize = true;
346             return payload;
347         }
348     }
349 }
350
351 impl<T> Peekable<T> for PortOne<T> {
352     fn peek(&self) -> bool {
353         unsafe {
354             let packet: *mut Packet<T> = self.packet();
355             let oldstate = (*packet).state.load(SeqCst);
356             match oldstate {
357                 STATE_BOTH => false,
358                 STATE_ONE => (*packet).payload.is_some(),
359                 _ => rtabort!("peeked on a blocked task")
360             }
361         }
362     }
363 }
364
365 #[unsafe_destructor]
366 impl<T> Drop for ChanOne<T> {
367     fn drop(&self) {
368         if self.suppress_finalize { return }
369
370         unsafe {
371             let this = cast::transmute_mut(self);
372             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
373             match oldstate {
374                 STATE_BOTH => {
375                     // Port still active. It will destroy the Packet.
376                 },
377                 STATE_ONE => {
378                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
379                 },
380                 task_as_state => {
381                     // The port is blocked waiting for a message we will never send. Wake it.
382                     assert!((*this.packet()).payload.is_none());
383                     let recvr = BlockedTask::cast_from_uint(task_as_state);
384                     do recvr.wake().map_move |woken_task| {
385                         Scheduler::run_task(woken_task);
386                     };
387                 }
388             }
389         }
390     }
391 }
392
393 #[unsafe_destructor]
394 impl<T> Drop for PortOne<T> {
395     fn drop(&self) {
396         if self.suppress_finalize { return }
397
398         unsafe {
399             let this = cast::transmute_mut(self);
400             let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
401             match oldstate {
402                 STATE_BOTH => {
403                     // Chan still active. It will destroy the packet.
404                 },
405                 STATE_ONE => {
406                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
407                 }
408                 task_as_state => {
409                     // This case occurs during unwinding, when the blocked
410                     // receiver was killed awake. The task can't still be
411                     // blocked (we are it), but we need to free the handle.
412                     let recvr = BlockedTask::cast_from_uint(task_as_state);
413                     recvr.assert_already_awake();
414                 }
415             }
416         }
417     }
418 }
419
420 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
421 pub trait SendDeferred<T> {
422     fn send_deferred(&self, val: T);
423     fn try_send_deferred(&self, val: T) -> bool;
424 }
425
426 struct StreamPayload<T> {
427     val: T,
428     next: PortOne<StreamPayload<T>>
429 }
430
431 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
432 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
433
434 /// A channel with unbounded size.
435 pub struct Chan<T> {
436     // FIXME #5372. Using Cell because we don't take &mut self
437     next: Cell<StreamChanOne<T>>
438 }
439
440 /// An port with unbounded size.
441 pub struct Port<T> {
442     // FIXME #5372. Using Cell because we don't take &mut self
443     next: Cell<StreamPortOne<T>>
444 }
445
446 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
447     let (pone, cone) = oneshot();
448     let port = Port { next: Cell::new(pone) };
449     let chan = Chan { next: Cell::new(cone) };
450     return (port, chan);
451 }
452
453 impl<T: Send> Chan<T> {
454     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
455         let (next_pone, next_cone) = oneshot();
456         let cone = self.next.take();
457         self.next.put_back(next_cone);
458         cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
459     }
460 }
461
462 impl<T: Send> GenericChan<T> for Chan<T> {
463     fn send(&self, val: T) {
464         self.try_send(val);
465     }
466 }
467
468 impl<T: Send> GenericSmartChan<T> for Chan<T> {
469     fn try_send(&self, val: T) -> bool {
470         self.try_send_inner(val, true)
471     }
472 }
473
474 impl<T: Send> SendDeferred<T> for Chan<T> {
475     fn send_deferred(&self, val: T) {
476         self.try_send_deferred(val);
477     }
478     fn try_send_deferred(&self, val: T) -> bool {
479         self.try_send_inner(val, false)
480     }
481 }
482
483 impl<T> GenericPort<T> for Port<T> {
484     fn recv(&self) -> T {
485         match self.try_recv() {
486             Some(val) => val,
487             None => {
488                 fail!("receiving on closed channel");
489             }
490         }
491     }
492
493     fn try_recv(&self) -> Option<T> {
494         let pone = self.next.take();
495         match pone.try_recv() {
496             Some(StreamPayload { val, next }) => {
497                 self.next.put_back(next);
498                 Some(val)
499             }
500             None => None
501         }
502     }
503 }
504
505 impl<T> Peekable<T> for Port<T> {
506     fn peek(&self) -> bool {
507         self.next.with_mut_ref(|p| p.peek())
508     }
509 }
510
511 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
512 // of them, but a &Port<T> should also be selectable so you can select2 on it
513 // alongside a PortOne<U> without passing the port by value in recv_ready.
514
515 impl<'self, T> Select for &'self Port<T> {
516     #[inline]
517     fn optimistic_check(&mut self) -> bool {
518         do self.next.with_mut_ref |pone| { pone.optimistic_check() }
519     }
520
521     #[inline]
522     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
523         let task = Cell::new(task);
524         do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
525     }
526
527     #[inline]
528     fn unblock_from(&mut self) -> bool {
529         do self.next.with_mut_ref |pone| { pone.unblock_from() }
530     }
531 }
532
533 impl<T> Select for Port<T> {
534     #[inline]
535     fn optimistic_check(&mut self) -> bool {
536         (&*self).optimistic_check()
537     }
538
539     #[inline]
540     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
541         (&*self).block_on(sched, task)
542     }
543
544     #[inline]
545     fn unblock_from(&mut self) -> bool {
546         (&*self).unblock_from()
547     }
548 }
549
550 impl<'self, T> SelectPort<T> for &'self Port<T> {
551     fn recv_ready(self) -> Option<T> {
552         match self.next.take().recv_ready() {
553             Some(StreamPayload { val, next }) => {
554                 self.next.put_back(next);
555                 Some(val)
556             }
557             None => None
558         }
559     }
560 }
561
562 pub struct SharedChan<T> {
563     // Just like Chan, but a shared AtomicOption instead of Cell
564     priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
565 }
566
567 impl<T> SharedChan<T> {
568     pub fn new(chan: Chan<T>) -> SharedChan<T> {
569         let next = chan.next.take();
570         let next = AtomicOption::new(~next);
571         SharedChan { next: UnsafeAtomicRcBox::new(next) }
572     }
573 }
574
575 impl<T: Send> SharedChan<T> {
576     fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
577         unsafe {
578             let (next_pone, next_cone) = oneshot();
579             let cone = (*self.next.get()).swap(~next_cone, SeqCst);
580             cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
581                                          do_resched)
582         }
583     }
584 }
585
586 impl<T: Send> GenericChan<T> for SharedChan<T> {
587     fn send(&self, val: T) {
588         self.try_send(val);
589     }
590 }
591
592 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
593     fn try_send(&self, val: T) -> bool {
594         self.try_send_inner(val, true)
595     }
596 }
597
598 impl<T: Send> SendDeferred<T> for SharedChan<T> {
599     fn send_deferred(&self, val: T) {
600         self.try_send_deferred(val);
601     }
602     fn try_send_deferred(&self, val: T) -> bool {
603         self.try_send_inner(val, false)
604     }
605 }
606
607 impl<T> Clone for SharedChan<T> {
608     fn clone(&self) -> SharedChan<T> {
609         SharedChan {
610             next: self.next.clone()
611         }
612     }
613 }
614
615 pub struct SharedPort<T> {
616     // The next port on which we will receive the next port on which we will receive T
617     priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
618 }
619
620 impl<T> SharedPort<T> {
621     pub fn new(port: Port<T>) -> SharedPort<T> {
622         // Put the data port into a new link pipe
623         let next_data_port = port.next.take();
624         let (next_link_port, next_link_chan) = oneshot();
625         next_link_chan.send(next_data_port);
626         let next_link = AtomicOption::new(~next_link_port);
627         SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
628     }
629 }
630
631 impl<T: Send> GenericPort<T> for SharedPort<T> {
632     fn recv(&self) -> T {
633         match self.try_recv() {
634             Some(val) => val,
635             None => {
636                 fail!("receiving on a closed channel");
637             }
638         }
639     }
640
641     fn try_recv(&self) -> Option<T> {
642         unsafe {
643             let (next_link_port, next_link_chan) = oneshot();
644             let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
645             let link_port = link_port.unwrap();
646             let data_port = link_port.recv();
647             let (next_data_port, res) = match data_port.try_recv() {
648                 Some(StreamPayload { val, next }) => {
649                     (next, Some(val))
650                 }
651                 None => {
652                     let (next_data_port, _) = oneshot();
653                     (next_data_port, None)
654                 }
655             };
656             next_link_chan.send(next_data_port);
657             return res;
658         }
659     }
660 }
661
662 impl<T> Clone for SharedPort<T> {
663     fn clone(&self) -> SharedPort<T> {
664         SharedPort {
665             next_link: self.next_link.clone()
666         }
667     }
668 }
669
670 // XXX: Need better name
671 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
672
673 pub fn megapipe<T: Send>() -> MegaPipe<T> {
674     let (port, chan) = stream();
675     (SharedPort::new(port), SharedChan::new(chan))
676 }
677
678 impl<T: Send> GenericChan<T> for MegaPipe<T> {
679     fn send(&self, val: T) {
680         self.second_ref().send(val)
681     }
682 }
683
684 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
685     fn try_send(&self, val: T) -> bool {
686         self.second_ref().try_send(val)
687     }
688 }
689
690 impl<T: Send> GenericPort<T> for MegaPipe<T> {
691     fn recv(&self) -> T {
692         self.first_ref().recv()
693     }
694
695     fn try_recv(&self) -> Option<T> {
696         self.first_ref().try_recv()
697     }
698 }
699
700 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
701     fn send_deferred(&self, val: T) {
702         self.second_ref().send_deferred(val)
703     }
704     fn try_send_deferred(&self, val: T) -> bool {
705         self.second_ref().try_send_deferred(val)
706     }
707 }
708
709 #[cfg(test)]
710 mod test {
711     use super::*;
712     use option::*;
713     use rt::test::*;
714     use cell::Cell;
715     use iter::Times;
716
717     #[test]
718     fn oneshot_single_thread_close_port_first() {
719         // Simple test of closing without sending
720         do run_in_newsched_task {
721             let (port, _chan) = oneshot::<int>();
722             { let _p = port; }
723         }
724     }
725
726     #[test]
727     fn oneshot_single_thread_close_chan_first() {
728         // Simple test of closing without sending
729         do run_in_newsched_task {
730             let (_port, chan) = oneshot::<int>();
731             { let _c = chan; }
732         }
733     }
734
735     #[test]
736     fn oneshot_single_thread_send_port_close() {
737         // Testing that the sender cleans up the payload if receiver is closed
738         do run_in_newsched_task {
739             let (port, chan) = oneshot::<~int>();
740             { let _p = port; }
741             chan.send(~0);
742         }
743     }
744
745     #[test]
746     fn oneshot_single_thread_recv_chan_close() {
747         // Receiving on a closed chan will fail
748         do run_in_newsched_task {
749             let res = do spawntask_try {
750                 let (port, chan) = oneshot::<~int>();
751                 { let _c = chan; }
752                 port.recv();
753             };
754             // What is our res?
755             rtdebug!("res is: %?", res.is_err());
756             assert!(res.is_err());
757         }
758     }
759
760     #[test]
761     fn oneshot_single_thread_send_then_recv() {
762         do run_in_newsched_task {
763             let (port, chan) = oneshot::<~int>();
764             chan.send(~10);
765             assert!(port.recv() == ~10);
766         }
767     }
768
769     #[test]
770     fn oneshot_single_thread_try_send_open() {
771         do run_in_newsched_task {
772             let (port, chan) = oneshot::<int>();
773             assert!(chan.try_send(10));
774             assert!(port.recv() == 10);
775         }
776     }
777
778     #[test]
779     fn oneshot_single_thread_try_send_closed() {
780         do run_in_newsched_task {
781             let (port, chan) = oneshot::<int>();
782             { let _p = port; }
783             assert!(!chan.try_send(10));
784         }
785     }
786
787     #[test]
788     fn oneshot_single_thread_try_recv_open() {
789         do run_in_newsched_task {
790             let (port, chan) = oneshot::<int>();
791             chan.send(10);
792             assert!(port.try_recv() == Some(10));
793         }
794     }
795
796     #[test]
797     fn oneshot_single_thread_try_recv_closed() {
798         do run_in_newsched_task {
799             let (port, chan) = oneshot::<int>();
800             { let _c = chan; }
801             assert!(port.try_recv() == None);
802         }
803     }
804
805     #[test]
806     fn oneshot_single_thread_peek_data() {
807         do run_in_newsched_task {
808             let (port, chan) = oneshot::<int>();
809             assert!(!port.peek());
810             chan.send(10);
811             assert!(port.peek());
812         }
813     }
814
815     #[test]
816     fn oneshot_single_thread_peek_close() {
817         do run_in_newsched_task {
818             let (port, chan) = oneshot::<int>();
819             { let _c = chan; }
820             assert!(!port.peek());
821             assert!(!port.peek());
822         }
823     }
824
825     #[test]
826     fn oneshot_single_thread_peek_open() {
827         do run_in_newsched_task {
828             let (port, _) = oneshot::<int>();
829             assert!(!port.peek());
830         }
831     }
832
833     #[test]
834     fn oneshot_multi_task_recv_then_send() {
835         do run_in_newsched_task {
836             let (port, chan) = oneshot::<~int>();
837             let port_cell = Cell::new(port);
838             do spawntask {
839                 assert!(port_cell.take().recv() == ~10);
840             }
841
842             chan.send(~10);
843         }
844     }
845
846     #[test]
847     fn oneshot_multi_task_recv_then_close() {
848         do run_in_newsched_task {
849             let (port, chan) = oneshot::<~int>();
850             let port_cell = Cell::new(port);
851             let chan_cell = Cell::new(chan);
852             do spawntask_later {
853                 let _cell = chan_cell.take();
854             }
855             let res = do spawntask_try {
856                 assert!(port_cell.take().recv() == ~10);
857             };
858             assert!(res.is_err());
859         }
860     }
861
862     #[test]
863     fn oneshot_multi_thread_close_stress() {
864         do stress_factor().times {
865             do run_in_newsched_task {
866                 let (port, chan) = oneshot::<int>();
867                 let port_cell = Cell::new(port);
868                 let thread = do spawntask_thread {
869                     let _p = port_cell.take();
870                 };
871                 let _chan = chan;
872                 thread.join();
873             }
874         }
875     }
876
877     #[test]
878     fn oneshot_multi_thread_send_close_stress() {
879         do stress_factor().times {
880             do run_in_newsched_task {
881                 let (port, chan) = oneshot::<int>();
882                 let chan_cell = Cell::new(chan);
883                 let port_cell = Cell::new(port);
884                 let thread1 = do spawntask_thread {
885                     let _p = port_cell.take();
886                 };
887                 let thread2 = do spawntask_thread {
888                     let c = chan_cell.take();
889                     c.send(1);
890                 };
891                 thread1.join();
892                 thread2.join();
893             }
894         }
895     }
896
897     #[test]
898     fn oneshot_multi_thread_recv_close_stress() {
899         do stress_factor().times {
900             do run_in_newsched_task {
901                 let (port, chan) = oneshot::<int>();
902                 let chan_cell = Cell::new(chan);
903                 let port_cell = Cell::new(port);
904                 let thread1 = do spawntask_thread {
905                     let port_cell = Cell::new(port_cell.take());
906                     let res = do spawntask_try {
907                         port_cell.take().recv();
908                     };
909                     assert!(res.is_err());
910                 };
911                 let thread2 = do spawntask_thread {
912                     let chan_cell = Cell::new(chan_cell.take());
913                     do spawntask {
914                         chan_cell.take();
915                     }
916                 };
917                 thread1.join();
918                 thread2.join();
919             }
920         }
921     }
922
923     #[test]
924     fn oneshot_multi_thread_send_recv_stress() {
925         do stress_factor().times {
926             do run_in_newsched_task {
927                 let (port, chan) = oneshot::<~int>();
928                 let chan_cell = Cell::new(chan);
929                 let port_cell = Cell::new(port);
930                 let thread1 = do spawntask_thread {
931                     chan_cell.take().send(~10);
932                 };
933                 let thread2 = do spawntask_thread {
934                     assert!(port_cell.take().recv() == ~10);
935                 };
936                 thread1.join();
937                 thread2.join();
938             }
939         }
940     }
941
942     #[test]
943     fn stream_send_recv_stress() {
944         do stress_factor().times {
945             do run_in_mt_newsched_task {
946                 let (port, chan) = stream::<~int>();
947
948                 send(chan, 0);
949                 recv(port, 0);
950
951                 fn send(chan: Chan<~int>, i: int) {
952                     if i == 10 { return }
953
954                     let chan_cell = Cell::new(chan);
955                     do spawntask_random {
956                         let chan = chan_cell.take();
957                         chan.send(~i);
958                         send(chan, i + 1);
959                     }
960                 }
961
962                 fn recv(port: Port<~int>, i: int) {
963                     if i == 10 { return }
964
965                     let port_cell = Cell::new(port);
966                     do spawntask_random {
967                         let port = port_cell.take();
968                         assert!(port.recv() == ~i);
969                         recv(port, i + 1);
970                     };
971                 }
972             }
973         }
974     }
975
976     #[test]
977     fn recv_a_lot() {
978         // Regression test that we don't run out of stack in scheduler context
979         do run_in_newsched_task {
980             let (port, chan) = stream();
981             do 10000.times { chan.send(()) }
982             do 10000.times { port.recv() }
983         }
984     }
985
986     #[test]
987     fn shared_chan_stress() {
988         do run_in_mt_newsched_task {
989             let (port, chan) = stream();
990             let chan = SharedChan::new(chan);
991             let total = stress_factor() + 100;
992             do total.times {
993                 let chan_clone = chan.clone();
994                 do spawntask_random {
995                     chan_clone.send(());
996                 }
997             }
998
999             do total.times {
1000                 port.recv();
1001             }
1002         }
1003     }
1004
1005     #[test]
1006     fn shared_port_stress() {
1007         do run_in_mt_newsched_task {
1008             // XXX: Removing these type annotations causes an ICE
1009             let (end_port, end_chan) = stream::<()>();
1010             let (port, chan) = stream::<()>();
1011             let end_chan = SharedChan::new(end_chan);
1012             let port = SharedPort::new(port);
1013             let total = stress_factor() + 100;
1014             do total.times {
1015                 let end_chan_clone = end_chan.clone();
1016                 let port_clone = port.clone();
1017                 do spawntask_random {
1018                     port_clone.recv();
1019                     end_chan_clone.send(());
1020                 }
1021             }
1022
1023             do total.times {
1024                 chan.send(());
1025             }
1026
1027             do total.times {
1028                 end_port.recv();
1029             }
1030         }
1031     }
1032
1033     #[test]
1034     fn shared_port_close_simple() {
1035         do run_in_mt_newsched_task {
1036             let (port, chan) = stream::<()>();
1037             let port = SharedPort::new(port);
1038             { let _chan = chan; }
1039             assert!(port.try_recv().is_none());
1040         }
1041     }
1042
1043     #[test]
1044     fn shared_port_close() {
1045         do run_in_mt_newsched_task {
1046             let (end_port, end_chan) = stream::<bool>();
1047             let (port, chan) = stream::<()>();
1048             let end_chan = SharedChan::new(end_chan);
1049             let port = SharedPort::new(port);
1050             let chan = SharedChan::new(chan);
1051             let send_total = 10;
1052             let recv_total = 20;
1053             do spawntask_random {
1054                 do send_total.times {
1055                     let chan_clone = chan.clone();
1056                     do spawntask_random {
1057                         chan_clone.send(());
1058                     }
1059                 }
1060             }
1061             let end_chan_clone = end_chan.clone();
1062             do spawntask_random {
1063                 do recv_total.times {
1064                     let port_clone = port.clone();
1065                     let end_chan_clone = end_chan_clone.clone();
1066                     do spawntask_random {
1067                         let recvd = port_clone.try_recv().is_some();
1068                         end_chan_clone.send(recvd);
1069                     }
1070                 }
1071             }
1072
1073             let mut recvd = 0;
1074             do recv_total.times {
1075                 recvd += if end_port.recv() { 1 } else { 0 };
1076             }
1077
1078             assert!(recvd == send_total);
1079         }
1080     }
1081
1082     #[test]
1083     fn megapipe_stress() {
1084         use rand;
1085         use rand::RngUtil;
1086
1087         do run_in_mt_newsched_task {
1088             let (end_port, end_chan) = stream::<()>();
1089             let end_chan = SharedChan::new(end_chan);
1090             let pipe = megapipe();
1091             let total = stress_factor() + 10;
1092             let mut rng = rand::rng();
1093             do total.times {
1094                 let msgs = rng.gen_uint_range(0, 10);
1095                 let pipe_clone = pipe.clone();
1096                 let end_chan_clone = end_chan.clone();
1097                 do spawntask_random {
1098                     do msgs.times {
1099                         pipe_clone.send(());
1100                     }
1101                     do msgs.times {
1102                         pipe_clone.recv();
1103                     }
1104                 }
1105
1106                 end_chan_clone.send(());
1107             }
1108
1109             do total.times {
1110                 end_port.recv();
1111             }
1112         }
1113     }
1114
1115     #[test]
1116     fn send_deferred() {
1117         use unstable::sync::atomically;
1118
1119         // Tests no-rescheduling of send_deferred on all types of channels.
1120         do run_in_newsched_task {
1121             let (pone, cone) = oneshot();
1122             let (pstream, cstream) = stream();
1123             let (pshared, cshared) = stream();
1124             let cshared = SharedChan::new(cshared);
1125             let mp = megapipe();
1126
1127             let pone = Cell::new(pone);
1128             do spawntask { pone.take().recv(); }
1129             let pstream = Cell::new(pstream);
1130             do spawntask { pstream.take().recv(); }
1131             let pshared = Cell::new(pshared);
1132             do spawntask { pshared.take().recv(); }
1133             let p_mp = Cell::new(mp.clone());
1134             do spawntask { p_mp.take().recv(); }
1135
1136             let cs = Cell::new((cone, cstream, cshared, mp));
1137             unsafe {
1138                 do atomically {
1139                     let (cone, cstream, cshared, mp) = cs.take();
1140                     cone.send_deferred(());
1141                     cstream.send_deferred(());
1142                     cshared.send_deferred(());
1143                     mp.send_deferred(());
1144                 }
1145             }
1146         }
1147     }
1148
1149 }