]> git.lizzy.rs Git - rust.git/commitdiff
std: Implement yields on receives for channels
authorAlex Crichton <alex@alexcrichton.com>
Sat, 14 Dec 2013 02:27:13 +0000 (18:27 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Wed, 25 Dec 2013 03:59:53 +0000 (19:59 -0800)
This will prevent a deadlock when a task spins in a try_recv when using channel
communication routines is a clear location for a M:N scheduling to happen.

src/libstd/comm/mod.rs

index 76a9e5d17e1018152a86512d80d91204e9abd26a..7b464bc2f3296433bab489bff322f640985c7eb2 100644 (file)
 use rt::task::{Task, BlockedTask};
 use rt::thread::Thread;
 use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
-use task;
 use vec::{ImmutableVector, OwnedVector};
 
 use spsc = sync::spsc_queue;
@@ -346,6 +345,7 @@ struct Packet {
     selection_id: uint,
     select_next: *mut Packet,
     select_prev: *mut Packet,
+    recv_cnt: int,
 }
 
 ///////////////////////////////////////////////////////////////////////////////
@@ -367,6 +367,7 @@ fn new() -> Packet {
             selection_id: 0,
             select_next: 0 as *mut Packet,
             select_prev: 0 as *mut Packet,
+            recv_cnt: 0,
         }
     }
 
@@ -611,8 +612,9 @@ fn try(&self, t: T, can_resched: bool) -> bool {
                 // the TLS overhead can be a bit much.
                 n => {
                     assert!(n >= 0);
-                    if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
-                        task::deschedule();
+                    if n > 0 && n % RESCHED_FREQ == 0 {
+                        let task: ~Task = Local::take();
+                        task.maybe_yield();
                     }
                     true
                 }
@@ -704,8 +706,9 @@ fn try(&self, t: T, can_resched: bool) -> bool {
                 DISCONNECTED => {} // oh well, we tried
                 -1 => { (*packet).wakeup(can_resched); }
                 n => {
-                    if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
-                        task::deschedule();
+                    if n > 0 && n % RESCHED_FREQ == 0 {
+                        let task: ~Task = Local::take();
+                        task.maybe_yield();
                     }
                 }
             }
@@ -773,6 +776,18 @@ fn try_recv_inc(&self, increment: bool) -> Option<T> {
         // This is a "best effort" situation, so if a queue is inconsistent just
         // don't worry about it.
         let this = unsafe { cast::transmute_mut(self) };
+
+        // See the comment about yielding on sends, but the same applies here.
+        // If a thread is spinning in try_recv we should try
+        unsafe {
+            let packet = this.queue.packet();
+            (*packet).recv_cnt += 1;
+            if (*packet).recv_cnt % RESCHED_FREQ == 0 {
+                let task: ~Task = Local::take();
+                task.maybe_yield();
+            }
+        }
+
         let ret = match this.queue {
             SPSC(ref mut queue) => queue.pop(),
             MPSC(ref mut queue) => match queue.pop() {