]> git.lizzy.rs Git - rust.git/commitdiff
Start migrating stream I/O away from ~fn()
authorAlex Crichton <alex@alexcrichton.com>
Tue, 5 Nov 2013 00:42:05 +0000 (16:42 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:10 +0000 (01:37 -0800)
src/librustuv/lib.rs
src/librustuv/pipe.rs
src/librustuv/process.rs
src/librustuv/stream.rs [new file with mode: 0644]
src/librustuv/tty.rs
src/librustuv/uvio.rs
src/libstd/rt/rtio.rs

index de8bed948c4df9df7c72c36f392a0d879c61ec2a..1d6f2f0edb55e7aa510fe75894994ac9275ca71f 100644 (file)
@@ -66,8 +66,9 @@
 pub use self::timer::TimerWatcher;
 pub use self::async::AsyncWatcher;
 pub use self::process::Process;
-pub use self::pipe::Pipe;
+pub use self::pipe::PipeWatcher;
 pub use self::signal::SignalWatcher;
+pub use self::tty::TtyWatcher;
 
 mod macros;
 
@@ -87,6 +88,7 @@
 pub mod pipe;
 pub mod tty;
 pub mod signal;
+pub mod stream;
 
 /// XXX: Loop(*handle) is buggy with destructors. Normal structs
 /// with dtors may not be destructured, but tuple structs can,
@@ -218,7 +220,6 @@ fn native_handle(&self) -> *uvll::uv_loop_t {
 pub type NullCallback = ~fn();
 pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
-pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
 pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
 pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
 
@@ -231,7 +232,6 @@ struct WatcherData {
     connect_cb: Option<ConnectionCallback>,
     close_cb: Option<NullCallback>,
     alloc_cb: Option<AllocCallback>,
-    async_cb: Option<AsyncCallback>,
     udp_recv_cb: Option<UdpReceiveCallback>,
     udp_send_cb: Option<UdpSendCallback>,
 }
index 0b65c55636d40de9aa1cee2f81d3a559d0a2c3e5..f1936635a1839fb8f7670377cc5f5dc470f174ce 100644 (file)
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::libc;
 use std::c_str::CString;
+use std::cast;
+use std::libc;
+use std::rt::BlockedTask;
+use std::rt::io::IoError;
+use std::rt::local::Local;
+use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
+use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::tube::Tube;
 
-use super::{Loop, UvError, Watcher, NativeHandle, status_to_maybe_uv_error};
-use super::ConnectionCallback;
-use net;
+use stream::StreamWatcher;
+use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
+use uvio::HomingIO;
 use uvll;
 
-pub struct Pipe(*uvll::uv_pipe_t);
+pub struct PipeWatcher {
+    stream: StreamWatcher,
+    home: SchedHandle,
+}
+
+pub struct PipeListener {
+    home: SchedHandle,
+    pipe: *uvll::uv_pipe_t,
+    priv closing_task: Option<BlockedTask>,
+    priv outgoing: Tube<Result<~RtioPipe, IoError>>,
+}
+
+pub struct PipeAcceptor {
+    listener: ~PipeListener,
+    priv incoming: Tube<Result<~RtioPipe, IoError>>,
+}
 
-impl Watcher for Pipe {}
+// PipeWatcher implementation and traits
 
-impl Pipe {
-    pub fn new(loop_: &Loop, ipc: bool) -> Pipe {
+impl PipeWatcher {
+    pub fn new(pipe: *uvll::uv_pipe_t) -> PipeWatcher {
+        PipeWatcher {
+            stream: StreamWatcher::new(pipe),
+            home: get_handle_to_current_scheduler!(),
+        }
+    }
+
+    pub fn alloc(loop_: &Loop, ipc: bool) -> *uvll::uv_pipe_t {
         unsafe {
             let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
-            assert!(handle.is_not_null());
+            assert!(!handle.is_null());
             let ipc = ipc as libc::c_int;
             assert_eq!(uvll::uv_pipe_init(loop_.native_handle(), handle, ipc), 0);
-            let mut ret: Pipe =
-                    NativeHandle::from_native_handle(handle);
-            ret.install_watcher_data();
-            ret
+            handle
         }
     }
 
-    pub fn as_stream(&self) -> net::StreamWatcher {
-        net::StreamWatcher(**self as *uvll::uv_stream_t)
+    pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
+    {
+        let handle = PipeWatcher::alloc(loop_, false);
+        match unsafe { uvll::uv_pipe_open(handle, file) } {
+            0 => Ok(PipeWatcher::new(handle)),
+            n => {
+                unsafe { uvll::uv_close(handle, pipe_close_cb) }
+                Err(UvError(n))
+            }
+        }
     }
 
-    #[fixed_stack_segment] #[inline(never)]
-    pub fn open(&mut self, file: libc::c_int) -> Result<(), UvError> {
-        match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } {
-            0 => Ok(()),
-            n => Err(UvError(n))
+    pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
+    {
+        struct Ctx {
+            task: Option<BlockedTask>,
+            result: Option<Result<PipeWatcher, UvError>>,
         }
+        let mut cx = Ctx { task: None, result: None };
+        let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
+        unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
+
+        let sched: ~Scheduler = Local::take();
+        do sched.deschedule_running_task_and_then |_, task| {
+            cx.task = Some(task);
+            unsafe {
+                uvll::uv_pipe_connect(req,
+                                      PipeWatcher::alloc(loop_, false),
+                                      name.with_ref(|p| p),
+                                      connect_cb)
+            }
+        }
+        assert!(cx.task.is_none());
+        return cx.result.take().expect("pipe connect needs a result");
+
+        extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+            unsafe {
+                let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
+                let stream = uvll::get_stream_handle_from_connect_req(req);
+                cx.result = Some(match status {
+                    0 => Ok(PipeWatcher::new(stream)),
+                    n => {
+                        uvll::free_handle(stream);
+                        Err(UvError(n))
+                    }
+                });
+                uvll::free_req(req);
+
+                let sched: ~Scheduler = Local::take();
+                sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+            }
+        }
+    }
+}
+
+impl RtioPipe for PipeWatcher {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+        let _m = self.fire_missiles();
+        self.stream.read(buf).map_err(uv_error_to_io_error)
+    }
+
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        self.stream.write(buf).map_err(uv_error_to_io_error)
+    }
+}
+
+impl HomingIO for PipeWatcher {
+    fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+}
+
+impl Drop for PipeWatcher {
+    fn drop(&mut self) {
+        let _m = self.fire_missiles();
+        self.stream.close(true); // close synchronously
     }
+}
+
+extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
+    unsafe { uvll::free_handle(handle) }
+}
 
-    #[fixed_stack_segment] #[inline(never)]
-    pub fn bind(&mut self, name: &CString) -> Result<(), UvError> {
-        do name.with_ref |name| {
-            match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } {
-                0 => Ok(()),
-                n => Err(UvError(n))
+// PipeListener implementation and traits
+
+impl PipeListener {
+    pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
+        let pipe = PipeWatcher::alloc(loop_, false);
+        match unsafe { uvll::uv_pipe_bind(pipe, name.with_ref(|p| p)) } {
+            0 => {
+                let p = ~PipeListener {
+                    home: get_handle_to_current_scheduler!(),
+                    pipe: pipe,
+                    closing_task: None,
+                    outgoing: Tube::new(),
+                };
+                Ok(p.install())
+            }
+            n => {
+                unsafe { uvll::free_handle(pipe) }
+                Err(UvError(n))
             }
         }
     }
+}
 
-    #[fixed_stack_segment] #[inline(never)]
-    pub fn connect(&mut self, name: &CString, cb: ConnectionCallback) {
-        {
-            let data = self.get_watcher_data();
-            assert!(data.connect_cb.is_none());
-            data.connect_cb = Some(cb);
+impl RtioUnixListener for PipeListener {
+    fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
+        // create the acceptor object from ourselves
+        let incoming = self.outgoing.clone();
+        let mut acceptor = ~PipeAcceptor {
+            listener: self,
+            incoming: incoming,
+        };
+
+        let _m = acceptor.fire_missiles();
+        // XXX: the 128 backlog should be configurable
+        match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
+            0 => Ok(acceptor as ~RtioUnixAcceptor),
+            n => Err(uv_error_to_io_error(UvError(n))),
         }
+    }
+}
 
-        let connect = net::ConnectRequest::new();
-        let name = do name.with_ref |p| { p };
+impl HomingIO for PipeListener {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
-        unsafe {
-            uvll::uv_pipe_connect(connect.native_handle(),
-                               self.native_handle(),
-                               name,
-                               connect_cb)
+impl UvHandle<uvll::uv_pipe_t> for PipeListener {
+    fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
+}
+
+extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
+    let msg = match status {
+        0 => {
+            let loop_ = NativeHandle::from_native_handle(unsafe {
+                uvll::get_loop_for_uv_handle(server)
+            });
+            let client = PipeWatcher::alloc(&loop_, false);
+            assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
+            Ok(~PipeWatcher::new(client) as ~RtioPipe)
         }
+        n => Err(uv_error_to_io_error(UvError(n)))
+    };
+
+    let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
+    pipe.outgoing.send(msg);
+}
 
-        extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
-            let connect_request: net::ConnectRequest =
-                    NativeHandle::from_native_handle(req);
-            let mut stream_watcher = connect_request.stream();
-            connect_request.delete();
+impl Drop for PipeListener {
+    fn drop(&mut self) {
+        let (_m, sched) = self.fire_missiles_sched();
 
-            let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
-            let status = status_to_maybe_uv_error(status);
-            cb(stream_watcher, status);
+        do sched.deschedule_running_task_and_then |_, task| {
+            self.closing_task = Some(task);
+            unsafe { uvll::uv_close(self.pipe, listener_close_cb) }
         }
     }
+}
+
+extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
+    let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) };
+    unsafe { uvll::free_handle(handle) }
 
+    let sched: ~Scheduler = Local::take();
+    sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap());
 }
 
-impl NativeHandle<*uvll::uv_pipe_t> for Pipe {
-    fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
-        Pipe(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_pipe_t {
-        match self { &Pipe(ptr) => ptr }
+// PipeAcceptor implementation and traits
+
+impl RtioUnixAcceptor for PipeAcceptor {
+    fn accept(&mut self) -> Result<~RtioPipe, IoError> {
+        let _m = self.fire_missiles();
+        self.incoming.recv()
     }
 }
+
+impl HomingIO for PipeAcceptor {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
index d143bc059e43729c23a9436d8b00d601e205c22e..50964d7a84c63aff6546cacb948b492ae990d40f 100644 (file)
@@ -21,8 +21,9 @@
 use std::vec;
 
 use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error};
-use uvio::{HomingIO, UvPipeStream, UvUnboundPipe};
+use uvio::HomingIO;
 use uvll;
+use pipe::PipeWatcher;
 
 pub struct Process {
     handle: *uvll::uv_process_t,
@@ -42,7 +43,7 @@ impl Process {
     /// Returns either the corresponding process object or an error which
     /// occurred.
     pub fn spawn(loop_: &Loop, config: ProcessConfig)
-                -> Result<(~Process, ~[Option<~UvPipeStream>]), UvError>
+                -> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
     {
         let cwd = config.cwd.map(|s| s.to_c_str());
         let io = config.io;
@@ -121,7 +122,7 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
 
 unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
                     io: &StdioContainer,
-                    loop_: &Loop) -> Option<~UvPipeStream> {
+                    loop_: &Loop) -> Option<PipeWatcher> {
     match *io {
         Ignored => {
             uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
@@ -140,11 +141,10 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
             if writable {
                 flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
             }
-            let pipe = UvUnboundPipe::new(loop_);
-            let handle = pipe.pipe.as_stream().native_handle();
+            let pipe_handle = PipeWatcher::alloc(loop_, false);
             uvll::set_stdio_container_flags(dst, flags);
-            uvll::set_stdio_container_stream(dst, handle);
-            Some(~UvPipeStream::new(pipe))
+            uvll::set_stdio_container_stream(dst, pipe_handle);
+            Some(PipeWatcher::new(pipe_handle))
         }
     }
 }
diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs
new file mode 100644 (file)
index 0000000..ad0deeb
--- /dev/null
@@ -0,0 +1,216 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::cast;
+use std::libc::{c_int, size_t, ssize_t, c_void};
+use std::ptr;
+use std::rt::BlockedTask;
+use std::rt::local::Local;
+use std::rt::sched::Scheduler;
+
+use super::{UvError, Buf, slice_to_uv_buf};
+use uvll;
+
+// This is a helper structure which is intended to get embedded into other
+// Watcher structures. This structure will retain a handle to the underlying
+// uv_stream_t instance, and all I/O operations assume that it's already located
+// on the appropriate scheduler.
+pub struct StreamWatcher {
+    handle: *uvll::uv_stream_t,
+
+    // Cache the last used uv_write_t so we don't have to allocate a new one on
+    // every call to uv_write(). Ideally this would be a stack-allocated
+    // structure, but currently we don't have mappings for all the structures
+    // defined in libuv, so we're foced to malloc this.
+    priv last_write_req: Option<*uvll::uv_write_t>,
+}
+
+struct ReadContext {
+    buf: Option<Buf>,
+    result: Option<Result<uint, UvError>>,
+    task: Option<BlockedTask>,
+}
+
+struct WriteContext {
+    result: Option<Result<(), UvError>>,
+    task: Option<BlockedTask>,
+}
+
+impl StreamWatcher {
+    // Creates a new helper structure which should be then embedded into another
+    // watcher. This provides the generic read/write methods on streams.
+    //
+    // This structure will *not* close the stream when it is dropped. It is up
+    // to the enclosure structure to be sure to call the close method (which
+    // will block the task). Note that this is also required to prevent memory
+    // leaks.
+    //
+    // It should also be noted that the `data` field of the underlying uv handle
+    // will be manipulated on each of the methods called on this watcher.
+    // Wrappers should ensure to always reset the field to an appropriate value
+    // if they rely on the field to perform an action.
+    pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
+        StreamWatcher {
+            handle: stream,
+            last_write_req: None,
+        }
+    }
+
+    pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
+        // Send off the read request, but don't block until we're sure that the
+        // read request is queued.
+        match unsafe {
+            uvll::uv_read_start(self.handle, alloc_cb, read_cb)
+        } {
+            0 => {
+                let mut rcx = ReadContext {
+                    buf: Some(slice_to_uv_buf(buf)),
+                    result: None,
+                    task: None,
+                };
+                unsafe {
+                    uvll::set_data_for_uv_handle(self.handle, &rcx)
+                }
+                let scheduler: ~Scheduler = Local::take();
+                do scheduler.deschedule_running_task_and_then |_sched, task| {
+                    rcx.task = Some(task);
+                }
+                rcx.result.take().expect("no result in read stream?")
+            }
+            n => Err(UvError(n))
+        }
+    }
+
+    pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
+        // Prepare the write request, either using a cached one or allocating a
+        // new one
+        let req = match self.last_write_req {
+            Some(req) => req,
+            None => unsafe { uvll::malloc_req(uvll::UV_WRITE) },
+        };
+        self.last_write_req = Some(req);
+        let mut wcx = WriteContext { result: None, task: None, };
+        unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) }
+
+        // Send off the request, but be careful to not block until we're sure
+        // that the write reqeust is queued. If the reqeust couldn't be queued,
+        // then we should return immediately with an error.
+        match unsafe {
+            uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb)
+        } {
+            0 => {
+                let scheduler: ~Scheduler = Local::take();
+                do scheduler.deschedule_running_task_and_then |_sched, task| {
+                    wcx.task = Some(task);
+                }
+                assert!(wcx.task.is_none());
+                wcx.result.take().expect("no result in write stream?")
+            }
+            n => Err(UvError(n)),
+        }
+    }
+
+    // This will deallocate an internally used memory, along with closing the
+    // handle (and freeing it).
+    //
+    // The `synchronous` flag dictates whether this handle is closed
+    // synchronously (the task is blocked) or asynchronously (the task is not
+    // block, but the handle is still deallocated).
+    pub fn close(&mut self, synchronous: bool) {
+        // clean up the cached write request if we have one
+        match self.last_write_req {
+            Some(req) => unsafe { uvll::free_req(req) },
+            None => {}
+        }
+
+        if synchronous {
+            let mut closing_task = None;
+            unsafe {
+                uvll::set_data_for_uv_handle(self.handle, &closing_task);
+            }
+
+            // Wait for this stream to close because it possibly represents a remote
+            // connection which may have consequences if we close asynchronously.
+            let sched: ~Scheduler = Local::take();
+            do sched.deschedule_running_task_and_then |_, task| {
+                closing_task = Some(task);
+                unsafe { uvll::uv_close(self.handle, close_cb) }
+            }
+        } else {
+            unsafe {
+                uvll::set_data_for_uv_handle(self.handle, ptr::null::<u8>());
+                uvll::uv_close(self.handle, close_cb)
+            }
+        }
+
+        extern fn close_cb(handle: *uvll::uv_handle_t) {
+            let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) };
+            unsafe { uvll::free_handle(handle) }
+            if data.is_null() { return }
+
+            let closing_task: &mut Option<BlockedTask> = unsafe {
+                cast::transmute(data)
+            };
+            let task = closing_task.take_unwrap();
+            let scheduler: ~Scheduler = Local::take();
+            scheduler.resume_blocked_task_immediately(task);
+        }
+    }
+}
+
+// This allocation callback expects to be invoked once and only once. It will
+// unwrap the buffer in the ReadContext stored in the stream and return it. This
+// will fail if it is called more than once.
+extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf {
+    let rcx: &mut ReadContext = unsafe {
+        cast::transmute(uvll::get_data_for_uv_handle(stream))
+    };
+    rcx.buf.take().expect("alloc_cb called more than once")
+}
+
+// When a stream has read some data, we will always forcibly stop reading and
+// return all the data read (even if it didn't fill the whole buffer).
+extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
+    let rcx: &mut ReadContext = unsafe {
+        cast::transmute(uvll::get_data_for_uv_handle(handle))
+    };
+    // Stop reading so that no read callbacks are
+    // triggered before the user calls `read` again.
+    // XXX: Is there a performance impact to calling
+    // stop here?
+    unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+
+    assert!(rcx.result.is_none());
+    rcx.result = Some(match nread {
+        n if n < 0 => Err(UvError(n as c_int)),
+        n => Ok(n as uint),
+    });
+
+    let task = rcx.task.take().expect("read_cb needs a task");
+    let scheduler: ~Scheduler = Local::take();
+    scheduler.resume_blocked_task_immediately(task);
+}
+
+// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
+// reading, however, all this does is wake up the blocked task after squirreling
+// away the error code as a result.
+extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
+    // Remember to not free the request because it is re-used between writes on
+    // the same stream.
+    unsafe {
+        let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req));
+        wcx.result = Some(match status {
+            0 => Ok(()),
+            n => Err(UvError(n)),
+        });
+        let sched: ~Scheduler = Local::take();
+        sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+    }
+}
index ad5f5043737f2ff97bad1bfa361e4ec5dceb00d4..316a817354db1a59f99f85d7d0a30908e5b13044 100644 (file)
 // except according to those terms.
 
 use std::libc;
+use std::rt::io::IoError;
+use std::rt::local::Local;
+use std::rt::rtio::RtioTTY;
+use std::rt::sched::{Scheduler, SchedHandle};
 
-use super::{Watcher, Loop, NativeHandle, UvError};
-use net;
+use stream::StreamWatcher;
+use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
+use uvio::HomingIO;
 use uvll;
 
-/// A process wraps the handle of the underlying uv_process_t.
-pub struct TTY(*uvll::uv_tty_t);
-
-impl Watcher for TTY {}
+pub struct TtyWatcher{
+    tty: *uvll::uv_tty_t,
+    stream: StreamWatcher,
+    home: SchedHandle,
+    fd: libc::c_int,
+}
 
-impl TTY {
-    #[fixed_stack_segment] #[inline(never)]
-    pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool) ->
-            Result<TTY, UvError>
+impl TtyWatcher {
+    pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool)
+        -> Result<TtyWatcher, UvError>
     {
-        let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) };
-        assert!(handle.is_not_null());
+        let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
 
-        let ret = unsafe {
+        match unsafe {
             uvll::uv_tty_init(loop_.native_handle(), handle, fd as libc::c_int,
                               readable as libc::c_int)
-        };
-        match ret {
+        } {
             0 => {
-                let mut ret: TTY = NativeHandle::from_native_handle(handle);
-                ret.install_watcher_data();
-                Ok(ret)
+                Ok(TtyWatcher {
+                    tty: handle,
+                    stream: StreamWatcher::new(handle),
+                    home: get_handle_to_current_scheduler!(),
+                    fd: fd,
+                })
             }
             n => {
-                unsafe { uvll::free_handle(handle); }
+                unsafe { uvll::free_handle(handle) }
                 Err(UvError(n))
             }
         }
     }
+}
 
-    pub fn as_stream(&self) -> net::StreamWatcher {
-        net::StreamWatcher(**self as *uvll::uv_stream_t)
+impl RtioTTY for TtyWatcher {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+        let _m = self.fire_missiles();
+        self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
-    #[fixed_stack_segment] #[inline(never)]
-    pub fn set_mode(&self, raw: bool) -> Result<(), UvError> {
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        self.stream.write(buf).map_err(uv_error_to_io_error)
+    }
+
+    fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
         let raw = raw as libc::c_int;
-        match unsafe { uvll::uv_tty_set_mode(self.native_handle(), raw) } {
+        let _m = self.fire_missiles();
+        match unsafe { uvll::uv_tty_set_mode(self.tty, raw) } {
             0 => Ok(()),
-            n => Err(UvError(n))
+            n => Err(uv_error_to_io_error(UvError(n)))
         }
     }
 
-    #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)]
-    pub fn get_winsize(&self) -> Result<(int, int), UvError> {
+    #[allow(unused_mut)]
+    fn get_winsize(&mut self) -> Result<(int, int), IoError> {
         let mut width: libc::c_int = 0;
         let mut height: libc::c_int = 0;
         let widthptr: *libc::c_int = &width;
         let heightptr: *libc::c_int = &width;
 
-        match unsafe { uvll::uv_tty_get_winsize(self.native_handle(),
+        let _m = self.fire_missiles();
+        match unsafe { uvll::uv_tty_get_winsize(self.tty,
                                                 widthptr, heightptr) } {
             0 => Ok((width as int, height as int)),
-            n => Err(UvError(n))
+            n => Err(uv_error_to_io_error(UvError(n)))
         }
     }
-}
 
-impl NativeHandle<*uvll::uv_tty_t> for TTY {
-    fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY {
-        TTY(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_tty_t {
-        match self { &TTY(ptr) => ptr }
+    fn isatty(&self) -> bool {
+        unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY }
     }
 }
 
+impl UvHandle<uvll::uv_tty_t> for TtyWatcher {
+    fn uv_handle(&self) -> *uvll::uv_tty_t { self.tty }
+}
+
+impl HomingIO for TtyWatcher {
+    fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
+}
+
+impl Drop for TtyWatcher {
+    // TTY handles are used for the logger in a task, so this destructor is run
+    // when a task is destroyed. When a task is being destroyed, a local
+    // scheduler isn't available, so we can't do the normal "take the scheduler
+    // and resume once close is done". Instead close operations on a TTY are
+    // asynchronous.
+    fn drop(&mut self) {
+        let _m = self.fire_missiles();
+        self.stream.close(false);
+    }
+}
index e0ceb954e58cbfa40642d0d3dcb68632b4603ecd..592a23297cc3a8dba632137e29541de8ff488fb4 100644 (file)
@@ -49,6 +49,7 @@
 use idle::IdleWatcher;
 use net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
 use addrinfo::{GetAddrInfoRequest, accum_addrinfo};
+use pipe::PipeListener;
 
 // XXX we should not be calling uvll functions in here.
 
@@ -616,67 +617,38 @@ fn spawn(&mut self, config: ProcessConfig)
         match Process::spawn(self.uv_loop(), config) {
             Ok((p, io)) => {
                 Ok((p as ~RtioProcess,
-                    io.move_iter().map(|i| i.map(|p| p as ~RtioPipe)).collect()))
+                    io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect()))
             }
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
-    fn unix_bind(&mut self, path: &CString) ->
-        Result<~RtioUnixListener, IoError> {
-        let mut pipe = UvUnboundPipe::new(self.uv_loop());
-        match pipe.pipe.bind(path) {
-            Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener),
+    fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError>
+    {
+        match PipeListener::bind(self.uv_loop(), path) {
+            Ok(p) => Ok(p as ~RtioUnixListener),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
     fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
-        let pipe = UvUnboundPipe::new(self.uv_loop());
-        let mut rawpipe = pipe.pipe;
-
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
-        let pipe_cell = Cell::new(pipe);
-        let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell;
-
-        let scheduler: ~Scheduler = Local::take();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do rawpipe.connect(path) |_stream, err| {
-                let res = match err {
-                    None => {
-                        let pipe = unsafe { (*pipe_cell_ptr).take() };
-                        Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
-                    }
-                    Some(e) => Err(uv_error_to_io_error(e)),
-                };
-                unsafe { (*result_cell_ptr).put_back(res); }
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
+        match PipeWatcher::connect(self.uv_loop(), path) {
+            Ok(p) => Ok(~p as ~RtioPipe),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
-
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
     }
 
     fn tty_open(&mut self, fd: c_int, readable: bool)
             -> Result<~RtioTTY, IoError> {
-        match tty::TTY::new(self.uv_loop(), fd, readable) {
-            Ok(tty) => Ok(~UvTTY {
-                home: get_handle_to_current_scheduler!(),
-                tty: tty,
-                fd: fd,
-            } as ~RtioTTY),
+        match TtyWatcher::new(self.uv_loop(), fd, readable) {
+            Ok(tty) => Ok(~tty as ~RtioTTY),
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
     fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
-        let mut pipe = UvUnboundPipe::new(self.uv_loop());
-        match pipe.pipe.open(fd) {
-            Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe),
+        match PipeWatcher::open(self.uv_loop(), fd) {
+            Ok(s) => Ok(~s as ~RtioPipe),
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
@@ -865,60 +837,6 @@ fn write_stream(mut watcher: StreamWatcher,
     result_cell.take()
 }
 
-pub struct UvUnboundPipe {
-    pipe: Pipe,
-    priv home: SchedHandle,
-}
-
-impl UvUnboundPipe {
-    /// Creates a new unbound pipe homed to the current scheduler, placed on the
-    /// specified event loop
-    pub fn new(loop_: &Loop) -> UvUnboundPipe {
-        UvUnboundPipe {
-            pipe: Pipe::new(loop_, false),
-            home: get_handle_to_current_scheduler!(),
-        }
-    }
-}
-
-impl HomingIO for UvUnboundPipe {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvUnboundPipe {
-    fn drop(&mut self) {
-        let (_m, sched) = self.fire_homing_missile_sched();
-        do sched.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.pipe.close {
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-    }
-}
-
-pub struct UvPipeStream {
-    priv inner: UvUnboundPipe,
-}
-
-impl UvPipeStream {
-    pub fn new(inner: UvUnboundPipe) -> UvPipeStream {
-        UvPipeStream { inner: inner }
-    }
-}
-
-impl RtioPipe for UvPipeStream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let (_m, scheduler) = self.inner.fire_homing_missile_sched();
-        read_stream(self.inner.pipe.as_stream(), scheduler, buf)
-    }
-    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let (_m, scheduler) = self.inner.fire_homing_missile_sched();
-        write_stream(self.inner.pipe.as_stream(), scheduler, buf)
-    }
-}
-
 pub struct UvTcpStream {
     priv watcher: TcpWatcher,
     priv home: SchedHandle,
@@ -1307,129 +1225,6 @@ fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
     }
 }
 
-pub struct UvUnixListener {
-    priv inner: UvUnboundPipe
-}
-
-impl HomingIO for UvUnixListener {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() }
-}
-
-impl UvUnixListener {
-    fn new(pipe: UvUnboundPipe) -> UvUnixListener {
-        UvUnixListener { inner: pipe }
-    }
-}
-
-impl RtioUnixListener for UvUnixListener {
-    fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
-        let _m = self.fire_homing_missile();
-        let acceptor = ~UvUnixAcceptor::new(*self);
-        let incoming = Cell::new(acceptor.incoming.clone());
-        let mut stream = acceptor.listener.inner.pipe.as_stream();
-        let res = do stream.listen |mut server, status| {
-            do incoming.with_mut_ref |incoming| {
-                let inc = match status {
-                    Some(e) => Err(uv_error_to_io_error(e)),
-                    None => {
-                        let pipe = UvUnboundPipe::new(&server.event_loop());
-                        server.accept(pipe.pipe.as_stream());
-                        Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
-                    }
-                };
-                incoming.send(inc);
-            }
-        };
-        match res {
-            Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
-            Err(e) => Err(uv_error_to_io_error(e)),
-        }
-    }
-}
-
-pub struct UvTTY {
-    tty: tty::TTY,
-    home: SchedHandle,
-    fd: c_int,
-}
-
-impl HomingIO for UvTTY {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvTTY {
-    fn drop(&mut self) {
-        // TTY handles are used for the logger in a task, so this destructor is
-        // run when a task is destroyed. When a task is being destroyed, a local
-        // scheduler isn't available, so we can't do the normal "take the
-        // scheduler and resume once close is done". Instead close operations on
-        // a TTY are asynchronous.
-        self.tty.close_async();
-    }
-}
-
-impl RtioTTY for UvTTY {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        read_stream(self.tty.as_stream(), scheduler, buf)
-    }
-
-    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        write_stream(self.tty.as_stream(), scheduler, buf)
-    }
-
-    fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        match self.tty.set_mode(raw) {
-            Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
-        }
-    }
-
-    fn get_winsize(&mut self) -> Result<(int, int), IoError> {
-        let _m = self.fire_homing_missile();
-        match self.tty.get_winsize() {
-            Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
-        }
-    }
-
-    fn isatty(&self) -> bool {
-        unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY }
-    }
-}
-
-pub struct UvUnixAcceptor {
-    listener: UvUnixListener,
-    incoming: Tube<Result<~RtioPipe, IoError>>,
-}
-
-impl HomingIO for UvUnixAcceptor {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
-}
-
-impl UvUnixAcceptor {
-    fn new(listener: UvUnixListener) -> UvUnixAcceptor {
-        UvUnixAcceptor { listener: listener, incoming: Tube::new() }
-    }
-}
-
-impl RtioUnixAcceptor for UvUnixAcceptor {
-    fn accept(&mut self) -> Result<~RtioPipe, IoError> {
-        let _m = self.fire_homing_missile();
-        self.incoming.recv()
-    }
-
-    fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        accept_simultaneously(self.listener.inner.pipe.as_stream(), 1)
-    }
-
-    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        accept_simultaneously(self.listener.inner.pipe.as_stream(), 0)
-    }
-}
-
 // this function is full of lies
 unsafe fn local_io() -> &'static mut IoFactory {
     do Local::borrow |sched: &mut Scheduler| {
index 8684537f4e4b5b376a90aaa787bb385447d786e1..f8b87abb9f67700f33f9425dae88cf3c3ed37a24 100644 (file)
@@ -213,8 +213,6 @@ pub trait RtioUnixListener {
 
 pub trait RtioUnixAcceptor {
     fn accept(&mut self) -> Result<~RtioPipe, IoError>;
-    fn accept_simultaneously(&mut self) -> Result<(), IoError>;
-    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
 }
 
 pub trait RtioTTY {