pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
pub use self::process::Process;
-pub use self::pipe::Pipe;
+pub use self::pipe::PipeWatcher;
pub use self::signal::SignalWatcher;
+pub use self::tty::TtyWatcher;
mod macros;
pub mod pipe;
pub mod tty;
pub mod signal;
+pub mod stream;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
pub type NullCallback = ~fn();
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
-pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
- async_cb: Option<AsyncCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>,
}
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use std::libc;
use std::c_str::CString;
+use std::cast;
+use std::libc;
+use std::rt::BlockedTask;
+use std::rt::io::IoError;
+use std::rt::local::Local;
+use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
+use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::tube::Tube;
-use super::{Loop, UvError, Watcher, NativeHandle, status_to_maybe_uv_error};
-use super::ConnectionCallback;
-use net;
+use stream::StreamWatcher;
+use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
+use uvio::HomingIO;
use uvll;
-pub struct Pipe(*uvll::uv_pipe_t);
+pub struct PipeWatcher {
+ stream: StreamWatcher,
+ home: SchedHandle,
+}
+
+pub struct PipeListener {
+ home: SchedHandle,
+ pipe: *uvll::uv_pipe_t,
+ priv closing_task: Option<BlockedTask>,
+ priv outgoing: Tube<Result<~RtioPipe, IoError>>,
+}
+
+pub struct PipeAcceptor {
+ listener: ~PipeListener,
+ priv incoming: Tube<Result<~RtioPipe, IoError>>,
+}
-impl Watcher for Pipe {}
+// PipeWatcher implementation and traits
-impl Pipe {
- pub fn new(loop_: &Loop, ipc: bool) -> Pipe {
+impl PipeWatcher {
+ pub fn new(pipe: *uvll::uv_pipe_t) -> PipeWatcher {
+ PipeWatcher {
+ stream: StreamWatcher::new(pipe),
+ home: get_handle_to_current_scheduler!(),
+ }
+ }
+
+ pub fn alloc(loop_: &Loop, ipc: bool) -> *uvll::uv_pipe_t {
unsafe {
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
- assert!(handle.is_not_null());
+ assert!(!handle.is_null());
let ipc = ipc as libc::c_int;
assert_eq!(uvll::uv_pipe_init(loop_.native_handle(), handle, ipc), 0);
- let mut ret: Pipe =
- NativeHandle::from_native_handle(handle);
- ret.install_watcher_data();
- ret
+ handle
}
}
- pub fn as_stream(&self) -> net::StreamWatcher {
- net::StreamWatcher(**self as *uvll::uv_stream_t)
+ pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
+ {
+ let handle = PipeWatcher::alloc(loop_, false);
+ match unsafe { uvll::uv_pipe_open(handle, file) } {
+ 0 => Ok(PipeWatcher::new(handle)),
+ n => {
+ unsafe { uvll::uv_close(handle, pipe_close_cb) }
+ Err(UvError(n))
+ }
+ }
}
- #[fixed_stack_segment] #[inline(never)]
- pub fn open(&mut self, file: libc::c_int) -> Result<(), UvError> {
- match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } {
- 0 => Ok(()),
- n => Err(UvError(n))
+ pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
+ {
+ struct Ctx {
+ task: Option<BlockedTask>,
+ result: Option<Result<PipeWatcher, UvError>>,
}
+ let mut cx = Ctx { task: None, result: None };
+ let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
+ unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
+
+ let sched: ~Scheduler = Local::take();
+ do sched.deschedule_running_task_and_then |_, task| {
+ cx.task = Some(task);
+ unsafe {
+ uvll::uv_pipe_connect(req,
+ PipeWatcher::alloc(loop_, false),
+ name.with_ref(|p| p),
+ connect_cb)
+ }
+ }
+ assert!(cx.task.is_none());
+ return cx.result.take().expect("pipe connect needs a result");
+
+ extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+ unsafe {
+ let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
+ let stream = uvll::get_stream_handle_from_connect_req(req);
+ cx.result = Some(match status {
+ 0 => Ok(PipeWatcher::new(stream)),
+ n => {
+ uvll::free_handle(stream);
+ Err(UvError(n))
+ }
+ });
+ uvll::free_req(req);
+
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+ }
+ }
+ }
+}
+
+impl RtioPipe for PipeWatcher {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+ let _m = self.fire_missiles();
+ self.stream.read(buf).map_err(uv_error_to_io_error)
+ }
+
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ self.stream.write(buf).map_err(uv_error_to_io_error)
+ }
+}
+
+impl HomingIO for PipeWatcher {
+ fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+}
+
+impl Drop for PipeWatcher {
+ fn drop(&mut self) {
+ let _m = self.fire_missiles();
+ self.stream.close(true); // close synchronously
}
+}
+
+extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
+ unsafe { uvll::free_handle(handle) }
+}
- #[fixed_stack_segment] #[inline(never)]
- pub fn bind(&mut self, name: &CString) -> Result<(), UvError> {
- do name.with_ref |name| {
- match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } {
- 0 => Ok(()),
- n => Err(UvError(n))
+// PipeListener implementation and traits
+
+impl PipeListener {
+ pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
+ let pipe = PipeWatcher::alloc(loop_, false);
+ match unsafe { uvll::uv_pipe_bind(pipe, name.with_ref(|p| p)) } {
+ 0 => {
+ let p = ~PipeListener {
+ home: get_handle_to_current_scheduler!(),
+ pipe: pipe,
+ closing_task: None,
+ outgoing: Tube::new(),
+ };
+ Ok(p.install())
+ }
+ n => {
+ unsafe { uvll::free_handle(pipe) }
+ Err(UvError(n))
}
}
}
+}
- #[fixed_stack_segment] #[inline(never)]
- pub fn connect(&mut self, name: &CString, cb: ConnectionCallback) {
- {
- let data = self.get_watcher_data();
- assert!(data.connect_cb.is_none());
- data.connect_cb = Some(cb);
+impl RtioUnixListener for PipeListener {
+ fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
+ // create the acceptor object from ourselves
+ let incoming = self.outgoing.clone();
+ let mut acceptor = ~PipeAcceptor {
+ listener: self,
+ incoming: incoming,
+ };
+
+ let _m = acceptor.fire_missiles();
+ // XXX: the 128 backlog should be configurable
+ match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
+ 0 => Ok(acceptor as ~RtioUnixAcceptor),
+ n => Err(uv_error_to_io_error(UvError(n))),
}
+ }
+}
- let connect = net::ConnectRequest::new();
- let name = do name.with_ref |p| { p };
+impl HomingIO for PipeListener {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
- unsafe {
- uvll::uv_pipe_connect(connect.native_handle(),
- self.native_handle(),
- name,
- connect_cb)
+impl UvHandle<uvll::uv_pipe_t> for PipeListener {
+ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
+}
+
+extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
+ let msg = match status {
+ 0 => {
+ let loop_ = NativeHandle::from_native_handle(unsafe {
+ uvll::get_loop_for_uv_handle(server)
+ });
+ let client = PipeWatcher::alloc(&loop_, false);
+ assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
+ Ok(~PipeWatcher::new(client) as ~RtioPipe)
}
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
+
+ let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
+ pipe.outgoing.send(msg);
+}
- extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
- let connect_request: net::ConnectRequest =
- NativeHandle::from_native_handle(req);
- let mut stream_watcher = connect_request.stream();
- connect_request.delete();
+impl Drop for PipeListener {
+ fn drop(&mut self) {
+ let (_m, sched) = self.fire_missiles_sched();
- let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
- let status = status_to_maybe_uv_error(status);
- cb(stream_watcher, status);
+ do sched.deschedule_running_task_and_then |_, task| {
+ self.closing_task = Some(task);
+ unsafe { uvll::uv_close(self.pipe, listener_close_cb) }
}
}
+}
+
+extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
+ let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) };
+ unsafe { uvll::free_handle(handle) }
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap());
}
-impl NativeHandle<*uvll::uv_pipe_t> for Pipe {
- fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
- Pipe(handle)
- }
- fn native_handle(&self) -> *uvll::uv_pipe_t {
- match self { &Pipe(ptr) => ptr }
+// PipeAcceptor implementation and traits
+
+impl RtioUnixAcceptor for PipeAcceptor {
+ fn accept(&mut self) -> Result<~RtioPipe, IoError> {
+ let _m = self.fire_missiles();
+ self.incoming.recv()
}
}
+
+impl HomingIO for PipeAcceptor {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
use std::vec;
use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error};
-use uvio::{HomingIO, UvPipeStream, UvUnboundPipe};
+use uvio::HomingIO;
use uvll;
+use pipe::PipeWatcher;
pub struct Process {
handle: *uvll::uv_process_t,
/// Returns either the corresponding process object or an error which
/// occurred.
pub fn spawn(loop_: &Loop, config: ProcessConfig)
- -> Result<(~Process, ~[Option<~UvPipeStream>]), UvError>
+ -> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
{
let cwd = config.cwd.map(|s| s.to_c_str());
let io = config.io;
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
io: &StdioContainer,
- loop_: &Loop) -> Option<~UvPipeStream> {
+ loop_: &Loop) -> Option<PipeWatcher> {
match *io {
Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
- let pipe = UvUnboundPipe::new(loop_);
- let handle = pipe.pipe.as_stream().native_handle();
+ let pipe_handle = PipeWatcher::alloc(loop_, false);
uvll::set_stdio_container_flags(dst, flags);
- uvll::set_stdio_container_stream(dst, handle);
- Some(~UvPipeStream::new(pipe))
+ uvll::set_stdio_container_stream(dst, pipe_handle);
+ Some(PipeWatcher::new(pipe_handle))
}
}
}
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::cast;
+use std::libc::{c_int, size_t, ssize_t, c_void};
+use std::ptr;
+use std::rt::BlockedTask;
+use std::rt::local::Local;
+use std::rt::sched::Scheduler;
+
+use super::{UvError, Buf, slice_to_uv_buf};
+use uvll;
+
+// This is a helper structure which is intended to get embedded into other
+// Watcher structures. This structure will retain a handle to the underlying
+// uv_stream_t instance, and all I/O operations assume that it's already located
+// on the appropriate scheduler.
+pub struct StreamWatcher {
+ handle: *uvll::uv_stream_t,
+
+ // Cache the last used uv_write_t so we don't have to allocate a new one on
+ // every call to uv_write(). Ideally this would be a stack-allocated
+ // structure, but currently we don't have mappings for all the structures
+ // defined in libuv, so we're foced to malloc this.
+ priv last_write_req: Option<*uvll::uv_write_t>,
+}
+
+struct ReadContext {
+ buf: Option<Buf>,
+ result: Option<Result<uint, UvError>>,
+ task: Option<BlockedTask>,
+}
+
+struct WriteContext {
+ result: Option<Result<(), UvError>>,
+ task: Option<BlockedTask>,
+}
+
+impl StreamWatcher {
+ // Creates a new helper structure which should be then embedded into another
+ // watcher. This provides the generic read/write methods on streams.
+ //
+ // This structure will *not* close the stream when it is dropped. It is up
+ // to the enclosure structure to be sure to call the close method (which
+ // will block the task). Note that this is also required to prevent memory
+ // leaks.
+ //
+ // It should also be noted that the `data` field of the underlying uv handle
+ // will be manipulated on each of the methods called on this watcher.
+ // Wrappers should ensure to always reset the field to an appropriate value
+ // if they rely on the field to perform an action.
+ pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
+ StreamWatcher {
+ handle: stream,
+ last_write_req: None,
+ }
+ }
+
+ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
+ // Send off the read request, but don't block until we're sure that the
+ // read request is queued.
+ match unsafe {
+ uvll::uv_read_start(self.handle, alloc_cb, read_cb)
+ } {
+ 0 => {
+ let mut rcx = ReadContext {
+ buf: Some(slice_to_uv_buf(buf)),
+ result: None,
+ task: None,
+ };
+ unsafe {
+ uvll::set_data_for_uv_handle(self.handle, &rcx)
+ }
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_sched, task| {
+ rcx.task = Some(task);
+ }
+ rcx.result.take().expect("no result in read stream?")
+ }
+ n => Err(UvError(n))
+ }
+ }
+
+ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
+ // Prepare the write request, either using a cached one or allocating a
+ // new one
+ let req = match self.last_write_req {
+ Some(req) => req,
+ None => unsafe { uvll::malloc_req(uvll::UV_WRITE) },
+ };
+ self.last_write_req = Some(req);
+ let mut wcx = WriteContext { result: None, task: None, };
+ unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) }
+
+ // Send off the request, but be careful to not block until we're sure
+ // that the write reqeust is queued. If the reqeust couldn't be queued,
+ // then we should return immediately with an error.
+ match unsafe {
+ uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb)
+ } {
+ 0 => {
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_sched, task| {
+ wcx.task = Some(task);
+ }
+ assert!(wcx.task.is_none());
+ wcx.result.take().expect("no result in write stream?")
+ }
+ n => Err(UvError(n)),
+ }
+ }
+
+ // This will deallocate an internally used memory, along with closing the
+ // handle (and freeing it).
+ //
+ // The `synchronous` flag dictates whether this handle is closed
+ // synchronously (the task is blocked) or asynchronously (the task is not
+ // block, but the handle is still deallocated).
+ pub fn close(&mut self, synchronous: bool) {
+ // clean up the cached write request if we have one
+ match self.last_write_req {
+ Some(req) => unsafe { uvll::free_req(req) },
+ None => {}
+ }
+
+ if synchronous {
+ let mut closing_task = None;
+ unsafe {
+ uvll::set_data_for_uv_handle(self.handle, &closing_task);
+ }
+
+ // Wait for this stream to close because it possibly represents a remote
+ // connection which may have consequences if we close asynchronously.
+ let sched: ~Scheduler = Local::take();
+ do sched.deschedule_running_task_and_then |_, task| {
+ closing_task = Some(task);
+ unsafe { uvll::uv_close(self.handle, close_cb) }
+ }
+ } else {
+ unsafe {
+ uvll::set_data_for_uv_handle(self.handle, ptr::null::<u8>());
+ uvll::uv_close(self.handle, close_cb)
+ }
+ }
+
+ extern fn close_cb(handle: *uvll::uv_handle_t) {
+ let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) };
+ unsafe { uvll::free_handle(handle) }
+ if data.is_null() { return }
+
+ let closing_task: &mut Option<BlockedTask> = unsafe {
+ cast::transmute(data)
+ };
+ let task = closing_task.take_unwrap();
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task);
+ }
+ }
+}
+
+// This allocation callback expects to be invoked once and only once. It will
+// unwrap the buffer in the ReadContext stored in the stream and return it. This
+// will fail if it is called more than once.
+extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf {
+ let rcx: &mut ReadContext = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(stream))
+ };
+ rcx.buf.take().expect("alloc_cb called more than once")
+}
+
+// When a stream has read some data, we will always forcibly stop reading and
+// return all the data read (even if it didn't fill the whole buffer).
+extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
+ let rcx: &mut ReadContext = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
+ // Stop reading so that no read callbacks are
+ // triggered before the user calls `read` again.
+ // XXX: Is there a performance impact to calling
+ // stop here?
+ unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+
+ assert!(rcx.result.is_none());
+ rcx.result = Some(match nread {
+ n if n < 0 => Err(UvError(n as c_int)),
+ n => Ok(n as uint),
+ });
+
+ let task = rcx.task.take().expect("read_cb needs a task");
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task);
+}
+
+// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
+// reading, however, all this does is wake up the blocked task after squirreling
+// away the error code as a result.
+extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
+ // Remember to not free the request because it is re-used between writes on
+ // the same stream.
+ unsafe {
+ let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req));
+ wcx.result = Some(match status {
+ 0 => Ok(()),
+ n => Err(UvError(n)),
+ });
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+ }
+}
// except according to those terms.
use std::libc;
+use std::rt::io::IoError;
+use std::rt::local::Local;
+use std::rt::rtio::RtioTTY;
+use std::rt::sched::{Scheduler, SchedHandle};
-use super::{Watcher, Loop, NativeHandle, UvError};
-use net;
+use stream::StreamWatcher;
+use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
+use uvio::HomingIO;
use uvll;
-/// A process wraps the handle of the underlying uv_process_t.
-pub struct TTY(*uvll::uv_tty_t);
-
-impl Watcher for TTY {}
+pub struct TtyWatcher{
+ tty: *uvll::uv_tty_t,
+ stream: StreamWatcher,
+ home: SchedHandle,
+ fd: libc::c_int,
+}
-impl TTY {
- #[fixed_stack_segment] #[inline(never)]
- pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool) ->
- Result<TTY, UvError>
+impl TtyWatcher {
+ pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
+ -> Result<TtyWatcher, UvError>
{
- let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) };
- assert!(handle.is_not_null());
+ let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
- let ret = unsafe {
+ match unsafe {
uvll::uv_tty_init(loop_.native_handle(), handle, fd as libc::c_int,
readable as libc::c_int)
- };
- match ret {
+ } {
0 => {
- let mut ret: TTY = NativeHandle::from_native_handle(handle);
- ret.install_watcher_data();
- Ok(ret)
+ Ok(TtyWatcher {
+ tty: handle,
+ stream: StreamWatcher::new(handle),
+ home: get_handle_to_current_scheduler!(),
+ fd: fd,
+ })
}
n => {
- unsafe { uvll::free_handle(handle); }
+ unsafe { uvll::free_handle(handle) }
Err(UvError(n))
}
}
}
+}
- pub fn as_stream(&self) -> net::StreamWatcher {
- net::StreamWatcher(**self as *uvll::uv_stream_t)
+impl RtioTTY for TtyWatcher {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+ let _m = self.fire_missiles();
+ self.stream.read(buf).map_err(uv_error_to_io_error)
}
- #[fixed_stack_segment] #[inline(never)]
- pub fn set_mode(&self, raw: bool) -> Result<(), UvError> {
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ self.stream.write(buf).map_err(uv_error_to_io_error)
+ }
+
+ fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
let raw = raw as libc::c_int;
- match unsafe { uvll::uv_tty_set_mode(self.native_handle(), raw) } {
+ let _m = self.fire_missiles();
+ match unsafe { uvll::uv_tty_set_mode(self.tty, raw) } {
0 => Ok(()),
- n => Err(UvError(n))
+ n => Err(uv_error_to_io_error(UvError(n)))
}
}
- #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)]
- pub fn get_winsize(&self) -> Result<(int, int), UvError> {
+ #[allow(unused_mut)]
+ fn get_winsize(&mut self) -> Result<(int, int), IoError> {
let mut width: libc::c_int = 0;
let mut height: libc::c_int = 0;
let widthptr: *libc::c_int = &width;
let heightptr: *libc::c_int = &width;
- match unsafe { uvll::uv_tty_get_winsize(self.native_handle(),
+ let _m = self.fire_missiles();
+ match unsafe { uvll::uv_tty_get_winsize(self.tty,
widthptr, heightptr) } {
0 => Ok((width as int, height as int)),
- n => Err(UvError(n))
+ n => Err(uv_error_to_io_error(UvError(n)))
}
}
-}
-impl NativeHandle<*uvll::uv_tty_t> for TTY {
- fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY {
- TTY(handle)
- }
- fn native_handle(&self) -> *uvll::uv_tty_t {
- match self { &TTY(ptr) => ptr }
+ fn isatty(&self) -> bool {
+ unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY }
}
}
+impl UvHandle<uvll::uv_tty_t> for TtyWatcher {
+ fn uv_handle(&self) -> *uvll::uv_tty_t { self.tty }
+}
+
+impl HomingIO for TtyWatcher {
+ fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+}
+
+impl Drop for TtyWatcher {
+ // TTY handles are used for the logger in a task, so this destructor is run
+ // when a task is destroyed. When a task is being destroyed, a local
+ // scheduler isn't available, so we can't do the normal "take the scheduler
+ // and resume once close is done". Instead close operations on a TTY are
+ // asynchronous.
+ fn drop(&mut self) {
+ let _m = self.fire_missiles();
+ self.stream.close(false);
+ }
+}
use idle::IdleWatcher;
use net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
use addrinfo::{GetAddrInfoRequest, accum_addrinfo};
+use pipe::PipeListener;
// XXX we should not be calling uvll functions in here.
match Process::spawn(self.uv_loop(), config) {
Ok((p, io)) => {
Ok((p as ~RtioProcess,
- io.move_iter().map(|i| i.map(|p| p as ~RtioPipe)).collect()))
+ io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect()))
}
Err(e) => Err(uv_error_to_io_error(e)),
}
}
- fn unix_bind(&mut self, path: &CString) ->
- Result<~RtioUnixListener, IoError> {
- let mut pipe = UvUnboundPipe::new(self.uv_loop());
- match pipe.pipe.bind(path) {
- Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener),
+ fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError>
+ {
+ match PipeListener::bind(self.uv_loop(), path) {
+ Ok(p) => Ok(p as ~RtioUnixListener),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
- let pipe = UvUnboundPipe::new(self.uv_loop());
- let mut rawpipe = pipe.pipe;
-
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
- let pipe_cell = Cell::new(pipe);
- let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell;
-
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do rawpipe.connect(path) |_stream, err| {
- let res = match err {
- None => {
- let pipe = unsafe { (*pipe_cell_ptr).take() };
- Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
- }
- Some(e) => Err(uv_error_to_io_error(e)),
- };
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ match PipeWatcher::connect(self.uv_loop(), path) {
+ Ok(p) => Ok(~p as ~RtioPipe),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
-
- assert!(!result_cell.is_empty());
- return result_cell.take();
}
fn tty_open(&mut self, fd: c_int, readable: bool)
-> Result<~RtioTTY, IoError> {
- match tty::TTY::new(self.uv_loop(), fd, readable) {
- Ok(tty) => Ok(~UvTTY {
- home: get_handle_to_current_scheduler!(),
- tty: tty,
- fd: fd,
- } as ~RtioTTY),
+ match TtyWatcher::new(self.uv_loop(), fd, readable) {
+ Ok(tty) => Ok(~tty as ~RtioTTY),
Err(e) => Err(uv_error_to_io_error(e))
}
}
fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
- let mut pipe = UvUnboundPipe::new(self.uv_loop());
- match pipe.pipe.open(fd) {
- Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe),
+ match PipeWatcher::open(self.uv_loop(), fd) {
+ Ok(s) => Ok(~s as ~RtioPipe),
Err(e) => Err(uv_error_to_io_error(e))
}
}
result_cell.take()
}
-pub struct UvUnboundPipe {
- pipe: Pipe,
- priv home: SchedHandle,
-}
-
-impl UvUnboundPipe {
- /// Creates a new unbound pipe homed to the current scheduler, placed on the
- /// specified event loop
- pub fn new(loop_: &Loop) -> UvUnboundPipe {
- UvUnboundPipe {
- pipe: Pipe::new(loop_, false),
- home: get_handle_to_current_scheduler!(),
- }
- }
-}
-
-impl HomingIO for UvUnboundPipe {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvUnboundPipe {
- fn drop(&mut self) {
- let (_m, sched) = self.fire_homing_missile_sched();
- do sched.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self.pipe.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
-}
-
-pub struct UvPipeStream {
- priv inner: UvUnboundPipe,
-}
-
-impl UvPipeStream {
- pub fn new(inner: UvUnboundPipe) -> UvPipeStream {
- UvPipeStream { inner: inner }
- }
-}
-
-impl RtioPipe for UvPipeStream {
- fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- let (_m, scheduler) = self.inner.fire_homing_missile_sched();
- read_stream(self.inner.pipe.as_stream(), scheduler, buf)
- }
- fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- let (_m, scheduler) = self.inner.fire_homing_missile_sched();
- write_stream(self.inner.pipe.as_stream(), scheduler, buf)
- }
-}
-
pub struct UvTcpStream {
priv watcher: TcpWatcher,
priv home: SchedHandle,
}
}
-pub struct UvUnixListener {
- priv inner: UvUnboundPipe
-}
-
-impl HomingIO for UvUnixListener {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() }
-}
-
-impl UvUnixListener {
- fn new(pipe: UvUnboundPipe) -> UvUnixListener {
- UvUnixListener { inner: pipe }
- }
-}
-
-impl RtioUnixListener for UvUnixListener {
- fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
- let _m = self.fire_homing_missile();
- let acceptor = ~UvUnixAcceptor::new(*self);
- let incoming = Cell::new(acceptor.incoming.clone());
- let mut stream = acceptor.listener.inner.pipe.as_stream();
- let res = do stream.listen |mut server, status| {
- do incoming.with_mut_ref |incoming| {
- let inc = match status {
- Some(e) => Err(uv_error_to_io_error(e)),
- None => {
- let pipe = UvUnboundPipe::new(&server.event_loop());
- server.accept(pipe.pipe.as_stream());
- Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
- }
- };
- incoming.send(inc);
- }
- };
- match res {
- Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
- Err(e) => Err(uv_error_to_io_error(e)),
- }
- }
-}
-
-pub struct UvTTY {
- tty: tty::TTY,
- home: SchedHandle,
- fd: c_int,
-}
-
-impl HomingIO for UvTTY {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvTTY {
- fn drop(&mut self) {
- // TTY handles are used for the logger in a task, so this destructor is
- // run when a task is destroyed. When a task is being destroyed, a local
- // scheduler isn't available, so we can't do the normal "take the
- // scheduler and resume once close is done". Instead close operations on
- // a TTY are asynchronous.
- self.tty.close_async();
- }
-}
-
-impl RtioTTY for UvTTY {
- fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- read_stream(self.tty.as_stream(), scheduler, buf)
- }
-
- fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- write_stream(self.tty.as_stream(), scheduler, buf)
- }
-
- fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- match self.tty.set_mode(raw) {
- Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
- }
- }
-
- fn get_winsize(&mut self) -> Result<(int, int), IoError> {
- let _m = self.fire_homing_missile();
- match self.tty.get_winsize() {
- Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
- }
- }
-
- fn isatty(&self) -> bool {
- unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY }
- }
-}
-
-pub struct UvUnixAcceptor {
- listener: UvUnixListener,
- incoming: Tube<Result<~RtioPipe, IoError>>,
-}
-
-impl HomingIO for UvUnixAcceptor {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
-}
-
-impl UvUnixAcceptor {
- fn new(listener: UvUnixListener) -> UvUnixAcceptor {
- UvUnixAcceptor { listener: listener, incoming: Tube::new() }
- }
-}
-
-impl RtioUnixAcceptor for UvUnixAcceptor {
- fn accept(&mut self) -> Result<~RtioPipe, IoError> {
- let _m = self.fire_homing_missile();
- self.incoming.recv()
- }
-
- fn accept_simultaneously(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- accept_simultaneously(self.listener.inner.pipe.as_stream(), 1)
- }
-
- fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- accept_simultaneously(self.listener.inner.pipe.as_stream(), 0)
- }
-}
-
// this function is full of lies
unsafe fn local_io() -> &'static mut IoFactory {
do Local::borrow |sched: &mut Scheduler| {
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> Result<~RtioPipe, IoError>;
- fn accept_simultaneously(&mut self) -> Result<(), IoError>;
- fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
pub trait RtioTTY {