]> git.lizzy.rs Git - rust.git/commitdiff
Implement io::net::unix
authorAlex Crichton <alex@alexcrichton.com>
Wed, 16 Oct 2013 02:44:08 +0000 (19:44 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Thu, 24 Oct 2013 21:21:56 +0000 (14:21 -0700)
src/libstd/rt/io/net/unix.rs
src/libstd/rt/io/pipe.rs
src/libstd/rt/io/process.rs
src/libstd/rt/rtio.rs
src/libstd/rt/test.rs
src/libstd/rt/uv/net.rs
src/libstd/rt/uv/pipe.rs
src/libstd/rt/uv/process.rs
src/libstd/rt/uv/uvio.rs
src/libstd/rt/uv/uvll.rs
src/rt/rustrt.def.in

index 1771a963ba78cdf6dacb6b49aa3bf905e37eed84..9428c1f800d158976ac75996d444ec63e9528bc6 100644 (file)
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+/*!
+
+Named pipes
+
+This module contains the ability to communicate over named pipes with
+synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
+while on Unix it corresponds to UNIX domain sockets.
+
+These pipes are similar to TCP in the sense that you can have both a stream to a
+server and a server itself. The server provided accepts other `UnixStream`
+instances as clients.
+
+*/
+
 use prelude::*;
-use super::super::*;
+
 use super::super::support::PathLike;
+use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject};
+use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener};
+use rt::rtio::RtioUnixAcceptor;
+use rt::io::pipe::PipeStream;
+use rt::io::{io_error, Listener, Acceptor, Reader, Writer};
+use rt::local::Local;
 
-pub struct UnixStream;
+/// A stream which communicates over a named pipe.
+pub struct UnixStream {
+    priv obj: PipeStream,
+}
 
 impl UnixStream {
-    pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> {
-        fail!()
+    fn new(obj: ~RtioPipeObject) -> UnixStream {
+        UnixStream { obj: PipeStream::new_bound(obj) }
+    }
+
+    /// Connect to a pipe named by `path`. This will attempt to open a
+    /// connection to the underlying socket.
+    ///
+    /// The returned stream will be closed when the object falls out of scope.
+    ///
+    /// # Failure
+    ///
+    /// This function will raise on the `io_error` condition if the connection
+    /// could not be made.
+    ///
+    /// # Example
+    ///
+    ///     use std::rt::io::net::unix::UnixStream;
+    ///
+    ///     let server = Path("path/to/my/socket");
+    ///     let mut stream = UnixStream::connect(&server);
+    ///     stream.write([1, 2, 3]);
+    ///
+    pub fn connect<P: PathLike>(path: &P) -> Option<UnixStream> {
+        let pipe = unsafe {
+            let io: *mut IoFactoryObject = Local::unsafe_borrow();
+            (*io).unix_connect(path)
+        };
+
+        match pipe {
+            Ok(s) => Some(UnixStream::new(s)),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
     }
 }
 
 impl Reader for UnixStream {
-    fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
-
-    fn eof(&mut self) -> bool { fail!() }
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) }
+    fn eof(&mut self) -> bool { self.obj.eof() }
 }
 
 impl Writer for UnixStream {
-    fn write(&mut self, _v: &[u8]) { fail!() }
-
-    fn flush(&mut self) { fail!() }
+    fn write(&mut self, buf: &[u8]) { self.obj.write(buf) }
+    fn flush(&mut self) { self.obj.flush() }
 }
 
-pub struct UnixListener;
+pub struct UnixListener {
+    priv obj: ~RtioUnixListenerObject,
+}
 
 impl UnixListener {
-    pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> {
-        fail!()
+
+    /// Creates a new listener, ready to receive incoming connections on the
+    /// specified socket. The server will be named by `path`.
+    ///
+    /// This listener will be closed when it falls out of scope.
+    ///
+    /// # Failure
+    ///
+    /// This function will raise on the `io_error` condition if the specified
+    /// path could not be bound.
+    ///
+    /// # Example
+    ///
+    ///     use std::rt::io::net::unix::UnixListener;
+    ///
+    ///     let server = Path("path/to/my/socket");
+    ///     let mut stream = UnixListener::bind(&server);
+    ///     for client in stream.incoming() {
+    ///         let mut client = client;
+    ///         client.write([1, 2, 3, 4]);
+    ///     }
+    ///
+    pub fn bind<P: PathLike>(path: &P) -> Option<UnixListener> {
+        let listener = unsafe {
+            let io: *mut IoFactoryObject = Local::unsafe_borrow();
+            (*io).unix_bind(path)
+        };
+        match listener {
+            Ok(s) => Some(UnixListener{ obj: s }),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
     }
 }
 
 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
-    fn listen(self) -> Option<UnixAcceptor> { fail!() }
+    fn listen(self) -> Option<UnixAcceptor> {
+        match self.obj.listen() {
+            Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
 }
 
-pub struct UnixAcceptor;
+pub struct UnixAcceptor {
+    priv obj: ~RtioUnixAcceptorObject,
+}
 
 impl Acceptor<UnixStream> for UnixAcceptor {
-    fn accept(&mut self) -> Option<UnixStream> { fail!() }
+    fn accept(&mut self) -> Option<UnixStream> {
+        match self.obj.accept() {
+            Ok(s) => Some(UnixStream::new(s)),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+    use super::*;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::io::*;
+    use rt::comm::oneshot;
+    use os;
+
+    fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) {
+        let server = Cell::new(server);
+        let client = Cell::new(client);
+        do run_in_mt_newsched_task {
+            let server = Cell::new(server.take());
+            let client = Cell::new(client.take());
+            let path1 = next_test_unix();
+            let path2 = path1.clone();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = UnixListener::bind(&path1).listen();
+                chan.take().send(());
+                server.take()(acceptor.accept().unwrap());
+            }
+
+            do spawntask {
+                port.take().recv();
+                client.take()(UnixStream::connect(&path2).unwrap());
+            }
+        }
+    }
+
+    #[test]
+    fn bind_error() {
+        do run_in_mt_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == PermissionDenied);
+                called = true;
+            }).inside {
+                let listener = UnixListener::bind(&("path/to/nowhere"));
+                assert!(listener.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn connect_error() {
+        do run_in_mt_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert_eq!(e.kind, OtherIoError);
+                called = true;
+            }).inside {
+                let stream = UnixStream::connect(&("path/to/nowhere"));
+                assert!(stream.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn smoke() {
+        smalltest(|mut server| {
+            let mut buf = [0];
+            server.read(buf);
+            assert!(buf[0] == 99);
+        }, |mut client| {
+            client.write([99]);
+        })
+    }
+
+    #[test]
+    fn read_eof() {
+        smalltest(|mut server| {
+            let mut buf = [0];
+            assert!(server.read(buf).is_none());
+            assert!(server.read(buf).is_none());
+        }, |_client| {
+            // drop the client
+        })
+    }
+
+    #[test]
+    fn write_begone() {
+        smalltest(|mut server| {
+            let buf = [0];
+            let mut stop = false;
+            while !stop{
+                do io_error::cond.trap(|e| {
+                    assert_eq!(e.kind, BrokenPipe);
+                    stop = true;
+                }).inside {
+                    server.write(buf);
+                }
+            }
+        }, |_client| {
+            // drop the client
+        })
+    }
+
+    #[test]
+    fn accept_lots() {
+        do run_in_mt_newsched_task {
+            let times = 10;
+            let path1 = next_test_unix();
+            let path2 = path1.clone();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = UnixListener::bind(&path1).listen();
+                chan.take().send(());
+                do times.times {
+                    let mut client = acceptor.accept();
+                    let mut buf = [0];
+                    client.read(buf);
+                    assert_eq!(buf[0], 100);
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                do times.times {
+                    let mut stream = UnixStream::connect(&path2);
+                    stream.write([100]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn path_exists() {
+        do run_in_mt_newsched_task {
+            let path = next_test_unix();
+            let _acceptor = UnixListener::bind(&path).listen();
+            assert!(os::path_exists(&path));
+        }
+    }
 }
index d2cd531ed266fd99c631e6e216844f958c016b4e..ff1bd55d594b71d3af353974fca134205a48c5d1 100644 (file)
@@ -21,7 +21,7 @@
 use rt::rtio::RtioUnboundPipeObject;
 
 pub struct PipeStream {
-    priv obj: RtioPipeObject
+    priv obj: ~RtioPipeObject
 }
 
 // This should not be a newtype, but rt::uv::process::set_stdio needs to reach
@@ -45,7 +45,7 @@ pub fn new() -> Option<UnboundPipeStream> {
         }
     }
 
-    pub fn bind(inner: RtioPipeObject) -> PipeStream {
+    pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream {
         PipeStream { obj: inner }
     }
 }
index 5f2453852ee7fe9a82704d64ecde3ae9efbf1274..e0ffa82b59fe317f9ccaf9bf2ba69e0171cf8d26 100644 (file)
@@ -100,7 +100,7 @@ pub fn new(config: ProcessConfig) -> Option<Process> {
             Ok((p, io)) => Some(Process{
                 handle: p,
                 io: io.move_iter().map(|p|
-                    p.map(|p| io::PipeStream::bind(p))
+                    p.map(|p| io::PipeStream::new_bound(p))
                 ).collect()
             }),
             Err(ioerr) => {
index 501def8b0607903f82f7fc33d1346e224a9dad00..0964f94d6d5281026804edca282af882356e064d 100644 (file)
@@ -36,6 +36,8 @@
 pub type RtioPipeObject = uvio::UvPipeStream;
 pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
 pub type RtioProcessObject = uvio::UvProcess;
+pub type RtioUnixListenerObject = uvio::UvUnixListener;
+pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
 
 pub trait EventLoop {
     fn run(&mut self);
@@ -86,7 +88,12 @@ fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
         Result<~[Path], IoError>;
     fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>;
     fn spawn(&mut self, config: ProcessConfig)
-            -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>;
+            -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>;
+
+    fn unix_bind<P: PathLike>(&mut self, path: &P) ->
+        Result<~RtioUnixListenerObject, IoError>;
+    fn unix_connect<P: PathLike>(&mut self, path: &P) ->
+        Result<~RtioPipeObject, IoError>;
 }
 
 pub trait RtioTcpListener : RtioSocket {
@@ -154,3 +161,13 @@ pub trait RtioPipe {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
 }
+
+pub trait RtioUnixListener {
+    fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>;
+}
+
+pub trait RtioUnixAcceptor {
+    fn accept(&mut self) -> Result<~RtioPipeObject, IoError>;
+    fn accept_simultaneously(&mut self) -> Result<(), IoError>;
+    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
+}
index 4f7ebb4a72195954e71d0ecb30f97be9a2d2725f..759550e5cbd039cdef69ccf52810acd22ded529f 100644 (file)
@@ -8,8 +8,12 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use rand;
+use rand::Rng;
+use os;
 use libc;
 use option::{Some, None};
+use path::{Path, GenericPath};
 use cell::Cell;
 use clone::Clone;
 use container::Container;
@@ -327,6 +331,12 @@ pub fn next_test_port() -> u16 {
     }
 }
 
+/// Get a temporary path which could be the location of a unix socket
+#[fixed_stack_segment] #[inline(never)]
+pub fn next_test_unix() -> Path {
+    os::tmpdir().push(rand::task_rng().gen_ascii_str(20))
+}
+
 /// Get a unique IPv4 localhost:port pair starting at 9600
 pub fn next_test_ip4() -> SocketAddr {
     SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() }
index a2608bf6b2406be50c1c5a0c1a020b7d43b6781b..2e85900a3f23ca1d1b2ebe106146be9311b80bee 100644 (file)
@@ -206,12 +206,6 @@ pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
         }
     }
 
-    pub fn accept(&mut self, stream: StreamWatcher) {
-        let self_handle = self.native_handle() as *c_void;
-        let stream_handle = stream.native_handle() as *c_void;
-        assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
-    }
-
     pub fn close(self, cb: NullCallback) {
         {
             let mut this = self;
@@ -230,6 +224,36 @@ pub fn close(self, cb: NullCallback) {
             cb();
         }
     }
+
+    pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
+        {
+            let data = self.get_watcher_data();
+            assert!(data.connect_cb.is_none());
+            data.connect_cb = Some(cb);
+        }
+
+        unsafe {
+            static BACKLOG: c_int = 128; // XXX should be configurable
+            match uvll::listen(self.native_handle(), BACKLOG, connection_cb) {
+                0 => Ok(()),
+                n => Err(UvError(n))
+            }
+        }
+
+        extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
+            rtdebug!("connection_cb");
+            let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
+            let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
+            let status = status_to_maybe_uv_error(status);
+            (*cb)(stream_watcher, status);
+        }
+    }
+
+    pub fn accept(&mut self, stream: StreamWatcher) {
+        let self_handle = self.native_handle() as *c_void;
+        let stream_handle = stream.native_handle() as *c_void;
+        assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
+    }
 }
 
 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
@@ -300,28 +324,6 @@ pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
         }
     }
 
-    pub fn listen(&mut self, cb: ConnectionCallback) {
-        {
-            let data = self.get_watcher_data();
-            assert!(data.connect_cb.is_none());
-            data.connect_cb = Some(cb);
-        }
-
-        unsafe {
-            static BACKLOG: c_int = 128; // XXX should be configurable
-            // XXX: This can probably fail
-            assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
-        }
-
-        extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
-            rtdebug!("connection_cb");
-            let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
-            let status = status_to_maybe_uv_error(status);
-            (*cb)(stream_watcher, status);
-        }
-    }
-
     pub fn as_stream(&self) -> StreamWatcher {
         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
     }
@@ -644,7 +646,8 @@ fn listen_ip4() {
             server_tcp_watcher.bind(addr);
             let loop_ = loop_;
             rtdebug!("listening");
-            do server_tcp_watcher.listen |mut server_stream_watcher, status| {
+            let mut stream = server_tcp_watcher.as_stream();
+            let res = do stream.listen |mut server_stream_watcher, status| {
                 rtdebug!("listened!");
                 assert!(status.is_none());
                 let mut loop_ = loop_;
@@ -678,7 +681,9 @@ fn listen_ip4() {
                     }
                     count_cell.put_back(count);
                 }
-            }
+            };
+
+            assert!(res.is_ok());
 
             let client_thread = do Thread::start {
                 rtdebug!("starting client thread");
@@ -705,7 +710,7 @@ fn listen_ip4() {
             loop_.run();
             loop_.close();
             client_thread.join();
-        }
+        };
     }
 
     #[test]
@@ -718,7 +723,8 @@ fn listen_ip6() {
             server_tcp_watcher.bind(addr);
             let loop_ = loop_;
             rtdebug!("listening");
-            do server_tcp_watcher.listen |mut server_stream_watcher, status| {
+            let mut stream = server_tcp_watcher.as_stream();
+            let res = do stream.listen |mut server_stream_watcher, status| {
                 rtdebug!("listened!");
                 assert!(status.is_none());
                 let mut loop_ = loop_;
@@ -754,7 +760,8 @@ fn listen_ip6() {
                     }
                     count_cell.put_back(count);
                 }
-            }
+            };
+            assert!(res.is_ok());
 
             let client_thread = do Thread::start {
                 rtdebug!("starting client thread");
index 1147c731a60c5e312a1d055bf915cec8ad4bbd4c..1cb86d4df2ca28f0abefb6ae7eeda0d3e988016e 100644 (file)
@@ -10,6 +10,7 @@
 
 use prelude::*;
 use libc;
+use c_str::CString;
 
 use rt::uv;
 use rt::uv::net;
@@ -37,6 +38,54 @@ pub fn as_stream(&self) -> net::StreamWatcher {
         net::StreamWatcher(**self as *uvll::uv_stream_t)
     }
 
+    #[fixed_stack_segment] #[inline(never)]
+    pub fn open(&mut self, file: libc::c_int) -> Result<(), uv::UvError> {
+        match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } {
+            0 => Ok(()),
+            n => Err(uv::UvError(n))
+        }
+    }
+
+    #[fixed_stack_segment] #[inline(never)]
+    pub fn bind(&mut self, name: &CString) -> Result<(), uv::UvError> {
+        do name.with_ref |name| {
+            match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } {
+                0 => Ok(()),
+                n => Err(uv::UvError(n))
+            }
+        }
+    }
+
+    #[fixed_stack_segment] #[inline(never)]
+    pub fn connect(&mut self, name: &CString, cb: uv::ConnectionCallback) {
+        {
+            let data = self.get_watcher_data();
+            assert!(data.connect_cb.is_none());
+            data.connect_cb = Some(cb);
+        }
+
+        let connect = net::ConnectRequest::new();
+        let name = do name.with_ref |p| { p };
+
+        unsafe {
+            uvll::uv_pipe_connect(connect.native_handle(),
+                                  self.native_handle(),
+                                  name,
+                                  connect_cb)
+        }
+
+        extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+            let connect_request: net::ConnectRequest =
+                    uv::NativeHandle::from_native_handle(req);
+            let mut stream_watcher = connect_request.stream();
+            connect_request.delete();
+
+            let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
+            let status = uv::status_to_maybe_uv_error(status);
+            cb(stream_watcher, status);
+        }
+    }
+
     pub fn close(self, cb: uv::NullCallback) {
         {
             let mut this = self;
@@ -47,7 +96,7 @@ pub fn close(self, cb: uv::NullCallback) {
 
         unsafe { uvll::close(self.native_handle(), close_cb); }
 
-        extern fn close_cb(handle: *uvll::uv_pipe_t) {
+        extern "C" fn close_cb(handle: *uvll::uv_pipe_t) {
             let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
             process.get_watcher_data().close_cb.take_unwrap()();
             process.drop_watcher_data();
index 176754de8f74529961bf6c571ec6496f494446a2..3c629a783cf7a1ae4cfdc67d1c0ccfc4c73c4686 100644 (file)
@@ -44,7 +44,7 @@ pub fn new() -> Process {
     /// occurred.
     pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig,
                  exit_cb: uv::ExitCallback)
-                    -> Result<~[Option<UvPipeStream>], uv::UvError>
+                    -> Result<~[Option<~UvPipeStream>], uv::UvError>
     {
         let cwd = config.cwd.map(|s| s.to_c_str());
 
@@ -144,7 +144,7 @@ pub fn close(self, cb: uv::NullCallback) {
 }
 
 unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
-                    io: StdioContainer) -> Option<UvPipeStream> {
+                    io: StdioContainer) -> Option<~UvPipeStream> {
     match io {
         Ignored => {
             uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
@@ -166,7 +166,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
             let handle = pipe.pipe.as_stream().native_handle();
             uvll::set_stdio_container_flags(dst, flags);
             uvll::set_stdio_container_stream(dst, handle);
-            Some(pipe.bind())
+            Some(~UvPipeStream::new(**pipe))
         }
     }
 }
index 8dd0f8a6b106e85ceb4484251688c979abcb89cf..6888aa23e99e7d2980be47794533e96f429b5860 100644 (file)
@@ -746,11 +746,11 @@ fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
 
     fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> {
         let home = get_handle_to_current_scheduler!();
-        Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
+        Ok(~UvUnboundPipe::new(Pipe::new(self.uv_loop(), ipc), home))
     }
 
     fn spawn(&mut self, config: ProcessConfig)
-            -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>
+            -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>
     {
         // Sadly, we must create the UvProcess before we actually call uv_spawn
         // so that the exit_cb can close over it and notify it when the process
@@ -801,6 +801,74 @@ fn spawn(&mut self, config: ProcessConfig)
             }
         }
     }
+
+    fn unix_bind<P: PathLike>(&mut self, path: &P) ->
+        Result<~RtioUnixListenerObject, IoError> {
+        let mut pipe = Pipe::new(self.uv_loop(), false);
+        match pipe.bind(&path.path_as_str(|s| s.to_c_str())) {
+            Ok(()) => {
+                let handle = get_handle_to_current_scheduler!();
+                let pipe = UvUnboundPipe::new(pipe, handle);
+                Ok(~UvUnixListener::new(pipe))
+            }
+            Err(e) => {
+                let scheduler: ~Scheduler = Local::take();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    let task_cell = Cell::new(task);
+                    do pipe.close {
+                        let scheduler: ~Scheduler = Local::take();
+                        scheduler.resume_blocked_task_immediately(
+                            task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(e))
+            }
+        }
+    }
+
+    fn unix_connect<P: PathLike>(&mut self, path: &P) ->
+        Result<~RtioPipeObject, IoError>
+    {
+        let scheduler: ~Scheduler = Local::take();
+        let mut pipe = Pipe::new(self.uv_loop(), false);
+        let result_cell = Cell::new_empty();
+        let result_cell_ptr: *Cell<Result<~RtioPipeObject, IoError>> = &result_cell;
+
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            let cstr = do path.path_as_str |s| { s.to_c_str() };
+            do pipe.connect(&cstr) |stream, err| {
+                let res = match err {
+                    None => {
+                        let handle = stream.native_handle();
+                        let pipe = NativeHandle::from_native_handle(
+                                        handle as *uvll::uv_pipe_t);
+                        let home = get_handle_to_current_scheduler!();
+                        let pipe = UvUnboundPipe::new(pipe, home);
+                        Ok(~UvPipeStream::new(pipe))
+                    }
+                    Some(e) => { Err(uv_error_to_io_error(e)) }
+                };
+                unsafe { (*result_cell_ptr).put_back(res); }
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        let ret = result_cell.take();
+        if ret.is_err() {
+            let scheduler: ~Scheduler = Local::take();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do pipe.close {
+                    let scheduler: ~Scheduler = Local::take();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
+            }
+        }
+        return ret;
+    }
 }
 
 pub struct UvTcpListener {
@@ -843,9 +911,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 impl RtioTcpListener for UvTcpListener {
     fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
         do self.home_for_io_consume |self_| {
-            let mut acceptor = ~UvTcpAcceptor::new(self_);
+            let acceptor = ~UvTcpAcceptor::new(self_);
             let incoming = Cell::new(acceptor.incoming.clone());
-            do acceptor.listener.watcher.listen |mut server, status| {
+            let mut stream = acceptor.listener.watcher.as_stream();
+            let res = do stream.listen |mut server, status| {
                 do incoming.with_mut_ref |incoming| {
                     let inc = match status {
                         Some(_) => Err(standard_error(OtherIoError)),
@@ -860,7 +929,10 @@ fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
                     incoming.send(inc);
                 }
             };
-            Ok(acceptor)
+            match res {
+                Ok(()) => Ok(acceptor),
+                Err(e) => Err(uv_error_to_io_error(e)),
+            }
         }
     }
 }
@@ -888,6 +960,17 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
     }
 }
 
+fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
+    let r = unsafe {
+        uvll::tcp_simultaneous_accepts(stream.native_handle(), a as c_int)
+    };
+
+    match status_to_maybe_uv_error(r) {
+        Some(err) => Err(uv_error_to_io_error(err)),
+        None => Ok(())
+    }
+}
+
 impl RtioTcpAcceptor for UvTcpAcceptor {
     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
         do self.home_for_io |self_| {
@@ -897,27 +980,13 @@ fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
         do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
-            };
-
-            match status_to_maybe_uv_error(r) {
-                Some(err) => Err(uv_error_to_io_error(err)),
-                None => Ok(())
-            }
+            accept_simultaneously(self_.listener.watcher.as_stream(), 1)
         }
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
         do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
-            };
-
-            match status_to_maybe_uv_error(r) {
-                Some(err) => Err(uv_error_to_io_error(err)),
-                None => Ok(())
-            }
+            accept_simultaneously(self_.listener.watcher.as_stream(), 0)
         }
     }
 }
@@ -994,6 +1063,12 @@ pub struct UvUnboundPipe {
     priv home: SchedHandle,
 }
 
+impl UvUnboundPipe {
+    fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
+        UvUnboundPipe { pipe: pipe, home: home }
+    }
+}
+
 impl HomingIO for UvUnboundPipe {
     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 }
@@ -1013,18 +1088,12 @@ fn drop(&mut self) {
     }
 }
 
-impl UvUnboundPipe {
-    pub unsafe fn bind(~self) -> UvPipeStream {
-        UvPipeStream { inner: self }
-    }
-}
-
 pub struct UvPipeStream {
-    priv inner: ~UvUnboundPipe,
+    priv inner: UvUnboundPipe,
 }
 
 impl UvPipeStream {
-    pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
+    pub fn new(inner: UvUnboundPipe) -> UvPipeStream {
         UvPipeStream { inner: inner }
     }
 }
@@ -1612,6 +1681,84 @@ fn wait(&mut self) -> int {
     }
 }
 
+pub struct UvUnixListener {
+    priv inner: UvUnboundPipe
+}
+
+impl HomingIO for UvUnixListener {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() }
+}
+
+impl UvUnixListener {
+    fn new(pipe: UvUnboundPipe) -> UvUnixListener {
+        UvUnixListener { inner: pipe }
+    }
+}
+
+impl RtioUnixListener for UvUnixListener {
+    fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError> {
+        do self.home_for_io_consume |self_| {
+            let acceptor = ~UvUnixAcceptor::new(self_);
+            let incoming = Cell::new(acceptor.incoming.clone());
+            let mut stream = acceptor.listener.inner.pipe.as_stream();
+            let res = do stream.listen |mut server, status| {
+                do incoming.with_mut_ref |incoming| {
+                    let inc = match status {
+                        Some(e) => Err(uv_error_to_io_error(e)),
+                        None => {
+                            let inc = Pipe::new(&server.event_loop(), false);
+                            server.accept(inc.as_stream());
+                            let home = get_handle_to_current_scheduler!();
+                            let pipe = UvUnboundPipe::new(inc, home);
+                            Ok(~UvPipeStream::new(pipe))
+                        }
+                    };
+                    incoming.send(inc);
+                }
+            };
+            match res {
+                Ok(()) => Ok(acceptor),
+                Err(e) => Err(uv_error_to_io_error(e)),
+            }
+        }
+    }
+}
+
+pub struct UvUnixAcceptor {
+    listener: UvUnixListener,
+    incoming: Tube<Result<~RtioPipeObject, IoError>>,
+}
+
+impl HomingIO for UvUnixAcceptor {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
+
+impl UvUnixAcceptor {
+    fn new(listener: UvUnixListener) -> UvUnixAcceptor {
+        UvUnixAcceptor { listener: listener, incoming: Tube::new() }
+    }
+}
+
+impl RtioUnixAcceptor for UvUnixAcceptor {
+    fn accept(&mut self) -> Result<~RtioPipeObject, IoError> {
+        do self.home_for_io |self_| {
+            self_.incoming.recv()
+        }
+    }
+
+    fn accept_simultaneously(&mut self) -> Result<(), IoError> {
+        do self.home_for_io |self_| {
+            accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1)
+        }
+    }
+
+    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
+        do self.home_for_io |self_| {
+            accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0)
+        }
+    }
+}
+
 #[test]
 fn test_simple_io_no_connect() {
     do run_in_mt_newsched_task {
index 367585b0f0ee40f6bc43fefc992818843db6b554..eb770d0807074ba23eb4d54204643470d815c137 100644 (file)
@@ -1102,4 +1102,23 @@ fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t,
     fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
                                        stream: *uv_stream_t);
     fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int;
+
+    pub fn uv_pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int;
+    pub fn uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int;
+    pub fn uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t,
+                           name: *c_char, cb: uv_connect_cb);
+
+    // These should all really be constants...
+    #[rust_stack] pub fn rust_SOCK_STREAM() -> c_int;
+    #[rust_stack] pub fn rust_SOCK_DGRAM() -> c_int;
+    #[rust_stack] pub fn rust_SOCK_RAW() -> c_int;
+    #[rust_stack] pub fn rust_IPPROTO_UDP() -> c_int;
+    #[rust_stack] pub fn rust_IPPROTO_TCP() -> c_int;
+    #[rust_stack] pub fn rust_AI_ADDRCONFIG() -> c_int;
+    #[rust_stack] pub fn rust_AI_ALL() -> c_int;
+    #[rust_stack] pub fn rust_AI_CANONNAME() -> c_int;
+    #[rust_stack] pub fn rust_AI_NUMERICHOST() -> c_int;
+    #[rust_stack] pub fn rust_AI_NUMERICSERV() -> c_int;
+    #[rust_stack] pub fn rust_AI_PASSIVE() -> c_int;
+    #[rust_stack] pub fn rust_AI_V4MAPPED() -> c_int;
 }
index 7323397508e2e32181867e0fcc078b5218519b52..b144820c4424636262367a2f49ece50bc41b55ae 100644 (file)
@@ -199,3 +199,18 @@ bufrelease
 bufnew
 rust_take_dlerror_lock
 rust_drop_dlerror_lock
+rust_SOCK_STREAM
+rust_SOCK_DGRAM
+rust_SOCK_RAW
+rust_IPPROTO_UDP
+rust_IPPROTO_TCP
+rust_AI_ADDRCONFIG
+rust_AI_ALL
+rust_AI_CANONNAME
+rust_AI_NUMERICHOST
+rust_AI_NUMERICSERV
+rust_AI_PASSIVE
+rust_AI_V4MAPPED
+uv_pipe_open
+uv_pipe_bind
+uv_pipe_connect