]> git.lizzy.rs Git - rust.git/commitdiff
Implement a basic event loop built on LittleLock
authorAlex Crichton <alex@alexcrichton.com>
Tue, 22 Oct 2013 22:00:37 +0000 (15:00 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Fri, 25 Oct 2013 06:49:11 +0000 (23:49 -0700)
It's not guaranteed that there will always be an event loop to run, and this
implementation will serve as an incredibly basic one which does not provide any
I/O, but allows the scheduler to still run.

cc #9128

12 files changed:
src/libextra/comm.rs
src/libstd/rt/basic.rs [new file with mode: 0644]
src/libstd/rt/io/mod.rs
src/libstd/rt/mod.rs
src/libstd/rt/sched.rs
src/libstd/rt/task.rs
src/libstd/rt/test.rs
src/libstd/select.rs
src/libstd/task/mod.rs
src/libstd/unstable/sync.rs
src/rt/rust_builtin.cpp
src/rt/rustrt.def.in

index 4a3801827a21120f83ede468b37bca0e3b5ffc7b..5cc5c140fd5d1503afb21a4941039df056815a5c 100644 (file)
@@ -136,7 +136,7 @@ pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
 #[cfg(test)]
 mod test {
     use comm::{DuplexStream, rendezvous};
-    use std::rt::test::run_in_newsched_task;
+    use std::rt::test::run_in_uv_task;
     use std::task::spawn_unlinked;
 
 
@@ -165,7 +165,7 @@ pub fn basic_rendezvous_test() {
     #[test]
     fn recv_a_lot() {
         // Rendezvous streams should be able to handle any number of messages being sent
-        do run_in_newsched_task {
+        do run_in_uv_task {
             let (port, chan) = rendezvous();
             do spawn {
                 do 1000000.times { chan.send(()) }
diff --git a/src/libstd/rt/basic.rs b/src/libstd/rt/basic.rs
new file mode 100644 (file)
index 0000000..86d3f8a
--- /dev/null
@@ -0,0 +1,256 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! This is a basic event loop implementation not meant for any "real purposes"
+//! other than testing the scheduler and proving that it's possible to have a
+//! pluggable event loop.
+
+use prelude::*;
+
+use cast;
+use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
+use unstable::sync::Exclusive;
+use util;
+
+/// This is the only exported function from this module.
+pub fn event_loop() -> ~EventLoop {
+    ~BasicLoop::new() as ~EventLoop
+}
+
+struct BasicLoop {
+    work: ~[~fn()],               // pending work
+    idle: Option<*BasicPausible>, // only one is allowed
+    remotes: ~[(uint, ~fn())],
+    next_remote: uint,
+    messages: Exclusive<~[Message]>
+}
+
+enum Message { RunRemote(uint), RemoveRemote(uint) }
+
+struct Time {
+    sec: u64,
+    nsec: u64,
+}
+
+impl Ord for Time {
+    fn lt(&self, other: &Time) -> bool {
+        self.sec < other.sec || self.nsec < other.nsec
+    }
+}
+
+impl BasicLoop {
+    fn new() -> BasicLoop {
+        BasicLoop {
+            work: ~[],
+            idle: None,
+            next_remote: 0,
+            remotes: ~[],
+            messages: Exclusive::new(~[]),
+        }
+    }
+
+    /// Process everything in the work queue (continually)
+    fn work(&mut self) {
+        while self.work.len() > 0 {
+            for work in util::replace(&mut self.work, ~[]).move_iter() {
+                work();
+            }
+        }
+    }
+
+    fn remote_work(&mut self) {
+        let messages = unsafe {
+            do self.messages.with |messages| {
+                if messages.len() > 0 {
+                    Some(util::replace(messages, ~[]))
+                } else {
+                    None
+                }
+            }
+        };
+        let messages = match messages {
+            Some(m) => m, None => return
+        };
+        for message in messages.iter() {
+            self.message(*message);
+        }
+    }
+
+    fn message(&mut self, message: Message) {
+        match message {
+            RunRemote(i) => {
+                match self.remotes.iter().find(|& &(id, _)| id == i) {
+                    Some(&(_, ref f)) => (*f)(),
+                    None => unreachable!()
+                }
+            }
+            RemoveRemote(i) => {
+                match self.remotes.iter().position(|&(id, _)| id == i) {
+                    Some(i) => { self.remotes.remove(i); }
+                    None => unreachable!()
+                }
+            }
+        }
+    }
+
+    /// Run the idle callback if one is registered
+    fn idle(&mut self) {
+        unsafe {
+            match self.idle {
+                Some(idle) => {
+                    if (*idle).active {
+                        (*(*idle).work.get_ref())();
+                    }
+                }
+                None => {}
+            }
+        }
+    }
+
+    fn has_idle(&self) -> bool {
+        unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
+    }
+}
+
+impl EventLoop for BasicLoop {
+    fn run(&mut self) {
+        // Not exactly efficient, but it gets the job done.
+        while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
+
+            self.work();
+            self.remote_work();
+
+            if self.has_idle() {
+                self.idle();
+                continue
+            }
+
+            unsafe {
+                // We block here if we have no messages to process and we may
+                // receive a message at a later date
+                do self.messages.hold_and_wait |messages| {
+                    self.remotes.len() > 0 &&
+                        messages.len() == 0 &&
+                        self.work.len() == 0
+                }
+            }
+        }
+    }
+
+    fn callback(&mut self, f: ~fn()) {
+        self.work.push(f);
+    }
+
+    // XXX: Seems like a really weird requirement to have an event loop provide.
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
+        let callback = ~BasicPausible::new(self);
+        rtassert!(self.idle.is_none());
+        unsafe {
+            let cb_ptr: &*BasicPausible = cast::transmute(&callback);
+            self.idle = Some(*cb_ptr);
+        }
+        return callback as ~PausibleIdleCallback;
+    }
+
+    fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
+        let id = self.next_remote;
+        self.next_remote += 1;
+        self.remotes.push((id, f));
+        ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
+    }
+
+    /// This has no bindings for local I/O
+    fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {}
+}
+
+struct BasicRemote {
+    queue: Exclusive<~[Message]>,
+    id: uint,
+}
+
+impl BasicRemote {
+    fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
+        BasicRemote { queue: queue, id: id }
+    }
+}
+
+impl RemoteCallback for BasicRemote {
+    fn fire(&mut self) {
+        unsafe {
+            do self.queue.hold_and_signal |queue| {
+                queue.push(RunRemote(self.id));
+            }
+        }
+    }
+}
+
+impl Drop for BasicRemote {
+    fn drop(&mut self) {
+        unsafe {
+            do self.queue.hold_and_signal |queue| {
+                queue.push(RemoveRemote(self.id));
+            }
+        }
+    }
+}
+
+struct BasicPausible {
+    eloop: *mut BasicLoop,
+    work: Option<~fn()>,
+    active: bool,
+}
+
+impl BasicPausible {
+    fn new(eloop: &mut BasicLoop) -> BasicPausible {
+        BasicPausible {
+            active: false,
+            work: None,
+            eloop: eloop,
+        }
+    }
+}
+
+impl PausibleIdleCallback for BasicPausible {
+    fn start(&mut self, f: ~fn()) {
+        rtassert!(!self.active && self.work.is_none());
+        self.active = true;
+        self.work = Some(f);
+    }
+    fn pause(&mut self) {
+        self.active = false;
+    }
+    fn resume(&mut self) {
+        self.active = true;
+    }
+    fn close(&mut self) {
+        self.active = false;
+        self.work = None;
+    }
+}
+
+impl Drop for BasicPausible {
+    fn drop(&mut self) {
+        unsafe {
+            (*self.eloop).idle = None;
+        }
+    }
+}
+
+fn time() -> Time {
+    #[fixed_stack_segment]; #[inline(never)];
+    extern {
+        fn get_time(sec: &mut i64, nsec: &mut i32);
+    }
+    let mut sec = 0;
+    let mut nsec = 0;
+    unsafe { get_time(&mut sec, &mut nsec) }
+
+    Time { sec: sec as u64, nsec: nsec as u64 }
+}
index 758c97791658f878094960a97298377f34bdd999..decf801d59294faf174f182b35913b05073a687d 100644 (file)
@@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
                 detail: None
             }
         }
+        IoUnavailable => {
+            IoError {
+                kind: IoUnavailable,
+                desc: "I/O is unavailable",
+                detail: None
+            }
+        }
         _ => fail!()
     }
 }
index 66d7a6bf48823b3f80df0610f67a0a4290043558..5113c28aa085e3a4ca429f120a913f932813fe4b 100644 (file)
@@ -102,6 +102,9 @@ pub mod shouldnt_be_public {
 // Internal macros used by the runtime.
 mod macros;
 
+/// Basic implementation of an EventLoop, provides no I/O interfaces
+mod basic;
+
 /// The global (exchange) heap.
 pub mod global_heap;
 
index 0e993a3564f80bb5837703e2c68b371b7f8d7f63..b008a8a74f2cb4678a6379bb153f5338d8224310 100644 (file)
@@ -62,8 +62,6 @@ pub struct Scheduler {
     /// no longer try to go to sleep, but exit instead.
     no_sleep: bool,
     stack_pool: StackPool,
-    /// The event loop used to drive the scheduler and perform I/O
-    event_loop: ~EventLoop,
     /// The scheduler runs on a special task. When it is not running
     /// it is stored here instead of the work queue.
     priv sched_task: Option<~Task>,
@@ -95,7 +93,7 @@ pub struct Scheduler {
     //      destroyed before it's actually destroyed.
 
     /// The event loop used to drive the scheduler and perform I/O
-    event_loop: ~EventLoopObject,
+    event_loop: ~EventLoop,
 }
 
 /// An indication of how hard to work on a given operation, the difference
@@ -915,7 +913,7 @@ mod test {
     use cell::Cell;
     use rt::thread::Thread;
     use rt::task::{Task, Sched};
-    use rt::rtio::EventLoop;
+    use rt::basic;
     use rt::util;
     use option::{Some};
 
@@ -1015,7 +1013,6 @@ fn test_home_sched() {
     #[test]
     fn test_schedule_home_states() {
 
-        use rt::uv::uvio::UvEventLoop;
         use rt::sleeper_list::SleeperList;
         use rt::work_queue::WorkQueue;
         use rt::sched::Shutdown;
@@ -1031,7 +1028,7 @@ fn test_schedule_home_states() {
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
-                ~UvEventLoop::new() as ~EventLoop,
+                basic::event_loop(),
                 normal_queue,
                 queues.clone(),
                 sleepers.clone());
@@ -1042,7 +1039,7 @@ fn test_schedule_home_states() {
 
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
-                ~UvEventLoop::new() as ~EventLoop,
+                basic::event_loop(),
                 special_queue.clone(),
                 queues.clone(),
                 sleepers.clone(),
@@ -1153,7 +1150,7 @@ fn test_io_callback() {
         // in the work queue, but we are performing I/O, that once we do put
         // something in the work queue again the scheduler picks it up and doesn't
         // exit before emptying the work queue
-        do run_in_newsched_task {
+        do run_in_uv_task {
             do spawntask {
                 timer::sleep(10);
             }
@@ -1195,7 +1192,6 @@ fn no_missed_messages() {
         use rt::work_queue::WorkQueue;
         use rt::sleeper_list::SleeperList;
         use rt::stack::StackPool;
-        use rt::uv::uvio::UvEventLoop;
         use rt::sched::{Shutdown, TaskFromFriend};
         use util;
 
@@ -1206,7 +1202,7 @@ fn no_missed_messages() {
                 let queues = ~[queue.clone()];
 
                 let mut sched = ~Scheduler::new(
-                    ~UvEventLoop::new() as ~EventLoop,
+                    basic::event_loop(),
                     queue,
                     queues.clone(),
                     sleepers.clone());
index 1ea68bb52d7e09de5d97f183ea58b7daacbca499..7bf124ad3129216867a9dd7a6b9f0ffc865b265e 100644 (file)
@@ -637,7 +637,7 @@ fn unwind() {
 
     #[test]
     fn rng() {
-        do run_in_newsched_task() {
+        do run_in_uv_task() {
             use rand::{rng, Rng};
             let mut r = rng();
             let _ = r.next_u32();
@@ -646,7 +646,7 @@ fn rng() {
 
     #[test]
     fn logging() {
-        do run_in_newsched_task() {
+        do run_in_uv_task() {
             info!("here i am. logging in a newsched task");
         }
     }
index c238b1dfba16a1527a3fc20cc80e1ace4d0f1b95..e4bbfe0a5a3e99be4016b1caa7fcbb9426920a5b 100644 (file)
@@ -21,6 +21,7 @@
 use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
 use vec::{OwnedVector, MutableVector, ImmutableVector};
 use path::GenericPath;
+use rt::basic;
 use rt::sched::Scheduler;
 use rt::rtio::EventLoop;
 use unstable::{run_in_bare_thread};
@@ -48,6 +49,28 @@ pub fn new_test_uv_sched() -> Scheduler {
 
 }
 
+pub fn new_test_sched() -> Scheduler {
+
+    let queue = WorkQueue::new();
+    let queues = ~[queue.clone()];
+
+    let mut sched = Scheduler::new(basic::event_loop(),
+                                   queue,
+                                   queues,
+                                   SleeperList::new());
+
+    // Don't wait for the Shutdown message
+    sched.no_sleep = true;
+    return sched;
+}
+
+pub fn run_in_uv_task(f: ~fn()) {
+    let f = Cell::new(f);
+    do run_in_bare_thread {
+        run_in_uv_task_core(f.take());
+    }
+}
+
 pub fn run_in_newsched_task(f: ~fn()) {
     let f = Cell::new(f);
     do run_in_bare_thread {
@@ -55,7 +78,7 @@ pub fn run_in_newsched_task(f: ~fn()) {
     }
 }
 
-pub fn run_in_newsched_task_core(f: ~fn()) {
+pub fn run_in_uv_task_core(f: ~fn()) {
 
     use rt::sched::Shutdown;
 
@@ -72,6 +95,23 @@ pub fn run_in_newsched_task_core(f: ~fn()) {
     sched.bootstrap(task);
 }
 
+pub fn run_in_newsched_task_core(f: ~fn()) {
+
+    use rt::sched::Shutdown;
+
+    let mut sched = ~new_test_sched();
+    let exit_handle = Cell::new(sched.make_handle());
+
+    let on_exit: ~fn(bool) = |exit_status| {
+        exit_handle.take().send(Shutdown);
+        rtassert!(exit_status);
+    };
+    let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
+    task.death.on_exit = Some(on_exit);
+
+    sched.bootstrap(task);
+}
+
 #[cfg(target_os="macos")]
 #[allow(non_camel_case_types)]
 mod darwin_fd_limit {
@@ -310,7 +350,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread {
 /// Get a ~Task for testing purposes other than actually scheduling it.
 pub fn with_test_task(blk: ~fn(~Task) -> ~Task) {
     do run_in_bare_thread {
-        let mut sched = ~new_test_uv_sched();
+        let mut sched = ~new_test_sched();
         let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{}));
         cleanup_task(task);
     }
index 62a09984794978671703af45600f3d73cec8b3de..75b09187f04ccfb47aa3806436bf2ea029a7430c 100644 (file)
@@ -183,7 +183,7 @@ fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
 
     #[test]
     fn select_one() {
-        do run_in_newsched_task { select_helper(1, [0]) }
+        do run_in_uv_task { select_helper(1, [0]) }
     }
 
     #[test]
@@ -191,14 +191,14 @@ fn select_two() {
         // NB. I would like to have a test that tests the first one that is
         // ready is the one that's returned, but that can't be reliably tested
         // with the randomized behaviour of optimistic_check.
-        do run_in_newsched_task { select_helper(2, [1]) }
-        do run_in_newsched_task { select_helper(2, [0]) }
-        do run_in_newsched_task { select_helper(2, [1,0]) }
+        do run_in_uv_task { select_helper(2, [1]) }
+        do run_in_uv_task { select_helper(2, [0]) }
+        do run_in_uv_task { select_helper(2, [1,0]) }
     }
 
     #[test]
     fn select_a_lot() {
-        do run_in_newsched_task { select_helper(12, [7,8,9]) }
+        do run_in_uv_task { select_helper(12, [7,8,9]) }
     }
 
     #[test]
@@ -208,7 +208,7 @@ fn select_stream() {
 
         // Sends 10 buffered packets, and uses select to retrieve them all.
         // Puts the port in a different spot in the vector each time.
-        do run_in_newsched_task {
+        do run_in_uv_task {
             let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
             let (port, chan) = stream();
             do 10.times { chan.send(31337); }
@@ -229,7 +229,7 @@ fn select_stream() {
 
     #[test]
     fn select_unkillable() {
-        do run_in_newsched_task {
+        do run_in_uv_task {
             do task::unkillable { select_helper(2, [1]) }
         }
     }
@@ -242,7 +242,7 @@ fn select_blocking() {
         select_blocking_helper(false);
 
         fn select_blocking_helper(killable: bool) {
-            do run_in_newsched_task {
+            do run_in_uv_task {
                 let (p1,_c) = oneshot();
                 let (p2,c2) = oneshot();
                 let mut ports = [p1,p2];
@@ -287,7 +287,7 @@ fn select_racing_senders() {
         fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
             use rt::test::spawntask_random;
 
-            do run_in_newsched_task {
+            do run_in_uv_task {
                 // A bit of stress, since ordinarily this is just smoke and mirrors.
                 do 4.times {
                     let send_on_chans = send_on_chans.clone();
@@ -318,7 +318,7 @@ fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
 
     #[test]
     fn select_killed() {
-        do run_in_newsched_task {
+        do run_in_uv_task {
             let (success_p, success_c) = oneshot::<bool>();
             let success_c = Cell::new(success_c);
             do task::try {
index 30c99c628853fed6114c2cd3d3dd37889069f784..b72d6773ec567774bed2c38f30d0d7374aee39cf 100644 (file)
@@ -645,7 +645,7 @@ fn test_kill_unkillable_task() {
     // CPU, *after* the spawner is already switched-back-to (and passes the
     // killed check at the start of its timeslice). As far as I know, it's not
     // possible to make this race deterministic, or even more likely to happen.
-    do run_in_newsched_task {
+    do run_in_uv_task {
         do task::try {
             do task::spawn {
                 fail!();
@@ -662,7 +662,7 @@ fn test_kill_rekillable_task() {
 
     // Tests that when a kill signal is received, 'rekillable' and
     // 'unkillable' unwind correctly in conjunction with each other.
-    do run_in_newsched_task {
+    do run_in_uv_task {
         do task::try {
             do task::unkillable {
                 do task::rekillable {
@@ -730,8 +730,8 @@ fn test_cant_dup_task_builder() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let (po, ch) = stream();
         let ch = SharedChan::new(ch);
         do spawn_unlinked {
@@ -749,16 +749,16 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         do spawn_unlinked { fail!(); }
     }
 }
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         do spawn_supervised { fail!(); }
         // Give child a chance to fail-but-not-kill-us.
         do 16.times { task::deschedule(); }
@@ -767,8 +767,8 @@ fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_unlinked_sup_fail_down() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             do spawn_supervised { block_forever(); }
             fail!(); // Shouldn't leave a child hanging around.
@@ -780,8 +780,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Unidirectional "parenting" shouldn't override bidirectional linked.
             // We have to cheat with opts - the interface doesn't support them because
@@ -801,8 +801,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // We have to cheat with opts - the interface doesn't support them because
             // they don't make sense (redundant with task().supervised()).
@@ -818,8 +818,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Default options are to spawn linked & unsupervised.
             do spawn { fail!(); }
@@ -831,8 +831,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Default options are to spawn linked & unsupervised.
             do spawn { block_forever(); }
@@ -844,8 +844,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Make sure the above test is the same as this one.
             let mut builder = task();
@@ -863,8 +863,8 @@ fn test_spawn_unlinked_sup_fail_down() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_failure_propagate_grandchild() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Middle task exits; does grandparent's failure propagate across the gap?
             do spawn_supervised {
@@ -880,8 +880,8 @@ fn test_spawn_failure_propagate_grandchild() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_failure_propagate_secondborn() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // First-born child exits; does parent's failure propagate to sibling?
             do spawn_supervised {
@@ -897,8 +897,8 @@ fn test_spawn_failure_propagate_secondborn() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_failure_propagate_nephew_or_niece() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Our sibling exits; does our failure propagate to sibling's child?
             do spawn { // linked
@@ -914,8 +914,8 @@ fn test_spawn_failure_propagate_nephew_or_niece() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_linked_sup_propagate_sibling() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result: Result<(),()> = do try {
             // Middle sibling exits - does eldest's failure propagate to youngest?
             do spawn { // linked
@@ -930,9 +930,9 @@ fn test_spawn_linked_sup_propagate_sibling() {
 
 #[test]
 fn test_unnamed_task() {
-    use rt::test::run_in_newsched_task;
+    use rt::test::run_in_uv_task;
 
-    do run_in_newsched_task {
+    do run_in_uv_task {
         do spawn {
             do with_task_name |name| {
                 assert!(name.is_none());
@@ -943,9 +943,9 @@ fn test_unnamed_task() {
 
 #[test]
 fn test_owned_named_task() {
-    use rt::test::run_in_newsched_task;
+    use rt::test::run_in_uv_task;
 
-    do run_in_newsched_task {
+    do run_in_uv_task {
         let mut t = task();
         t.name(~"ada lovelace");
         do t.spawn {
@@ -958,9 +958,9 @@ fn test_owned_named_task() {
 
 #[test]
 fn test_static_named_task() {
-    use rt::test::run_in_newsched_task;
+    use rt::test::run_in_uv_task;
 
-    do run_in_newsched_task {
+    do run_in_uv_task {
         let mut t = task();
         t.name("ada lovelace");
         do t.spawn {
@@ -973,9 +973,9 @@ fn test_static_named_task() {
 
 #[test]
 fn test_send_named_task() {
-    use rt::test::run_in_newsched_task;
+    use rt::test::run_in_uv_task;
 
-    do run_in_newsched_task {
+    do run_in_uv_task {
         let mut t = task();
         t.name("ada lovelace".into_send_str());
         do t.spawn {
@@ -1326,9 +1326,9 @@ fn child_no(x: uint) -> ~fn() {
 
 #[test]
 fn test_simple_newsched_spawn() {
-    use rt::test::run_in_newsched_task;
+    use rt::test::run_in_uv_task;
 
-    do run_in_newsched_task {
+    do run_in_uv_task {
         spawn(||())
     }
 }
@@ -1336,8 +1336,8 @@ fn test_simple_newsched_spawn() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_spawn_watched() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result = do try {
             let mut t = task();
             t.unlinked();
@@ -1359,8 +1359,8 @@ fn test_spawn_watched() {
 #[ignore(reason = "linked failure")]
 #[test]
 fn test_indestructible() {
-    use rt::test::run_in_newsched_task;
-    do run_in_newsched_task {
+    use rt::test::run_in_uv_task;
+    do run_in_uv_task {
         let result = do try {
             let mut t = task();
             t.watched();
index 9d15dd031e0dce2b51573cd5a0aabe78c3b9e5cf..2b036c318bac7464a0647af0dafef24ac13915fa 100644 (file)
@@ -334,6 +334,23 @@ pub unsafe fn lock<T>(&self, f: &fn() -> T) -> T {
             }
         }
     }
+
+    pub unsafe fn signal(&self) {
+        rust_signal_little_lock(self.l);
+    }
+
+    pub unsafe fn lock_and_wait(&self, f: &fn() -> bool) {
+        do atomically {
+            rust_lock_little_lock(self.l);
+            do (|| {
+                if f() {
+                    rust_wait_little_lock(self.l);
+                }
+            }).finally {
+                rust_unlock_little_lock(self.l);
+            }
+        }
+    }
 }
 
 struct ExData<T> {
@@ -402,6 +419,34 @@ pub unsafe fn with_imm<U>(&self, f: &fn(x: &T) -> U) -> U {
         }
     }
 
+    #[inline]
+    pub unsafe fn hold_and_signal(&self, f: &fn(x: &mut T)) {
+        let rec = self.x.get();
+        do (*rec).lock.lock {
+            if (*rec).failed {
+                fail!("Poisoned Exclusive::new - another task failed inside!");
+            }
+            (*rec).failed = true;
+            f(&mut (*rec).data);
+            (*rec).failed = false;
+            (*rec).lock.signal();
+        }
+    }
+
+    #[inline]
+    pub unsafe fn hold_and_wait(&self, f: &fn(x: &T) -> bool) {
+        let rec = self.x.get();
+        do (*rec).lock.lock_and_wait {
+            if (*rec).failed {
+                fail!("Poisoned Exclusive::new - another task failed inside!");
+            }
+            (*rec).failed = true;
+            let result = f(&(*rec).data);
+            (*rec).failed = false;
+            result
+        }
+    }
+
     pub fn unwrap(self) -> T {
         let Exclusive { x: x } = self;
         // Someday we might need to unkillably unwrap an Exclusive, but not today.
@@ -415,6 +460,8 @@ pub fn unwrap(self) -> T {
 externfn!(fn rust_destroy_little_lock(lock: rust_little_lock))
 externfn!(fn rust_lock_little_lock(lock: rust_little_lock))
 externfn!(fn rust_unlock_little_lock(lock: rust_little_lock))
+externfn!(fn rust_signal_little_lock(lock: rust_little_lock))
+externfn!(fn rust_wait_little_lock(lock: rust_little_lock))
 
 #[cfg(test)]
 mod tests {
index 77020537661600115a4e2565895f243257626c53..a8eec52943ee004d8918cf746cd2ebf6296365df 100644 (file)
@@ -377,6 +377,16 @@ rust_unlock_little_lock(lock_and_signal *lock) {
     lock->unlock();
 }
 
+extern "C" void
+rust_wait_little_lock(lock_and_signal *lock) {
+    lock->wait();
+}
+
+extern "C" void
+rust_signal_little_lock(lock_and_signal *lock) {
+    lock->signal();
+}
+
 typedef void(startfn)(void*, void*);
 
 class raw_thread: public rust_thread {
index 269da8e7882ac9fd61647b595f7828a36b8562fb..06f4c0006f17a6828009d401eebfd7e5b7a96acf 100644 (file)
@@ -128,6 +128,8 @@ rust_create_little_lock
 rust_destroy_little_lock
 rust_lock_little_lock
 rust_unlock_little_lock
+rust_signal_little_lock
+rust_wait_little_lock
 tdefl_compress_mem_to_heap
 tinfl_decompress_mem_to_heap
 rust_uv_ip4_port