1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Ports and channels.
16 use rt::kill::BlockedTask;
18 use rt::sched::Scheduler;
20 use rt::select::{Select, SelectPort};
21 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
22 use unstable::sync::UnsafeAtomicRcBox;
24 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
27 use rt::{context, SchedulerContext};
28 use tuple::ImmutableTuple;
30 /// A combined refcount / BlockedTask-as-uint pointer.
32 /// Can be equal to the following values:
34 /// * 2 - both endpoints are alive
35 /// * 1 - either the sender or the receiver is dead, determined by context
36 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
39 static STATE_BOTH: State = 2;
40 static STATE_ONE: State = 1;
42 /// The heap-allocated structure shared between two endpoints.
48 /// A one-shot channel.
49 pub struct ChanOne<T> {
50 void_packet: *mut Void,
51 suppress_finalize: bool
55 pub struct PortOne<T> {
56 void_packet: *mut Void,
57 suppress_finalize: bool
60 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
61 let packet: ~Packet<T> = ~Packet {
62 state: AtomicUint::new(STATE_BOTH),
67 let packet: *mut Void = cast::transmute(packet);
70 suppress_finalize: false
74 suppress_finalize: false
82 fn packet(&self) -> *mut Packet<T> {
84 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
85 let p: *mut Packet<T> = &mut **p;
90 /// Send a message on the one-shot channel. If a receiver task is blocked
91 /// waiting for the message, will wake it up and reschedule to it.
92 pub fn send(self, val: T) {
96 /// As `send`, but also returns whether or not the receiver endpoint is still open.
97 pub fn try_send(self, val: T) -> bool {
98 self.try_send_inner(val, true)
101 /// Send a message without immediately rescheduling to a blocked receiver.
102 /// This can be useful in contexts where rescheduling is forbidden, or to
103 /// optimize for when the sender expects to still have useful work to do.
104 pub fn send_deferred(self, val: T) {
105 self.try_send_deferred(val);
108 /// As `send_deferred` and `try_send` together.
109 pub fn try_send_deferred(self, val: T) -> bool {
110 self.try_send_inner(val, false)
113 // 'do_resched' configures whether the scheduler immediately switches to
114 // the receiving task, or leaves the sending task still running.
115 fn try_send_inner(self, val: T, do_resched: bool) -> bool {
116 rtassert!(context() != SchedulerContext);
119 let mut recvr_active = true;
120 let packet = this.packet();
124 // Install the payload
125 assert!((*packet).payload.is_none());
126 (*packet).payload = Some(val);
128 // Atomically swap out the old state to figure out what
129 // the port's up to, issuing a release barrier to prevent
130 // reordering of the payload write. This also issues an
131 // acquire barrier that keeps the subsequent access of the
132 // ~Task pointer from being reordered.
133 let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
135 // Suppress the synchronizing actions in the finalizer. We're
136 // done with the packet. NB: In case of do_resched, this *must*
137 // happen before waking up a blocked task (or be unkillable),
138 // because we might get a kill signal during the reschedule.
139 this.suppress_finalize = true;
143 // Port is not waiting yet. Nothing to do
144 do Local::borrow::<Scheduler, ()> |sched| {
145 rtdebug!("non-rendezvous send");
146 sched.metrics.non_rendezvous_sends += 1;
150 do Local::borrow::<Scheduler, ()> |sched| {
151 rtdebug!("rendezvous send");
152 sched.metrics.rendezvous_sends += 1;
154 // Port has closed. Need to clean up.
155 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
156 recvr_active = false;
159 // Port is blocked. Wake it up.
160 let recvr = BlockedTask::cast_from_uint(task_as_state);
162 do recvr.wake().map_move |woken_task| {
163 Scheduler::run_task(woken_task);
166 let recvr = Cell::new(recvr);
167 do Local::borrow::<Scheduler, ()> |sched| {
168 sched.enqueue_blocked_task(recvr.take());
180 fn packet(&self) -> *mut Packet<T> {
182 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
183 let p: *mut Packet<T> = &mut **p;
188 /// Wait for a message on the one-shot port. Fails if the send end is closed.
189 pub fn recv(self) -> T {
190 match self.try_recv() {
193 fail!("receiving on closed channel");
198 /// As `recv`, but returns `None` if the send end is closed rather than failing.
199 pub fn try_recv(self) -> Option<T> {
202 // Optimistic check. If data was sent already, we don't even need to block.
203 // No release barrier needed here; we're not handing off our task pointer yet.
204 if !this.optimistic_check() {
205 // No data available yet.
206 // Switch to the scheduler to put the ~Task into the Packet state.
207 let sched = Local::take::<Scheduler>();
208 do sched.deschedule_running_task_and_then |sched, task| {
209 this.block_on(sched, task);
218 impl<T> Select for PortOne<T> {
219 #[inline] #[cfg(not(test))]
220 fn optimistic_check(&mut self) -> bool {
221 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
224 #[inline] #[cfg(test)]
225 fn optimistic_check(&mut self) -> bool {
226 // The optimistic check is never necessary for correctness. For testing
227 // purposes, making it randomly return false simulates a racing sender.
229 let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
230 Rand::rand(&mut sched.rng)
233 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
239 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
241 // Atomically swap the task pointer into the Packet state, issuing
242 // an acquire barrier to prevent reordering of the subsequent read
243 // of the payload. Also issues a release barrier to prevent
244 // reordering of any previous writes to the task structure.
245 let task_as_state = task.cast_to_uint();
246 let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
249 // Data has not been sent. Now we're blocked.
250 rtdebug!("non-rendezvous recv");
251 sched.metrics.non_rendezvous_recvs += 1;
255 // Re-record that we are the only owner of the packet.
256 // No barrier needed, even if the task gets reawoken
257 // on a different core -- this is analogous to writing a
258 // payload; a barrier in enqueueing the task protects it.
259 // NB(#8132). This *must* occur before the enqueue below.
260 // FIXME(#6842, #8130) This is usually only needed for the
261 // assertion in recv_ready, except in the case of select().
262 // This won't actually ever have cacheline contention, but
263 // maybe should be optimized out with a cfg(test) anyway?
264 (*self.packet()).state.store(STATE_ONE, Relaxed);
266 rtdebug!("rendezvous recv");
267 sched.metrics.rendezvous_recvs += 1;
269 // Channel is closed. Switch back and check the data.
270 // NB: We have to drop back into the scheduler event loop here
271 // instead of switching immediately back or we could end up
272 // triggering infinite recursion on the scheduler's stack.
273 let recvr = BlockedTask::cast_from_uint(task_as_state);
274 sched.enqueue_blocked_task(recvr);
277 _ => rtabort!("can't block_on; a task is already blocked")
282 // This is the only select trait function that's not also used in recv.
283 fn unblock_from(&mut self) -> bool {
284 let packet = self.packet();
286 // In case the data is available, the acquire barrier here matches
287 // the release barrier the sender used to release the payload.
288 match (*packet).state.load(Acquire) {
289 // Impossible. We removed STATE_BOTH when blocking on it, and
290 // no self-respecting sender would put it back.
291 STATE_BOTH => rtabort!("refcount already 2 in unblock_from"),
292 // Here, a sender already tried to wake us up. Perhaps they
293 // even succeeded! Data is available.
295 // Still registered as blocked. Need to "unblock" the pointer.
297 // In the window between the load and the CAS, a sender
298 // might take the pointer and set the refcount to ONE. If
299 // that happens, we shouldn't clobber that with BOTH!
300 // Acquire barrier again for the same reason as above.
301 match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
303 STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
304 STATE_ONE => true, // Lost the race. Data available.
306 // We successfully unblocked our task pointer.
307 assert!(task_as_state == same_ptr);
308 let handle = BlockedTask::cast_from_uint(task_as_state);
309 // Because we are already awake, the handle we
310 // gave to this port shall already be empty.
311 handle.assert_already_awake();
321 impl<T> SelectPort<T> for PortOne<T> {
322 fn recv_ready(self) -> Option<T> {
324 let packet = this.packet();
326 // No further memory barrier is needed here to access the
327 // payload. Some scenarios:
329 // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
330 // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
331 // and ran on its thread. The sending task issued a read barrier when taking the
332 // pointer to the receiving task.
333 // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
334 // is pinned to some other scheduler, so the sending task had to give us to
335 // a different scheduler for resuming. That send synchronized memory.
337 // See corresponding store() above in block_on for rationale.
338 // FIXME(#8130) This can happen only in test builds.
339 assert!((*packet).state.load(Relaxed) == STATE_ONE);
341 let payload = (*packet).payload.take();
343 // The sender has closed up shop. Drop the packet.
344 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
345 // Suppress the synchronizing actions in the finalizer. We're done with the packet.
346 this.suppress_finalize = true;
352 impl<T> Peekable<T> for PortOne<T> {
353 fn peek(&self) -> bool {
355 let packet: *mut Packet<T> = self.packet();
356 let oldstate = (*packet).state.load(SeqCst);
359 STATE_ONE => (*packet).payload.is_some(),
360 _ => rtabort!("peeked on a blocked task")
367 impl<T> Drop for ChanOne<T> {
369 if self.suppress_finalize { return }
372 let this = cast::transmute_mut(self);
373 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
376 // Port still active. It will destroy the Packet.
379 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
382 // The port is blocked waiting for a message we will never send. Wake it.
383 assert!((*this.packet()).payload.is_none());
384 let recvr = BlockedTask::cast_from_uint(task_as_state);
385 do recvr.wake().map_move |woken_task| {
386 Scheduler::run_task(woken_task);
395 impl<T> Drop for PortOne<T> {
397 if self.suppress_finalize { return }
400 let this = cast::transmute_mut(self);
401 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
404 // Chan still active. It will destroy the packet.
407 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
410 // This case occurs during unwinding, when the blocked
411 // receiver was killed awake. The task can't still be
412 // blocked (we are it), but we need to free the handle.
413 let recvr = BlockedTask::cast_from_uint(task_as_state);
414 recvr.assert_already_awake();
421 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
422 pub trait SendDeferred<T> {
423 fn send_deferred(&self, val: T);
424 fn try_send_deferred(&self, val: T) -> bool;
427 struct StreamPayload<T> {
429 next: PortOne<StreamPayload<T>>
432 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
433 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
435 /// A channel with unbounded size.
437 // FIXME #5372. Using Cell because we don't take &mut self
438 next: Cell<StreamChanOne<T>>
441 /// An port with unbounded size.
443 // FIXME #5372. Using Cell because we don't take &mut self
444 next: Cell<StreamPortOne<T>>
447 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
448 let (pone, cone) = oneshot();
449 let port = Port { next: Cell::new(pone) };
450 let chan = Chan { next: Cell::new(cone) };
454 impl<T: Send> Chan<T> {
455 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
456 let (next_pone, next_cone) = oneshot();
457 let cone = self.next.take();
458 self.next.put_back(next_cone);
459 cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
463 impl<T: Send> GenericChan<T> for Chan<T> {
464 fn send(&self, val: T) {
469 impl<T: Send> GenericSmartChan<T> for Chan<T> {
470 fn try_send(&self, val: T) -> bool {
471 self.try_send_inner(val, true)
475 impl<T: Send> SendDeferred<T> for Chan<T> {
476 fn send_deferred(&self, val: T) {
477 self.try_send_deferred(val);
479 fn try_send_deferred(&self, val: T) -> bool {
480 self.try_send_inner(val, false)
484 impl<T> GenericPort<T> for Port<T> {
485 fn recv(&self) -> T {
486 match self.try_recv() {
489 fail!("receiving on closed channel");
494 fn try_recv(&self) -> Option<T> {
495 let pone = self.next.take();
496 match pone.try_recv() {
497 Some(StreamPayload { val, next }) => {
498 self.next.put_back(next);
506 impl<T> Peekable<T> for Port<T> {
507 fn peek(&self) -> bool {
508 self.next.with_mut_ref(|p| p.peek())
512 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
513 // of them, but a &Port<T> should also be selectable so you can select2 on it
514 // alongside a PortOne<U> without passing the port by value in recv_ready.
516 impl<'self, T> Select for &'self Port<T> {
518 fn optimistic_check(&mut self) -> bool {
519 do self.next.with_mut_ref |pone| { pone.optimistic_check() }
523 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
524 let task = Cell::new(task);
525 do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
529 fn unblock_from(&mut self) -> bool {
530 do self.next.with_mut_ref |pone| { pone.unblock_from() }
534 impl<T> Select for Port<T> {
536 fn optimistic_check(&mut self) -> bool {
537 (&*self).optimistic_check()
541 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
542 (&*self).block_on(sched, task)
546 fn unblock_from(&mut self) -> bool {
547 (&*self).unblock_from()
551 impl<'self, T> SelectPort<T> for &'self Port<T> {
552 fn recv_ready(self) -> Option<T> {
553 match self.next.take().recv_ready() {
554 Some(StreamPayload { val, next }) => {
555 self.next.put_back(next);
563 pub struct SharedChan<T> {
564 // Just like Chan, but a shared AtomicOption instead of Cell
565 priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
568 impl<T> SharedChan<T> {
569 pub fn new(chan: Chan<T>) -> SharedChan<T> {
570 let next = chan.next.take();
571 let next = AtomicOption::new(~next);
572 SharedChan { next: UnsafeAtomicRcBox::new(next) }
576 impl<T: Send> SharedChan<T> {
577 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
579 let (next_pone, next_cone) = oneshot();
580 let cone = (*self.next.get()).swap(~next_cone, SeqCst);
581 cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
587 impl<T: Send> GenericChan<T> for SharedChan<T> {
588 fn send(&self, val: T) {
593 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
594 fn try_send(&self, val: T) -> bool {
595 self.try_send_inner(val, true)
599 impl<T: Send> SendDeferred<T> for SharedChan<T> {
600 fn send_deferred(&self, val: T) {
601 self.try_send_deferred(val);
603 fn try_send_deferred(&self, val: T) -> bool {
604 self.try_send_inner(val, false)
608 impl<T> Clone for SharedChan<T> {
609 fn clone(&self) -> SharedChan<T> {
611 next: self.next.clone()
616 pub struct SharedPort<T> {
617 // The next port on which we will receive the next port on which we will receive T
618 priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
621 impl<T> SharedPort<T> {
622 pub fn new(port: Port<T>) -> SharedPort<T> {
623 // Put the data port into a new link pipe
624 let next_data_port = port.next.take();
625 let (next_link_port, next_link_chan) = oneshot();
626 next_link_chan.send(next_data_port);
627 let next_link = AtomicOption::new(~next_link_port);
628 SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
632 impl<T: Send> GenericPort<T> for SharedPort<T> {
633 fn recv(&self) -> T {
634 match self.try_recv() {
637 fail!("receiving on a closed channel");
642 fn try_recv(&self) -> Option<T> {
644 let (next_link_port, next_link_chan) = oneshot();
645 let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
646 let link_port = link_port.unwrap();
647 let data_port = link_port.recv();
648 let (next_data_port, res) = match data_port.try_recv() {
649 Some(StreamPayload { val, next }) => {
653 let (next_data_port, _) = oneshot();
654 (next_data_port, None)
657 next_link_chan.send(next_data_port);
663 impl<T> Clone for SharedPort<T> {
664 fn clone(&self) -> SharedPort<T> {
666 next_link: self.next_link.clone()
671 // XXX: Need better name
672 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
674 pub fn megapipe<T: Send>() -> MegaPipe<T> {
675 let (port, chan) = stream();
676 (SharedPort::new(port), SharedChan::new(chan))
679 impl<T: Send> GenericChan<T> for MegaPipe<T> {
680 fn send(&self, val: T) {
681 self.second_ref().send(val)
685 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
686 fn try_send(&self, val: T) -> bool {
687 self.second_ref().try_send(val)
691 impl<T: Send> GenericPort<T> for MegaPipe<T> {
692 fn recv(&self) -> T {
693 self.first_ref().recv()
696 fn try_recv(&self) -> Option<T> {
697 self.first_ref().try_recv()
701 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
702 fn send_deferred(&self, val: T) {
703 self.second_ref().send_deferred(val)
705 fn try_send_deferred(&self, val: T) -> bool {
706 self.second_ref().try_send_deferred(val)
719 fn oneshot_single_thread_close_port_first() {
720 // Simple test of closing without sending
721 do run_in_newsched_task {
722 let (port, _chan) = oneshot::<int>();
728 fn oneshot_single_thread_close_chan_first() {
729 // Simple test of closing without sending
730 do run_in_newsched_task {
731 let (_port, chan) = oneshot::<int>();
737 fn oneshot_single_thread_send_port_close() {
738 // Testing that the sender cleans up the payload if receiver is closed
739 do run_in_newsched_task {
740 let (port, chan) = oneshot::<~int>();
747 fn oneshot_single_thread_recv_chan_close() {
748 // Receiving on a closed chan will fail
749 do run_in_newsched_task {
750 let res = do spawntask_try {
751 let (port, chan) = oneshot::<~int>();
756 rtdebug!("res is: %?", res.is_err());
757 assert!(res.is_err());
762 fn oneshot_single_thread_send_then_recv() {
763 do run_in_newsched_task {
764 let (port, chan) = oneshot::<~int>();
766 assert!(port.recv() == ~10);
771 fn oneshot_single_thread_try_send_open() {
772 do run_in_newsched_task {
773 let (port, chan) = oneshot::<int>();
774 assert!(chan.try_send(10));
775 assert!(port.recv() == 10);
780 fn oneshot_single_thread_try_send_closed() {
781 do run_in_newsched_task {
782 let (port, chan) = oneshot::<int>();
784 assert!(!chan.try_send(10));
789 fn oneshot_single_thread_try_recv_open() {
790 do run_in_newsched_task {
791 let (port, chan) = oneshot::<int>();
793 assert!(port.try_recv() == Some(10));
798 fn oneshot_single_thread_try_recv_closed() {
799 do run_in_newsched_task {
800 let (port, chan) = oneshot::<int>();
802 assert!(port.try_recv() == None);
807 fn oneshot_single_thread_peek_data() {
808 do run_in_newsched_task {
809 let (port, chan) = oneshot::<int>();
810 assert!(!port.peek());
812 assert!(port.peek());
817 fn oneshot_single_thread_peek_close() {
818 do run_in_newsched_task {
819 let (port, chan) = oneshot::<int>();
821 assert!(!port.peek());
822 assert!(!port.peek());
827 fn oneshot_single_thread_peek_open() {
828 do run_in_newsched_task {
829 let (port, _) = oneshot::<int>();
830 assert!(!port.peek());
835 fn oneshot_multi_task_recv_then_send() {
836 do run_in_newsched_task {
837 let (port, chan) = oneshot::<~int>();
838 let port_cell = Cell::new(port);
840 assert!(port_cell.take().recv() == ~10);
848 fn oneshot_multi_task_recv_then_close() {
849 do run_in_newsched_task {
850 let (port, chan) = oneshot::<~int>();
851 let port_cell = Cell::new(port);
852 let chan_cell = Cell::new(chan);
854 let _cell = chan_cell.take();
856 let res = do spawntask_try {
857 assert!(port_cell.take().recv() == ~10);
859 assert!(res.is_err());
864 fn oneshot_multi_thread_close_stress() {
865 do stress_factor().times {
866 do run_in_newsched_task {
867 let (port, chan) = oneshot::<int>();
868 let port_cell = Cell::new(port);
869 let thread = do spawntask_thread {
870 let _p = port_cell.take();
879 fn oneshot_multi_thread_send_close_stress() {
880 do stress_factor().times {
881 do run_in_newsched_task {
882 let (port, chan) = oneshot::<int>();
883 let chan_cell = Cell::new(chan);
884 let port_cell = Cell::new(port);
885 let thread1 = do spawntask_thread {
886 let _p = port_cell.take();
888 let thread2 = do spawntask_thread {
889 let c = chan_cell.take();
899 fn oneshot_multi_thread_recv_close_stress() {
900 do stress_factor().times {
901 do run_in_newsched_task {
902 let (port, chan) = oneshot::<int>();
903 let chan_cell = Cell::new(chan);
904 let port_cell = Cell::new(port);
905 let thread1 = do spawntask_thread {
906 let port_cell = Cell::new(port_cell.take());
907 let res = do spawntask_try {
908 port_cell.take().recv();
910 assert!(res.is_err());
912 let thread2 = do spawntask_thread {
913 let chan_cell = Cell::new(chan_cell.take());
925 fn oneshot_multi_thread_send_recv_stress() {
926 do stress_factor().times {
927 do run_in_newsched_task {
928 let (port, chan) = oneshot::<~int>();
929 let chan_cell = Cell::new(chan);
930 let port_cell = Cell::new(port);
931 let thread1 = do spawntask_thread {
932 chan_cell.take().send(~10);
934 let thread2 = do spawntask_thread {
935 assert!(port_cell.take().recv() == ~10);
944 fn stream_send_recv_stress() {
945 do stress_factor().times {
946 do run_in_mt_newsched_task {
947 let (port, chan) = stream::<~int>();
952 fn send(chan: Chan<~int>, i: int) {
953 if i == 10 { return }
955 let chan_cell = Cell::new(chan);
956 do spawntask_random {
957 let chan = chan_cell.take();
963 fn recv(port: Port<~int>, i: int) {
964 if i == 10 { return }
966 let port_cell = Cell::new(port);
967 do spawntask_random {
968 let port = port_cell.take();
969 assert!(port.recv() == ~i);
979 // Regression test that we don't run out of stack in scheduler context
980 do run_in_newsched_task {
981 let (port, chan) = stream();
982 do 10000.times { chan.send(()) }
983 do 10000.times { port.recv() }
988 fn shared_chan_stress() {
989 do run_in_mt_newsched_task {
990 let (port, chan) = stream();
991 let chan = SharedChan::new(chan);
992 let total = stress_factor() + 100;
994 let chan_clone = chan.clone();
995 do spawntask_random {
1007 fn shared_port_stress() {
1008 do run_in_mt_newsched_task {
1009 // XXX: Removing these type annotations causes an ICE
1010 let (end_port, end_chan) = stream::<()>();
1011 let (port, chan) = stream::<()>();
1012 let end_chan = SharedChan::new(end_chan);
1013 let port = SharedPort::new(port);
1014 let total = stress_factor() + 100;
1016 let end_chan_clone = end_chan.clone();
1017 let port_clone = port.clone();
1018 do spawntask_random {
1020 end_chan_clone.send(());
1035 fn shared_port_close_simple() {
1036 do run_in_mt_newsched_task {
1037 let (port, chan) = stream::<()>();
1038 let port = SharedPort::new(port);
1039 { let _chan = chan; }
1040 assert!(port.try_recv().is_none());
1045 fn shared_port_close() {
1046 do run_in_mt_newsched_task {
1047 let (end_port, end_chan) = stream::<bool>();
1048 let (port, chan) = stream::<()>();
1049 let end_chan = SharedChan::new(end_chan);
1050 let port = SharedPort::new(port);
1051 let chan = SharedChan::new(chan);
1052 let send_total = 10;
1053 let recv_total = 20;
1054 do spawntask_random {
1055 do send_total.times {
1056 let chan_clone = chan.clone();
1057 do spawntask_random {
1058 chan_clone.send(());
1062 let end_chan_clone = end_chan.clone();
1063 do spawntask_random {
1064 do recv_total.times {
1065 let port_clone = port.clone();
1066 let end_chan_clone = end_chan_clone.clone();
1067 do spawntask_random {
1068 let recvd = port_clone.try_recv().is_some();
1069 end_chan_clone.send(recvd);
1075 do recv_total.times {
1076 recvd += if end_port.recv() { 1 } else { 0 };
1079 assert!(recvd == send_total);
1084 fn megapipe_stress() {
1088 do run_in_mt_newsched_task {
1089 let (end_port, end_chan) = stream::<()>();
1090 let end_chan = SharedChan::new(end_chan);
1091 let pipe = megapipe();
1092 let total = stress_factor() + 10;
1093 let mut rng = rand::rng();
1095 let msgs = rng.gen_uint_range(0, 10);
1096 let pipe_clone = pipe.clone();
1097 let end_chan_clone = end_chan.clone();
1098 do spawntask_random {
1100 pipe_clone.send(());
1107 end_chan_clone.send(());
1117 fn send_deferred() {
1118 use unstable::sync::atomically;
1120 // Tests no-rescheduling of send_deferred on all types of channels.
1121 do run_in_newsched_task {
1122 let (pone, cone) = oneshot();
1123 let (pstream, cstream) = stream();
1124 let (pshared, cshared) = stream();
1125 let cshared = SharedChan::new(cshared);
1126 let mp = megapipe();
1128 let pone = Cell::new(pone);
1129 do spawntask { pone.take().recv(); }
1130 let pstream = Cell::new(pstream);
1131 do spawntask { pstream.take().recv(); }
1132 let pshared = Cell::new(pshared);
1133 do spawntask { pshared.take().recv(); }
1134 let p_mp = Cell::new(mp.clone());
1135 do spawntask { p_mp.take().recv(); }
1137 let cs = Cell::new((cone, cstream, cshared, mp));
1140 let (cone, cstream, cshared, mp) = cs.take();
1141 cone.send_deferred(());
1142 cstream.send_deferred(());
1143 cshared.send_deferred(());
1144 mp.send_deferred(());