]> git.lizzy.rs Git - rust.git/commitdiff
Add SendDeferred trait and use it to fix #8214.
authorBen Blum <bblum@andrew.cmu.edu>
Fri, 2 Aug 2013 01:57:15 +0000 (21:57 -0400)
committerBen Blum <bblum@andrew.cmu.edu>
Fri, 2 Aug 2013 21:31:44 +0000 (17:31 -0400)
src/libextra/sync.rs
src/libstd/comm.rs
src/libstd/rt/comm.rs

index e539b067edd1ce01c60600ae5ddbf004e8650cff..045aeb0feda0a1cfccba3ad03a8992afe304d765 100644 (file)
@@ -18,6 +18,7 @@
 
 use std::borrow;
 use std::comm;
+use std::comm::SendDeferred;
 use std::task;
 use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
 use std::unstable::atomics;
@@ -49,7 +50,7 @@ fn signal(&self) -> bool {
         if self.head.peek() {
             // Pop and send a wakeup signal. If the waiter was killed, its port
             // will have closed. Keep trying until we get a live task.
-            if comm::try_send_one(self.head.recv(), ()) {
+            if self.head.recv().try_send_deferred(()) {
                 true
             } else {
                 self.signal()
@@ -62,7 +63,7 @@ fn signal(&self) -> bool {
     fn broadcast(&self) -> uint {
         let mut count = 0;
         while self.head.peek() {
-            if comm::try_send_one(self.head.recv(), ()) {
+            if self.head.recv().try_send_deferred(()) {
                 count += 1;
             }
         }
@@ -102,7 +103,7 @@ pub fn acquire(&self) {
                     // Tell outer scope we need to block.
                     waiter_nobe = Some(WaitEnd);
                     // Enqueue ourself.
-                    state.waiters.tail.send(SignalEnd);
+                    state.waiters.tail.send_deferred(SignalEnd);
                 }
             }
             // Uncomment if you wish to test for sem races. Not valgrind-friendly.
@@ -256,7 +257,7 @@ pub fn wait_on(&self, condvar_id: uint) {
                         }
                         // Enqueue ourself to be woken up by a signaller.
                         let SignalEnd = SignalEnd.take_unwrap();
-                        state.blocked[condvar_id].tail.send(SignalEnd);
+                        state.blocked[condvar_id].tail.send_deferred(SignalEnd);
                     } else {
                         out_of_bounds = Some(state.blocked.len());
                     }
index acdf2cee841f4cdc983a931c19a866ea98eec125..a0731dc3494c20d617daa4005be8bf55befdd9a5 100644 (file)
@@ -19,6 +19,7 @@
 use kinds::Send;
 use option::{Option, Some};
 use unstable::sync::Exclusive;
+pub use rt::comm::SendDeferred;
 use rtcomm = rt::comm;
 use rt;
 
@@ -105,6 +106,21 @@ fn try_send(&self, x: T) -> bool {
     }
 }
 
+impl<T: Send> SendDeferred<T> for Chan<T> {
+    fn send_deferred(&self, x: T) {
+        match self.inner {
+            Left(ref chan) => chan.send(x),
+            Right(ref chan) => chan.send_deferred(x)
+        }
+    }
+    fn try_send_deferred(&self, x: T) -> bool {
+        match self.inner {
+            Left(ref chan) => chan.try_send(x),
+            Right(ref chan) => chan.try_send_deferred(x)
+        }
+    }
+}
+
 impl<T: Send> GenericPort<T> for Port<T> {
     fn recv(&self) -> T {
         match self.inner {
@@ -250,6 +266,20 @@ pub fn try_send(self, data: T) -> bool {
             Right(p) => p.try_send(data)
         }
     }
+    pub fn send_deferred(self, data: T) {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.send(data),
+            Right(p) => p.send_deferred(data)
+        }
+    }
+    pub fn try_send_deferred(self, data: T) -> bool {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.try_send(data),
+            Right(p) => p.try_send_deferred(data)
+        }
+    }
 }
 
 pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
index 00e1aaa2193252e1124185d7d1b3ff628de2443d..c19ac8aa33714721cb6b498eeb2c1ced82b515f8 100644 (file)
@@ -25,6 +25,7 @@
 use cell::Cell;
 use clone::Clone;
 use rt::{context, SchedulerContext};
+use tuple::ImmutableTuple;
 
 /// A combined refcount / BlockedTask-as-uint pointer.
 ///
@@ -86,12 +87,32 @@ fn packet(&self) -> *mut Packet<T> {
         }
     }
 
+    /// Send a message on the one-shot channel. If a receiver task is blocked
+    /// waiting for the message, will wake it up and reschedule to it.
     pub fn send(self, val: T) {
         self.try_send(val);
     }
 
+    /// As `send`, but also returns whether or not the receiver endpoint is still open.
     pub fn try_send(self, val: T) -> bool {
+        self.try_send_inner(val, true)
+    }
+
+    /// Send a message without immediately rescheduling to a blocked receiver.
+    /// This can be useful in contexts where rescheduling is forbidden, or to
+    /// optimize for when the sender expects to still have useful work to do.
+    pub fn send_deferred(self, val: T) {
+        self.try_send_deferred(val);
+    }
+
+    /// As `send_deferred` and `try_send` together.
+    pub fn try_send_deferred(self, val: T) -> bool {
+        self.try_send_inner(val, false)
+    }
 
+    // 'do_resched' configures whether the scheduler immediately switches to
+    // the receiving task, or leaves the sending task still running.
+    fn try_send_inner(self, val: T, do_resched: bool) -> bool {
         rtassert!(context() != SchedulerContext);
 
         let mut this = self;
@@ -130,9 +151,16 @@ pub fn try_send(self, val: T) -> bool {
                 task_as_state => {
                     // Port is blocked. Wake it up.
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    do recvr.wake().map_consume |woken_task| {
-                        Scheduler::run_task(woken_task);
-                    };
+                    if do_resched {
+                        do recvr.wake().map_consume |woken_task| {
+                            Scheduler::run_task(woken_task);
+                        };
+                    } else {
+                        let recvr = Cell::new(recvr);
+                        do Local::borrow::<Scheduler, ()> |sched| {
+                            sched.enqueue_blocked_task(recvr.take());
+                        }
+                    }
                 }
             }
         }
@@ -152,6 +180,7 @@ fn packet(&self) -> *mut Packet<T> {
         }
     }
 
+    /// Wait for a message on the one-shot port. Fails if the send end is closed.
     pub fn recv(self) -> T {
         match self.try_recv() {
             Some(val) => val,
@@ -161,6 +190,7 @@ pub fn recv(self) -> T {
         }
     }
 
+    /// As `recv`, but returns `None` if the send end is closed rather than failing.
     pub fn try_recv(self) -> Option<T> {
         let mut this = self;
 
@@ -382,6 +412,12 @@ fn drop(&self) {
     }
 }
 
+/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
+pub trait SendDeferred<T> {
+    fn send_deferred(&self, val: T);
+    fn try_send_deferred(&self, val: T) -> bool;
+}
+
 struct StreamPayload<T> {
     val: T,
     next: PortOne<StreamPayload<T>>
@@ -409,6 +445,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
     return (port, chan);
 }
 
+impl<T: Send> Chan<T> {
+    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+        let (next_pone, next_cone) = oneshot();
+        let cone = self.next.take();
+        self.next.put_back(next_cone);
+        cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
+    }
+}
+
 impl<T: Send> GenericChan<T> for Chan<T> {
     fn send(&self, val: T) {
         self.try_send(val);
@@ -417,10 +462,16 @@ fn send(&self, val: T) {
 
 impl<T: Send> GenericSmartChan<T> for Chan<T> {
     fn try_send(&self, val: T) -> bool {
-        let (next_pone, next_cone) = oneshot();
-        let cone = self.next.take();
-        self.next.put_back(next_cone);
-        cone.try_send(StreamPayload { val: val, next: next_pone })
+        self.try_send_inner(val, true)
+    }
+}
+
+impl<T: Send> SendDeferred<T> for Chan<T> {
+    fn send_deferred(&self, val: T) {
+        self.try_send_deferred(val);
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.try_send_inner(val, false)
     }
 }
 
@@ -495,6 +546,17 @@ pub fn new(chan: Chan<T>) -> SharedChan<T> {
     }
 }
 
+impl<T: Send> SharedChan<T> {
+    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+        unsafe {
+            let (next_pone, next_cone) = oneshot();
+            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
+            cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
+                                         do_resched)
+        }
+    }
+}
+
 impl<T: Send> GenericChan<T> for SharedChan<T> {
     fn send(&self, val: T) {
         self.try_send(val);
@@ -503,11 +565,16 @@ fn send(&self, val: T) {
 
 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
     fn try_send(&self, val: T) -> bool {
-        unsafe {
-            let (next_pone, next_cone) = oneshot();
-            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
-            cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
-        }
+        self.try_send_inner(val, true)
+    }
+}
+
+impl<T: Send> SendDeferred<T> for SharedChan<T> {
+    fn send_deferred(&self, val: T) {
+        self.try_send_deferred(val);
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.try_send_inner(val, false)
     }
 }
 
@@ -584,31 +651,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> {
 
 impl<T: Send> GenericChan<T> for MegaPipe<T> {
     fn send(&self, val: T) {
-        match *self {
-            (_, ref c) => c.send(val)
-        }
+        self.second_ref().send(val)
     }
 }
 
 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
     fn try_send(&self, val: T) -> bool {
-        match *self {
-            (_, ref c) => c.try_send(val)
-        }
+        self.second_ref().try_send(val)
     }
 }
 
 impl<T: Send> GenericPort<T> for MegaPipe<T> {
     fn recv(&self) -> T {
-        match *self {
-            (ref p, _) => p.recv()
-        }
+        self.first_ref().recv()
     }
 
     fn try_recv(&self) -> Option<T> {
-        match *self {
-            (ref p, _) => p.try_recv()
-        }
+        self.first_ref().try_recv()
+    }
+}
+
+impl<T: Send> SendDeferred<T> for MegaPipe<T> {
+    fn send_deferred(&self, val: T) {
+        self.second_ref().send_deferred(val)
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.second_ref().try_send_deferred(val)
     }
 }
 
@@ -1017,4 +1085,39 @@ fn megapipe_stress() {
             }
         }
     }
+
+    #[test]
+    fn send_deferred() {
+        use unstable::sync::atomically;
+
+        // Tests no-rescheduling of send_deferred on all types of channels.
+        do run_in_newsched_task {
+            let (pone, cone) = oneshot();
+            let (pstream, cstream) = stream();
+            let (pshared, cshared) = stream();
+            let cshared = SharedChan::new(cshared);
+            let mp = megapipe();
+
+            let pone = Cell::new(pone);
+            do spawntask { pone.take().recv(); }
+            let pstream = Cell::new(pstream);
+            do spawntask { pstream.take().recv(); }
+            let pshared = Cell::new(pshared);
+            do spawntask { pshared.take().recv(); }
+            let p_mp = Cell::new(mp.clone());
+            do spawntask { p_mp.take().recv(); }
+
+            let cs = Cell::new((cone, cstream, cshared, mp));
+            unsafe {
+                do atomically {
+                    let (cone, cstream, cshared, mp) = cs.take();
+                    cone.send_deferred(());
+                    cstream.send_deferred(());
+                    cshared.send_deferred(());
+                    mp.send_deferred(());
+                }
+            }
+        }
+    }
+
 }