// option. This file may not be copied, modified, or distributed
// except according to those terms.
+/*!
+
+Named pipes
+
+This module contains the ability to communicate over named pipes with
+synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
+while on Unix it corresponds to UNIX domain sockets.
+
+These pipes are similar to TCP in the sense that you can have both a stream to a
+server and a server itself. The server provided accepts other `UnixStream`
+instances as clients.
+
+*/
+
use prelude::*;
-use super::super::*;
+
use super::super::support::PathLike;
+use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject};
+use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener};
+use rt::rtio::RtioUnixAcceptor;
+use rt::io::pipe::PipeStream;
+use rt::io::{io_error, Listener, Acceptor, Reader, Writer};
+use rt::local::Local;
-pub struct UnixStream;
+/// A stream which communicates over a named pipe.
+pub struct UnixStream {
+ priv obj: PipeStream,
+}
impl UnixStream {
- pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> {
- fail!()
+ fn new(obj: ~RtioPipeObject) -> UnixStream {
+ UnixStream { obj: PipeStream::new_bound(obj) }
+ }
+
+ /// Connect to a pipe named by `path`. This will attempt to open a
+ /// connection to the underlying socket.
+ ///
+ /// The returned stream will be closed when the object falls out of scope.
+ ///
+ /// # Failure
+ ///
+ /// This function will raise on the `io_error` condition if the connection
+ /// could not be made.
+ ///
+ /// # Example
+ ///
+ /// use std::rt::io::net::unix::UnixStream;
+ ///
+ /// let server = Path("path/to/my/socket");
+ /// let mut stream = UnixStream::connect(&server);
+ /// stream.write([1, 2, 3]);
+ ///
+ pub fn connect<P: PathLike>(path: &P) -> Option<UnixStream> {
+ let pipe = unsafe {
+ let io: *mut IoFactoryObject = Local::unsafe_borrow();
+ (*io).unix_connect(path)
+ };
+
+ match pipe {
+ Ok(s) => Some(UnixStream::new(s)),
+ Err(ioerr) => {
+ io_error::cond.raise(ioerr);
+ None
+ }
+ }
}
}
impl Reader for UnixStream {
- fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
-
- fn eof(&mut self) -> bool { fail!() }
+ fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) }
+ fn eof(&mut self) -> bool { self.obj.eof() }
}
impl Writer for UnixStream {
- fn write(&mut self, _v: &[u8]) { fail!() }
-
- fn flush(&mut self) { fail!() }
+ fn write(&mut self, buf: &[u8]) { self.obj.write(buf) }
+ fn flush(&mut self) { self.obj.flush() }
}
-pub struct UnixListener;
+pub struct UnixListener {
+ priv obj: ~RtioUnixListenerObject,
+}
impl UnixListener {
- pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> {
- fail!()
+
+ /// Creates a new listener, ready to receive incoming connections on the
+ /// specified socket. The server will be named by `path`.
+ ///
+ /// This listener will be closed when it falls out of scope.
+ ///
+ /// # Failure
+ ///
+ /// This function will raise on the `io_error` condition if the specified
+ /// path could not be bound.
+ ///
+ /// # Example
+ ///
+ /// use std::rt::io::net::unix::UnixListener;
+ ///
+ /// let server = Path("path/to/my/socket");
+ /// let mut stream = UnixListener::bind(&server);
+ /// for client in stream.incoming() {
+ /// let mut client = client;
+ /// client.write([1, 2, 3, 4]);
+ /// }
+ ///
+ pub fn bind<P: PathLike>(path: &P) -> Option<UnixListener> {
+ let listener = unsafe {
+ let io: *mut IoFactoryObject = Local::unsafe_borrow();
+ (*io).unix_bind(path)
+ };
+ match listener {
+ Ok(s) => Some(UnixListener{ obj: s }),
+ Err(ioerr) => {
+ io_error::cond.raise(ioerr);
+ None
+ }
+ }
}
}
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
- fn listen(self) -> Option<UnixAcceptor> { fail!() }
+ fn listen(self) -> Option<UnixAcceptor> {
+ match self.obj.listen() {
+ Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }),
+ Err(ioerr) => {
+ io_error::cond.raise(ioerr);
+ None
+ }
+ }
+ }
}
-pub struct UnixAcceptor;
+pub struct UnixAcceptor {
+ priv obj: ~RtioUnixAcceptorObject,
+}
impl Acceptor<UnixStream> for UnixAcceptor {
- fn accept(&mut self) -> Option<UnixStream> { fail!() }
+ fn accept(&mut self) -> Option<UnixStream> {
+ match self.obj.accept() {
+ Ok(s) => Some(UnixStream::new(s)),
+ Err(ioerr) => {
+ io_error::cond.raise(ioerr);
+ None
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use prelude::*;
+ use super::*;
+ use cell::Cell;
+ use rt::test::*;
+ use rt::io::*;
+ use rt::comm::oneshot;
+ use os;
+
+ fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) {
+ let server = Cell::new(server);
+ let client = Cell::new(client);
+ do run_in_mt_newsched_task {
+ let server = Cell::new(server.take());
+ let client = Cell::new(client.take());
+ let path1 = next_test_unix();
+ let path2 = path1.clone();
+ let (port, chan) = oneshot();
+ let port = Cell::new(port);
+ let chan = Cell::new(chan);
+
+ do spawntask {
+ let mut acceptor = UnixListener::bind(&path1).listen();
+ chan.take().send(());
+ server.take()(acceptor.accept().unwrap());
+ }
+
+ do spawntask {
+ port.take().recv();
+ client.take()(UnixStream::connect(&path2).unwrap());
+ }
+ }
+ }
+
+ #[test]
+ fn bind_error() {
+ do run_in_mt_newsched_task {
+ let mut called = false;
+ do io_error::cond.trap(|e| {
+ assert!(e.kind == PermissionDenied);
+ called = true;
+ }).inside {
+ let listener = UnixListener::bind(&("path/to/nowhere"));
+ assert!(listener.is_none());
+ }
+ assert!(called);
+ }
+ }
+
+ #[test]
+ fn connect_error() {
+ do run_in_mt_newsched_task {
+ let mut called = false;
+ do io_error::cond.trap(|e| {
+ assert_eq!(e.kind, OtherIoError);
+ called = true;
+ }).inside {
+ let stream = UnixStream::connect(&("path/to/nowhere"));
+ assert!(stream.is_none());
+ }
+ assert!(called);
+ }
+ }
+
+ #[test]
+ fn smoke() {
+ smalltest(|mut server| {
+ let mut buf = [0];
+ server.read(buf);
+ assert!(buf[0] == 99);
+ }, |mut client| {
+ client.write([99]);
+ })
+ }
+
+ #[test]
+ fn read_eof() {
+ smalltest(|mut server| {
+ let mut buf = [0];
+ assert!(server.read(buf).is_none());
+ assert!(server.read(buf).is_none());
+ }, |_client| {
+ // drop the client
+ })
+ }
+
+ #[test]
+ fn write_begone() {
+ smalltest(|mut server| {
+ let buf = [0];
+ let mut stop = false;
+ while !stop{
+ do io_error::cond.trap(|e| {
+ assert_eq!(e.kind, BrokenPipe);
+ stop = true;
+ }).inside {
+ server.write(buf);
+ }
+ }
+ }, |_client| {
+ // drop the client
+ })
+ }
+
+ #[test]
+ fn accept_lots() {
+ do run_in_mt_newsched_task {
+ let times = 10;
+ let path1 = next_test_unix();
+ let path2 = path1.clone();
+ let (port, chan) = oneshot();
+ let port = Cell::new(port);
+ let chan = Cell::new(chan);
+
+ do spawntask {
+ let mut acceptor = UnixListener::bind(&path1).listen();
+ chan.take().send(());
+ do times.times {
+ let mut client = acceptor.accept();
+ let mut buf = [0];
+ client.read(buf);
+ assert_eq!(buf[0], 100);
+ }
+ }
+
+ do spawntask {
+ port.take().recv();
+ do times.times {
+ let mut stream = UnixStream::connect(&path2);
+ stream.write([100]);
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn path_exists() {
+ do run_in_mt_newsched_task {
+ let path = next_test_unix();
+ let _acceptor = UnixListener::bind(&path).listen();
+ assert!(os::path_exists(&path));
+ }
+ }
}
use rt::rtio::RtioUnboundPipeObject;
pub struct PipeStream {
- priv obj: RtioPipeObject
+ priv obj: ~RtioPipeObject
}
// This should not be a newtype, but rt::uv::process::set_stdio needs to reach
}
}
- pub fn bind(inner: RtioPipeObject) -> PipeStream {
+ pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream {
PipeStream { obj: inner }
}
}
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
- p.map(|p| io::PipeStream::bind(p))
+ p.map(|p| io::PipeStream::new_bound(p))
).collect()
}),
Err(ioerr) => {
pub type RtioPipeObject = uvio::UvPipeStream;
pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
pub type RtioProcessObject = uvio::UvProcess;
+pub type RtioUnixListenerObject = uvio::UvUnixListener;
+pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
pub trait EventLoop {
fn run(&mut self);
Result<~[Path], IoError>;
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>;
fn spawn(&mut self, config: ProcessConfig)
- -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>;
+ -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>;
+
+ fn unix_bind<P: PathLike>(&mut self, path: &P) ->
+ Result<~RtioUnixListenerObject, IoError>;
+ fn unix_connect<P: PathLike>(&mut self, path: &P) ->
+ Result<~RtioPipeObject, IoError>;
}
pub trait RtioTcpListener : RtioSocket {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}
+
+pub trait RtioUnixListener {
+ fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>;
+}
+
+pub trait RtioUnixAcceptor {
+ fn accept(&mut self) -> Result<~RtioPipeObject, IoError>;
+ fn accept_simultaneously(&mut self) -> Result<(), IoError>;
+ fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
+}
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use rand;
+use rand::Rng;
+use os;
use libc;
use option::{Some, None};
+use path::{Path, GenericPath};
use cell::Cell;
use clone::Clone;
use container::Container;
}
}
+/// Get a temporary path which could be the location of a unix socket
+#[fixed_stack_segment] #[inline(never)]
+pub fn next_test_unix() -> Path {
+ os::tmpdir().push(rand::task_rng().gen_ascii_str(20))
+}
+
/// Get a unique IPv4 localhost:port pair starting at 9600
pub fn next_test_ip4() -> SocketAddr {
SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() }
}
}
- pub fn accept(&mut self, stream: StreamWatcher) {
- let self_handle = self.native_handle() as *c_void;
- let stream_handle = stream.native_handle() as *c_void;
- assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
- }
-
pub fn close(self, cb: NullCallback) {
{
let mut this = self;
cb();
}
}
+
+ pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
+ {
+ let data = self.get_watcher_data();
+ assert!(data.connect_cb.is_none());
+ data.connect_cb = Some(cb);
+ }
+
+ unsafe {
+ static BACKLOG: c_int = 128; // XXX should be configurable
+ match uvll::listen(self.native_handle(), BACKLOG, connection_cb) {
+ 0 => Ok(()),
+ n => Err(UvError(n))
+ }
+ }
+
+ extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
+ rtdebug!("connection_cb");
+ let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
+ let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
+ let status = status_to_maybe_uv_error(status);
+ (*cb)(stream_watcher, status);
+ }
+ }
+
+ pub fn accept(&mut self, stream: StreamWatcher) {
+ let self_handle = self.native_handle() as *c_void;
+ let stream_handle = stream.native_handle() as *c_void;
+ assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
+ }
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
}
}
- pub fn listen(&mut self, cb: ConnectionCallback) {
- {
- let data = self.get_watcher_data();
- assert!(data.connect_cb.is_none());
- data.connect_cb = Some(cb);
- }
-
- unsafe {
- static BACKLOG: c_int = 128; // XXX should be configurable
- // XXX: This can probably fail
- assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
- }
-
- extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
- rtdebug!("connection_cb");
- let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
- let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
- let status = status_to_maybe_uv_error(status);
- (*cb)(stream_watcher, status);
- }
- }
-
pub fn as_stream(&self) -> StreamWatcher {
NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
}
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
- do server_tcp_watcher.listen |mut server_stream_watcher, status| {
+ let mut stream = server_tcp_watcher.as_stream();
+ let res = do stream.listen |mut server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut loop_ = loop_;
}
count_cell.put_back(count);
}
- }
+ };
+
+ assert!(res.is_ok());
let client_thread = do Thread::start {
rtdebug!("starting client thread");
loop_.run();
loop_.close();
client_thread.join();
- }
+ };
}
#[test]
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
- do server_tcp_watcher.listen |mut server_stream_watcher, status| {
+ let mut stream = server_tcp_watcher.as_stream();
+ let res = do stream.listen |mut server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut loop_ = loop_;
}
count_cell.put_back(count);
}
- }
+ };
+ assert!(res.is_ok());
let client_thread = do Thread::start {
rtdebug!("starting client thread");
use prelude::*;
use libc;
+use c_str::CString;
use rt::uv;
use rt::uv::net;
net::StreamWatcher(**self as *uvll::uv_stream_t)
}
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn open(&mut self, file: libc::c_int) -> Result<(), uv::UvError> {
+ match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } {
+ 0 => Ok(()),
+ n => Err(uv::UvError(n))
+ }
+ }
+
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn bind(&mut self, name: &CString) -> Result<(), uv::UvError> {
+ do name.with_ref |name| {
+ match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } {
+ 0 => Ok(()),
+ n => Err(uv::UvError(n))
+ }
+ }
+ }
+
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn connect(&mut self, name: &CString, cb: uv::ConnectionCallback) {
+ {
+ let data = self.get_watcher_data();
+ assert!(data.connect_cb.is_none());
+ data.connect_cb = Some(cb);
+ }
+
+ let connect = net::ConnectRequest::new();
+ let name = do name.with_ref |p| { p };
+
+ unsafe {
+ uvll::uv_pipe_connect(connect.native_handle(),
+ self.native_handle(),
+ name,
+ connect_cb)
+ }
+
+ extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+ let connect_request: net::ConnectRequest =
+ uv::NativeHandle::from_native_handle(req);
+ let mut stream_watcher = connect_request.stream();
+ connect_request.delete();
+
+ let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
+ let status = uv::status_to_maybe_uv_error(status);
+ cb(stream_watcher, status);
+ }
+ }
+
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
unsafe { uvll::close(self.native_handle(), close_cb); }
- extern fn close_cb(handle: *uvll::uv_pipe_t) {
+ extern "C" fn close_cb(handle: *uvll::uv_pipe_t) {
let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
/// occurred.
pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig,
exit_cb: uv::ExitCallback)
- -> Result<~[Option<UvPipeStream>], uv::UvError>
+ -> Result<~[Option<~UvPipeStream>], uv::UvError>
{
let cwd = config.cwd.map(|s| s.to_c_str());
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
- io: StdioContainer) -> Option<UvPipeStream> {
+ io: StdioContainer) -> Option<~UvPipeStream> {
match io {
Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
let handle = pipe.pipe.as_stream().native_handle();
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst, handle);
- Some(pipe.bind())
+ Some(~UvPipeStream::new(**pipe))
}
}
}
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> {
let home = get_handle_to_current_scheduler!();
- Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
+ Ok(~UvUnboundPipe::new(Pipe::new(self.uv_loop(), ipc), home))
}
fn spawn(&mut self, config: ProcessConfig)
- -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>
+ -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>
{
// Sadly, we must create the UvProcess before we actually call uv_spawn
// so that the exit_cb can close over it and notify it when the process
}
}
}
+
+ fn unix_bind<P: PathLike>(&mut self, path: &P) ->
+ Result<~RtioUnixListenerObject, IoError> {
+ let mut pipe = Pipe::new(self.uv_loop(), false);
+ match pipe.bind(&path.path_as_str(|s| s.to_c_str())) {
+ Ok(()) => {
+ let handle = get_handle_to_current_scheduler!();
+ let pipe = UvUnboundPipe::new(pipe, handle);
+ Ok(~UvUnixListener::new(pipe))
+ }
+ Err(e) => {
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do pipe.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(
+ task_cell.take());
+ }
+ }
+ Err(uv_error_to_io_error(e))
+ }
+ }
+ }
+
+ fn unix_connect<P: PathLike>(&mut self, path: &P) ->
+ Result<~RtioPipeObject, IoError>
+ {
+ let scheduler: ~Scheduler = Local::take();
+ let mut pipe = Pipe::new(self.uv_loop(), false);
+ let result_cell = Cell::new_empty();
+ let result_cell_ptr: *Cell<Result<~RtioPipeObject, IoError>> = &result_cell;
+
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let cstr = do path.path_as_str |s| { s.to_c_str() };
+ do pipe.connect(&cstr) |stream, err| {
+ let res = match err {
+ None => {
+ let handle = stream.native_handle();
+ let pipe = NativeHandle::from_native_handle(
+ handle as *uvll::uv_pipe_t);
+ let home = get_handle_to_current_scheduler!();
+ let pipe = UvUnboundPipe::new(pipe, home);
+ Ok(~UvPipeStream::new(pipe))
+ }
+ Some(e) => { Err(uv_error_to_io_error(e)) }
+ };
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
+ }
+
+ assert!(!result_cell.is_empty());
+ let ret = result_cell.take();
+ if ret.is_err() {
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do pipe.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
+ }
+ }
+ return ret;
+ }
}
pub struct UvTcpListener {
impl RtioTcpListener for UvTcpListener {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
do self.home_for_io_consume |self_| {
- let mut acceptor = ~UvTcpAcceptor::new(self_);
+ let acceptor = ~UvTcpAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
- do acceptor.listener.watcher.listen |mut server, status| {
+ let mut stream = acceptor.listener.watcher.as_stream();
+ let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
incoming.send(inc);
}
};
- Ok(acceptor)
+ match res {
+ Ok(()) => Ok(acceptor),
+ Err(e) => Err(uv_error_to_io_error(e)),
+ }
}
}
}
}
}
+fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
+ let r = unsafe {
+ uvll::tcp_simultaneous_accepts(stream.native_handle(), a as c_int)
+ };
+
+ match status_to_maybe_uv_error(r) {
+ Some(err) => Err(uv_error_to_io_error(err)),
+ None => Ok(())
+ }
+}
+
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
do self.home_for_io |self_| {
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
- let r = unsafe {
- uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
- };
-
- match status_to_maybe_uv_error(r) {
- Some(err) => Err(uv_error_to_io_error(err)),
- None => Ok(())
- }
+ accept_simultaneously(self_.listener.watcher.as_stream(), 1)
}
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
- let r = unsafe {
- uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
- };
-
- match status_to_maybe_uv_error(r) {
- Some(err) => Err(uv_error_to_io_error(err)),
- None => Ok(())
- }
+ accept_simultaneously(self_.listener.watcher.as_stream(), 0)
}
}
}
priv home: SchedHandle,
}
+impl UvUnboundPipe {
+ fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
+ UvUnboundPipe { pipe: pipe, home: home }
+ }
+}
+
impl HomingIO for UvUnboundPipe {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
}
}
-impl UvUnboundPipe {
- pub unsafe fn bind(~self) -> UvPipeStream {
- UvPipeStream { inner: self }
- }
-}
-
pub struct UvPipeStream {
- priv inner: ~UvUnboundPipe,
+ priv inner: UvUnboundPipe,
}
impl UvPipeStream {
- pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
+ pub fn new(inner: UvUnboundPipe) -> UvPipeStream {
UvPipeStream { inner: inner }
}
}
}
}
+pub struct UvUnixListener {
+ priv inner: UvUnboundPipe
+}
+
+impl HomingIO for UvUnixListener {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() }
+}
+
+impl UvUnixListener {
+ fn new(pipe: UvUnboundPipe) -> UvUnixListener {
+ UvUnixListener { inner: pipe }
+ }
+}
+
+impl RtioUnixListener for UvUnixListener {
+ fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError> {
+ do self.home_for_io_consume |self_| {
+ let acceptor = ~UvUnixAcceptor::new(self_);
+ let incoming = Cell::new(acceptor.incoming.clone());
+ let mut stream = acceptor.listener.inner.pipe.as_stream();
+ let res = do stream.listen |mut server, status| {
+ do incoming.with_mut_ref |incoming| {
+ let inc = match status {
+ Some(e) => Err(uv_error_to_io_error(e)),
+ None => {
+ let inc = Pipe::new(&server.event_loop(), false);
+ server.accept(inc.as_stream());
+ let home = get_handle_to_current_scheduler!();
+ let pipe = UvUnboundPipe::new(inc, home);
+ Ok(~UvPipeStream::new(pipe))
+ }
+ };
+ incoming.send(inc);
+ }
+ };
+ match res {
+ Ok(()) => Ok(acceptor),
+ Err(e) => Err(uv_error_to_io_error(e)),
+ }
+ }
+ }
+}
+
+pub struct UvUnixAcceptor {
+ listener: UvUnixListener,
+ incoming: Tube<Result<~RtioPipeObject, IoError>>,
+}
+
+impl HomingIO for UvUnixAcceptor {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
+
+impl UvUnixAcceptor {
+ fn new(listener: UvUnixListener) -> UvUnixAcceptor {
+ UvUnixAcceptor { listener: listener, incoming: Tube::new() }
+ }
+}
+
+impl RtioUnixAcceptor for UvUnixAcceptor {
+ fn accept(&mut self) -> Result<~RtioPipeObject, IoError> {
+ do self.home_for_io |self_| {
+ self_.incoming.recv()
+ }
+ }
+
+ fn accept_simultaneously(&mut self) -> Result<(), IoError> {
+ do self.home_for_io |self_| {
+ accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1)
+ }
+ }
+
+ fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
+ do self.home_for_io |self_| {
+ accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0)
+ }
+ }
+}
+
#[test]
fn test_simple_io_no_connect() {
do run_in_mt_newsched_task {
fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
stream: *uv_stream_t);
fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int;
+
+ pub fn uv_pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int;
+ pub fn uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int;
+ pub fn uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t,
+ name: *c_char, cb: uv_connect_cb);
+
+ // These should all really be constants...
+ #[rust_stack] pub fn rust_SOCK_STREAM() -> c_int;
+ #[rust_stack] pub fn rust_SOCK_DGRAM() -> c_int;
+ #[rust_stack] pub fn rust_SOCK_RAW() -> c_int;
+ #[rust_stack] pub fn rust_IPPROTO_UDP() -> c_int;
+ #[rust_stack] pub fn rust_IPPROTO_TCP() -> c_int;
+ #[rust_stack] pub fn rust_AI_ADDRCONFIG() -> c_int;
+ #[rust_stack] pub fn rust_AI_ALL() -> c_int;
+ #[rust_stack] pub fn rust_AI_CANONNAME() -> c_int;
+ #[rust_stack] pub fn rust_AI_NUMERICHOST() -> c_int;
+ #[rust_stack] pub fn rust_AI_NUMERICSERV() -> c_int;
+ #[rust_stack] pub fn rust_AI_PASSIVE() -> c_int;
+ #[rust_stack] pub fn rust_AI_V4MAPPED() -> c_int;
}
bufnew
rust_take_dlerror_lock
rust_drop_dlerror_lock
+rust_SOCK_STREAM
+rust_SOCK_DGRAM
+rust_SOCK_RAW
+rust_IPPROTO_UDP
+rust_IPPROTO_TCP
+rust_AI_ADDRCONFIG
+rust_AI_ALL
+rust_AI_CANONNAME
+rust_AI_NUMERICHOST
+rust_AI_NUMERICSERV
+rust_AI_PASSIVE
+rust_AI_V4MAPPED
+uv_pipe_open
+uv_pipe_bind
+uv_pipe_connect