TcpWatcher {
home: home,
handle: handle,
- stream: StreamWatcher::new(handle),
+ stream: StreamWatcher::new(handle, true),
refcount: Refcount::new(),
read_access: AccessTimeout::new(),
write_access: AccessTimeout::new(),
fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
box TcpWatcher {
handle: self.handle,
- stream: StreamWatcher::new(self.handle),
+ stream: StreamWatcher::new(self.handle, false),
home: self.home.clone(),
refcount: self.refcount.clone(),
read_access: self.read_access.clone(),
handle
};
PipeWatcher {
- stream: StreamWatcher::new(handle),
+ stream: StreamWatcher::new(handle, true),
home: home,
defused: false,
refcount: Refcount::new(),
fn clone(&self) -> Box<rtio::RtioPipe + Send> {
box PipeWatcher {
- stream: StreamWatcher::new(self.stream.handle),
+ stream: StreamWatcher::new(self.stream.handle, false),
defused: false,
home: self.home.clone(),
refcount: self.refcount.clone(),
// will be manipulated on each of the methods called on this watcher.
// Wrappers should ensure to always reset the field to an appropriate value
// if they rely on the field to perform an action.
- pub fn new(stream: *mut uvll::uv_stream_t) -> StreamWatcher {
- unsafe { uvll::set_data_for_uv_handle(stream, 0 as *mut int) }
+ pub fn new(stream: *mut uvll::uv_stream_t,
+ init: bool) -> StreamWatcher {
+ if init {
+ unsafe { uvll::set_data_for_uv_handle(stream, 0 as *mut int) }
+ }
StreamWatcher {
handle: stream,
last_write_req: None,
let handle = UvHandle::alloc(None::<TtyWatcher>, uvll::UV_TTY);
let mut watcher = TtyWatcher {
tty: handle,
- stream: StreamWatcher::new(handle),
+ stream: StreamWatcher::new(handle, true),
home: io.make_handle(),
fd: fd,
};
rx2.recv();
})
+
+ iotest!(fn clone_while_reading() {
+ let addr = next_test_ip6();
+ let listen = TcpListener::bind(addr.ip.to_str().as_slice(), addr.port);
+ let mut accept = listen.listen().unwrap();
+
+ // Enqueue a task to write to a socket
+ let (tx, rx) = channel();
+ let (txdone, rxdone) = channel();
+ let txdone2 = txdone.clone();
+ spawn(proc() {
+ let mut tcp = TcpStream::connect(addr.ip.to_str().as_slice(),
+ addr.port).unwrap();
+ rx.recv();
+ tcp.write_u8(0).unwrap();
+ txdone2.send(());
+ });
+
+ // Spawn off a reading clone
+ let tcp = accept.accept().unwrap();
+ let tcp2 = tcp.clone();
+ let txdone3 = txdone.clone();
+ spawn(proc() {
+ let mut tcp2 = tcp2;
+ tcp2.read_u8().unwrap();
+ txdone3.send(());
+ });
+
+ // Try to ensure that the reading clone is indeed reading
+ for _ in range(0i, 50) {
+ ::task::deschedule();
+ }
+
+ // clone the handle again while it's reading, then let it finish the
+ // read.
+ let _ = tcp.clone();
+ tx.send(());
+ rxdone.recv();
+ rxdone.recv();
+ })
}
use std::io::timer;
use mem;
- let mut tb = TaskBuilder::new();
+ let tb = TaskBuilder::new();
let rx = tb.try_future(proc() {});
mem::drop(rx);
timer::sleep(1000);