]> git.lizzy.rs Git - rust.git/commitdiff
rustuv: Reimplement without using std::rt::sched
authorAlex Crichton <alex@alexcrichton.com>
Fri, 13 Dec 2013 01:47:48 +0000 (17:47 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Tue, 24 Dec 2013 22:42:00 +0000 (14:42 -0800)
This reimplements librustuv without using the interfaces provided by the
scheduler in libstd. This solely uses the new Runtime trait in order to
interface with the local task and perform the necessary scheduling operations.

The largest snag in this refactoring is reimplementing homing. The new runtime
trait exposes no concept of "homing" a task or forcibly sending a task to a
remote scheduler (there is no concept of a scheduler). In order to reimplement
homing, the transferrence of tasks is now done at the librustuv level instead of
the scheduler level. This means that all I/O loops now have a concurrent queue
which receives homing messages and requests.

This allows the entire implementation of librustuv to be only dependent on the
runtime trait, severing all dependence of librustuv on the scheduler and related
green-thread functions.

This is all in preparation of the introduction of libgreen and libnative.

At the same time, I also took the liberty of removing all glob imports from
librustuv.

17 files changed:
src/librustuv/addrinfo.rs
src/librustuv/async.rs
src/librustuv/file.rs
src/librustuv/homing.rs [new file with mode: 0644]
src/librustuv/idle.rs
src/librustuv/lib.rs
src/librustuv/macros.rs
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/process.rs
src/librustuv/queue.rs [new file with mode: 0644]
src/librustuv/signal.rs
src/librustuv/stream.rs
src/librustuv/timer.rs
src/librustuv/tty.rs
src/librustuv/uvio.rs
src/librustuv/uvll.rs

index 04c1c5a9fb8c97c4cdc77025eee854e51bd22c9a..f4c12c6f2a398e08807ebbee49011af31c73d4c9 100644 (file)
 use ai = std::io::net::addrinfo;
 use std::libc::c_int;
 use std::ptr::null;
-use std::rt::BlockedTask;
-use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::BlockedTask;
 
 use net;
-use super::{Loop, UvError, Request, wait_until_woken_after};
+use super::{Loop, UvError, Request, wait_until_woken_after, wakeup};
 use uvll;
 
 struct Addrinfo {
@@ -108,8 +106,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
             cx.status = status;
             cx.addrinfo = Some(Addrinfo { handle: res });
 
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(cx.slot.take_unwrap());
+            wakeup(&mut cx.slot);
         }
     }
 }
@@ -188,7 +185,6 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
 #[cfg(test, not(target_os="android"))]
 mod test {
     use std::io::net::ip::{SocketAddr, Ipv4Addr};
-    use super::*;
     use super::super::local_loop;
 
     #[test]
index 5a0db8313fb5cdf939c5110bc146863f7c9bcc31..2d770ff6be1fe5e974a1ecabad8e2b7fd2f59def 100644 (file)
@@ -129,7 +129,6 @@ mod test_remote {
     use std::rt::thread::Thread;
     use std::rt::tube::Tube;
 
-    use super::*;
     use super::super::local_loop;
 
     // Make sure that we can fire watchers in remote threads and that they
index 3a8af71e019ef7b8d70b1a640d00be6f73b1ba28..cebf4f199e49c45192d4b3fb8fc09a595612680f 100644 (file)
 use std::cast;
 use std::libc::{c_int, c_char, c_void, size_t};
 use std::libc;
-use std::rt::BlockedTask;
+use std::rt::task::BlockedTask;
 use std::io::{FileStat, IoError};
 use std::io;
-use std::rt::local::Local;
 use std::rt::rtio;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::vec;
 
-use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
-use uvio::HomingIO;
+use homing::{HomingIO, HomeHandle};
+use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
 use uvll;
 
 pub struct FsRequest {
@@ -34,19 +34,19 @@ pub struct FileWatcher {
     priv loop_: Loop,
     priv fd: c_int,
     priv close: rtio::CloseBehavior,
-    priv home: SchedHandle,
+    priv home: HomeHandle,
 }
 
 impl FsRequest {
-    pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int)
+    pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
         -> Result<FileWatcher, UvError>
     {
         execute(|req, cb| unsafe {
-            uvll::uv_fs_open(loop_.handle,
+            uvll::uv_fs_open(io.uv_loop(),
                              req, path.with_ref(|p| p), flags as c_int,
                              mode as c_int, cb)
         }).map(|req|
-            FileWatcher::new(*loop_, req.get_result() as c_int,
+            FileWatcher::new(io, req.get_result() as c_int,
                              rtio::CloseSynchronously)
         )
     }
@@ -320,8 +320,7 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
         let slot: &mut Option<BlockedTask> = unsafe {
             cast::transmute(uvll::get_data_for_req(req))
         };
-        let sched: ~Scheduler = Local::take();
-        sched.resume_blocked_task_immediately(slot.take_unwrap());
+        wakeup(slot);
     }
 }
 
@@ -331,16 +330,17 @@ fn execute_nop(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
 }
 
 impl HomingIO for FileWatcher {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl FileWatcher {
-    pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher {
+    pub fn new(io: &mut UvIoFactory, fd: c_int,
+               close: rtio::CloseBehavior) -> FileWatcher {
         FileWatcher {
-            loop_: loop_,
+            loop_: Loop::wrap(io.uv_loop()),
             fd: fd,
             close: close,
-            home: get_handle_to_current_scheduler!()
+            home: io.make_handle(),
         }
     }
 
@@ -448,7 +448,6 @@ mod test {
     use std::io;
     use std::str;
     use std::vec;
-    use super::*;
     use l = super::super::local_loop;
 
     #[test]
diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs
new file mode 100644 (file)
index 0000000..7a12c4a
--- /dev/null
@@ -0,0 +1,144 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Homing I/O implementation
+//!
+//! In libuv, whenever a handle is created on an I/O loop it is illegal to use
+//! that handle outside of that I/O loop. We use libuv I/O with our green
+//! scheduler, and each green scheduler corresponds to a different I/O loop on a
+//! different OS thread. Green tasks are also free to roam among schedulers,
+//! which implies that it is possible to create an I/O handle on one event loop
+//! and then attempt to use it on another.
+//!
+//! In order to solve this problem, this module implements the notion of a
+//! "homing operation" which will transplant a task from its currently running
+//! scheduler back onto the original I/O loop. This is accomplished entirely at
+//! the librustuv layer with very little cooperation from the scheduler (which
+//! we don't even know exists technically).
+//!
+//! These homing operations are completed by first realizing that we're on the
+//! wrong I/O loop, then descheduling ourselves, sending ourselves to the
+//! correct I/O loop, and then waking up the I/O loop in order to process its
+//! local queue of tasks which need to run.
+//!
+//! This enqueueing is done with a concurrent queue from libstd, and the
+//! signalling is achieved with an async handle.
+
+use std::rt::local::Local;
+use std::rt::rtio::LocalIo;
+use std::rt::task::{Task, BlockedTask};
+
+use ForbidUnwind;
+use queue::{Queue, QueuePool};
+
+/// A handle to a remote libuv event loop. This handle will keep the event loop
+/// alive while active in order to ensure that a homing operation can always be
+/// completed.
+///
+/// Handles are clone-able in order to derive new handles from existing handles
+/// (very useful for when accepting a socket from a server).
+pub struct HomeHandle {
+    priv queue: Queue,
+    priv id: uint,
+}
+
+impl HomeHandle {
+    pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
+        HomeHandle { queue: pool.queue(), id: id }
+    }
+
+    fn send(&mut self, task: BlockedTask) {
+        self.queue.push(task);
+    }
+}
+
+impl Clone for HomeHandle {
+    fn clone(&self) -> HomeHandle {
+        HomeHandle {
+            queue: self.queue.clone(),
+            id: self.id,
+        }
+    }
+}
+
+pub trait HomingIO {
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
+
+    /// This function will move tasks to run on their home I/O scheduler. Note
+    /// that this function does *not* pin the task to the I/O scheduler, but
+    /// rather it simply moves it to running on the I/O scheduler.
+    fn go_to_IO_home(&mut self) -> uint {
+        let _f = ForbidUnwind::new("going home");
+
+        let mut cur_task: ~Task = Local::take();
+        let cur_loop_id = {
+            let mut io = cur_task.local_io().expect("libuv must have I/O");
+            io.get().id()
+        };
+
+        // Try at all costs to avoid the homing operation because it is quite
+        // expensive. Hence, we only deschedule/send if we're not on the correct
+        // event loop. If we're already on the home event loop, then we're good
+        // to go (remember we have no preemption, so we're guaranteed to stay on
+        // this event loop as long as we avoid the scheduler).
+        if cur_loop_id != self.home().id {
+            cur_task.deschedule(1, |task| {
+                self.home().send(task);
+                Ok(())
+            });
+
+            // Once we wake up, assert that we're in the right location
+            let cur_loop_id = {
+                let mut io = LocalIo::borrow().expect("libuv must have I/O");
+                io.get().id()
+            };
+            assert_eq!(cur_loop_id, self.home().id);
+
+            cur_loop_id
+        } else {
+            Local::put(cur_task);
+            cur_loop_id
+        }
+    }
+
+    /// Fires a single homing missile, returning another missile targeted back
+    /// at the original home of this task. In other words, this function will
+    /// move the local task to its I/O scheduler and then return an RAII wrapper
+    /// which will return the task home.
+    fn fire_homing_missile(&mut self) -> HomingMissile {
+        HomingMissile { io_home: self.go_to_IO_home() }
+    }
+}
+
+/// After a homing operation has been completed, this will return the current
+/// task back to its appropriate home (if applicable). The field is used to
+/// assert that we are where we think we are.
+struct HomingMissile {
+    priv io_home: uint,
+}
+
+impl HomingMissile {
+    /// Check at runtime that the task has *not* transplanted itself to a
+    /// different I/O loop while executing.
+    pub fn check(&self, msg: &'static str) {
+        let mut io = LocalIo::borrow().expect("libuv must have I/O");
+        assert!(io.get().id() == self.io_home, "{}", msg);
+    }
+}
+
+impl Drop for HomingMissile {
+    fn drop(&mut self) {
+        let _f = ForbidUnwind::new("leaving home");
+
+        // It would truly be a sad day if we had moved off the home I/O
+        // scheduler while we were doing I/O.
+        self.check("task moved away from the home scheduler");
+    }
+}
index 32c7699a30847b2a6b37bc29eb4574afaf8a1231..2445932c026b8b5f4a69616aab48db1fbeb84991 100644 (file)
@@ -97,7 +97,6 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use super::*;
     use std::rt::tube::Tube;
     use std::rt::rtio::{Callback, PausableIdleCallback};
     use super::super::local_loop;
index 2e5b7254769f136afdae9a6f7117feaa39d2ef95..2715f0bd02a1647267d154911fb4d707043c792a 100644 (file)
 #[crate_type = "rlib"];
 #[crate_type = "dylib"];
 
-#[feature(macro_rules, globs)];
+#[feature(macro_rules)];
 
-use std::cast::transmute;
 use std::cast;
+use std::io;
+use std::io::IoError;
 use std::libc::{c_int, malloc};
 use std::ptr::null;
 use std::ptr;
-use std::rt::BlockedTask;
 use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::{BlockedTask, Task};
+use std::rt::rtio::LocalIo;
 use std::str::raw::from_c_str;
 use std::str;
 use std::task;
 use std::unstable::finally::Finally;
 
-use std::io::IoError;
-
 pub use self::async::AsyncWatcher;
 pub use self::file::{FsRequest, FileWatcher};
 pub use self::idle::IdleWatcher;
@@ -70,6 +69,9 @@
 
 mod macros;
 
+mod queue;
+mod homing;
+
 /// The implementation of `rtio` for libuv
 pub mod uvio;
 
@@ -144,32 +146,31 @@ fn close(&mut self) {
                 uvll::free_handle(handle);
                 if data == ptr::null() { return }
                 let slot: &mut Option<BlockedTask> = cast::transmute(data);
-                let sched: ~Scheduler = Local::take();
-                sched.resume_blocked_task_immediately(slot.take_unwrap());
+                wakeup(slot);
             }
         }
     }
 }
 
 pub struct ForbidSwitch {
-    msg: &'static str,
-    sched: uint,
+    priv msg: &'static str,
+    priv io: uint,
 }
 
 impl ForbidSwitch {
     fn new(s: &'static str) -> ForbidSwitch {
-        let mut sched = Local::borrow(None::<Scheduler>);
+        let mut io = LocalIo::borrow().expect("libuv must have local I/O");
         ForbidSwitch {
             msg: s,
-            sched: sched.get().sched_id(),
+            io: io.get().id(),
         }
     }
 }
 
 impl Drop for ForbidSwitch {
     fn drop(&mut self) {
-        let mut sched = Local::borrow(None::<Scheduler>);
-        assert!(self.sched == sched.get().sched_id(),
+        let mut io = LocalIo::borrow().expect("libuv must have local I/O");
+        assert!(self.io == io.get().id(),
                 "didnt want a scheduler switch: {}",
                 self.msg);
     }
@@ -199,14 +200,20 @@ fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
     let _f = ForbidUnwind::new("wait_until_woken_after");
     unsafe {
         assert!((*slot).is_none());
-        let sched: ~Scheduler = Local::take();
-        sched.deschedule_running_task_and_then(|_, task| {
-            f();
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
             *slot = Some(task);
-        })
+            f();
+            Ok(())
+        });
     }
 }
 
+fn wakeup(slot: &mut Option<BlockedTask>) {
+    assert!(slot.is_some());
+    slot.take_unwrap().wake().map(|t| t.reawaken(true));
+}
+
 pub struct Request {
     handle: *uvll::uv_req_t,
     priv defused: bool,
@@ -325,28 +332,26 @@ fn error_smoke_test() {
 pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
     unsafe {
         // Importing error constants
-        use uvll::*;
-        use std::io::*;
 
         // uv error descriptions are static
         let c_desc = uvll::uv_strerror(*uverr);
         let desc = str::raw::c_str_to_static_slice(c_desc);
 
         let kind = match *uverr {
-            UNKNOWN => OtherIoError,
-            OK => OtherIoError,
-            EOF => EndOfFile,
-            EACCES => PermissionDenied,
-            ECONNREFUSED => ConnectionRefused,
-            ECONNRESET => ConnectionReset,
-            ENOENT => FileNotFound,
-            ENOTCONN => NotConnected,
-            EPIPE => BrokenPipe,
-            ECONNABORTED => ConnectionAborted,
+            uvll::UNKNOWN => io::OtherIoError,
+            uvll::OK => io::OtherIoError,
+            uvll::EOF => io::EndOfFile,
+            uvll::EACCES => io::PermissionDenied,
+            uvll::ECONNREFUSED => io::ConnectionRefused,
+            uvll::ECONNRESET => io::ConnectionReset,
+            uvll::ENOTCONN => io::NotConnected,
+            uvll::ENOENT => io::FileNotFound,
+            uvll::EPIPE => io::BrokenPipe,
+            uvll::ECONNABORTED => io::ConnectionAborted,
             err => {
                 uvdebug!("uverr.code {}", err as int);
                 // XXX: Need to map remaining uv error types
-                OtherIoError
+                io::OtherIoError
             }
         };
 
index a63dcc6de3105c97f7a196b7024cfc2d13735288..61b4de5765592419385aa3f25130381a4546225d 100644 (file)
@@ -27,18 +27,21 @@ macro_rules! uvdebug (
     })
 )
 
-// get a handle for the current scheduler
-macro_rules! get_handle_to_current_scheduler(
-    () => ({
-        let mut sched = Local::borrow(None::<Scheduler>);
-        sched.get().make_handle()
-    })
-)
-
 pub fn dumb_println(args: &fmt::Arguments) {
-    use std::io::native::file::FileDesc;
     use std::io;
     use std::libc;
-    let mut out = FileDesc::new(libc::STDERR_FILENO, false);
-    fmt::writeln(&mut out as &mut io::Writer, args);
+    use std::vec;
+
+    struct Stderr;
+    impl io::Writer for Stderr {
+        fn write(&mut self, data: &[u8]) {
+            unsafe {
+                libc::write(libc::STDERR_FILENO,
+                            vec::raw::to_ptr(data) as *libc::c_void,
+                            data.len() as libc::size_t);
+            }
+        }
+    }
+    let mut w = Stderr;
+    fmt::writeln(&mut w as &mut io::Writer, args);
 }
index ce543eafd2f644d611a1c76ed00863debd803c69..caf9ee0aac636fdb686fa44b9745f91e11ed21e6 100644 (file)
@@ -9,24 +9,22 @@
 // except according to those terms.
 
 use std::cast;
-use std::libc;
-use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
-use std::ptr;
-use std::rt::BlockedTask;
 use std::io::IoError;
 use std::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
-use std::rt::local::Local;
+use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
+use std::libc;
+use std::ptr;
 use std::rt::rtio;
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
+use std::rt::task::BlockedTask;
 use std::str;
 use std::vec;
 
+use homing::{HomingIO, HomeHandle};
 use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
-            wait_until_woken_after};
-use uvio::HomingIO;
+            wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
 use uvll;
 use uvll::sockaddr;
 
@@ -145,42 +143,47 @@ fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoErro
 pub struct TcpWatcher {
     handle: *uvll::uv_tcp_t,
     stream: StreamWatcher,
-    home: SchedHandle,
+    home: HomeHandle,
 }
 
 pub struct TcpListener {
-    home: SchedHandle,
+    home: HomeHandle,
     handle: *uvll::uv_pipe_t,
     priv closing_task: Option<BlockedTask>,
-    priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
+    priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
+    priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
 }
 
 pub struct TcpAcceptor {
     listener: ~TcpListener,
-    priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
 }
 
 // TCP watchers (clients/streams)
 
 impl TcpWatcher {
-    pub fn new(loop_: &Loop) -> TcpWatcher {
+    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
+        let handle = io.make_handle();
+        TcpWatcher::new_home(&io.loop_, handle)
+    }
+
+    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
         assert_eq!(unsafe {
             uvll::uv_tcp_init(loop_.handle, handle)
         }, 0);
         TcpWatcher {
-            home: get_handle_to_current_scheduler!(),
+            home: home,
             handle: handle,
             stream: StreamWatcher::new(handle),
         }
     }
 
-    pub fn connect(loop_: &mut Loop, address: SocketAddr)
+    pub fn connect(io: &mut UvIoFactory, address: SocketAddr)
         -> Result<TcpWatcher, UvError>
     {
         struct Ctx { status: c_int, task: Option<BlockedTask> }
 
-        let tcp = TcpWatcher::new(loop_);
+        let tcp = TcpWatcher::new(io);
         let ret = socket_addr_as_sockaddr(address, |addr| {
             let mut req = Request::new(uvll::UV_CONNECT);
             let result = unsafe {
@@ -213,14 +216,13 @@ struct Ctx { status: c_int, task: Option<BlockedTask> }
             assert!(status != uvll::ECANCELED);
             let cx: &mut Ctx = unsafe { req.get_data() };
             cx.status = status;
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
+            wakeup(&mut cx.task);
         }
     }
 }
 
 impl HomingIO for TcpWatcher {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl rtio::RtioSocket for TcpWatcher {
@@ -290,17 +292,19 @@ fn drop(&mut self) {
 // TCP listeners (unbound servers)
 
 impl TcpListener {
-    pub fn bind(loop_: &mut Loop, address: SocketAddr)
+    pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
                 -> Result<~TcpListener, UvError> {
         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
         assert_eq!(unsafe {
-            uvll::uv_tcp_init(loop_.handle, handle)
+            uvll::uv_tcp_init(io.uv_loop(), handle)
         }, 0);
+        let (port, chan) = Chan::new();
         let l = ~TcpListener {
-            home: get_handle_to_current_scheduler!(),
+            home: io.make_handle(),
             handle: handle,
             closing_task: None,
-            outgoing: Tube::new(),
+            outgoing: chan,
+            incoming: port,
         };
         let res = socket_addr_as_sockaddr(address, |addr| unsafe {
             uvll::uv_tcp_bind(l.handle, addr)
@@ -313,7 +317,7 @@ pub fn bind(loop_: &mut Loop, address: SocketAddr)
 }
 
 impl HomingIO for TcpListener {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
@@ -330,11 +334,7 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 impl rtio::RtioTcpListener for TcpListener {
     fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
         // create the acceptor object from ourselves
-        let incoming = self.outgoing.clone();
-        let mut acceptor = ~TcpAcceptor {
-            listener: self,
-            incoming: incoming,
-        };
+        let mut acceptor = ~TcpAcceptor { listener: self };
 
         let _m = acceptor.fire_homing_missile();
         // XXX: the 128 backlog should be configurable
@@ -347,19 +347,18 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
 
 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
     assert!(status != uvll::ECANCELED);
+    let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
     let msg = match status {
         0 => {
             let loop_ = Loop::wrap(unsafe {
                 uvll::get_loop_for_uv_handle(server)
             });
-            let client = TcpWatcher::new(&loop_);
+            let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
             Ok(~client as ~rtio::RtioTcpStream)
         }
         n => Err(uv_error_to_io_error(UvError(n)))
     };
-
-    let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
     tcp.outgoing.send(msg);
 }
 
@@ -373,7 +372,7 @@ fn drop(&mut self) {
 // TCP acceptors (bound servers)
 
 impl HomingIO for TcpAcceptor {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
 }
 
 impl rtio::RtioSocket for TcpAcceptor {
@@ -385,8 +384,7 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 
 impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
-        let _m = self.fire_homing_missile();
-        self.incoming.recv()
+        self.listener.incoming.recv()
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
@@ -410,18 +408,18 @@ fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
 
 pub struct UdpWatcher {
     handle: *uvll::uv_udp_t,
-    home: SchedHandle,
+    home: HomeHandle,
 }
 
 impl UdpWatcher {
-    pub fn bind(loop_: &Loop, address: SocketAddr)
+    pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
                 -> Result<UdpWatcher, UvError> {
         let udp = UdpWatcher {
             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
-            home: get_handle_to_current_scheduler!(),
+            home: io.make_handle(),
         };
         assert_eq!(unsafe {
-            uvll::uv_udp_init(loop_.handle, udp.handle)
+            uvll::uv_udp_init(io.uv_loop(), udp.handle)
         }, 0);
         let result = socket_addr_as_sockaddr(address, |addr| unsafe {
             uvll::uv_udp_bind(udp.handle, addr, 0u32)
@@ -438,7 +436,7 @@ fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
 }
 
 impl HomingIO for UdpWatcher {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl rtio::RtioSocket for UdpWatcher {
@@ -519,9 +517,7 @@ struct Ctx {
                 Some(sockaddr_to_socket_addr(addr))
             };
             cx.result = Some((nread, addr));
-
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+            wakeup(&mut cx.task);
         }
     }
 
@@ -556,9 +552,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
             assert!(status != uvll::ECANCELED);
             let cx: &mut Ctx = unsafe { req.get_data() };
             cx.result = status;
-
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+            wakeup(&mut cx.task);
         }
     }
 
@@ -646,12 +640,10 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use std::rt::test::*;
     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
                         RtioUdpSocket};
     use std::task;
 
-    use super::*;
     use super::super::local_loop;
 
     #[test]
@@ -824,7 +816,6 @@ fn udp_recv_ip6() {
 
     #[test]
     fn test_read_read_read() {
-        use std::rt::rtio::*;
         let addr = next_test_ip4();
         static MAX: uint = 5000;
         let (port, chan) = Chan::new();
index 814205cbbf1ccf8f3233a24971b52ba2cca0efd5..6975ef26bd764b2dde4723fd6b1325482a922a65 100644 (file)
@@ -9,35 +9,33 @@
 // except according to those terms.
 
 use std::c_str::CString;
-use std::libc;
-use std::rt::BlockedTask;
 use std::io::IoError;
-use std::rt::local::Local;
+use std::libc;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
+use std::rt::task::BlockedTask;
 
+use homing::{HomingIO, HomeHandle};
 use stream::StreamWatcher;
 use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
-            wait_until_woken_after};
-use uvio::HomingIO;
+            wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
 use uvll;
 
 pub struct PipeWatcher {
     stream: StreamWatcher,
-    home: SchedHandle,
+    home: HomeHandle,
     priv defused: bool,
 }
 
 pub struct PipeListener {
-    home: SchedHandle,
+    home: HomeHandle,
     pipe: *uvll::uv_pipe_t,
-    priv outgoing: Tube<Result<~RtioPipe, IoError>>,
+    priv outgoing: Chan<Result<~RtioPipe, IoError>>,
+    priv incoming: Port<Result<~RtioPipe, IoError>>,
 }
 
 pub struct PipeAcceptor {
     listener: ~PipeListener,
-    priv incoming: Tube<Result<~RtioPipe, IoError>>,
 }
 
 // PipeWatcher implementation and traits
@@ -46,7 +44,12 @@ impl PipeWatcher {
     // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
     // get bound to some other source (this is normally a helper method paired
     // with another call).
-    pub fn new(loop_: &Loop, ipc: bool) -> PipeWatcher {
+    pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
+        let home = io.make_handle();
+        PipeWatcher::new_home(&io.loop_, home, ipc)
+    }
+
+    pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
         let handle = unsafe {
             let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
             assert!(!handle.is_null());
@@ -56,26 +59,28 @@ pub fn new(loop_: &Loop, ipc: bool) -> PipeWatcher {
         };
         PipeWatcher {
             stream: StreamWatcher::new(handle),
-            home: get_handle_to_current_scheduler!(),
+            home: home,
             defused: false,
         }
     }
 
-    pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
+    pub fn open(io: &mut UvIoFactory, file: libc::c_int)
+        -> Result<PipeWatcher, UvError>
     {
-        let pipe = PipeWatcher::new(loop_, false);
+        let pipe = PipeWatcher::new(io, false);
         match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
             0 => Ok(pipe),
             n => Err(UvError(n))
         }
     }
 
-    pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
+    pub fn connect(io: &mut UvIoFactory, name: &CString)
+        -> Result<PipeWatcher, UvError>
     {
         struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
         let mut cx = Ctx { task: None, result: 0 };
         let mut req = Request::new(uvll::UV_CONNECT);
-        let pipe = PipeWatcher::new(loop_, false);
+        let pipe = PipeWatcher::new(io, false);
 
         wait_until_woken_after(&mut cx.task, || {
             unsafe {
@@ -97,8 +102,7 @@ struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
             assert!(status != uvll::ECANCELED);
             let cx: &mut Ctx = unsafe { req.get_data() };
             cx.result = status;
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+            wakeup(&mut cx.task);
         }
     }
 
@@ -125,7 +129,7 @@ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
 }
 
 impl HomingIO for PipeWatcher {
-    fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+    fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
@@ -144,8 +148,10 @@ fn drop(&mut self) {
 // PipeListener implementation and traits
 
 impl PipeListener {
-    pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
-        let pipe = PipeWatcher::new(loop_, false);
+    pub fn bind(io: &mut UvIoFactory, name: &CString)
+        -> Result<~PipeListener, UvError>
+    {
+        let pipe = PipeWatcher::new(io, false);
         match unsafe {
             uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
         } {
@@ -153,10 +159,12 @@ pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
                 // If successful, unwrap the PipeWatcher because we control how
                 // we close the pipe differently. We can't rely on
                 // StreamWatcher's default close method.
+                let (port, chan) = Chan::new();
                 let p = ~PipeListener {
-                    home: get_handle_to_current_scheduler!(),
+                    home: io.make_handle(),
                     pipe: pipe.unwrap(),
-                    outgoing: Tube::new(),
+                    incoming: port,
+                    outgoing: chan,
                 };
                 Ok(p.install())
             }
@@ -168,11 +176,7 @@ pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
 impl RtioUnixListener for PipeListener {
     fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
         // create the acceptor object from ourselves
-        let incoming = self.outgoing.clone();
-        let mut acceptor = ~PipeAcceptor {
-            listener: self,
-            incoming: incoming,
-        };
+        let mut acceptor = ~PipeAcceptor { listener: self };
 
         let _m = acceptor.fire_homing_missile();
         // XXX: the 128 backlog should be configurable
@@ -184,7 +188,7 @@ fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
 }
 
 impl HomingIO for PipeListener {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
@@ -193,20 +197,20 @@ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
 
 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
     assert!(status != uvll::ECANCELED);
+
+    let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
     let msg = match status {
         0 => {
             let loop_ = Loop::wrap(unsafe {
                 uvll::get_loop_for_uv_handle(server)
             });
-            let client = PipeWatcher::new(&loop_, false);
+            let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
             Ok(~client as ~RtioPipe)
         }
         n => Err(uv_error_to_io_error(UvError(n)))
     };
-
-    let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
-    pipe.outgoing.send(msg);
+    pipe.outgoing.send_deferred(msg);
 }
 
 impl Drop for PipeListener {
@@ -220,13 +224,12 @@ fn drop(&mut self) {
 
 impl RtioUnixAcceptor for PipeAcceptor {
     fn accept(&mut self) -> Result<~RtioPipe, IoError> {
-        let _m = self.fire_homing_missile();
-        self.incoming.recv()
+        self.listener.incoming.recv()
     }
 }
 
 impl HomingIO for PipeAcceptor {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
 }
 
 #[cfg(test)]
@@ -234,7 +237,6 @@ mod tests {
     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
     use std::rt::test::next_test_unix;
 
-    use super::*;
     use super::super::local_loop;
 
     #[test]
index 9e359e26f03d6abddcb2371af62450d89f3925cb..7b7a16d7084e5aa8da28c5e64d246e369719f410 100644 (file)
@@ -8,32 +8,31 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use std::io::IoError;
+use std::io::process;
 use std::libc::c_int;
 use std::libc;
 use std::ptr;
-use std::rt::BlockedTask;
-use std::io::IoError;
-use std::io::process::*;
-use std::rt::local::Local;
 use std::rt::rtio::RtioProcess;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::task::BlockedTask;
 use std::vec;
 
-use super::{Loop, UvHandle, UvError, uv_error_to_io_error,
-            wait_until_woken_after};
-use uvio::HomingIO;
-use uvll;
+use homing::{HomingIO, HomeHandle};
 use pipe::PipeWatcher;
+use super::{UvHandle, UvError, uv_error_to_io_error,
+            wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
+use uvll;
 
 pub struct Process {
     handle: *uvll::uv_process_t,
-    home: SchedHandle,
+    home: HomeHandle,
 
     /// Task to wake up (may be null) for when the process exits
     to_wake: Option<BlockedTask>,
 
     /// Collected from the exit_cb
-    exit_status: Option<ProcessExit>,
+    exit_status: Option<process::ProcessExit>,
 }
 
 impl Process {
@@ -41,7 +40,7 @@ impl Process {
     ///
     /// Returns either the corresponding process object or an error which
     /// occurred.
-    pub fn spawn(loop_: &Loop, config: ProcessConfig)
+    pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
                 -> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
     {
         let cwd = config.cwd.map(|s| s.to_c_str());
@@ -52,7 +51,7 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
             stdio.set_len(io.len());
             for (slot, other) in stdio.iter().zip(io.iter()) {
                 let io = set_stdio(slot as *uvll::uv_stdio_container_t, other,
-                                   loop_);
+                                   io_loop);
                 ret_io.push(io);
             }
         }
@@ -78,12 +77,12 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
                 let handle = UvHandle::alloc(None::<Process>, uvll::UV_PROCESS);
                 let process = ~Process {
                     handle: handle,
-                    home: get_handle_to_current_scheduler!(),
+                    home: io_loop.make_handle(),
                     to_wake: None,
                     exit_status: None,
                 };
                 match unsafe {
-                    uvll::uv_spawn(loop_.handle, handle, &options)
+                    uvll::uv_spawn(io_loop.uv_loop(), handle, &options)
                 } {
                     0 => Ok(process.install()),
                     err => Err(UvError(err)),
@@ -105,33 +104,28 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
 
     assert!(p.exit_status.is_none());
     p.exit_status = Some(match term_signal {
-        0 => ExitStatus(exit_status as int),
-        n => ExitSignal(n as int),
+        0 => process::ExitStatus(exit_status as int),
+        n => process::ExitSignal(n as int),
     });
 
-    match p.to_wake.take() {
-        Some(task) => {
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.resume_blocked_task_immediately(task);
-        }
-        None => {}
-    }
+    if p.to_wake.is_none() { return }
+    wakeup(&mut p.to_wake);
 }
 
 unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
-                    io: &StdioContainer,
-                    loop_: &Loop) -> Option<PipeWatcher> {
+                    io: &process::StdioContainer,
+                    io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
     match *io {
-        Ignored => {
+        process::Ignored => {
             uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
             None
         }
-        InheritFd(fd) => {
+        process::InheritFd(fd) => {
             uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
             uvll::set_stdio_container_fd(dst, fd);
             None
         }
-        CreatePipe(readable, writable) => {
+        process::CreatePipe(readable, writable) => {
             let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
             if readable {
                 flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
@@ -139,7 +133,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
             if writable {
                 flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
             }
-            let pipe = PipeWatcher::new(loop_, false);
+            let pipe = PipeWatcher::new(io_loop, false);
             uvll::set_stdio_container_flags(dst, flags);
             uvll::set_stdio_container_stream(dst, pipe.handle());
             Some(pipe)
@@ -186,7 +180,7 @@ fn with_env<T>(env: Option<&[(~str, ~str)]>, f: |**libc::c_char| -> T) -> T {
 }
 
 impl HomingIO for Process {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_process_t> for Process {
@@ -208,7 +202,7 @@ fn kill(&mut self, signal: int) -> Result<(), IoError> {
         }
     }
 
-    fn wait(&mut self) -> ProcessExit {
+    fn wait(&mut self) -> process::ProcessExit {
         // Make sure (on the home scheduler) that we have an exit status listed
         let _m = self.fire_homing_missile();
         match self.exit_status {
diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs
new file mode 100644 (file)
index 0000000..22e7925
--- /dev/null
@@ -0,0 +1,184 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A concurrent queue used to signal remote event loops
+//!
+//! This queue implementation is used to send tasks among event loops. This is
+//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
+//! handles (to wake up a remote event loop).
+//!
+//! The uv_async_t is stored next to the event loop, so in order to not keep the
+//! event loop alive we use uv_ref and uv_unref in order to control when the
+//! async handle is active or not.
+
+use std::cast;
+use std::libc::{c_void, c_int};
+use std::rt::task::BlockedTask;
+use std::unstable::sync::LittleLock;
+use mpsc = std::sync::mpsc_queue;
+
+use async::AsyncWatcher;
+use super::{Loop, UvHandle};
+use uvll;
+
+enum Message {
+    Task(BlockedTask),
+    Increment,
+    Decrement,
+}
+
+struct State {
+    handle: *uvll::uv_async_t,
+    lock: LittleLock, // see comments in async_cb for why this is needed
+}
+
+/// This structure is intended to be stored next to the event loop, and it is
+/// used to create new `Queue` structures.
+pub struct QueuePool {
+    priv producer: mpsc::Producer<Message, State>,
+    priv consumer: mpsc::Consumer<Message, State>,
+    priv refcnt: uint,
+}
+
+/// This type is used to send messages back to the original event loop.
+pub struct Queue {
+    priv queue: mpsc::Producer<Message, State>,
+}
+
+extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
+    assert_eq!(status, 0);
+    let state: &mut QueuePool = unsafe {
+        cast::transmute(uvll::get_data_for_uv_handle(handle))
+    };
+    let packet = unsafe { state.consumer.packet() };
+
+    // Remember that there is no guarantee about how many times an async
+    // callback is called with relation to the number of sends, so process the
+    // entire queue in a loop.
+    loop {
+        match state.consumer.pop() {
+            mpsc::Data(Task(task)) => {
+                task.wake().map(|t| t.reawaken(true));
+            }
+            mpsc::Data(Increment) => unsafe {
+                if state.refcnt == 0 {
+                    uvll::uv_ref((*packet).handle);
+                }
+                state.refcnt += 1;
+            },
+            mpsc::Data(Decrement) => unsafe {
+                state.refcnt -= 1;
+                if state.refcnt == 0 {
+                    uvll::uv_unref((*packet).handle);
+                }
+            },
+            mpsc::Empty | mpsc::Inconsistent => break
+        };
+    }
+
+    // If the refcount is now zero after processing the queue, then there is no
+    // longer a reference on the async handle and it is possible that this event
+    // loop can exit. What we're not guaranteed, however, is that a producer in
+    // the middle of dropping itself is yet done with the handle. It could be
+    // possible that we saw their Decrement message but they have yet to signal
+    // on the async handle. If we were to return immediately, the entire uv loop
+    // could be destroyed meaning the call to uv_async_send would abort()
+    //
+    // In order to fix this, an OS mutex is used to wait for the other end to
+    // finish before we continue. The drop block on a handle will acquire a
+    // mutex and then drop it after both the push and send have been completed.
+    // If we acquire the mutex here, then we are guaranteed that there are no
+    // longer any senders which are holding on to their handles, so we can
+    // safely allow the event loop to exit.
+    if state.refcnt == 0 {
+        unsafe {
+            let _l = (*packet).lock.lock();
+        }
+    }
+}
+
+impl QueuePool {
+    pub fn new(loop_: &mut Loop) -> ~QueuePool {
+        let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
+        let (c, p) = mpsc::queue(State {
+            handle: handle,
+            lock: LittleLock::new(),
+        });
+        let q = ~QueuePool {
+            producer: p,
+            consumer: c,
+            refcnt: 0,
+        };
+
+        unsafe {
+            assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
+            uvll::uv_unref(handle);
+            let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q);
+            uvll::set_data_for_uv_handle(handle, data);
+        }
+
+        return q;
+    }
+
+    pub fn queue(&mut self) -> Queue {
+        unsafe {
+            if self.refcnt == 0 {
+                uvll::uv_ref((*self.producer.packet()).handle);
+            }
+            self.refcnt += 1;
+        }
+        Queue { queue: self.producer.clone() }
+    }
+}
+
+impl Queue {
+    pub fn push(&mut self, task: BlockedTask) {
+        self.queue.push(Task(task));
+        unsafe {
+            uvll::uv_async_send((*self.queue.packet()).handle);
+        }
+    }
+}
+
+impl Clone for Queue {
+    fn clone(&self) -> Queue {
+        // Push a request to increment on the queue, but there's no need to
+        // signal the event loop to process it at this time. We're guaranteed
+        // that the count is at least one (because we have a queue right here),
+        // and if the queue is dropped later on it'll see the increment for the
+        // decrement anyway.
+        unsafe {
+            cast::transmute_mut(self).queue.push(Increment);
+        }
+        Queue { queue: self.queue.clone() }
+    }
+}
+
+impl Drop for Queue {
+    fn drop(&mut self) {
+        // See the comments in the async_cb function for why there is a lock
+        // that is acquired only on a drop.
+        unsafe {
+            let state = self.queue.packet();
+            let _l = (*state).lock.lock();
+            self.queue.push(Decrement);
+            uvll::uv_async_send((*state).handle);
+        }
+    }
+}
+
+impl Drop for State {
+    fn drop(&mut self) {
+        unsafe {
+            uvll::uv_close(self.handle, cast::transmute(0));
+            uvll::free_handle(self.handle);
+        }
+    }
+}
index f082aef003c60bcf00f1526999b551ced7984991..27dbc0fe3bb08769091515b0883e094a3795c549 100644 (file)
 
 use std::libc::c_int;
 use std::io::signal::Signum;
-use std::rt::sched::{SchedHandle, Scheduler};
 use std::comm::SharedChan;
-use std::rt::local::Local;
 use std::rt::rtio::RtioSignal;
 
-use super::{Loop, UvError, UvHandle};
+use homing::{HomingIO, HomeHandle};
+use super::{UvError, UvHandle};
 use uvll;
-use uvio::HomingIO;
+use uvio::UvIoFactory;
 
 pub struct SignalWatcher {
     handle: *uvll::uv_signal_t,
-    home: SchedHandle,
+    home: HomeHandle,
 
     channel: SharedChan<Signum>,
     signal: Signum,
 }
 
 impl SignalWatcher {
-    pub fn new(loop_: &mut Loop, signum: Signum,
+    pub fn new(io: &mut UvIoFactory, signum: Signum,
                channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
         let s = ~SignalWatcher {
             handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
-            home: get_handle_to_current_scheduler!(),
+            home: io.make_handle(),
             channel: channel,
             signal: signum,
         };
         assert_eq!(unsafe {
-            uvll::uv_signal_init(loop_.handle, s.handle)
+            uvll::uv_signal_init(io.uv_loop(), s.handle)
         }, 0);
 
         match unsafe {
@@ -57,7 +56,7 @@ pub fn new(loop_: &mut Loop, signum: Signum,
 }
 
 impl HomingIO for SignalWatcher {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_signal_t> for SignalWatcher {
@@ -75,7 +74,6 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use super::*;
     use super::super::local_loop;
     use std::io::signal;
 
index 0304b89dd6fdeae36e4206dec97a614452ebae25..73173fc677e8f786c782afc3388e0fd7b6132d76 100644 (file)
 use std::cast;
 use std::libc::{c_int, size_t, ssize_t};
 use std::ptr;
-use std::rt::BlockedTask;
-use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::BlockedTask;
 
 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
-            ForbidUnwind};
+            ForbidUnwind, wakeup};
 use uvll;
 
 // This is a helper structure which is intended to get embedded into other
@@ -164,8 +162,7 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
     unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
     rcx.result = nread;
 
-    let scheduler: ~Scheduler = Local::take();
-    scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
+    wakeup(&mut rcx.task);
 }
 
 // Unlike reading, the WriteContext is stored in the uv_write_t request. Like
@@ -180,6 +177,5 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
     wcx.result = status;
     req.defuse();
 
-    let sched: ~Scheduler = Local::take();
-    sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+    wakeup(&mut wcx.task);
 }
index ab143d6e8b077ada20042d4497e0cc8381f167e3..1e70c5c55e06c5b01c29c97078e1737d7ec4e0f1 100644 (file)
@@ -9,19 +9,19 @@
 // except according to those terms.
 
 use std::libc::c_int;
-use std::rt::BlockedTask;
 use std::rt::local::Local;
 use std::rt::rtio::RtioTimer;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::task::{BlockedTask, Task};
 use std::util;
 
+use homing::{HomeHandle, HomingIO};
+use super::{UvHandle, ForbidUnwind, ForbidSwitch};
+use uvio::UvIoFactory;
 use uvll;
-use super::{Loop, UvHandle, ForbidUnwind, ForbidSwitch};
-use uvio::HomingIO;
 
 pub struct TimerWatcher {
     handle: *uvll::uv_timer_t,
-    home: SchedHandle,
+    home: HomeHandle,
     action: Option<NextAction>,
     id: uint, // see comments in timer_cb
 }
@@ -33,15 +33,15 @@ pub enum NextAction {
 }
 
 impl TimerWatcher {
-    pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
+    pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
         let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
         assert_eq!(unsafe {
-            uvll::uv_timer_init(loop_.handle, handle)
+            uvll::uv_timer_init(io.uv_loop(), handle)
         }, 0);
         let me = ~TimerWatcher {
             handle: handle,
             action: None,
-            home: get_handle_to_current_scheduler!(),
+            home: io.make_handle(),
             id: 0,
         };
         return me.install();
@@ -59,7 +59,7 @@ fn stop(&mut self) {
 }
 
 impl HomingIO for TimerWatcher {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
 
 impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
@@ -89,10 +89,11 @@ fn sleep(&mut self, msecs: u64) {
         // started, then we need to call stop on the timer.
         let _f = ForbidUnwind::new("timer");
 
-        let sched: ~Scheduler = Local::take();
-        sched.deschedule_running_task_and_then(|_sched, task| {
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
             self.action = Some(WakeTask(task));
             self.start(msecs, 0);
+            Ok(())
         });
         self.stop();
     }
@@ -137,8 +138,7 @@ fn period(&mut self, msecs: u64) -> Port<()> {
 
     match timer.action.take_unwrap() {
         WakeTask(task) => {
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(task);
+            task.wake().map(|t| t.reawaken(true));
         }
         SendOnce(chan) => { chan.try_send_deferred(()); }
         SendMany(chan, id) => {
@@ -177,7 +177,6 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use super::*;
     use std::rt::rtio::RtioTimer;
     use super::super::local_loop;
 
index fcad6296579960b5b3bd1bc6bf72ceb88b4f39c4..0e76ed9feb93dfe78985e3cda7372a6063ae5e66 100644 (file)
 
 use std::libc;
 use std::io::IoError;
-use std::rt::local::Local;
 use std::rt::rtio::RtioTTY;
-use std::rt::sched::{Scheduler, SchedHandle};
 
+use homing::{HomingIO, HomeHandle};
 use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
-use uvio::HomingIO;
+use super::{UvError, UvHandle, uv_error_to_io_error};
+use uvio::UvIoFactory;
 use uvll;
 
 pub struct TtyWatcher{
     tty: *uvll::uv_tty_t,
     stream: StreamWatcher,
-    home: SchedHandle,
+    home: HomeHandle,
     fd: libc::c_int,
 }
 
 impl TtyWatcher {
-    pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
+    pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool)
         -> Result<TtyWatcher, UvError>
     {
         // libuv may succeed in giving us a handle (via uv_tty_init), but if the
@@ -56,14 +55,14 @@ pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
         // with attempting to open it as a tty.
         let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
         match unsafe {
-            uvll::uv_tty_init(loop_.handle, handle, fd as libc::c_int,
+            uvll::uv_tty_init(io.uv_loop(), handle, fd as libc::c_int,
                               readable as libc::c_int)
         } {
             0 => {
                 Ok(TtyWatcher {
                     tty: handle,
                     stream: StreamWatcher::new(handle),
-                    home: get_handle_to_current_scheduler!(),
+                    home: io.make_handle(),
                     fd: fd,
                 })
             }
@@ -120,7 +119,7 @@ fn uv_handle(&self) -> *uvll::uv_tty_t { self.tty }
 }
 
 impl HomingIO for TtyWatcher {
-    fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+    fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
 }
 
 impl Drop for TtyWatcher {
index c556b96671ab63c9dc24167b508c737c6db6bd2d..52e0b5ed77b261760f18e9d6c00e29a7ece2190f 100644 (file)
 // except according to those terms.
 
 use std::c_str::CString;
+use std::cast;
 use std::comm::SharedChan;
-use std::libc::c_int;
-use std::libc;
-use std::path::Path;
 use std::io::IoError;
 use std::io::net::ip::SocketAddr;
 use std::io::process::ProcessConfig;
-use std::io;
-use std::rt::local::Local;
-use std::rt::rtio::*;
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::task::Task;
-use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
-                S_IRUSR, S_IWUSR};
-use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
-                  ReadWrite, FileStat};
 use std::io::signal::Signum;
+use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
+              ReadWrite, FileStat};
+use std::io;
+use std::libc::c_int;
+use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
+                S_IWUSR};
+use std::libc;
+use std::path::Path;
+use std::rt::rtio;
+use std::rt::rtio::IoFactory;
 use ai = std::io::net::addrinfo;
 
 #[cfg(test)] use std::unstable::run_in_bare_thread;
 
-use super::*;
-use addrinfo::GetAddrInfoRequest;
-
-pub trait HomingIO {
-
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
-
-    /// This function will move tasks to run on their home I/O scheduler. Note
-    /// that this function does *not* pin the task to the I/O scheduler, but
-    /// rather it simply moves it to running on the I/O scheduler.
-    fn go_to_IO_home(&mut self) -> uint {
-        use std::rt::sched::RunOnce;
-
-        let _f = ForbidUnwind::new("going home");
-
-        let current_sched_id = {
-            let mut sched = Local::borrow(None::<Scheduler>);
-            sched.get().sched_id()
-        };
-
-        // Only need to invoke a context switch if we're not on the right
-        // scheduler.
-        if current_sched_id != self.home().sched_id {
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.deschedule_running_task_and_then(|_, task| {
-                task.wake().map(|task| {
-                    self.home().send(RunOnce(task));
-                });
-            })
-        }
-        let current_sched_id = {
-            let mut sched = Local::borrow(None::<Scheduler>);
-            sched.get().sched_id()
-        };
-        assert!(current_sched_id == self.home().sched_id);
-
-        self.home().sched_id
-    }
+use super::{uv_error_to_io_error, Loop};
 
-    /// Fires a single homing missile, returning another missile targeted back
-    /// at the original home of this task. In other words, this function will
-    /// move the local task to its I/O scheduler and then return an RAII wrapper
-    /// which will return the task home.
-    fn fire_homing_missile(&mut self) -> HomingMissile {
-        HomingMissile { io_home: self.go_to_IO_home() }
-    }
-
-    /// Same as `fire_homing_missile`, but returns the local I/O scheduler as
-    /// well (the one that was homed to).
-    fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
-        // First, transplant ourselves to the home I/O scheduler
-        let missile = self.fire_homing_missile();
-        // Next (must happen next), grab the local I/O scheduler
-        let io_sched: ~Scheduler = Local::take();
-
-        (missile, io_sched)
-    }
-}
-
-/// After a homing operation has been completed, this will return the current
-/// task back to its appropriate home (if applicable). The field is used to
-/// assert that we are where we think we are.
-struct HomingMissile {
-    priv io_home: uint,
-}
-
-impl HomingMissile {
-    pub fn check(&self, msg: &'static str) {
-        let mut sched = Local::borrow(None::<Scheduler>);
-        let local_id = sched.get().sched_id();
-        assert!(local_id == self.io_home, "{}", msg);
-    }
-}
-
-impl Drop for HomingMissile {
-    fn drop(&mut self) {
-        let _f = ForbidUnwind::new("leaving home");
-
-        // It would truly be a sad day if we had moved off the home I/O
-        // scheduler while we were doing I/O.
-        self.check("task moved away from the home scheduler");
-
-        // If we were a homed task, then we must send ourselves back to the
-        // original scheduler. Otherwise, we can just return and keep running
-        if !Task::on_appropriate_sched() {
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.deschedule_running_task_and_then(|_, task| {
-                task.wake().map(|task| {
-                    Scheduler::run_task(task);
-                });
-            })
-        }
-    }
-}
+use addrinfo::GetAddrInfoRequest;
+use async::AsyncWatcher;
+use file::{FsRequest, FileWatcher};
+use queue::QueuePool;
+use homing::HomeHandle;
+use idle::IdleWatcher;
+use net::{TcpWatcher, TcpListener, UdpWatcher};
+use pipe::{PipeWatcher, PipeListener};
+use process::Process;
+use signal::SignalWatcher;
+use timer::TimerWatcher;
+use tty::TtyWatcher;
+use uvll;
 
 // Obviously an Event Loop is always home.
 pub struct UvEventLoop {
@@ -132,45 +52,52 @@ pub struct UvEventLoop {
 
 impl UvEventLoop {
     pub fn new() -> UvEventLoop {
+        let mut loop_ = Loop::new();
+        let handle_pool = QueuePool::new(&mut loop_);
         UvEventLoop {
-            uvio: UvIoFactory(Loop::new())
+            uvio: UvIoFactory {
+                loop_: loop_,
+                handle_pool: handle_pool,
+            }
         }
     }
 }
 
 impl Drop for UvEventLoop {
     fn drop(&mut self) {
-        self.uvio.uv_loop().close();
+        self.uvio.loop_.close();
     }
 }
 
-impl EventLoop for UvEventLoop {
+impl rtio::EventLoop for UvEventLoop {
     fn run(&mut self) {
-        self.uvio.uv_loop().run();
+        self.uvio.loop_.run();
     }
 
     fn callback(&mut self, f: proc()) {
-        IdleWatcher::onetime(self.uvio.uv_loop(), f);
+        IdleWatcher::onetime(&mut self.uvio.loop_, f);
     }
 
-    fn pausable_idle_callback(&mut self, cb: ~Callback) -> ~PausableIdleCallback {
-        IdleWatcher::new(self.uvio.uv_loop(), cb) as ~PausableIdleCallback
+    fn pausible_idle_callback(&mut self, cb: ~rtio::Callback)
+        -> ~rtio::PausibleIdleCallback
+    {
+        IdleWatcher::new(&mut self.uvio.loop_, cb) as ~rtio::PausibleIdleCallback
     }
 
-    fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
-        ~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback
+    fn remote_callback(&mut self, f: ~rtio::Callback) -> ~rtio::RemoteCallback {
+        ~AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback
     }
 
-    fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> {
-        let factory = &mut self.uvio as &mut IoFactory;
+    fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
+        let factory = &mut self.uvio as &mut rtio::IoFactory;
         Some(factory)
     }
 }
 
 #[cfg(not(test))]
 #[lang = "event_loop_factory"]
-pub extern "C" fn new_loop() -> ~EventLoop {
-    ~UvEventLoop::new() as ~EventLoop
+pub extern "C" fn new_loop() -> ~rtio::EventLoop {
+    ~UvEventLoop::new() as ~rtio::EventLoop
 }
 
 #[test]
@@ -187,59 +114,65 @@ fn test_callback_run_once() {
     }
 }
 
-pub struct UvIoFactory(Loop);
+pub struct UvIoFactory {
+    loop_: Loop,
+    priv handle_pool: ~QueuePool,
+}
 
 impl UvIoFactory {
-    pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
-        match self { &UvIoFactory(ref mut ptr) => ptr }
+    pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
+
+    pub fn make_handle(&mut self) -> HomeHandle {
+        HomeHandle::new(self.id(), &mut *self.handle_pool)
     }
 }
 
 impl IoFactory for UvIoFactory {
+    fn id(&self) -> uint { unsafe { cast::transmute(self) } }
+
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
     fn tcp_connect(&mut self, addr: SocketAddr)
-        -> Result<~RtioTcpStream, IoError>
+        -> Result<~rtio::RtioTcpStream, IoError>
     {
-        match TcpWatcher::connect(self.uv_loop(), addr) {
-            Ok(t) => Ok(~t as ~RtioTcpStream),
+        match TcpWatcher::connect(self, addr) {
+            Ok(t) => Ok(~t as ~rtio::RtioTcpStream),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
-        match TcpListener::bind(self.uv_loop(), addr) {
-            Ok(t) => Ok(t as ~RtioTcpListener),
+    fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener, IoError> {
+        match TcpListener::bind(self, addr) {
+            Ok(t) => Ok(t as ~rtio::RtioTcpListener),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
-        match UdpWatcher::bind(self.uv_loop(), addr) {
-            Ok(u) => Ok(~u as ~RtioUdpSocket),
+    fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket, IoError> {
+        match UdpWatcher::bind(self, addr) {
+            Ok(u) => Ok(~u as ~rtio::RtioUdpSocket),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
-        Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer)
+    fn timer_init(&mut self) -> Result<~rtio::RtioTimer, IoError> {
+        Ok(TimerWatcher::new(self) as ~rtio::RtioTimer)
     }
 
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
                           hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> {
-        let r = GetAddrInfoRequest::run(self.uv_loop(), host, servname, hint);
+        let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
         r.map_err(uv_error_to_io_error)
     }
 
     fn fs_from_raw_fd(&mut self, fd: c_int,
-                      close: CloseBehavior) -> ~RtioFileStream {
-        let loop_ = Loop::wrap(self.uv_loop().handle);
-        ~FileWatcher::new(loop_, fd, close) as ~RtioFileStream
+                      close: rtio::CloseBehavior) -> ~rtio::RtioFileStream {
+        ~FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream
     }
 
     fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
-        -> Result<~RtioFileStream, IoError> {
+        -> Result<~rtio::RtioFileStream, IoError> {
         let flags = match fm {
             io::Open => 0,
             io::Append => libc::O_APPEND,
@@ -254,117 +187,117 @@ fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
                               libc::S_IRUSR | libc::S_IWUSR),
         };
 
-        match FsRequest::open(self.uv_loop(), path, flags as int, mode as int) {
-            Ok(fs) => Ok(~fs as ~RtioFileStream),
+        match FsRequest::open(self, path, flags as int, mode as int) {
+            Ok(fs) => Ok(~fs as ~rtio::RtioFileStream),
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
     fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
-        let r = FsRequest::unlink(self.uv_loop(), path);
+        let r = FsRequest::unlink(&self.loop_, path);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
-        let r = FsRequest::lstat(self.uv_loop(), path);
+        let r = FsRequest::lstat(&self.loop_, path);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
-        let r = FsRequest::stat(self.uv_loop(), path);
+        let r = FsRequest::stat(&self.loop_, path);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_mkdir(&mut self, path: &CString,
                 perm: io::FilePermission) -> Result<(), IoError> {
-        let r = FsRequest::mkdir(self.uv_loop(), path, perm as c_int);
+        let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
-        let r = FsRequest::rmdir(self.uv_loop(), path);
+        let r = FsRequest::rmdir(&self.loop_, path);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
-        let r = FsRequest::rename(self.uv_loop(), path, to);
+        let r = FsRequest::rename(&self.loop_, path, to);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_chmod(&mut self, path: &CString,
                 perm: io::FilePermission) -> Result<(), IoError> {
-        let r = FsRequest::chmod(self.uv_loop(), path, perm as c_int);
+        let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_readdir(&mut self, path: &CString, flags: c_int)
         -> Result<~[Path], IoError>
     {
-        let r = FsRequest::readdir(self.uv_loop(), path, flags);
+        let r = FsRequest::readdir(&self.loop_, path, flags);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
-        let r = FsRequest::link(self.uv_loop(), src, dst);
+        let r = FsRequest::link(&self.loop_, src, dst);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
-        let r = FsRequest::symlink(self.uv_loop(), src, dst);
+        let r = FsRequest::symlink(&self.loop_, src, dst);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
-        let r = FsRequest::chown(self.uv_loop(), path, uid, gid);
+        let r = FsRequest::chown(&self.loop_, path, uid, gid);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
-        let r = FsRequest::readlink(self.uv_loop(), path);
+        let r = FsRequest::readlink(&self.loop_, path);
         r.map_err(uv_error_to_io_error)
     }
     fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
         -> Result<(), IoError>
     {
-        let r = FsRequest::utime(self.uv_loop(), path, atime, mtime);
+        let r = FsRequest::utime(&self.loop_, path, atime, mtime);
         r.map_err(uv_error_to_io_error)
     }
 
     fn spawn(&mut self, config: ProcessConfig)
-            -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>
+            -> Result<(~rtio::RtioProcess, ~[Option<~rtio::RtioPipe>]), IoError>
     {
-        match Process::spawn(self.uv_loop(), config) {
+        match Process::spawn(self, config) {
             Ok((p, io)) => {
-                Ok((p as ~RtioProcess,
-                    io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect()))
+                Ok((p as ~rtio::RtioProcess,
+                    io.move_iter().map(|i| i.map(|p| ~p as ~rtio::RtioPipe)).collect()))
             }
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError>
+    fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener, IoError>
     {
-        match PipeListener::bind(self.uv_loop(), path) {
-            Ok(p) => Ok(p as ~RtioUnixListener),
+        match PipeListener::bind(self, path) {
+            Ok(p) => Ok(p as ~rtio::RtioUnixListener),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
-        match PipeWatcher::connect(self.uv_loop(), path) {
-            Ok(p) => Ok(~p as ~RtioPipe),
+    fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe, IoError> {
+        match PipeWatcher::connect(self, path) {
+            Ok(p) => Ok(~p as ~rtio::RtioPipe),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
     fn tty_open(&mut self, fd: c_int, readable: bool)
-            -> Result<~RtioTTY, IoError> {
-        match TtyWatcher::new(self.uv_loop(), fd, readable) {
-            Ok(tty) => Ok(~tty as ~RtioTTY),
+            -> Result<~rtio::RtioTTY, IoError> {
+        match TtyWatcher::new(self, fd, readable) {
+            Ok(tty) => Ok(~tty as ~rtio::RtioTTY),
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
-    fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
-        match PipeWatcher::open(self.uv_loop(), fd) {
-            Ok(s) => Ok(~s as ~RtioPipe),
+    fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe, IoError> {
+        match PipeWatcher::open(self, fd) {
+            Ok(s) => Ok(~s as ~rtio::RtioPipe),
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
     fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
-        -> Result<~RtioSignal, IoError> {
-        match SignalWatcher::new(self.uv_loop(), signum, channel) {
-            Ok(s) => Ok(s as ~RtioSignal),
+        -> Result<~rtio::RtioSignal, IoError> {
+        match SignalWatcher::new(self, signum, channel) {
+            Ok(s) => Ok(s as ~rtio::RtioSignal),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
index dea90a40fa9fc37497b87a77419a19555746842e..fa0bb85faed1bb7b36d27ec2be3774efdc6304a6 100644 (file)
@@ -37,7 +37,8 @@
 #[cfg(test)]
 use std::libc::uintptr_t;
 
-pub use self::errors::*;
+pub use self::errors::{EACCES, ECONNREFUSED, ECONNRESET, EPIPE, ECONNABORTED,
+                       ECANCELED, EBADF, ENOTCONN};
 
 pub static OK: c_int = 0;
 pub static EOF: c_int = -4095;
@@ -576,6 +577,8 @@ fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
 
     // generic uv functions
     pub fn uv_loop_delete(l: *uv_loop_t);
+    pub fn uv_ref(t: *uv_handle_t);
+    pub fn uv_unref(t: *uv_handle_t);
     pub fn uv_handle_size(ty: uv_handle_type) -> size_t;
     pub fn uv_req_size(ty: uv_req_type) -> size_t;
     pub fn uv_run(l: *uv_loop_t, mode: uv_run_mode) -> c_int;