pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
pub use self::pipe::PipeStream;
-pub use self::pipe::UnboundPipeStream;
pub use self::process::Process;
// Some extension traits that all Readers and Writers get.
use prelude::*;
use super::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
-use rt::local::Local;
-use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory};
-use rt::rtio::RtioUnboundPipeObject;
+use rt::rtio::{RtioPipe, RtioPipeObject};
pub struct PipeStream {
priv obj: ~RtioPipeObject
}
-// This should not be a newtype, but rt::uv::process::set_stdio needs to reach
-// into the internals of this :(
-pub struct UnboundPipeStream(~RtioUnboundPipeObject);
-
impl PipeStream {
- /// Creates a new pipe initialized, but not bound to any particular
- /// source/destination
- pub fn new() -> Option<UnboundPipeStream> {
- let pipe = unsafe {
- let io: *mut IoFactoryObject = Local::unsafe_borrow();
- (*io).pipe_init(false)
- };
- match pipe {
- Ok(p) => Some(UnboundPipeStream(p)),
- Err(ioerr) => {
- io_error::cond.raise(ioerr);
- None
- }
- }
- }
-
pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream {
PipeStream { obj: inner }
}
/// specified for.
InheritFd(libc::c_int),
- /// Creates a pipe for the specified file descriptor which will be directed
- /// into the previously-initialized pipe passed in.
+ /// Creates a pipe for the specified file descriptor which will be created
+ /// when the process is spawned.
///
/// The first boolean argument is whether the pipe is readable, and the
/// second is whether it is writable. These properties are from the view of
/// the *child* process, not the parent process.
- CreatePipe(io::UnboundPipeStream,
- bool /* readable */,
- bool /* writable */),
+ CreatePipe(bool /* readable */, bool /* writable */),
}
impl Process {
pub type RtioTimerObject = uvio::UvTimer;
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
pub type RtioPipeObject = uvio::UvPipeStream;
-pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
pub type RtioProcessObject = uvio::UvProcess;
pub type RtioUnixListenerObject = uvio::UvUnixListener;
pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>;
fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
Result<~[Path], IoError>;
- fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>;
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>;
use rt::io::process::*;
use rt::uv;
-use rt::uv::uvio::UvPipeStream;
+use rt::uv::uvio::{UvPipeStream, UvUnboundPipe};
use rt::uv::uvll;
/// A process wraps the handle of the underlying uv_process_t.
unsafe {
vec::raw::set_len(&mut stdio, io.len());
for (slot, other) in stdio.iter().zip(io.move_iter()) {
- let io = set_stdio(slot as *uvll::uv_stdio_container_t, other);
+ let io = set_stdio(slot as *uvll::uv_stdio_container_t, other,
+ loop_);
ret_io.push(io);
}
}
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
- io: StdioContainer) -> Option<~UvPipeStream> {
+ io: StdioContainer,
+ loop_: &uv::Loop) -> Option<~UvPipeStream> {
match io {
Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
uvll::set_stdio_container_fd(dst, fd);
None
}
- CreatePipe(pipe, readable, writable) => {
+ CreatePipe(readable, writable) => {
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
if readable {
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
if writable {
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
}
+ let pipe = UvUnboundPipe::new_fresh(loop_);
let handle = pipe.pipe.as_stream().native_handle();
uvll::set_stdio_container_flags(dst, flags);
uvll::set_stdio_container_stream(dst, handle);
- Some(~UvPipeStream::new(**pipe))
+ Some(~UvPipeStream::new(pipe))
}
}
}
return result_cell.take();
}
- fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> {
- let home = get_handle_to_current_scheduler!();
- Ok(~UvUnboundPipe::new(Pipe::new(self.uv_loop(), ipc), home))
- }
-
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>
{
}
impl UvUnboundPipe {
+ /// Takes ownership of an unbound pipe along with the scheduler that it is
+ /// homed on.
fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
UvUnboundPipe { pipe: pipe, home: home }
}
+
+ /// Creates a fresh new unbound pipe on the specified I/O loop
+ pub fn new_fresh(loop_: &Loop) -> UvUnboundPipe {
+ UvUnboundPipe {
+ pipe: Pipe::new(loop_, false),
+ home: get_handle_to_current_scheduler!(),
+ }
+ }
}
impl HomingIO for UvUnboundPipe {
use std::rt::io::process::{Process, ProcessConfig, CreatePipe, Ignored};
use std::rt::io::{Reader, Writer};
-use std::rt::io::pipe::PipeStream;
use std::str;
#[test]
#[test]
#[cfg(unix, not(target_os="android"))]
fn stdout_works() {
- let pipe = PipeStream::new().unwrap();
- let io = ~[Ignored, CreatePipe(pipe, false, true)];
+ let io = ~[Ignored, CreatePipe(false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"echo foobar"],
#[test]
#[cfg(unix, not(target_os="android"))]
fn set_cwd_works() {
- let pipe = PipeStream::new().unwrap();
- let io = ~[Ignored, CreatePipe(pipe, false, true)];
+ let io = ~[Ignored, CreatePipe(false, true)];
let cwd = Some("/");
let args = ProcessConfig {
program: "/bin/sh",
#[test]
#[cfg(unix, not(target_os="android"))]
fn stdin_works() {
- let input = PipeStream::new().unwrap();
- let output = PipeStream::new().unwrap();
- let io = ~[CreatePipe(input, true, false),
- CreatePipe(output, false, true)];
+ let io = ~[CreatePipe(true, false),
+ CreatePipe(false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"read line; echo $line"],