1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Ports and channels.
16 use rt::kill::BlockedTask;
19 use rt::sched::Scheduler;
21 use rt::select::{SelectInner, SelectPortInner};
22 use select::{Select, SelectPort};
23 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
24 use unstable::sync::UnsafeArc;
26 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
29 use tuple::ImmutableTuple;
31 /// A combined refcount / BlockedTask-as-uint pointer.
33 /// Can be equal to the following values:
35 /// * 2 - both endpoints are alive
36 /// * 1 - either the sender or the receiver is dead, determined by context
37 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
40 static STATE_BOTH: State = 2;
41 static STATE_ONE: State = 1;
43 /// The heap-allocated structure shared between two endpoints.
49 // A one-shot channel.
50 pub struct ChanOne<T> {
51 void_packet: *mut Void,
52 suppress_finalize: bool
56 pub struct PortOne<T> {
57 void_packet: *mut Void,
58 suppress_finalize: bool
61 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
62 let packet: ~Packet<T> = ~Packet {
63 state: AtomicUint::new(STATE_BOTH),
68 let packet: *mut Void = cast::transmute(packet);
71 suppress_finalize: false
75 suppress_finalize: false
83 fn packet(&self) -> *mut Packet<T> {
85 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
86 let p: *mut Packet<T> = &mut **p;
91 /// Send a message on the one-shot channel. If a receiver task is blocked
92 /// waiting for the message, will wake it up and reschedule to it.
93 pub fn send(self, val: T) {
97 /// As `send`, but also returns whether or not the receiver endpoint is still open.
98 pub fn try_send(self, val: T) -> bool {
99 self.try_send_inner(val, true)
102 /// Send a message without immediately rescheduling to a blocked receiver.
103 /// This can be useful in contexts where rescheduling is forbidden, or to
104 /// optimize for when the sender expects to still have useful work to do.
105 pub fn send_deferred(self, val: T) {
106 self.try_send_deferred(val);
109 /// As `send_deferred` and `try_send` together.
110 pub fn try_send_deferred(self, val: T) -> bool {
111 self.try_send_inner(val, false)
114 // 'do_resched' configures whether the scheduler immediately switches to
115 // the receiving task, or leaves the sending task still running.
116 fn try_send_inner(self, val: T, do_resched: bool) -> bool {
118 rtassert!(!rt::in_sched_context());
122 let mut recvr_active = true;
123 let packet = this.packet();
127 // Install the payload
128 rtassert!((*packet).payload.is_none());
129 (*packet).payload = Some(val);
131 // Atomically swap out the old state to figure out what
132 // the port's up to, issuing a release barrier to prevent
133 // reordering of the payload write. This also issues an
134 // acquire barrier that keeps the subsequent access of the
135 // ~Task pointer from being reordered.
136 let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
138 // Suppress the synchronizing actions in the finalizer. We're
139 // done with the packet. NB: In case of do_resched, this *must*
140 // happen before waking up a blocked task (or be unkillable),
141 // because we might get a kill signal during the reschedule.
142 this.suppress_finalize = true;
146 // Port is not waiting yet. Nothing to do
149 // Port has closed. Need to clean up.
150 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
151 recvr_active = false;
154 // Port is blocked. Wake it up.
155 let recvr = BlockedTask::cast_from_uint(task_as_state);
157 do recvr.wake().map_move |woken_task| {
158 Scheduler::run_task(woken_task);
161 let recvr = Cell::new(recvr);
162 do Local::borrow |sched: &mut Scheduler| {
163 sched.enqueue_blocked_task(recvr.take());
175 fn packet(&self) -> *mut Packet<T> {
177 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
178 let p: *mut Packet<T> = &mut **p;
183 /// Wait for a message on the one-shot port. Fails if the send end is closed.
184 pub fn recv(self) -> T {
185 match self.try_recv() {
188 fail!("receiving on closed channel");
193 /// As `recv`, but returns `None` if the send end is closed rather than failing.
194 pub fn try_recv(self) -> Option<T> {
197 // Optimistic check. If data was sent already, we don't even need to block.
198 // No release barrier needed here; we're not handing off our task pointer yet.
199 if !this.optimistic_check() {
200 // No data available yet.
201 // Switch to the scheduler to put the ~Task into the Packet state.
202 let sched: ~Scheduler = Local::take();
203 do sched.deschedule_running_task_and_then |sched, task| {
204 this.block_on(sched, task);
213 impl<T> SelectInner for PortOne<T> {
214 #[inline] #[cfg(not(test))]
215 fn optimistic_check(&mut self) -> bool {
216 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
219 #[inline] #[cfg(test)]
220 fn optimistic_check(&mut self) -> bool {
221 // The optimistic check is never necessary for correctness. For testing
222 // purposes, making it randomly return false simulates a racing sender.
224 let actually_check = do Local::borrow |sched: &mut Scheduler| {
225 Rand::rand(&mut sched.rng)
228 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
234 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
236 // Atomically swap the task pointer into the Packet state, issuing
237 // an acquire barrier to prevent reordering of the subsequent read
238 // of the payload. Also issues a release barrier to prevent
239 // reordering of any previous writes to the task structure.
240 let task_as_state = task.cast_to_uint();
241 let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
244 // Data has not been sent. Now we're blocked.
245 rtdebug!("non-rendezvous recv");
249 // Re-record that we are the only owner of the packet.
250 // No barrier needed, even if the task gets reawoken
251 // on a different core -- this is analogous to writing a
252 // payload; a barrier in enqueueing the task protects it.
253 // NB(#8132). This *must* occur before the enqueue below.
254 // FIXME(#6842, #8130) This is usually only needed for the
255 // assertion in recv_ready, except in the case of select().
256 // This won't actually ever have cacheline contention, but
257 // maybe should be optimized out with a cfg(test) anyway?
258 (*self.packet()).state.store(STATE_ONE, Relaxed);
260 rtdebug!("rendezvous recv");
262 // Channel is closed. Switch back and check the data.
263 // NB: We have to drop back into the scheduler event loop here
264 // instead of switching immediately back or we could end up
265 // triggering infinite recursion on the scheduler's stack.
266 let recvr = BlockedTask::cast_from_uint(task_as_state);
267 sched.enqueue_blocked_task(recvr);
270 _ => rtabort!("can't block_on; a task is already blocked")
275 // This is the only select trait function that's not also used in recv.
276 fn unblock_from(&mut self) -> bool {
277 let packet = self.packet();
279 // In case the data is available, the acquire barrier here matches
280 // the release barrier the sender used to release the payload.
281 match (*packet).state.load(Acquire) {
282 // Impossible. We removed STATE_BOTH when blocking on it, and
283 // no self-respecting sender would put it back.
284 STATE_BOTH => rtabort!("refcount already 2 in unblock_from"),
285 // Here, a sender already tried to wake us up. Perhaps they
286 // even succeeded! Data is available.
288 // Still registered as blocked. Need to "unblock" the pointer.
290 // In the window between the load and the CAS, a sender
291 // might take the pointer and set the refcount to ONE. If
292 // that happens, we shouldn't clobber that with BOTH!
293 // Acquire barrier again for the same reason as above.
294 match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
296 STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
297 STATE_ONE => true, // Lost the race. Data available.
299 // We successfully unblocked our task pointer.
300 rtassert!(task_as_state == same_ptr);
301 let handle = BlockedTask::cast_from_uint(task_as_state);
302 // Because we are already awake, the handle we
303 // gave to this port shall already be empty.
304 handle.assert_already_awake();
314 impl<T> Select for PortOne<T> { }
316 impl<T> SelectPortInner<T> for PortOne<T> {
317 fn recv_ready(self) -> Option<T> {
319 let packet = this.packet();
321 // No further memory barrier is needed here to access the
322 // payload. Some scenarios:
324 // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
325 // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
326 // and ran on its thread. The sending task issued a read barrier when taking the
327 // pointer to the receiving task.
328 // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
329 // is pinned to some other scheduler, so the sending task had to give us to
330 // a different scheduler for resuming. That send synchronized memory.
332 // See corresponding store() above in block_on for rationale.
333 // FIXME(#8130) This can happen only in test builds.
334 // This load is not required for correctness and may be compiled out.
335 rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
337 let payload = (*packet).payload.take();
339 // The sender has closed up shop. Drop the packet.
340 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
341 // Suppress the synchronizing actions in the finalizer. We're done with the packet.
342 this.suppress_finalize = true;
348 impl<T> SelectPort<T> for PortOne<T> { }
350 impl<T> Peekable<T> for PortOne<T> {
351 fn peek(&self) -> bool {
353 let packet: *mut Packet<T> = self.packet();
354 let oldstate = (*packet).state.load(SeqCst);
357 STATE_ONE => (*packet).payload.is_some(),
358 _ => rtabort!("peeked on a blocked task")
365 impl<T> Drop for ChanOne<T> {
367 if self.suppress_finalize { return }
370 let this = cast::transmute_mut(self);
371 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
374 // Port still active. It will destroy the Packet.
377 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
380 // The port is blocked waiting for a message we will never send. Wake it.
381 rtassert!((*this.packet()).payload.is_none());
382 let recvr = BlockedTask::cast_from_uint(task_as_state);
383 do recvr.wake().map_move |woken_task| {
384 Scheduler::run_task(woken_task);
393 impl<T> Drop for PortOne<T> {
395 if self.suppress_finalize { return }
398 let this = cast::transmute_mut(self);
399 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
402 // Chan still active. It will destroy the packet.
405 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
408 // This case occurs during unwinding, when the blocked
409 // receiver was killed awake. The task can't still be
410 // blocked (we are it), but we need to free the handle.
411 let recvr = BlockedTask::cast_from_uint(task_as_state);
412 recvr.assert_already_awake();
419 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
420 pub trait SendDeferred<T> {
421 fn send_deferred(&self, val: T);
422 fn try_send_deferred(&self, val: T) -> bool;
425 struct StreamPayload<T> {
427 next: PortOne<StreamPayload<T>>
430 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
431 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
433 /// A channel with unbounded size.
435 // FIXME #5372. Using Cell because we don't take &mut self
436 next: Cell<StreamChanOne<T>>
439 /// An port with unbounded size.
441 // FIXME #5372. Using Cell because we don't take &mut self
442 next: Cell<StreamPortOne<T>>
445 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
446 let (pone, cone) = oneshot();
447 let port = Port { next: Cell::new(pone) };
448 let chan = Chan { next: Cell::new(cone) };
452 impl<T: Send> Chan<T> {
453 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
454 let (next_pone, next_cone) = oneshot();
455 let cone = self.next.take();
456 self.next.put_back(next_cone);
457 cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
461 impl<T: Send> GenericChan<T> for Chan<T> {
462 fn send(&self, val: T) {
467 impl<T: Send> GenericSmartChan<T> for Chan<T> {
468 fn try_send(&self, val: T) -> bool {
469 self.try_send_inner(val, true)
473 impl<T: Send> SendDeferred<T> for Chan<T> {
474 fn send_deferred(&self, val: T) {
475 self.try_send_deferred(val);
477 fn try_send_deferred(&self, val: T) -> bool {
478 self.try_send_inner(val, false)
482 impl<T> GenericPort<T> for Port<T> {
483 fn recv(&self) -> T {
484 match self.try_recv() {
487 fail!("receiving on closed channel");
492 fn try_recv(&self) -> Option<T> {
493 do self.next.take_opt().map_move_default(None) |pone| {
494 match pone.try_recv() {
495 Some(StreamPayload { val, next }) => {
496 self.next.put_back(next);
505 impl<T> Peekable<T> for Port<T> {
506 fn peek(&self) -> bool {
507 self.next.with_mut_ref(|p| p.peek())
511 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
512 // of them, but a &Port<T> should also be selectable so you can select2 on it
513 // alongside a PortOne<U> without passing the port by value in recv_ready.
515 impl<'self, T> SelectInner for &'self Port<T> {
517 fn optimistic_check(&mut self) -> bool {
518 do self.next.with_mut_ref |pone| { pone.optimistic_check() }
522 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
523 let task = Cell::new(task);
524 do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
528 fn unblock_from(&mut self) -> bool {
529 do self.next.with_mut_ref |pone| { pone.unblock_from() }
533 impl<'self, T> Select for &'self Port<T> { }
535 impl<T> SelectInner for Port<T> {
537 fn optimistic_check(&mut self) -> bool {
538 (&*self).optimistic_check()
542 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
543 (&*self).block_on(sched, task)
547 fn unblock_from(&mut self) -> bool {
548 (&*self).unblock_from()
552 impl<T> Select for Port<T> { }
554 impl<'self, T> SelectPortInner<T> for &'self Port<T> {
555 fn recv_ready(self) -> Option<T> {
556 match self.next.take().recv_ready() {
557 Some(StreamPayload { val, next }) => {
558 self.next.put_back(next);
566 impl<'self, T> SelectPort<T> for &'self Port<T> { }
568 pub struct SharedChan<T> {
569 // Just like Chan, but a shared AtomicOption instead of Cell
570 priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
573 impl<T> SharedChan<T> {
574 pub fn new(chan: Chan<T>) -> SharedChan<T> {
575 let next = chan.next.take();
576 let next = AtomicOption::new(~next);
577 SharedChan { next: UnsafeArc::new(next) }
581 impl<T: Send> SharedChan<T> {
582 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
584 let (next_pone, next_cone) = oneshot();
585 let cone = (*self.next.get()).swap(~next_cone, SeqCst);
586 cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
592 impl<T: Send> GenericChan<T> for SharedChan<T> {
593 fn send(&self, val: T) {
598 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
599 fn try_send(&self, val: T) -> bool {
600 self.try_send_inner(val, true)
604 impl<T: Send> SendDeferred<T> for SharedChan<T> {
605 fn send_deferred(&self, val: T) {
606 self.try_send_deferred(val);
608 fn try_send_deferred(&self, val: T) -> bool {
609 self.try_send_inner(val, false)
613 impl<T> Clone for SharedChan<T> {
614 fn clone(&self) -> SharedChan<T> {
616 next: self.next.clone()
621 pub struct SharedPort<T> {
622 // The next port on which we will receive the next port on which we will receive T
623 priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
626 impl<T> SharedPort<T> {
627 pub fn new(port: Port<T>) -> SharedPort<T> {
628 // Put the data port into a new link pipe
629 let next_data_port = port.next.take();
630 let (next_link_port, next_link_chan) = oneshot();
631 next_link_chan.send(next_data_port);
632 let next_link = AtomicOption::new(~next_link_port);
633 SharedPort { next_link: UnsafeArc::new(next_link) }
637 impl<T: Send> GenericPort<T> for SharedPort<T> {
638 fn recv(&self) -> T {
639 match self.try_recv() {
642 fail!("receiving on a closed channel");
647 fn try_recv(&self) -> Option<T> {
649 let (next_link_port, next_link_chan) = oneshot();
650 let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
651 let link_port = link_port.unwrap();
652 let data_port = link_port.recv();
653 let (next_data_port, res) = match data_port.try_recv() {
654 Some(StreamPayload { val, next }) => {
658 let (next_data_port, _) = oneshot();
659 (next_data_port, None)
662 next_link_chan.send(next_data_port);
668 impl<T> Clone for SharedPort<T> {
669 fn clone(&self) -> SharedPort<T> {
671 next_link: self.next_link.clone()
676 // FIXME #7760: Need better name
677 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
679 pub fn megapipe<T: Send>() -> MegaPipe<T> {
680 let (port, chan) = stream();
681 (SharedPort::new(port), SharedChan::new(chan))
684 impl<T: Send> GenericChan<T> for MegaPipe<T> {
685 fn send(&self, val: T) {
686 self.second_ref().send(val)
690 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
691 fn try_send(&self, val: T) -> bool {
692 self.second_ref().try_send(val)
696 impl<T: Send> GenericPort<T> for MegaPipe<T> {
697 fn recv(&self) -> T {
698 self.first_ref().recv()
701 fn try_recv(&self) -> Option<T> {
702 self.first_ref().try_recv()
706 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
707 fn send_deferred(&self, val: T) {
708 self.second_ref().send_deferred(val)
710 fn try_send_deferred(&self, val: T) -> bool {
711 self.second_ref().try_send_deferred(val)
725 fn oneshot_single_thread_close_port_first() {
726 // Simple test of closing without sending
727 do run_in_newsched_task {
728 let (port, _chan) = oneshot::<int>();
734 fn oneshot_single_thread_close_chan_first() {
735 // Simple test of closing without sending
736 do run_in_newsched_task {
737 let (_port, chan) = oneshot::<int>();
743 fn oneshot_single_thread_send_port_close() {
744 // Testing that the sender cleans up the payload if receiver is closed
745 do run_in_newsched_task {
746 let (port, chan) = oneshot::<~int>();
753 fn oneshot_single_thread_recv_chan_close() {
754 // Receiving on a closed chan will fail
755 do run_in_newsched_task {
756 let res = do spawntask_try {
757 let (port, chan) = oneshot::<~int>();
762 rtdebug!("res is: %?", res.is_err());
763 assert!(res.is_err());
768 fn oneshot_single_thread_send_then_recv() {
769 do run_in_newsched_task {
770 let (port, chan) = oneshot::<~int>();
772 assert!(port.recv() == ~10);
777 fn oneshot_single_thread_try_send_open() {
778 do run_in_newsched_task {
779 let (port, chan) = oneshot::<int>();
780 assert!(chan.try_send(10));
781 assert!(port.recv() == 10);
786 fn oneshot_single_thread_try_send_closed() {
787 do run_in_newsched_task {
788 let (port, chan) = oneshot::<int>();
790 assert!(!chan.try_send(10));
795 fn oneshot_single_thread_try_recv_open() {
796 do run_in_newsched_task {
797 let (port, chan) = oneshot::<int>();
799 assert!(port.try_recv() == Some(10));
804 fn oneshot_single_thread_try_recv_closed() {
805 do run_in_newsched_task {
806 let (port, chan) = oneshot::<int>();
808 assert!(port.try_recv() == None);
813 fn oneshot_single_thread_peek_data() {
814 do run_in_newsched_task {
815 let (port, chan) = oneshot::<int>();
816 assert!(!port.peek());
818 assert!(port.peek());
823 fn oneshot_single_thread_peek_close() {
824 do run_in_newsched_task {
825 let (port, chan) = oneshot::<int>();
827 assert!(!port.peek());
828 assert!(!port.peek());
833 fn oneshot_single_thread_peek_open() {
834 do run_in_newsched_task {
835 let (port, _) = oneshot::<int>();
836 assert!(!port.peek());
841 fn oneshot_multi_task_recv_then_send() {
842 do run_in_newsched_task {
843 let (port, chan) = oneshot::<~int>();
844 let port_cell = Cell::new(port);
846 assert!(port_cell.take().recv() == ~10);
854 fn oneshot_multi_task_recv_then_close() {
855 do run_in_newsched_task {
856 let (port, chan) = oneshot::<~int>();
857 let port_cell = Cell::new(port);
858 let chan_cell = Cell::new(chan);
860 let _cell = chan_cell.take();
862 let res = do spawntask_try {
863 assert!(port_cell.take().recv() == ~10);
865 assert!(res.is_err());
870 fn oneshot_multi_thread_close_stress() {
871 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
872 do stress_factor().times {
873 do run_in_newsched_task {
874 let (port, chan) = oneshot::<int>();
875 let port_cell = Cell::new(port);
876 let thread = do spawntask_thread {
877 let _p = port_cell.take();
886 fn oneshot_multi_thread_send_close_stress() {
887 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
888 do stress_factor().times {
889 do run_in_newsched_task {
890 let (port, chan) = oneshot::<int>();
891 let chan_cell = Cell::new(chan);
892 let port_cell = Cell::new(port);
893 let thread1 = do spawntask_thread {
894 let _p = port_cell.take();
896 let thread2 = do spawntask_thread {
897 let c = chan_cell.take();
907 fn oneshot_multi_thread_recv_close_stress() {
908 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
909 do stress_factor().times {
910 do run_in_newsched_task {
911 let (port, chan) = oneshot::<int>();
912 let chan_cell = Cell::new(chan);
913 let port_cell = Cell::new(port);
914 let thread1 = do spawntask_thread {
915 let port_cell = Cell::new(port_cell.take());
916 let res = do spawntask_try {
917 port_cell.take().recv();
919 assert!(res.is_err());
921 let thread2 = do spawntask_thread {
922 let chan_cell = Cell::new(chan_cell.take());
934 fn oneshot_multi_thread_send_recv_stress() {
935 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
936 do stress_factor().times {
937 do run_in_newsched_task {
938 let (port, chan) = oneshot::<~int>();
939 let chan_cell = Cell::new(chan);
940 let port_cell = Cell::new(port);
941 let thread1 = do spawntask_thread {
942 chan_cell.take().send(~10);
944 let thread2 = do spawntask_thread {
945 assert!(port_cell.take().recv() == ~10);
954 fn stream_send_recv_stress() {
955 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
956 do stress_factor().times {
957 do run_in_mt_newsched_task {
958 let (port, chan) = stream::<~int>();
963 fn send(chan: Chan<~int>, i: int) {
964 if i == 10 { return }
966 let chan_cell = Cell::new(chan);
967 do spawntask_random {
968 let chan = chan_cell.take();
974 fn recv(port: Port<~int>, i: int) {
975 if i == 10 { return }
977 let port_cell = Cell::new(port);
978 do spawntask_random {
979 let port = port_cell.take();
980 assert!(port.recv() == ~i);
990 // Regression test that we don't run out of stack in scheduler context
991 do run_in_newsched_task {
992 let (port, chan) = stream();
993 do 10000.times { chan.send(()) }
994 do 10000.times { port.recv() }
999 fn shared_chan_stress() {
1000 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1001 do run_in_mt_newsched_task {
1002 let (port, chan) = stream();
1003 let chan = SharedChan::new(chan);
1004 let total = stress_factor() + 100;
1006 let chan_clone = chan.clone();
1007 do spawntask_random {
1008 chan_clone.send(());
1019 fn shared_port_stress() {
1020 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1021 do run_in_mt_newsched_task {
1022 let (end_port, end_chan) = stream();
1023 let (port, chan) = stream();
1024 let end_chan = SharedChan::new(end_chan);
1025 let port = SharedPort::new(port);
1026 let total = stress_factor() + 100;
1028 let end_chan_clone = end_chan.clone();
1029 let port_clone = port.clone();
1030 do spawntask_random {
1032 end_chan_clone.send(());
1047 fn shared_port_close_simple() {
1048 do run_in_mt_newsched_task {
1049 let (port, chan) = stream::<()>();
1050 let port = SharedPort::new(port);
1051 { let _chan = chan; }
1052 assert!(port.try_recv().is_none());
1057 fn shared_port_close() {
1058 do run_in_mt_newsched_task {
1059 let (end_port, end_chan) = stream::<bool>();
1060 let (port, chan) = stream::<()>();
1061 let end_chan = SharedChan::new(end_chan);
1062 let port = SharedPort::new(port);
1063 let chan = SharedChan::new(chan);
1064 let send_total = 10;
1065 let recv_total = 20;
1066 do spawntask_random {
1067 do send_total.times {
1068 let chan_clone = chan.clone();
1069 do spawntask_random {
1070 chan_clone.send(());
1074 let end_chan_clone = end_chan.clone();
1075 do spawntask_random {
1076 do recv_total.times {
1077 let port_clone = port.clone();
1078 let end_chan_clone = end_chan_clone.clone();
1079 do spawntask_random {
1080 let recvd = port_clone.try_recv().is_some();
1081 end_chan_clone.send(recvd);
1087 do recv_total.times {
1088 recvd += if end_port.recv() { 1 } else { 0 };
1091 assert!(recvd == send_total);
1096 fn megapipe_stress() {
1100 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1102 do run_in_mt_newsched_task {
1103 let (end_port, end_chan) = stream::<()>();
1104 let end_chan = SharedChan::new(end_chan);
1105 let pipe = megapipe();
1106 let total = stress_factor() + 10;
1107 let mut rng = rand::rng();
1109 let msgs = rng.gen_uint_range(0, 10);
1110 let pipe_clone = pipe.clone();
1111 let end_chan_clone = end_chan.clone();
1112 do spawntask_random {
1114 pipe_clone.send(());
1121 end_chan_clone.send(());
1131 fn send_deferred() {
1132 use unstable::sync::atomically;
1134 // Tests no-rescheduling of send_deferred on all types of channels.
1135 do run_in_newsched_task {
1136 let (pone, cone) = oneshot();
1137 let (pstream, cstream) = stream();
1138 let (pshared, cshared) = stream();
1139 let cshared = SharedChan::new(cshared);
1140 let mp = megapipe();
1142 let pone = Cell::new(pone);
1143 do spawntask { pone.take().recv(); }
1144 let pstream = Cell::new(pstream);
1145 do spawntask { pstream.take().recv(); }
1146 let pshared = Cell::new(pshared);
1147 do spawntask { pshared.take().recv(); }
1148 let p_mp = Cell::new(mp.clone());
1149 do spawntask { p_mp.take().recv(); }
1151 let cs = Cell::new((cone, cstream, cshared, mp));
1154 let (cone, cstream, cshared, mp) = cs.take();
1155 cone.send_deferred(());
1156 cstream.send_deferred(());
1157 cshared.send_deferred(());
1158 mp.send_deferred(());