We get a little more functionality from libuv for these kinds of streams (things
like terminal dimentions), and it also appears to more gracefully handle the
stream being a window. Beforehand, if you used stdio and hit CTRL+d on a
process, libuv would continually return 0-length successful reads instead of
interpreting that the stream was closed.
I was hoping to be able to write tests for this, but currently the testing
infrastructure doesn't allow tests with a stdin and a stdout, but this has been
manually tested! (not that it means much)
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::local::Local;
-use rt::rtio::{RtioFileStream, IoFactoryObject, IoFactory};
+use rt::rtio::{IoFactoryObject, IoFactory, RtioTTYObject, RtioTTY};
use super::{Reader, Writer, io_error};
/// Creates a new non-blocking handle to the stdin of the current process.
pub fn stdin() -> StdReader {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
- (*io).fs_from_raw_fd(libc::STDIN_FILENO, false)
- };
+ (*io).tty_open(libc::STDIN_FILENO, true, false)
+ }.unwrap();
StdReader { inner: stream }
}
pub fn stdout() -> StdWriter {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
- (*io).fs_from_raw_fd(libc::STDOUT_FILENO, false)
- };
+ (*io).tty_open(libc::STDOUT_FILENO, false, false)
+ }.unwrap();
StdWriter { inner: stream }
}
pub fn stderr() -> StdWriter {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
- (*io).fs_from_raw_fd(libc::STDERR_FILENO, false)
- };
+ (*io).tty_open(libc::STDERR_FILENO, false, false)
+ }.unwrap();
StdWriter { inner: stream }
}
/// Representation of a reader of a standard input stream
pub struct StdReader {
- priv inner: ~RtioFileStream
+ priv inner: ~RtioTTYObject
+}
+
+impl StdReader {
+ /// Controls whether this output stream is a "raw stream" or simply a normal
+ /// stream.
+ ///
+ /// # Failure
+ ///
+ /// This function will raise on the `io_error` condition if an error
+ /// happens.
+ pub fn set_raw(&mut self, raw: bool) {
+ match self.inner.set_raw(raw) {
+ Ok(()) => {},
+ Err(e) => io_error::cond.raise(e),
+ }
+ }
+
+ /// Resets the mode of this stream back to its original state.
+ ///
+ /// # Failure
+ ///
+ /// This function cannot fail.
+ pub fn reset_mode(&mut self) { self.inner.reset_mode(); }
}
impl Reader for StdReader {
/// Representation of a writer to a standard output stream
pub struct StdWriter {
- priv inner: ~RtioFileStream
+ priv inner: ~RtioTTYObject
+}
+
+impl StdWriter {
+ /// Gets the size of this output window, if possible. This is typically used
+ /// when the writer is attached to something like a terminal, this is used
+ /// to fetch the dimensions of the terminal.
+ ///
+ /// If successful, returns Some((width, height)).
+ ///
+ /// # Failure
+ ///
+ /// This function will raise on the `io_error` condition if an error
+ /// happens.
+ pub fn winsize(&mut self) -> Option<(int, int)> {
+ match self.inner.get_winsize() {
+ Ok(p) => Some(p),
+ Err(e) => {
+ io_error::cond.raise(e);
+ None
+ }
+ }
+ }
+
+ /// Controls whether this output stream is a "raw stream" or simply a normal
+ /// stream.
+ ///
+ /// # Failure
+ ///
+ /// This function will raise on the `io_error` condition if an error
+ /// happens.
+ pub fn set_raw(&mut self, raw: bool) {
+ match self.inner.set_raw(raw) {
+ Ok(()) => {},
+ Err(e) => io_error::cond.raise(e),
+ }
+ }
+
+ /// Resets the mode of this stream back to its original state.
+ ///
+ /// # Failure
+ ///
+ /// This function cannot fail.
+ pub fn reset_mode(&mut self) { self.inner.reset_mode(); }
}
impl Writer for StdWriter {
}
}
- fn flush(&mut self) {
- match self.inner.flush() {
- Ok(()) => {}
- Err(e) => io_error::cond.raise(e)
- }
+ fn flush(&mut self) { /* nothing to do */ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn smoke() {
+ // Just make sure we can acquire handles
+ stdin();
+ stdout();
+ stderr();
}
}
pub type RtioProcessObject = uvio::UvProcess;
pub type RtioUnixListenerObject = uvio::UvUnixListener;
pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
+pub type RtioTTYObject = uvio::UvTTY;
pub trait EventLoop {
fn run(&mut self);
Result<~RtioUnixListenerObject, IoError>;
fn unix_connect<P: PathLike>(&mut self, path: &P) ->
Result<~RtioPipeObject, IoError>;
+ fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
+ -> Result<~RtioTTYObject, IoError>;
}
pub trait RtioTcpListener : RtioSocket {
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
+
+pub trait RtioTTY {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
+ fn set_raw(&mut self, raw: bool) -> Result<(), IoError>;
+ fn reset_mode(&mut self);
+ fn get_winsize(&mut self) -> Result<(int, int), IoError>;
+}
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use libc::{c_int, c_void};
+use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::uvll::UV_ASYNC;
-use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
+use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback};
use rt::uv::WatcherInterop;
use rt::uv::status_to_maybe_uv_error;
uvll::async_send(handle);
}
}
-
- pub fn close(self, cb: NullCallback) {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
-
- unsafe {
- uvll::close(self.native_handle(), close_cb);
- }
-
- extern fn close_cb(handle: *uvll::uv_stream_t) {
- let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
- {
- let data = watcher.get_watcher_data();
- data.close_cb.take_unwrap()();
- }
- watcher.drop_watcher_data();
- unsafe { uvll::free_handle(handle as *c_void); }
- }
- }
}
impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
use libc::c_int;
use option::Some;
use rt::uv::uvll;
-use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
+use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct IdleWatcher(*uvll::uv_idle_t);
assert!(0 == uvll::idle_stop(self.native_handle()));
}
}
-
- pub fn close(self, cb: NullCallback) {
- {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe { uvll::close(self.native_handle(), close_cb) };
-
- extern fn close_cb(handle: *uvll::uv_idle_t) {
- unsafe {
- let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
- {
- let data = idle_watcher.get_watcher_data();
- data.close_cb.take_unwrap()();
- }
- idle_watcher.drop_watcher_data();
- uvll::idle_delete(handle);
- }
- }
- }
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
pub mod addrinfo;
pub mod process;
pub mod pipe;
+pub mod tty;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
priv handle: *uvll::uv_loop_t
}
+pub struct Handle(*uvll::uv_handle_t);
+
+impl Watcher for Handle {}
+impl NativeHandle<*uvll::uv_handle_t> for Handle {
+ fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) }
+ fn native_handle(&self) -> *uvll::uv_handle_t { **self }
+}
+
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
+ fn close(self, cb: NullCallback);
}
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
+
+ fn close(self, cb: NullCallback) {
+ let mut this = self;
+ {
+ let data = this.get_watcher_data();
+ assert!(data.close_cb.is_none());
+ data.close_cb = Some(cb);
+ }
+
+ unsafe { uvll::close(this.native_handle(), close_cb); }
+
+ extern fn close_cb(handle: *uvll::uv_handle_t) {
+ let mut h: Handle = NativeHandle::from_native_handle(handle);
+ h.get_watcher_data().close_cb.take_unwrap()();
+ h.drop_watcher_data();
+ unsafe { uvll::free_handle(handle as *c_void) }
+ }
+ }
}
// XXX: Need to define the error constants like EOF so they can be
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
-use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
+use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle,
status_to_maybe_uv_error, vec_to_uv_buf};
use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use vec;
}
}
- pub fn close(self, cb: NullCallback) {
- {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe { uvll::close(self.native_handle(), close_cb); }
-
- extern fn close_cb(handle: *uvll::uv_stream_t) {
- let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
- let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
- stream_watcher.drop_watcher_data();
- unsafe { free_handle(handle as *c_void) }
- cb();
- }
- }
pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
{
cb(udp_watcher, status);
}
}
-
- pub fn close(self, cb: NullCallback) {
- {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe { uvll::close(self.native_handle(), close_cb); }
-
- extern fn close_cb(handle: *uvll::uv_udp_t) {
- let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
- let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
- udp_watcher.drop_watcher_data();
- unsafe { free_handle(handle as *c_void) }
- cb();
- }
- }
}
impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
}
}
- pub fn close(self, cb: uv::NullCallback) {
- {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe { uvll::close(self.native_handle(), close_cb); }
-
- extern "C" fn close_cb(handle: *uvll::uv_pipe_t) {
- let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
- process.get_watcher_data().close_cb.take_unwrap()();
- process.drop_watcher_data();
- unsafe { uvll::free_handle(handle as *libc::c_void) }
- }
- }
}
impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe {
pub fn pid(&self) -> libc::pid_t {
unsafe { uvll::process_pid(**self) as libc::pid_t }
}
-
- /// Closes this handle, invoking the specified callback once closed
- pub fn close(self, cb: uv::NullCallback) {
- {
- let mut this = self;
- let data = this.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe { uvll::close(self.native_handle(), close_cb); }
-
- extern fn close_cb(handle: *uvll::uv_process_t) {
- let mut process: Process = uv::NativeHandle::from_native_handle(handle);
- process.get_watcher_data().close_cb.take_unwrap()();
- process.drop_watcher_data();
- unsafe { uvll::free_handle(handle as *libc::c_void) }
- }
- }
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use libc::{c_void, c_int};
+use libc::c_int;
use option::Some;
use rt::uv::uvll;
-use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback};
+use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct TimerWatcher(*uvll::uv_timer_t);
uvll::timer_stop(self.native_handle());
}
}
-
- pub fn close(self, cb: NullCallback) {
- let mut watcher = self;
- {
- let data = watcher.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe {
- uvll::close(watcher.native_handle(), close_cb);
- }
-
- extern fn close_cb(handle: *uvll::uv_timer_t) {
- let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
- {
- let data = watcher.get_watcher_data();
- data.close_cb.take_unwrap()();
- }
- watcher.drop_watcher_data();
- unsafe {
- uvll::free_handle(handle as *c_void);
- }
- }
- }
}
impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use prelude::*;
+use libc;
+
+use rt::uv;
+use rt::uv::net;
+use rt::uv::uvll;
+
+/// A process wraps the handle of the underlying uv_process_t.
+pub struct TTY(*uvll::uv_tty_t);
+
+impl uv::Watcher for TTY {}
+
+impl TTY {
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn new(loop_: &uv::Loop, fd: libc::c_int, readable: bool) ->
+ Result<TTY, uv::UvError>
+ {
+ let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) };
+ assert!(handle.is_not_null());
+
+ let ret = unsafe {
+ uvll::uv_tty_init(loop_.native_handle(), handle, fd as libc::c_int,
+ readable as libc::c_int)
+ };
+ match ret {
+ 0 => {
+ let mut ret: TTY = uv::NativeHandle::from_native_handle(handle);
+ ret.install_watcher_data();
+ Ok(ret)
+ }
+ n => {
+ unsafe { uvll::free_handle(handle); }
+ Err(uv::UvError(n))
+ }
+ }
+ }
+
+ pub fn as_stream(&self) -> net::StreamWatcher {
+ net::StreamWatcher(**self as *uvll::uv_stream_t)
+ }
+
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn set_mode(&self, raw: bool) -> Result<(), uv::UvError> {
+ let raw = raw as libc::c_int;
+ match unsafe { uvll::uv_tty_set_mode(self.native_handle(), raw) } {
+ 0 => Ok(()),
+ n => Err(uv::UvError(n))
+ }
+ }
+
+ #[fixed_stack_segment] #[inline(never)]
+ pub fn reset_mode(&self) {
+ unsafe { uvll::uv_tty_reset_mode(self.native_handle()) }
+ }
+
+ #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)]
+ pub fn get_winsize(&self) -> Result<(int, int), uv::UvError> {
+ let mut width: libc::c_int = 0;
+ let mut height: libc::c_int = 0;
+ let widthptr: *libc::c_int = &width;
+ let heightptr: *libc::c_int = &width;
+
+ match unsafe { uvll::uv_tty_get_winsize(self.native_handle(),
+ widthptr, heightptr) } {
+ 0 => Ok((width as int, height as int)),
+ n => Err(uv::UvError(n))
+ }
+ }
+}
+
+impl uv::NativeHandle<*uvll::uv_tty_t> for TTY {
+ fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY {
+ TTY(handle)
+ }
+ fn native_handle(&self) -> *uvll::uv_tty_t {
+ match self { &TTY(ptr) => ptr }
+ }
+}
+
}
return ret;
}
+
+ fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
+ -> Result<~RtioTTYObject, IoError> {
+ match tty::TTY::new(self.uv_loop(), fd, readable) {
+ Ok(tty) => Ok(~UvTTY {
+ home: get_handle_to_current_scheduler!(),
+ tty: tty,
+ close_on_drop: close_on_drop,
+ }),
+ Err(e) => Err(uv_error_to_io_error(e))
+ }
+ }
}
pub struct UvTcpListener {
}
}
+pub struct UvTTY {
+ tty: tty::TTY,
+ home: SchedHandle,
+ close_on_drop: bool,
+}
+
+impl HomingIO for UvTTY {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl Drop for UvTTY {
+ fn drop(&mut self) {
+ if self.close_on_drop {
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task = Cell::new(task);
+ do self.tty.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task.take());
+ }
+ }
+ } else {
+ self.tty.drop_watcher_data();
+ unsafe { uvll::free_handle(self.tty.native_handle()) }
+ }
+ }
+}
+
pub struct UvUnixAcceptor {
listener: UvUnixListener,
incoming: Tube<Result<~RtioPipeObject, IoError>>,
}
}
+impl RtioTTY for UvTTY {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+ do self.home_for_io_with_sched |self_, scheduler| {
+ read_stream(self_.tty.as_stream(), scheduler, buf)
+ }
+ }
+
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+ do self.home_for_io_with_sched |self_, scheduler| {
+ write_stream(self_.tty.as_stream(), scheduler, buf)
+ }
+ }
+
+ fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
+ do self.home_for_io |self_| {
+ match self_.tty.set_mode(raw) {
+ Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
+ }
+ }
+ }
+
+ fn reset_mode(&mut self) {
+ do self.home_for_io |self_| { self_.tty.reset_mode() }
+ }
+
+ fn get_winsize(&mut self) -> Result<(int, int), IoError> {
+ do self.home_for_io |self_| {
+ match self_.tty.get_winsize() {
+ Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
+ }
+ }
+ }
+}
+
#[test]
fn test_simple_io_no_connect() {
do run_in_mt_newsched_task {
pub type uv_getaddrinfo_t = c_void;
pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
+pub type uv_tty_t = c_void;
pub struct uv_timespec_t {
tv_sec: libc::c_long,
pub fn uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int;
pub fn uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t,
name: *c_char, cb: uv_connect_cb);
+ pub fn uv_tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int,
+ readable: c_int) -> c_int;
+ pub fn uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int;
+ pub fn uv_tty_reset_mode(tty: *uv_tty_t);
+ pub fn uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int,
+ height: *c_int) -> c_int;
// These should all really be constants...
#[rust_stack] pub fn rust_SOCK_STREAM() -> c_int;
uv_pipe_open
uv_pipe_bind
uv_pipe_connect
+uv_tty_init
+uv_tty_set_mode
+uv_tty_reset_mode
+uv_tty_get_winsize