]> git.lizzy.rs Git - rust.git/commitdiff
std: Reimplement std::comm without the scheduler
authorAlex Crichton <alex@alexcrichton.com>
Fri, 13 Dec 2013 01:53:05 +0000 (17:53 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Tue, 24 Dec 2013 22:42:00 +0000 (14:42 -0800)
Like the librustuv refactoring, this refactors std::comm to sever all ties with
the scheduler. This means that the entire `comm::imp` module can be deleted in
favor of implementations outside of libstd.

src/libstd/comm/imp.rs [deleted file]
src/libstd/comm/mod.rs
src/libstd/comm/select.rs

diff --git a/src/libstd/comm/imp.rs b/src/libstd/comm/imp.rs
deleted file mode 100644 (file)
index bd1d6fe..0000000
+++ /dev/null
@@ -1,337 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! One of the major goals behind this channel implementation is to work
-//! seamlessly on and off the runtime. This also means that the code isn't
-//! littered with "if is_green() { ... } else { ... }". Right now, the rest of
-//! the runtime isn't quite ready to for this abstraction to be done very
-//! nicely, so the conditional "if green" blocks are all contained in this inner
-//! module.
-//!
-//! The goal of this module is to mirror what the runtime "should be", not the
-//! state that it is currently in today. You'll notice that there is no mention
-//! of schedulers or is_green inside any of the channel code, it is currently
-//! entirely contained in this one module.
-//!
-//! In the ideal world, nothing in this module exists and it is all implemented
-//! elsewhere in the runtime (in the proper location). All of this code is
-//! structured in order to easily refactor this to the correct location whenever
-//! we have the trait objects in place to serve as the boundary of the
-//! abstraction.
-
-use iter::{range, Iterator};
-use ops::Drop;
-use option::{Some, None, Option};
-use rt::local::Local;
-use rt::sched::{SchedHandle, Scheduler, TaskFromFriend};
-use rt::thread::Thread;
-use rt;
-use unstable::mutex::Mutex;
-use unstable::sync::UnsafeArc;
-
-// A task handle is a method of waking up a blocked task. The handle itself
-// is completely opaque and only has a wake() method defined on it. This
-// method will wake the method regardless of the context of the thread which
-// is currently calling wake().
-//
-// This abstraction should be able to be created when putting a task to
-// sleep. This should basically be a method on whatever the local Task is,
-// consuming the local Task.
-
-pub struct TaskHandle {
-    priv inner: TaskRepr
-}
-enum TaskRepr {
-    Green(rt::BlockedTask, *mut SchedHandle),
-    Native(NativeWakeupStyle),
-}
-enum NativeWakeupStyle {
-    ArcWakeup(UnsafeArc<Mutex>),    // shared mutex to synchronize on
-    LocalWakeup(*mut Mutex),        // synchronize on the task-local mutex
-}
-
-impl TaskHandle {
-    // Signal that this handle should be woken up. The `can_resched`
-    // argument indicates whether the current task could possibly be
-    // rescheduled or not. This does not have a lot of meaning for the
-    // native case, but for an M:N case it indicates whether a context
-    // switch can happen or not.
-    pub fn wake(self, can_resched: bool) {
-        match self.inner {
-            Green(task, handle) => {
-                // If we have a local scheduler, then use that to run the
-                // blocked task, otherwise we can use the handle to send the
-                // task back to its home.
-                if rt::in_green_task_context() {
-                    if can_resched {
-                        task.wake().map(Scheduler::run_task);
-                    } else {
-                        let mut s: ~Scheduler = Local::take();
-                        s.enqueue_blocked_task(task);
-                        Local::put(s);
-                    }
-                } else {
-                    let task = match task.wake() {
-                        Some(task) => task, None => return
-                    };
-                    // XXX: this is not an easy section of code to refactor.
-                    //      If this handle is owned by the Task (which it
-                    //      should be), then this would be a use-after-free
-                    //      because once the task is pushed onto the message
-                    //      queue, the handle is gone.
-                    //
-                    //      Currently the handle is instead owned by the
-                    //      Port/Chan pair, which means that because a
-                    //      channel is invoking this method the handle will
-                    //      continue to stay alive for the entire duration
-                    //      of this method. This will require thought when
-                    //      moving the handle into the task.
-                    unsafe { (*handle).send(TaskFromFriend(task)) }
-                }
-            }
-
-            // Note that there are no use-after-free races in this code. In
-            // the arc-case, we own the lock, and in the local case, we're
-            // using a lock so it's guranteed that they aren't running while
-            // we hold the lock.
-            Native(ArcWakeup(lock)) => {
-                unsafe {
-                    let lock = lock.get();
-                    (*lock).lock();
-                    (*lock).signal();
-                    (*lock).unlock();
-                }
-            }
-            Native(LocalWakeup(lock)) => {
-                unsafe {
-                    (*lock).lock();
-                    (*lock).signal();
-                    (*lock).unlock();
-                }
-            }
-        }
-    }
-
-    // Trashes handle to this task. This ensures that necessary memory is
-    // deallocated, and there may be some extra assertions as well.
-    pub fn trash(self) {
-        match self.inner {
-            Green(task, _) => task.assert_already_awake(),
-            Native(..) => {}
-        }
-    }
-}
-
-// This structure is an abstraction of what should be stored in the local
-// task itself. This data is currently stored inside of each channel, but
-// this should rather be stored in each task (and channels will still
-// continue to lazily initialize this data).
-
-pub struct TaskData {
-    priv handle: Option<SchedHandle>,
-    priv lock: Mutex,
-}
-
-impl TaskData {
-    pub fn new() -> TaskData {
-        TaskData {
-            handle: None,
-            lock: unsafe { Mutex::empty() },
-        }
-    }
-}
-
-impl Drop for TaskData {
-    fn drop(&mut self) {
-        unsafe { self.lock.destroy() }
-    }
-}
-
-// Now this is the really fun part. This is where all the M:N/1:1-agnostic
-// along with recv/select-agnostic blocking information goes. A "blocking
-// context" is really just a stack-allocated structure (which is probably
-// fine to be a stack-trait-object).
-//
-// This has some particularly strange interfaces, but the reason for all
-// this is to support selection/recv/1:1/M:N all in one bundle.
-
-pub struct BlockingContext<'a> {
-    priv inner: BlockingRepr<'a>
-}
-
-enum BlockingRepr<'a> {
-    GreenBlock(rt::BlockedTask, &'a mut Scheduler),
-    NativeBlock(Option<UnsafeArc<Mutex>>),
-}
-
-impl<'a> BlockingContext<'a> {
-    // Creates one blocking context. The data provided should in theory be
-    // acquired from the local task, but it is instead acquired from the
-    // channel currently.
-    //
-    // This function will call `f` with a blocking context, plus the data
-    // that it is given. This function will then return whether this task
-    // should actually go to sleep or not. If `true` is returned, then this
-    // function does not return until someone calls `wake()` on the task.
-    // If `false` is returned, then this function immediately returns.
-    //
-    // # Safety note
-    //
-    // Note that this stack closure may not be run on the same stack as when
-    // this function was called. This means that the environment of this
-    // stack closure could be unsafely aliased. This is currently prevented
-    // through the guarantee that this function will never return before `f`
-    // finishes executing.
-    pub fn one(data: &mut TaskData,
-               f: |BlockingContext, &mut TaskData| -> bool) {
-        if rt::in_green_task_context() {
-            let sched: ~Scheduler = Local::take();
-            sched.deschedule_running_task_and_then(|sched, task| {
-                let ctx = BlockingContext { inner: GreenBlock(task, sched) };
-                // no need to do something on success/failure other than
-                // returning because the `block` function for a BlockingContext
-                // takes care of reawakening itself if the blocking procedure
-                // fails. If this function is successful, then we're already
-                // blocked, and if it fails, the task will already be
-                // rescheduled.
-                f(ctx, data);
-            });
-        } else {
-            unsafe { data.lock.lock(); }
-            let ctx = BlockingContext { inner: NativeBlock(None) };
-            if f(ctx, data) {
-                unsafe { data.lock.wait(); }
-            }
-            unsafe { data.lock.unlock(); }
-        }
-    }
-
-    // Creates many blocking contexts. The intended use case for this
-    // function is selection over a number of ports. This will create `amt`
-    // blocking contexts, yielding them to `f` in turn. If `f` returns
-    // false, then this function aborts and returns immediately. If `f`
-    // repeatedly returns `true` `amt` times, then this function will block.
-    pub fn many(amt: uint, f: |BlockingContext| -> bool) {
-        if rt::in_green_task_context() {
-            let sched: ~Scheduler = Local::take();
-            sched.deschedule_running_task_and_then(|sched, task| {
-                for handle in task.make_selectable(amt) {
-                    let ctx = BlockingContext {
-                        inner: GreenBlock(handle, sched)
-                    };
-                    // see comment above in `one` for why no further action is
-                    // necessary here
-                    if !f(ctx) { break }
-                }
-            });
-        } else {
-            // In the native case, our decision to block must be shared
-            // amongst all of the channels. It may be possible to
-            // stack-allocate this mutex (instead of putting it in an
-            // UnsafeArc box), but for now in order to prevent
-            // use-after-free trivially we place this into a box and then
-            // pass that around.
-            unsafe {
-                let mtx = UnsafeArc::new(Mutex::new());
-                (*mtx.get()).lock();
-                let success = range(0, amt).all(|_| {
-                    f(BlockingContext {
-                        inner: NativeBlock(Some(mtx.clone()))
-                    })
-                });
-                if success {
-                    (*mtx.get()).wait();
-                }
-                (*mtx.get()).unlock();
-            }
-        }
-    }
-
-    // This function will consume this BlockingContext, and optionally block
-    // if according to the atomic `decision` function. The semantics of this
-    // functions are:
-    //
-    //  * `slot` is required to be a `None`-slot (which is owned by the
-    //    channel)
-    //  * The `slot` will be filled in with a blocked version of the current
-    //    task (with `wake`-ability if this function is successful).
-    //  * If the `decision` function returns true, then this function
-    //    immediately returns having relinquished ownership of the task.
-    //  * If the `decision` function returns false, then the `slot` is reset
-    //    to `None` and the task is re-scheduled if necessary (remember that
-    //    the task will not resume executing before the outer `one` or
-    //    `many` function has returned. This function is expected to have a
-    //    release memory fence in order for the modifications of `to_wake` to be
-    //    visible to other tasks. Code which attempts to read `to_wake` should
-    //    have an acquiring memory fence to guarantee that this write is
-    //    visible.
-    //
-    // This function will return whether the blocking occurred or not.
-    pub fn block(self,
-                 data: &mut TaskData,
-                 slot: &mut Option<TaskHandle>,
-                 decision: || -> bool) -> bool {
-        assert!(slot.is_none());
-        match self.inner {
-            GreenBlock(task, sched) => {
-                if data.handle.is_none() {
-                    data.handle = Some(sched.make_handle());
-                }
-                let handle = data.handle.get_mut_ref() as *mut SchedHandle;
-                *slot = Some(TaskHandle { inner: Green(task, handle) });
-
-                if !decision() {
-                    match slot.take_unwrap().inner {
-                        Green(task, _) => sched.enqueue_blocked_task(task),
-                        Native(..) => unreachable!()
-                    }
-                    false
-                } else {
-                    true
-                }
-            }
-            NativeBlock(shared) => {
-                *slot = Some(TaskHandle {
-                    inner: Native(match shared {
-                        Some(arc) => ArcWakeup(arc),
-                        None => LocalWakeup(&mut data.lock as *mut Mutex),
-                    })
-                });
-
-                if !decision() {
-                    *slot = None;
-                    false
-                } else {
-                    true
-                }
-            }
-        }
-    }
-}
-
-// Agnostic method of forcing a yield of the current task
-pub fn yield_now() {
-    if rt::in_green_task_context() {
-        let sched: ~Scheduler = Local::take();
-        sched.yield_now();
-    } else {
-        Thread::yield_now();
-    }
-}
-
-// Agnostic method of "maybe yielding" in order to provide fairness
-pub fn maybe_yield() {
-    if rt::in_green_task_context() {
-        let sched: ~Scheduler = Local::take();
-        sched.maybe_yield();
-    } else {
-        // the OS decides fairness, nothing for us to do.
-    }
-}
index 33d5d48ebdcae925035f9c3681e6027eb11e7525..f5048ec62a402ed2621ea7b72605099d0be50230 100644 (file)
 use kinds::Send;
 use ops::Drop;
 use option::{Option, Some, None};
+use result::{Ok, Err};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
 use rt::thread::Thread;
-use unstable::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
+use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
+use task;
 use vec::{ImmutableVector, OwnedVector};
 
-use spsc = rt::spsc_queue;
-use mpsc = rt::mpsc_queue;
+use spsc = sync::spsc_queue;
+use mpsc = sync::mpsc_queue;
 
-use self::imp::{TaskHandle, TaskData, BlockingContext};
 pub use self::select::Select;
 
 macro_rules! test (
@@ -265,7 +268,6 @@ fn native() {
     )
 )
 
-mod imp;
 mod select;
 
 ///////////////////////////////////////////////////////////////////////////////
@@ -326,9 +328,7 @@ pub struct SharedChan<T> {
 struct Packet {
     cnt: AtomicInt, // How many items are on this channel
     steals: int,    // How many times has a port received without blocking?
-    to_wake: Option<TaskHandle>, // Task to wake up
-
-    data: TaskData,
+    to_wake: Option<BlockedTask>, // Task to wake up
 
     // This lock is used to wake up native threads blocked in select. The
     // `lock` field is not used because the thread blocking in select must
@@ -358,7 +358,6 @@ fn new() -> Packet {
             cnt: AtomicInt::new(0),
             steals: 0,
             to_wake: None,
-            data: TaskData::new(),
             channels: AtomicInt::new(1),
 
             selecting: AtomicBool::new(false),
@@ -418,7 +417,10 @@ fn can_recv(&self) -> bool {
     // This function must have had at least an acquire fence before it to be
     // properly called.
     fn wakeup(&mut self, can_resched: bool) {
-        self.to_wake.take_unwrap().wake(can_resched);
+        match self.to_wake.take_unwrap().wake() {
+            Some(task) => task.reawaken(can_resched),
+            None => {}
+        }
         self.selecting.store(false, Relaxed);
     }
 
@@ -607,7 +609,7 @@ fn try(&self, t: T, can_resched: bool) -> bool {
                 n => {
                     assert!(n >= 0);
                     if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
-                        imp::maybe_yield();
+                        task::deschedule();
                     }
                     true
                 }
@@ -700,7 +702,7 @@ fn try(&self, t: T, can_resched: bool) -> bool {
                 -1 => { (*packet).wakeup(can_resched); }
                 n => {
                     if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
-                        imp::maybe_yield();
+                        task::deschedule();
                     }
                 }
             }
@@ -840,8 +842,15 @@ pub fn recv_opt(&self) -> Option<T> {
         unsafe {
             this = cast::transmute_mut(self);
             packet = this.queue.packet();
-            BlockingContext::one(&mut (*packet).data, |ctx, data| {
-                ctx.block(data, &mut (*packet).to_wake, || (*packet).decrement())
+            let task: ~Task = Local::take();
+            task.deschedule(1, |task| {
+                assert!((*packet).to_wake.is_none());
+                (*packet).to_wake = Some(task);
+                if (*packet).decrement() {
+                    Ok(())
+                } else {
+                    Err((*packet).to_wake.take_unwrap())
+                }
             });
         }
 
index bbd4cfea78d7a65d81f7a2f6d7af94b296e7ab0c..68e1a05a6530272e4e3fa4e00d0eb2dc5a00a17c 100644 (file)
 use ops::Drop;
 use option::{Some, None, Option};
 use ptr::RawPtr;
-use super::imp::BlockingContext;
-use super::{Packet, Port, imp};
+use result::{Ok, Err};
+use rt::thread::Thread;
+use rt::local::Local;
+use rt::task::Task;
+use super::{Packet, Port};
+use sync::atomics::{Relaxed, SeqCst};
 use uint;
-use unstable::atomics::{Relaxed, SeqCst};
 
 macro_rules! select {
     (
@@ -184,19 +187,22 @@ pub fn wait(&self) -> uint {
             // Acquire a number of blocking contexts, and block on each one
             // sequentially until one fails. If one fails, then abort
             // immediately so we can go unblock on all the other ports.
-            BlockingContext::many(amt, |ctx| {
+            let task: ~Task = Local::take();
+            task.deschedule(amt, |task| {
+                // Prepare for the block
                 let (i, packet) = iter.next().unwrap();
+                assert!((*packet).to_wake.is_none());
+                (*packet).to_wake = Some(task);
                 (*packet).selecting.store(true, SeqCst);
-                if !ctx.block(&mut (*packet).data,
-                              &mut (*packet).to_wake,
-                              || (*packet).decrement()) {
+
+                if (*packet).decrement() {
+                    Ok(())
+                } else {
                     (*packet).abort_selection(false);
                     (*packet).selecting.store(false, SeqCst);
                     ready_index = i;
                     ready_id = (*packet).selection_id;
-                    false
-                } else {
-                    true
+                    Err((*packet).to_wake.take_unwrap())
                 }
             });
 
@@ -225,7 +231,7 @@ pub fn wait(&self) -> uint {
                 if (*packet).abort_selection(true) {
                     ready_id = (*packet).selection_id;
                     while (*packet).selecting.load(Relaxed) {
-                        imp::yield_now();
+                        task::deschedule();
                     }
                 }
             }