use ai = std::io::net::addrinfo;
use std::libc::c_int;
use std::ptr::null;
-use std::rt::BlockedTask;
-use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::BlockedTask;
use net;
-use super::{Loop, UvError, Request, wait_until_woken_after};
+use super::{Loop, UvError, Request, wait_until_woken_after, wakeup};
use uvll;
struct Addrinfo {
cx.status = status;
cx.addrinfo = Some(Addrinfo { handle: res });
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(cx.slot.take_unwrap());
+ wakeup(&mut cx.slot);
}
}
}
#[cfg(test, not(target_os="android"))]
mod test {
use std::io::net::ip::{SocketAddr, Ipv4Addr};
- use super::*;
use super::super::local_loop;
#[test]
use std::rt::thread::Thread;
use std::rt::tube::Tube;
- use super::*;
use super::super::local_loop;
// Make sure that we can fire watchers in remote threads and that they
use std::cast;
use std::libc::{c_int, c_char, c_void, size_t};
use std::libc;
-use std::rt::BlockedTask;
+use std::rt::task::BlockedTask;
use std::io::{FileStat, IoError};
use std::io;
-use std::rt::local::Local;
use std::rt::rtio;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::vec;
-use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
-use uvio::HomingIO;
+use homing::{HomingIO, HomeHandle};
+use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
use uvll;
pub struct FsRequest {
priv loop_: Loop,
priv fd: c_int,
priv close: rtio::CloseBehavior,
- priv home: SchedHandle,
+ priv home: HomeHandle,
}
impl FsRequest {
- pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int)
+ pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
-> Result<FileWatcher, UvError>
{
execute(|req, cb| unsafe {
- uvll::uv_fs_open(loop_.handle,
+ uvll::uv_fs_open(io.uv_loop(),
req, path.with_ref(|p| p), flags as c_int,
mode as c_int, cb)
}).map(|req|
- FileWatcher::new(*loop_, req.get_result() as c_int,
+ FileWatcher::new(io, req.get_result() as c_int,
rtio::CloseSynchronously)
)
}
let slot: &mut Option<BlockedTask> = unsafe {
cast::transmute(uvll::get_data_for_req(req))
};
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(slot.take_unwrap());
+ wakeup(slot);
}
}
}
impl HomingIO for FileWatcher {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl FileWatcher {
- pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher {
+ pub fn new(io: &mut UvIoFactory, fd: c_int,
+ close: rtio::CloseBehavior) -> FileWatcher {
FileWatcher {
- loop_: loop_,
+ loop_: Loop::wrap(io.uv_loop()),
fd: fd,
close: close,
- home: get_handle_to_current_scheduler!()
+ home: io.make_handle(),
}
}
use std::io;
use std::str;
use std::vec;
- use super::*;
use l = super::super::local_loop;
#[test]
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Homing I/O implementation
+//!
+//! In libuv, whenever a handle is created on an I/O loop it is illegal to use
+//! that handle outside of that I/O loop. We use libuv I/O with our green
+//! scheduler, and each green scheduler corresponds to a different I/O loop on a
+//! different OS thread. Green tasks are also free to roam among schedulers,
+//! which implies that it is possible to create an I/O handle on one event loop
+//! and then attempt to use it on another.
+//!
+//! In order to solve this problem, this module implements the notion of a
+//! "homing operation" which will transplant a task from its currently running
+//! scheduler back onto the original I/O loop. This is accomplished entirely at
+//! the librustuv layer with very little cooperation from the scheduler (which
+//! we don't even know exists technically).
+//!
+//! These homing operations are completed by first realizing that we're on the
+//! wrong I/O loop, then descheduling ourselves, sending ourselves to the
+//! correct I/O loop, and then waking up the I/O loop in order to process its
+//! local queue of tasks which need to run.
+//!
+//! This enqueueing is done with a concurrent queue from libstd, and the
+//! signalling is achieved with an async handle.
+
+use std::rt::local::Local;
+use std::rt::rtio::LocalIo;
+use std::rt::task::{Task, BlockedTask};
+
+use ForbidUnwind;
+use queue::{Queue, QueuePool};
+
+/// A handle to a remote libuv event loop. This handle will keep the event loop
+/// alive while active in order to ensure that a homing operation can always be
+/// completed.
+///
+/// Handles are clone-able in order to derive new handles from existing handles
+/// (very useful for when accepting a socket from a server).
+pub struct HomeHandle {
+ priv queue: Queue,
+ priv id: uint,
+}
+
+impl HomeHandle {
+ pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
+ HomeHandle { queue: pool.queue(), id: id }
+ }
+
+ fn send(&mut self, task: BlockedTask) {
+ self.queue.push(task);
+ }
+}
+
+impl Clone for HomeHandle {
+ fn clone(&self) -> HomeHandle {
+ HomeHandle {
+ queue: self.queue.clone(),
+ id: self.id,
+ }
+ }
+}
+
+pub trait HomingIO {
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
+
+ /// This function will move tasks to run on their home I/O scheduler. Note
+ /// that this function does *not* pin the task to the I/O scheduler, but
+ /// rather it simply moves it to running on the I/O scheduler.
+ fn go_to_IO_home(&mut self) -> uint {
+ let _f = ForbidUnwind::new("going home");
+
+ let mut cur_task: ~Task = Local::take();
+ let cur_loop_id = {
+ let mut io = cur_task.local_io().expect("libuv must have I/O");
+ io.get().id()
+ };
+
+ // Try at all costs to avoid the homing operation because it is quite
+ // expensive. Hence, we only deschedule/send if we're not on the correct
+ // event loop. If we're already on the home event loop, then we're good
+ // to go (remember we have no preemption, so we're guaranteed to stay on
+ // this event loop as long as we avoid the scheduler).
+ if cur_loop_id != self.home().id {
+ cur_task.deschedule(1, |task| {
+ self.home().send(task);
+ Ok(())
+ });
+
+ // Once we wake up, assert that we're in the right location
+ let cur_loop_id = {
+ let mut io = LocalIo::borrow().expect("libuv must have I/O");
+ io.get().id()
+ };
+ assert_eq!(cur_loop_id, self.home().id);
+
+ cur_loop_id
+ } else {
+ Local::put(cur_task);
+ cur_loop_id
+ }
+ }
+
+ /// Fires a single homing missile, returning another missile targeted back
+ /// at the original home of this task. In other words, this function will
+ /// move the local task to its I/O scheduler and then return an RAII wrapper
+ /// which will return the task home.
+ fn fire_homing_missile(&mut self) -> HomingMissile {
+ HomingMissile { io_home: self.go_to_IO_home() }
+ }
+}
+
+/// After a homing operation has been completed, this will return the current
+/// task back to its appropriate home (if applicable). The field is used to
+/// assert that we are where we think we are.
+struct HomingMissile {
+ priv io_home: uint,
+}
+
+impl HomingMissile {
+ /// Check at runtime that the task has *not* transplanted itself to a
+ /// different I/O loop while executing.
+ pub fn check(&self, msg: &'static str) {
+ let mut io = LocalIo::borrow().expect("libuv must have I/O");
+ assert!(io.get().id() == self.io_home, "{}", msg);
+ }
+}
+
+impl Drop for HomingMissile {
+ fn drop(&mut self) {
+ let _f = ForbidUnwind::new("leaving home");
+
+ // It would truly be a sad day if we had moved off the home I/O
+ // scheduler while we were doing I/O.
+ self.check("task moved away from the home scheduler");
+ }
+}
#[cfg(test)]
mod test {
- use super::*;
use std::rt::tube::Tube;
use std::rt::rtio::{Callback, PausableIdleCallback};
use super::super::local_loop;
#[crate_type = "rlib"];
#[crate_type = "dylib"];
-#[feature(macro_rules, globs)];
+#[feature(macro_rules)];
-use std::cast::transmute;
use std::cast;
+use std::io;
+use std::io::IoError;
use std::libc::{c_int, malloc};
use std::ptr::null;
use std::ptr;
-use std::rt::BlockedTask;
use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::{BlockedTask, Task};
+use std::rt::rtio::LocalIo;
use std::str::raw::from_c_str;
use std::str;
use std::task;
use std::unstable::finally::Finally;
-use std::io::IoError;
-
pub use self::async::AsyncWatcher;
pub use self::file::{FsRequest, FileWatcher};
pub use self::idle::IdleWatcher;
mod macros;
+mod queue;
+mod homing;
+
/// The implementation of `rtio` for libuv
pub mod uvio;
uvll::free_handle(handle);
if data == ptr::null() { return }
let slot: &mut Option<BlockedTask> = cast::transmute(data);
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(slot.take_unwrap());
+ wakeup(slot);
}
}
}
}
pub struct ForbidSwitch {
- msg: &'static str,
- sched: uint,
+ priv msg: &'static str,
+ priv io: uint,
}
impl ForbidSwitch {
fn new(s: &'static str) -> ForbidSwitch {
- let mut sched = Local::borrow(None::<Scheduler>);
+ let mut io = LocalIo::borrow().expect("libuv must have local I/O");
ForbidSwitch {
msg: s,
- sched: sched.get().sched_id(),
+ io: io.get().id(),
}
}
}
impl Drop for ForbidSwitch {
fn drop(&mut self) {
- let mut sched = Local::borrow(None::<Scheduler>);
- assert!(self.sched == sched.get().sched_id(),
+ let mut io = LocalIo::borrow().expect("libuv must have local I/O");
+ assert!(self.io == io.get().id(),
"didnt want a scheduler switch: {}",
self.msg);
}
let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe {
assert!((*slot).is_none());
- let sched: ~Scheduler = Local::take();
- sched.deschedule_running_task_and_then(|_, task| {
- f();
+ let task: ~Task = Local::take();
+ task.deschedule(1, |task| {
*slot = Some(task);
- })
+ f();
+ Ok(())
+ });
}
}
+fn wakeup(slot: &mut Option<BlockedTask>) {
+ assert!(slot.is_some());
+ slot.take_unwrap().wake().map(|t| t.reawaken(true));
+}
+
pub struct Request {
handle: *uvll::uv_req_t,
priv defused: bool,
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
unsafe {
// Importing error constants
- use uvll::*;
- use std::io::*;
// uv error descriptions are static
let c_desc = uvll::uv_strerror(*uverr);
let desc = str::raw::c_str_to_static_slice(c_desc);
let kind = match *uverr {
- UNKNOWN => OtherIoError,
- OK => OtherIoError,
- EOF => EndOfFile,
- EACCES => PermissionDenied,
- ECONNREFUSED => ConnectionRefused,
- ECONNRESET => ConnectionReset,
- ENOENT => FileNotFound,
- ENOTCONN => NotConnected,
- EPIPE => BrokenPipe,
- ECONNABORTED => ConnectionAborted,
+ uvll::UNKNOWN => io::OtherIoError,
+ uvll::OK => io::OtherIoError,
+ uvll::EOF => io::EndOfFile,
+ uvll::EACCES => io::PermissionDenied,
+ uvll::ECONNREFUSED => io::ConnectionRefused,
+ uvll::ECONNRESET => io::ConnectionReset,
+ uvll::ENOTCONN => io::NotConnected,
+ uvll::ENOENT => io::FileNotFound,
+ uvll::EPIPE => io::BrokenPipe,
+ uvll::ECONNABORTED => io::ConnectionAborted,
err => {
uvdebug!("uverr.code {}", err as int);
// XXX: Need to map remaining uv error types
- OtherIoError
+ io::OtherIoError
}
};
})
)
-// get a handle for the current scheduler
-macro_rules! get_handle_to_current_scheduler(
- () => ({
- let mut sched = Local::borrow(None::<Scheduler>);
- sched.get().make_handle()
- })
-)
-
pub fn dumb_println(args: &fmt::Arguments) {
- use std::io::native::file::FileDesc;
use std::io;
use std::libc;
- let mut out = FileDesc::new(libc::STDERR_FILENO, false);
- fmt::writeln(&mut out as &mut io::Writer, args);
+ use std::vec;
+
+ struct Stderr;
+ impl io::Writer for Stderr {
+ fn write(&mut self, data: &[u8]) {
+ unsafe {
+ libc::write(libc::STDERR_FILENO,
+ vec::raw::to_ptr(data) as *libc::c_void,
+ data.len() as libc::size_t);
+ }
+ }
+ }
+ let mut w = Stderr;
+ fmt::writeln(&mut w as &mut io::Writer, args);
}
// except according to those terms.
use std::cast;
-use std::libc;
-use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
-use std::ptr;
-use std::rt::BlockedTask;
use std::io::IoError;
use std::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
-use std::rt::local::Local;
+use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
+use std::libc;
+use std::ptr;
use std::rt::rtio;
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
+use std::rt::task::BlockedTask;
use std::str;
use std::vec;
+use homing::{HomingIO, HomeHandle};
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
- wait_until_woken_after};
-use uvio::HomingIO;
+ wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
use uvll;
use uvll::sockaddr;
pub struct TcpWatcher {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
- home: SchedHandle,
+ home: HomeHandle,
}
pub struct TcpListener {
- home: SchedHandle,
+ home: HomeHandle,
handle: *uvll::uv_pipe_t,
priv closing_task: Option<BlockedTask>,
- priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
+ priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
+ priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
}
pub struct TcpAcceptor {
listener: ~TcpListener,
- priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
}
// TCP watchers (clients/streams)
impl TcpWatcher {
- pub fn new(loop_: &Loop) -> TcpWatcher {
+ pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
+ let handle = io.make_handle();
+ TcpWatcher::new_home(&io.loop_, handle)
+ }
+
+ fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.handle, handle)
}, 0);
TcpWatcher {
- home: get_handle_to_current_scheduler!(),
+ home: home,
handle: handle,
stream: StreamWatcher::new(handle),
}
}
- pub fn connect(loop_: &mut Loop, address: SocketAddr)
+ pub fn connect(io: &mut UvIoFactory, address: SocketAddr)
-> Result<TcpWatcher, UvError>
{
struct Ctx { status: c_int, task: Option<BlockedTask> }
- let tcp = TcpWatcher::new(loop_);
+ let tcp = TcpWatcher::new(io);
let ret = socket_addr_as_sockaddr(address, |addr| {
let mut req = Request::new(uvll::UV_CONNECT);
let result = unsafe {
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
+ wakeup(&mut cx.task);
}
}
}
impl HomingIO for TcpWatcher {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl rtio::RtioSocket for TcpWatcher {
// TCP listeners (unbound servers)
impl TcpListener {
- pub fn bind(loop_: &mut Loop, address: SocketAddr)
+ pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
-> Result<~TcpListener, UvError> {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
- uvll::uv_tcp_init(loop_.handle, handle)
+ uvll::uv_tcp_init(io.uv_loop(), handle)
}, 0);
+ let (port, chan) = Chan::new();
let l = ~TcpListener {
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
handle: handle,
closing_task: None,
- outgoing: Tube::new(),
+ outgoing: chan,
+ incoming: port,
};
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_tcp_bind(l.handle, addr)
}
impl HomingIO for TcpListener {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_tcp_t> for TcpListener {
impl rtio::RtioTcpListener for TcpListener {
fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
// create the acceptor object from ourselves
- let incoming = self.outgoing.clone();
- let mut acceptor = ~TcpAcceptor {
- listener: self,
- incoming: incoming,
- };
+ let mut acceptor = ~TcpAcceptor { listener: self };
let _m = acceptor.fire_homing_missile();
// XXX: the 128 backlog should be configurable
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
assert!(status != uvll::ECANCELED);
+ let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(server)
});
- let client = TcpWatcher::new(&loop_);
+ let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
Ok(~client as ~rtio::RtioTcpStream)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
-
- let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
tcp.outgoing.send(msg);
}
// TCP acceptors (bound servers)
impl HomingIO for TcpAcceptor {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
}
impl rtio::RtioSocket for TcpAcceptor {
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
- let _m = self.fire_homing_missile();
- self.incoming.recv()
+ self.listener.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
pub struct UdpWatcher {
handle: *uvll::uv_udp_t,
- home: SchedHandle,
+ home: HomeHandle,
}
impl UdpWatcher {
- pub fn bind(loop_: &Loop, address: SocketAddr)
+ pub fn bind(io: &mut UvIoFactory, address: SocketAddr)
-> Result<UdpWatcher, UvError> {
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
};
assert_eq!(unsafe {
- uvll::uv_udp_init(loop_.handle, udp.handle)
+ uvll::uv_udp_init(io.uv_loop(), udp.handle)
}, 0);
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_udp_bind(udp.handle, addr, 0u32)
}
impl HomingIO for UdpWatcher {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl rtio::RtioSocket for UdpWatcher {
Some(sockaddr_to_socket_addr(addr))
};
cx.result = Some((nread, addr));
-
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+ wakeup(&mut cx.task);
}
}
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
-
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+ wakeup(&mut cx.task);
}
}
#[cfg(test)]
mod test {
- use std::rt::test::*;
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket};
use std::task;
- use super::*;
use super::super::local_loop;
#[test]
#[test]
fn test_read_read_read() {
- use std::rt::rtio::*;
let addr = next_test_ip4();
static MAX: uint = 5000;
let (port, chan) = Chan::new();
// except according to those terms.
use std::c_str::CString;
-use std::libc;
-use std::rt::BlockedTask;
use std::io::IoError;
-use std::rt::local::Local;
+use std::libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
+use std::rt::task::BlockedTask;
+use homing::{HomingIO, HomeHandle};
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
- wait_until_woken_after};
-use uvio::HomingIO;
+ wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
use uvll;
pub struct PipeWatcher {
stream: StreamWatcher,
- home: SchedHandle,
+ home: HomeHandle,
priv defused: bool,
}
pub struct PipeListener {
- home: SchedHandle,
+ home: HomeHandle,
pipe: *uvll::uv_pipe_t,
- priv outgoing: Tube<Result<~RtioPipe, IoError>>,
+ priv outgoing: Chan<Result<~RtioPipe, IoError>>,
+ priv incoming: Port<Result<~RtioPipe, IoError>>,
}
pub struct PipeAcceptor {
listener: ~PipeListener,
- priv incoming: Tube<Result<~RtioPipe, IoError>>,
}
// PipeWatcher implementation and traits
// Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
// get bound to some other source (this is normally a helper method paired
// with another call).
- pub fn new(loop_: &Loop, ipc: bool) -> PipeWatcher {
+ pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
+ let home = io.make_handle();
+ PipeWatcher::new_home(&io.loop_, home, ipc)
+ }
+
+ pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
let handle = unsafe {
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
assert!(!handle.is_null());
};
PipeWatcher {
stream: StreamWatcher::new(handle),
- home: get_handle_to_current_scheduler!(),
+ home: home,
defused: false,
}
}
- pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
+ pub fn open(io: &mut UvIoFactory, file: libc::c_int)
+ -> Result<PipeWatcher, UvError>
{
- let pipe = PipeWatcher::new(loop_, false);
+ let pipe = PipeWatcher::new(io, false);
match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
0 => Ok(pipe),
n => Err(UvError(n))
}
}
- pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
+ pub fn connect(io: &mut UvIoFactory, name: &CString)
+ -> Result<PipeWatcher, UvError>
{
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
- let pipe = PipeWatcher::new(loop_, false);
+ let pipe = PipeWatcher::new(io, false);
wait_until_woken_after(&mut cx.task, || {
unsafe {
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+ wakeup(&mut cx.task);
}
}
}
impl HomingIO for PipeWatcher {
- fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+ fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
// PipeListener implementation and traits
impl PipeListener {
- pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
- let pipe = PipeWatcher::new(loop_, false);
+ pub fn bind(io: &mut UvIoFactory, name: &CString)
+ -> Result<~PipeListener, UvError>
+ {
+ let pipe = PipeWatcher::new(io, false);
match unsafe {
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
} {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
+ let (port, chan) = Chan::new();
let p = ~PipeListener {
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
pipe: pipe.unwrap(),
- outgoing: Tube::new(),
+ incoming: port,
+ outgoing: chan,
};
Ok(p.install())
}
impl RtioUnixListener for PipeListener {
fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
// create the acceptor object from ourselves
- let incoming = self.outgoing.clone();
- let mut acceptor = ~PipeAcceptor {
- listener: self,
- incoming: incoming,
- };
+ let mut acceptor = ~PipeAcceptor { listener: self };
let _m = acceptor.fire_homing_missile();
// XXX: the 128 backlog should be configurable
}
impl HomingIO for PipeListener {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_pipe_t> for PipeListener {
extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
assert!(status != uvll::ECANCELED);
+
+ let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(server)
});
- let client = PipeWatcher::new(&loop_, false);
+ let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
Ok(~client as ~RtioPipe)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
-
- let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
- pipe.outgoing.send(msg);
+ pipe.outgoing.send_deferred(msg);
}
impl Drop for PipeListener {
impl RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
- let _m = self.fire_homing_missile();
- self.incoming.recv()
+ self.listener.incoming.recv()
}
}
impl HomingIO for PipeAcceptor {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
}
#[cfg(test)]
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::rt::test::next_test_unix;
- use super::*;
use super::super::local_loop;
#[test]
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use std::io::IoError;
+use std::io::process;
use std::libc::c_int;
use std::libc;
use std::ptr;
-use std::rt::BlockedTask;
-use std::io::IoError;
-use std::io::process::*;
-use std::rt::local::Local;
use std::rt::rtio::RtioProcess;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::task::BlockedTask;
use std::vec;
-use super::{Loop, UvHandle, UvError, uv_error_to_io_error,
- wait_until_woken_after};
-use uvio::HomingIO;
-use uvll;
+use homing::{HomingIO, HomeHandle};
use pipe::PipeWatcher;
+use super::{UvHandle, UvError, uv_error_to_io_error,
+ wait_until_woken_after, wakeup};
+use uvio::UvIoFactory;
+use uvll;
pub struct Process {
handle: *uvll::uv_process_t,
- home: SchedHandle,
+ home: HomeHandle,
/// Task to wake up (may be null) for when the process exits
to_wake: Option<BlockedTask>,
/// Collected from the exit_cb
- exit_status: Option<ProcessExit>,
+ exit_status: Option<process::ProcessExit>,
}
impl Process {
///
/// Returns either the corresponding process object or an error which
/// occurred.
- pub fn spawn(loop_: &Loop, config: ProcessConfig)
+ pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
-> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
{
let cwd = config.cwd.map(|s| s.to_c_str());
stdio.set_len(io.len());
for (slot, other) in stdio.iter().zip(io.iter()) {
let io = set_stdio(slot as *uvll::uv_stdio_container_t, other,
- loop_);
+ io_loop);
ret_io.push(io);
}
}
let handle = UvHandle::alloc(None::<Process>, uvll::UV_PROCESS);
let process = ~Process {
handle: handle,
- home: get_handle_to_current_scheduler!(),
+ home: io_loop.make_handle(),
to_wake: None,
exit_status: None,
};
match unsafe {
- uvll::uv_spawn(loop_.handle, handle, &options)
+ uvll::uv_spawn(io_loop.uv_loop(), handle, &options)
} {
0 => Ok(process.install()),
err => Err(UvError(err)),
assert!(p.exit_status.is_none());
p.exit_status = Some(match term_signal {
- 0 => ExitStatus(exit_status as int),
- n => ExitSignal(n as int),
+ 0 => process::ExitStatus(exit_status as int),
+ n => process::ExitSignal(n as int),
});
- match p.to_wake.take() {
- Some(task) => {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task);
- }
- None => {}
- }
+ if p.to_wake.is_none() { return }
+ wakeup(&mut p.to_wake);
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
- io: &StdioContainer,
- loop_: &Loop) -> Option<PipeWatcher> {
+ io: &process::StdioContainer,
+ io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
match *io {
- Ignored => {
+ process::Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
None
}
- InheritFd(fd) => {
+ process::InheritFd(fd) => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
uvll::set_stdio_container_fd(dst, fd);
None
}
- CreatePipe(readable, writable) => {
+ process::CreatePipe(readable, writable) => {
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
if readable {
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
- let pipe = PipeWatcher::new(loop_, false);
+ let pipe = PipeWatcher::new(io_loop, false);
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst, pipe.handle());
Some(pipe)
}
impl HomingIO for Process {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_process_t> for Process {
}
}
- fn wait(&mut self) -> ProcessExit {
+ fn wait(&mut self) -> process::ProcessExit {
// Make sure (on the home scheduler) that we have an exit status listed
let _m = self.fire_homing_missile();
match self.exit_status {
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A concurrent queue used to signal remote event loops
+//!
+//! This queue implementation is used to send tasks among event loops. This is
+//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
+//! handles (to wake up a remote event loop).
+//!
+//! The uv_async_t is stored next to the event loop, so in order to not keep the
+//! event loop alive we use uv_ref and uv_unref in order to control when the
+//! async handle is active or not.
+
+use std::cast;
+use std::libc::{c_void, c_int};
+use std::rt::task::BlockedTask;
+use std::unstable::sync::LittleLock;
+use mpsc = std::sync::mpsc_queue;
+
+use async::AsyncWatcher;
+use super::{Loop, UvHandle};
+use uvll;
+
+enum Message {
+ Task(BlockedTask),
+ Increment,
+ Decrement,
+}
+
+struct State {
+ handle: *uvll::uv_async_t,
+ lock: LittleLock, // see comments in async_cb for why this is needed
+}
+
+/// This structure is intended to be stored next to the event loop, and it is
+/// used to create new `Queue` structures.
+pub struct QueuePool {
+ priv producer: mpsc::Producer<Message, State>,
+ priv consumer: mpsc::Consumer<Message, State>,
+ priv refcnt: uint,
+}
+
+/// This type is used to send messages back to the original event loop.
+pub struct Queue {
+ priv queue: mpsc::Producer<Message, State>,
+}
+
+extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
+ assert_eq!(status, 0);
+ let state: &mut QueuePool = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
+ let packet = unsafe { state.consumer.packet() };
+
+ // Remember that there is no guarantee about how many times an async
+ // callback is called with relation to the number of sends, so process the
+ // entire queue in a loop.
+ loop {
+ match state.consumer.pop() {
+ mpsc::Data(Task(task)) => {
+ task.wake().map(|t| t.reawaken(true));
+ }
+ mpsc::Data(Increment) => unsafe {
+ if state.refcnt == 0 {
+ uvll::uv_ref((*packet).handle);
+ }
+ state.refcnt += 1;
+ },
+ mpsc::Data(Decrement) => unsafe {
+ state.refcnt -= 1;
+ if state.refcnt == 0 {
+ uvll::uv_unref((*packet).handle);
+ }
+ },
+ mpsc::Empty | mpsc::Inconsistent => break
+ };
+ }
+
+ // If the refcount is now zero after processing the queue, then there is no
+ // longer a reference on the async handle and it is possible that this event
+ // loop can exit. What we're not guaranteed, however, is that a producer in
+ // the middle of dropping itself is yet done with the handle. It could be
+ // possible that we saw their Decrement message but they have yet to signal
+ // on the async handle. If we were to return immediately, the entire uv loop
+ // could be destroyed meaning the call to uv_async_send would abort()
+ //
+ // In order to fix this, an OS mutex is used to wait for the other end to
+ // finish before we continue. The drop block on a handle will acquire a
+ // mutex and then drop it after both the push and send have been completed.
+ // If we acquire the mutex here, then we are guaranteed that there are no
+ // longer any senders which are holding on to their handles, so we can
+ // safely allow the event loop to exit.
+ if state.refcnt == 0 {
+ unsafe {
+ let _l = (*packet).lock.lock();
+ }
+ }
+}
+
+impl QueuePool {
+ pub fn new(loop_: &mut Loop) -> ~QueuePool {
+ let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
+ let (c, p) = mpsc::queue(State {
+ handle: handle,
+ lock: LittleLock::new(),
+ });
+ let q = ~QueuePool {
+ producer: p,
+ consumer: c,
+ refcnt: 0,
+ };
+
+ unsafe {
+ assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
+ uvll::uv_unref(handle);
+ let data: *c_void = *cast::transmute::<&~QueuePool, &*c_void>(&q);
+ uvll::set_data_for_uv_handle(handle, data);
+ }
+
+ return q;
+ }
+
+ pub fn queue(&mut self) -> Queue {
+ unsafe {
+ if self.refcnt == 0 {
+ uvll::uv_ref((*self.producer.packet()).handle);
+ }
+ self.refcnt += 1;
+ }
+ Queue { queue: self.producer.clone() }
+ }
+}
+
+impl Queue {
+ pub fn push(&mut self, task: BlockedTask) {
+ self.queue.push(Task(task));
+ unsafe {
+ uvll::uv_async_send((*self.queue.packet()).handle);
+ }
+ }
+}
+
+impl Clone for Queue {
+ fn clone(&self) -> Queue {
+ // Push a request to increment on the queue, but there's no need to
+ // signal the event loop to process it at this time. We're guaranteed
+ // that the count is at least one (because we have a queue right here),
+ // and if the queue is dropped later on it'll see the increment for the
+ // decrement anyway.
+ unsafe {
+ cast::transmute_mut(self).queue.push(Increment);
+ }
+ Queue { queue: self.queue.clone() }
+ }
+}
+
+impl Drop for Queue {
+ fn drop(&mut self) {
+ // See the comments in the async_cb function for why there is a lock
+ // that is acquired only on a drop.
+ unsafe {
+ let state = self.queue.packet();
+ let _l = (*state).lock.lock();
+ self.queue.push(Decrement);
+ uvll::uv_async_send((*state).handle);
+ }
+ }
+}
+
+impl Drop for State {
+ fn drop(&mut self) {
+ unsafe {
+ uvll::uv_close(self.handle, cast::transmute(0));
+ uvll::free_handle(self.handle);
+ }
+ }
+}
use std::libc::c_int;
use std::io::signal::Signum;
-use std::rt::sched::{SchedHandle, Scheduler};
use std::comm::SharedChan;
-use std::rt::local::Local;
use std::rt::rtio::RtioSignal;
-use super::{Loop, UvError, UvHandle};
+use homing::{HomingIO, HomeHandle};
+use super::{UvError, UvHandle};
use uvll;
-use uvio::HomingIO;
+use uvio::UvIoFactory;
pub struct SignalWatcher {
handle: *uvll::uv_signal_t,
- home: SchedHandle,
+ home: HomeHandle,
channel: SharedChan<Signum>,
signal: Signum,
}
impl SignalWatcher {
- pub fn new(loop_: &mut Loop, signum: Signum,
+ pub fn new(io: &mut UvIoFactory, signum: Signum,
channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
let s = ~SignalWatcher {
handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
channel: channel,
signal: signum,
};
assert_eq!(unsafe {
- uvll::uv_signal_init(loop_.handle, s.handle)
+ uvll::uv_signal_init(io.uv_loop(), s.handle)
}, 0);
match unsafe {
}
impl HomingIO for SignalWatcher {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_signal_t> for SignalWatcher {
#[cfg(test)]
mod test {
- use super::*;
use super::super::local_loop;
use std::io::signal;
use std::cast;
use std::libc::{c_int, size_t, ssize_t};
use std::ptr;
-use std::rt::BlockedTask;
-use std::rt::local::Local;
-use std::rt::sched::Scheduler;
+use std::rt::task::BlockedTask;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
- ForbidUnwind};
+ ForbidUnwind, wakeup};
use uvll;
// This is a helper structure which is intended to get embedded into other
unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
rcx.result = nread;
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
+ wakeup(&mut rcx.task);
}
// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
wcx.result = status;
req.defuse();
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+ wakeup(&mut wcx.task);
}
// except according to those terms.
use std::libc::c_int;
-use std::rt::BlockedTask;
use std::rt::local::Local;
use std::rt::rtio::RtioTimer;
-use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::task::{BlockedTask, Task};
use std::util;
+use homing::{HomeHandle, HomingIO};
+use super::{UvHandle, ForbidUnwind, ForbidSwitch};
+use uvio::UvIoFactory;
use uvll;
-use super::{Loop, UvHandle, ForbidUnwind, ForbidSwitch};
-use uvio::HomingIO;
pub struct TimerWatcher {
handle: *uvll::uv_timer_t,
- home: SchedHandle,
+ home: HomeHandle,
action: Option<NextAction>,
id: uint, // see comments in timer_cb
}
}
impl TimerWatcher {
- pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
+ pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
assert_eq!(unsafe {
- uvll::uv_timer_init(loop_.handle, handle)
+ uvll::uv_timer_init(io.uv_loop(), handle)
}, 0);
let me = ~TimerWatcher {
handle: handle,
action: None,
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
id: 0,
};
return me.install();
}
impl HomingIO for TimerWatcher {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+ fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
// started, then we need to call stop on the timer.
let _f = ForbidUnwind::new("timer");
- let sched: ~Scheduler = Local::take();
- sched.deschedule_running_task_and_then(|_sched, task| {
+ let task: ~Task = Local::take();
+ task.deschedule(1, |task| {
self.action = Some(WakeTask(task));
self.start(msecs, 0);
+ Ok(())
});
self.stop();
}
match timer.action.take_unwrap() {
WakeTask(task) => {
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(task);
+ task.wake().map(|t| t.reawaken(true));
}
SendOnce(chan) => { chan.try_send_deferred(()); }
SendMany(chan, id) => {
#[cfg(test)]
mod test {
- use super::*;
use std::rt::rtio::RtioTimer;
use super::super::local_loop;
use std::libc;
use std::io::IoError;
-use std::rt::local::Local;
use std::rt::rtio::RtioTTY;
-use std::rt::sched::{Scheduler, SchedHandle};
+use homing::{HomingIO, HomeHandle};
use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
-use uvio::HomingIO;
+use super::{UvError, UvHandle, uv_error_to_io_error};
+use uvio::UvIoFactory;
use uvll;
pub struct TtyWatcher{
tty: *uvll::uv_tty_t,
stream: StreamWatcher,
- home: SchedHandle,
+ home: HomeHandle,
fd: libc::c_int,
}
impl TtyWatcher {
- pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
+ pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool)
-> Result<TtyWatcher, UvError>
{
// libuv may succeed in giving us a handle (via uv_tty_init), but if the
// with attempting to open it as a tty.
let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
match unsafe {
- uvll::uv_tty_init(loop_.handle, handle, fd as libc::c_int,
+ uvll::uv_tty_init(io.uv_loop(), handle, fd as libc::c_int,
readable as libc::c_int)
} {
0 => {
Ok(TtyWatcher {
tty: handle,
stream: StreamWatcher::new(handle),
- home: get_handle_to_current_scheduler!(),
+ home: io.make_handle(),
fd: fd,
})
}
}
impl HomingIO for TtyWatcher {
- fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+ fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
}
impl Drop for TtyWatcher {
// except according to those terms.
use std::c_str::CString;
+use std::cast;
use std::comm::SharedChan;
-use std::libc::c_int;
-use std::libc;
-use std::path::Path;
use std::io::IoError;
use std::io::net::ip::SocketAddr;
use std::io::process::ProcessConfig;
-use std::io;
-use std::rt::local::Local;
-use std::rt::rtio::*;
-use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::task::Task;
-use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
- S_IRUSR, S_IWUSR};
-use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
- ReadWrite, FileStat};
use std::io::signal::Signum;
+use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
+ ReadWrite, FileStat};
+use std::io;
+use std::libc::c_int;
+use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
+ S_IWUSR};
+use std::libc;
+use std::path::Path;
+use std::rt::rtio;
+use std::rt::rtio::IoFactory;
use ai = std::io::net::addrinfo;
#[cfg(test)] use std::unstable::run_in_bare_thread;
-use super::*;
-use addrinfo::GetAddrInfoRequest;
-
-pub trait HomingIO {
-
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
-
- /// This function will move tasks to run on their home I/O scheduler. Note
- /// that this function does *not* pin the task to the I/O scheduler, but
- /// rather it simply moves it to running on the I/O scheduler.
- fn go_to_IO_home(&mut self) -> uint {
- use std::rt::sched::RunOnce;
-
- let _f = ForbidUnwind::new("going home");
-
- let current_sched_id = {
- let mut sched = Local::borrow(None::<Scheduler>);
- sched.get().sched_id()
- };
-
- // Only need to invoke a context switch if we're not on the right
- // scheduler.
- if current_sched_id != self.home().sched_id {
- let scheduler: ~Scheduler = Local::take();
- scheduler.deschedule_running_task_and_then(|_, task| {
- task.wake().map(|task| {
- self.home().send(RunOnce(task));
- });
- })
- }
- let current_sched_id = {
- let mut sched = Local::borrow(None::<Scheduler>);
- sched.get().sched_id()
- };
- assert!(current_sched_id == self.home().sched_id);
-
- self.home().sched_id
- }
+use super::{uv_error_to_io_error, Loop};
- /// Fires a single homing missile, returning another missile targeted back
- /// at the original home of this task. In other words, this function will
- /// move the local task to its I/O scheduler and then return an RAII wrapper
- /// which will return the task home.
- fn fire_homing_missile(&mut self) -> HomingMissile {
- HomingMissile { io_home: self.go_to_IO_home() }
- }
-
- /// Same as `fire_homing_missile`, but returns the local I/O scheduler as
- /// well (the one that was homed to).
- fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
- // First, transplant ourselves to the home I/O scheduler
- let missile = self.fire_homing_missile();
- // Next (must happen next), grab the local I/O scheduler
- let io_sched: ~Scheduler = Local::take();
-
- (missile, io_sched)
- }
-}
-
-/// After a homing operation has been completed, this will return the current
-/// task back to its appropriate home (if applicable). The field is used to
-/// assert that we are where we think we are.
-struct HomingMissile {
- priv io_home: uint,
-}
-
-impl HomingMissile {
- pub fn check(&self, msg: &'static str) {
- let mut sched = Local::borrow(None::<Scheduler>);
- let local_id = sched.get().sched_id();
- assert!(local_id == self.io_home, "{}", msg);
- }
-}
-
-impl Drop for HomingMissile {
- fn drop(&mut self) {
- let _f = ForbidUnwind::new("leaving home");
-
- // It would truly be a sad day if we had moved off the home I/O
- // scheduler while we were doing I/O.
- self.check("task moved away from the home scheduler");
-
- // If we were a homed task, then we must send ourselves back to the
- // original scheduler. Otherwise, we can just return and keep running
- if !Task::on_appropriate_sched() {
- let scheduler: ~Scheduler = Local::take();
- scheduler.deschedule_running_task_and_then(|_, task| {
- task.wake().map(|task| {
- Scheduler::run_task(task);
- });
- })
- }
- }
-}
+use addrinfo::GetAddrInfoRequest;
+use async::AsyncWatcher;
+use file::{FsRequest, FileWatcher};
+use queue::QueuePool;
+use homing::HomeHandle;
+use idle::IdleWatcher;
+use net::{TcpWatcher, TcpListener, UdpWatcher};
+use pipe::{PipeWatcher, PipeListener};
+use process::Process;
+use signal::SignalWatcher;
+use timer::TimerWatcher;
+use tty::TtyWatcher;
+use uvll;
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
impl UvEventLoop {
pub fn new() -> UvEventLoop {
+ let mut loop_ = Loop::new();
+ let handle_pool = QueuePool::new(&mut loop_);
UvEventLoop {
- uvio: UvIoFactory(Loop::new())
+ uvio: UvIoFactory {
+ loop_: loop_,
+ handle_pool: handle_pool,
+ }
}
}
}
impl Drop for UvEventLoop {
fn drop(&mut self) {
- self.uvio.uv_loop().close();
+ self.uvio.loop_.close();
}
}
-impl EventLoop for UvEventLoop {
+impl rtio::EventLoop for UvEventLoop {
fn run(&mut self) {
- self.uvio.uv_loop().run();
+ self.uvio.loop_.run();
}
fn callback(&mut self, f: proc()) {
- IdleWatcher::onetime(self.uvio.uv_loop(), f);
+ IdleWatcher::onetime(&mut self.uvio.loop_, f);
}
- fn pausable_idle_callback(&mut self, cb: ~Callback) -> ~PausableIdleCallback {
- IdleWatcher::new(self.uvio.uv_loop(), cb) as ~PausableIdleCallback
+ fn pausible_idle_callback(&mut self, cb: ~rtio::Callback)
+ -> ~rtio::PausibleIdleCallback
+ {
+ IdleWatcher::new(&mut self.uvio.loop_, cb) as ~rtio::PausibleIdleCallback
}
- fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
- ~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback
+ fn remote_callback(&mut self, f: ~rtio::Callback) -> ~rtio::RemoteCallback {
+ ~AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback
}
- fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> {
- let factory = &mut self.uvio as &mut IoFactory;
+ fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
+ let factory = &mut self.uvio as &mut rtio::IoFactory;
Some(factory)
}
}
#[cfg(not(test))]
#[lang = "event_loop_factory"]
-pub extern "C" fn new_loop() -> ~EventLoop {
- ~UvEventLoop::new() as ~EventLoop
+pub extern "C" fn new_loop() -> ~rtio::EventLoop {
+ ~UvEventLoop::new() as ~rtio::EventLoop
}
#[test]
}
}
-pub struct UvIoFactory(Loop);
+pub struct UvIoFactory {
+ loop_: Loop,
+ priv handle_pool: ~QueuePool,
+}
impl UvIoFactory {
- pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
- match self { &UvIoFactory(ref mut ptr) => ptr }
+ pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
+
+ pub fn make_handle(&mut self) -> HomeHandle {
+ HomeHandle::new(self.id(), &mut *self.handle_pool)
}
}
impl IoFactory for UvIoFactory {
+ fn id(&self) -> uint { unsafe { cast::transmute(self) } }
+
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr)
- -> Result<~RtioTcpStream, IoError>
+ -> Result<~rtio::RtioTcpStream, IoError>
{
- match TcpWatcher::connect(self.uv_loop(), addr) {
- Ok(t) => Ok(~t as ~RtioTcpStream),
+ match TcpWatcher::connect(self, addr) {
+ Ok(t) => Ok(~t as ~rtio::RtioTcpStream),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
- match TcpListener::bind(self.uv_loop(), addr) {
- Ok(t) => Ok(t as ~RtioTcpListener),
+ fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener, IoError> {
+ match TcpListener::bind(self, addr) {
+ Ok(t) => Ok(t as ~rtio::RtioTcpListener),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
- match UdpWatcher::bind(self.uv_loop(), addr) {
- Ok(u) => Ok(~u as ~RtioUdpSocket),
+ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket, IoError> {
+ match UdpWatcher::bind(self, addr) {
+ Ok(u) => Ok(~u as ~rtio::RtioUdpSocket),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
- Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer)
+ fn timer_init(&mut self) -> Result<~rtio::RtioTimer, IoError> {
+ Ok(TimerWatcher::new(self) as ~rtio::RtioTimer)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> {
- let r = GetAddrInfoRequest::run(self.uv_loop(), host, servname, hint);
+ let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
r.map_err(uv_error_to_io_error)
}
fn fs_from_raw_fd(&mut self, fd: c_int,
- close: CloseBehavior) -> ~RtioFileStream {
- let loop_ = Loop::wrap(self.uv_loop().handle);
- ~FileWatcher::new(loop_, fd, close) as ~RtioFileStream
+ close: rtio::CloseBehavior) -> ~rtio::RtioFileStream {
+ ~FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream
}
fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
- -> Result<~RtioFileStream, IoError> {
+ -> Result<~rtio::RtioFileStream, IoError> {
let flags = match fm {
io::Open => 0,
io::Append => libc::O_APPEND,
libc::S_IRUSR | libc::S_IWUSR),
};
- match FsRequest::open(self.uv_loop(), path, flags as int, mode as int) {
- Ok(fs) => Ok(~fs as ~RtioFileStream),
+ match FsRequest::open(self, path, flags as int, mode as int) {
+ Ok(fs) => Ok(~fs as ~rtio::RtioFileStream),
Err(e) => Err(uv_error_to_io_error(e))
}
}
fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
- let r = FsRequest::unlink(self.uv_loop(), path);
+ let r = FsRequest::unlink(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
- let r = FsRequest::lstat(self.uv_loop(), path);
+ let r = FsRequest::lstat(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
- let r = FsRequest::stat(self.uv_loop(), path);
+ let r = FsRequest::stat(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_mkdir(&mut self, path: &CString,
perm: io::FilePermission) -> Result<(), IoError> {
- let r = FsRequest::mkdir(self.uv_loop(), path, perm as c_int);
+ let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
r.map_err(uv_error_to_io_error)
}
fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
- let r = FsRequest::rmdir(self.uv_loop(), path);
+ let r = FsRequest::rmdir(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
- let r = FsRequest::rename(self.uv_loop(), path, to);
+ let r = FsRequest::rename(&self.loop_, path, to);
r.map_err(uv_error_to_io_error)
}
fn fs_chmod(&mut self, path: &CString,
perm: io::FilePermission) -> Result<(), IoError> {
- let r = FsRequest::chmod(self.uv_loop(), path, perm as c_int);
+ let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
r.map_err(uv_error_to_io_error)
}
fn fs_readdir(&mut self, path: &CString, flags: c_int)
-> Result<~[Path], IoError>
{
- let r = FsRequest::readdir(self.uv_loop(), path, flags);
+ let r = FsRequest::readdir(&self.loop_, path, flags);
r.map_err(uv_error_to_io_error)
}
fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
- let r = FsRequest::link(self.uv_loop(), src, dst);
+ let r = FsRequest::link(&self.loop_, src, dst);
r.map_err(uv_error_to_io_error)
}
fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
- let r = FsRequest::symlink(self.uv_loop(), src, dst);
+ let r = FsRequest::symlink(&self.loop_, src, dst);
r.map_err(uv_error_to_io_error)
}
fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
- let r = FsRequest::chown(self.uv_loop(), path, uid, gid);
+ let r = FsRequest::chown(&self.loop_, path, uid, gid);
r.map_err(uv_error_to_io_error)
}
fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
- let r = FsRequest::readlink(self.uv_loop(), path);
+ let r = FsRequest::readlink(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
-> Result<(), IoError>
{
- let r = FsRequest::utime(self.uv_loop(), path, atime, mtime);
+ let r = FsRequest::utime(&self.loop_, path, atime, mtime);
r.map_err(uv_error_to_io_error)
}
fn spawn(&mut self, config: ProcessConfig)
- -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>
+ -> Result<(~rtio::RtioProcess, ~[Option<~rtio::RtioPipe>]), IoError>
{
- match Process::spawn(self.uv_loop(), config) {
+ match Process::spawn(self, config) {
Ok((p, io)) => {
- Ok((p as ~RtioProcess,
- io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect()))
+ Ok((p as ~rtio::RtioProcess,
+ io.move_iter().map(|i| i.map(|p| ~p as ~rtio::RtioPipe)).collect()))
}
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError>
+ fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener, IoError>
{
- match PipeListener::bind(self.uv_loop(), path) {
- Ok(p) => Ok(p as ~RtioUnixListener),
+ match PipeListener::bind(self, path) {
+ Ok(p) => Ok(p as ~rtio::RtioUnixListener),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
- match PipeWatcher::connect(self.uv_loop(), path) {
- Ok(p) => Ok(~p as ~RtioPipe),
+ fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe, IoError> {
+ match PipeWatcher::connect(self, path) {
+ Ok(p) => Ok(~p as ~rtio::RtioPipe),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn tty_open(&mut self, fd: c_int, readable: bool)
- -> Result<~RtioTTY, IoError> {
- match TtyWatcher::new(self.uv_loop(), fd, readable) {
- Ok(tty) => Ok(~tty as ~RtioTTY),
+ -> Result<~rtio::RtioTTY, IoError> {
+ match TtyWatcher::new(self, fd, readable) {
+ Ok(tty) => Ok(~tty as ~rtio::RtioTTY),
Err(e) => Err(uv_error_to_io_error(e))
}
}
- fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
- match PipeWatcher::open(self.uv_loop(), fd) {
- Ok(s) => Ok(~s as ~RtioPipe),
+ fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe, IoError> {
+ match PipeWatcher::open(self, fd) {
+ Ok(s) => Ok(~s as ~rtio::RtioPipe),
Err(e) => Err(uv_error_to_io_error(e))
}
}
fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
- -> Result<~RtioSignal, IoError> {
- match SignalWatcher::new(self.uv_loop(), signum, channel) {
- Ok(s) => Ok(s as ~RtioSignal),
+ -> Result<~rtio::RtioSignal, IoError> {
+ match SignalWatcher::new(self, signum, channel) {
+ Ok(s) => Ok(s as ~rtio::RtioSignal),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
#[cfg(test)]
use std::libc::uintptr_t;
-pub use self::errors::*;
+pub use self::errors::{EACCES, ECONNREFUSED, ECONNRESET, EPIPE, ECONNABORTED,
+ ECANCELED, EBADF, ENOTCONN};
pub static OK: c_int = 0;
pub static EOF: c_int = -4095;
// generic uv functions
pub fn uv_loop_delete(l: *uv_loop_t);
+ pub fn uv_ref(t: *uv_handle_t);
+ pub fn uv_unref(t: *uv_handle_t);
pub fn uv_handle_size(ty: uv_handle_type) -> size_t;
pub fn uv_req_size(ty: uv_req_type) -> size_t;
pub fn uv_run(l: *uv_loop_t, mode: uv_run_mode) -> c_int;