self.home().sched_id
}
- // XXX: dummy self parameter
- fn restore_original_home(_: Option<Self>, io_home: uint) {
+ /// Fires a single homing missile, returning another missile targeted back
+ /// at the original home of this task. In other words, this function will
+ /// move the local task to its I/O scheduler and then return an RAII wrapper
+ /// which will return the task home.
+ fn fire_homing_missile(&mut self) -> HomingMissile {
+ HomingMissile { io_home: self.go_to_IO_home() }
+ }
+
+ /// Same as `fire_homing_missile`, but returns the local I/O scheduler as
+ /// well (the one that was homed to).
+ fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
+ // First, transplant ourselves to the home I/O scheduler
+ let missile = self.fire_homing_missile();
+ // Next (must happen next), grab the local I/O scheduler
+ let io_sched: ~Scheduler = Local::take();
+
+ (missile, io_sched)
+ }
+}
+
+/// After a homing operation has been completed, this will return the current
+/// task back to its appropriate home (if applicable). The field is used to
+/// assert that we are where we think we are.
+struct HomingMissile {
+ priv io_home: uint,
+}
+
+impl Drop for HomingMissile {
+ fn drop(&mut self) {
// It would truly be a sad day if we had moved off the home I/O
// scheduler while we were doing I/O.
assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
- io_home);
+ self.io_home);
// If we were a homed task, then we must send ourselves back to the
// original scheduler. Otherwise, we can just return and keep running
}
}
}
-
- fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
- let home = self.go_to_IO_home();
- let a = io(self); // do IO
- HomingIO::restore_original_home(None::<Self>, home);
- a // return the result of the IO
- }
-
- fn home_for_io_consume<A>(mut self, io: &fn(Self) -> A) -> A {
- let home = self.go_to_IO_home();
- let a = io(self); // do IO
- HomingIO::restore_original_home(None::<Self>, home);
- a // return the result of the IO
- }
-
- fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
- let home = self.go_to_IO_home();
- let a = do task::unkillable { // FIXME(#8674)
- let scheduler: ~Scheduler = Local::take();
- io_sched(self, scheduler) // do IO and scheduling action
- };
- HomingIO::restore_original_home(None::<Self>, home);
- a // return result of IO
- }
}
// get a handle for the current scheduler
impl Drop for UvTcpListener {
fn drop(&mut self) {
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task = Cell::new(task);
- do self_.watcher.as_stream().close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task.take());
- }
+ let (_m, sched) = self.fire_homing_missile_sched();
+ do sched.deschedule_running_task_and_then |_, task| {
+ let task = Cell::new(task);
+ do self.watcher.as_stream().close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task.take());
}
}
}
impl RtioSocket for UvTcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- do self.home_for_io |self_| {
- socket_name(Tcp, self_.watcher)
- }
+ let _m = self.fire_homing_missile();
+ socket_name(Tcp, self.watcher)
}
}
impl RtioTcpListener for UvTcpListener {
- fn listen(~self) -> Result<~RtioTcpAcceptor, IoError> {
- do self.home_for_io_consume |self_| {
- let acceptor = ~UvTcpAcceptor::new(self_);
- let incoming = Cell::new(acceptor.incoming.clone());
- let mut stream = acceptor.listener.watcher.as_stream();
- let res = do stream.listen |mut server, status| {
- do incoming.with_mut_ref |incoming| {
- let inc = match status {
- Some(_) => Err(standard_error(OtherIoError)),
- None => {
- let inc = TcpWatcher::new(&server.event_loop());
- // first accept call in the callback guarenteed to succeed
- server.accept(inc.as_stream());
- let home = get_handle_to_current_scheduler!();
- Ok(~UvTcpStream { watcher: inc, home: home }
- as ~RtioTcpStream)
- }
- };
- incoming.send(inc);
- }
- };
- match res {
- Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
- Err(e) => Err(uv_error_to_io_error(e)),
+ fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
+ let _m = self.fire_homing_missile();
+ let acceptor = ~UvTcpAcceptor::new(*self);
+ let incoming = Cell::new(acceptor.incoming.clone());
+ let mut stream = acceptor.listener.watcher.as_stream();
+ let res = do stream.listen |mut server, status| {
+ do incoming.with_mut_ref |incoming| {
+ let inc = match status {
+ Some(_) => Err(standard_error(OtherIoError)),
+ None => {
+ let inc = TcpWatcher::new(&server.event_loop());
+ // first accept call in the callback guarenteed to succeed
+ server.accept(inc.as_stream());
+ let home = get_handle_to_current_scheduler!();
+ Ok(~UvTcpStream { watcher: inc, home: home }
+ as ~RtioTcpStream)
+ }
+ };
+ incoming.send(inc);
}
+ };
+ match res {
+ Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
impl RtioSocket for UvTcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- do self.home_for_io |self_| {
- socket_name(Tcp, self_.listener.watcher)
- }
+ let _m = self.fire_homing_missile();
+ socket_name(Tcp, self.listener.watcher)
}
}
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
- do self.home_for_io |self_| {
- self_.incoming.recv()
- }
+ let _m = self.fire_homing_missile();
+ self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- accept_simultaneously(self_.listener.watcher.as_stream(), 1)
- }
+ let _m = self.fire_homing_missile();
+ accept_simultaneously(self.listener.watcher.as_stream(), 1)
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- accept_simultaneously(self_.listener.watcher.as_stream(), 0)
- }
+ let _m = self.fire_homing_missile();
+ accept_simultaneously(self.listener.watcher.as_stream(), 0)
}
}
impl Drop for UvUnboundPipe {
fn drop(&mut self) {
- do self.home_for_io |self_| {
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self_.pipe.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, sched) = self.fire_homing_missile_sched();
+ do sched.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do self.pipe.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
impl RtioPipe for UvPipeStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- do self.inner.home_for_io_with_sched |self_, scheduler| {
- read_stream(self_.pipe.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.inner.fire_homing_missile_sched();
+ read_stream(self.inner.pipe.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- do self.inner.home_for_io_with_sched |self_, scheduler| {
- write_stream(self_.pipe.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.inner.fire_homing_missile_sched();
+ write_stream(self.inner.pipe.as_stream(), scheduler, buf)
}
}
impl Drop for UvTcpStream {
fn drop(&mut self) {
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self_.watcher.as_stream().close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, sched) = self.fire_homing_missile_sched();
+ do sched.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do self.watcher.as_stream().close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
impl RtioSocket for UvTcpStream {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- do self.home_for_io |self_| {
- socket_name(Tcp, self_.watcher)
- }
+ let _m = self.fire_homing_missile();
+ socket_name(Tcp, self.watcher)
}
}
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- read_stream(self_.watcher.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ read_stream(self.watcher.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- write_stream(self_.watcher.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ write_stream(self.watcher.as_stream(), scheduler, buf)
}
fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
- do self.home_for_io |self_| {
- socket_name(TcpPeer, self_.watcher)
- }
+ let _m = self.fire_homing_missile();
+ socket_name(TcpPeer, self.watcher)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 0 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
+ })
}
fn nodelay(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 1 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
+ })
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
- delay_in_seconds as c_uint)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
+ delay_in_seconds as c_uint)
+ })
}
fn letdie(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_tcp_keepalive(self_.watcher.native_handle(),
- 0 as c_int, 0 as c_uint)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_keepalive(self.watcher.native_handle(),
+ 0 as c_int, 0 as c_uint)
+ })
}
}
impl Drop for UvUdpSocket {
fn drop(&mut self) {
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self_.watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do self.watcher.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
impl RtioSocket for UvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- do self.home_for_io |self_| {
- socket_name(Udp, self_.watcher)
- }
+ let _m = self.fire_homing_missile();
+ socket_name(Udp, self.watcher)
}
}
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
- let uv_buf = slice_to_uv_buf(buf);
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let alloc: AllocCallback = |_| uv_buf;
- do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
- let _ = flags; // /XXX add handling for partials?
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ let result_cell = Cell::new_empty();
+ let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
- watcher.recv_stop();
+ let buf_ptr: *&mut [u8] = &buf;
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
+ do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
+ let _ = flags; // /XXX add handling for partials?
- let result = match status {
- None => {
- assert!(nread >= 0);
- Ok((nread as uint, addr))
- }
- Some(err) => Err(uv_error_to_io_error(err)),
- };
+ watcher.recv_stop();
- unsafe { (*result_cell_ptr).put_back(result); }
+ let result = match status {
+ None => {
+ assert!(nread >= 0);
+ Ok((nread as uint, addr))
+ }
+ Some(err) => Err(uv_error_to_io_error(err)),
+ };
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
+ unsafe { (*result_cell_ptr).put_back(result); }
- assert!(!result_cell.is_empty());
- result_cell.take()
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
}
+
+ assert!(!result_cell.is_empty());
+ result_cell.take()
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- let buf_ptr: *&[u8] = &buf;
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
- do self_.watcher.send(buf, dst) |_watcher, status| {
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ let result_cell = Cell::new_empty();
+ let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+ let buf_ptr: *&[u8] = &buf;
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+ do self.watcher.send(buf, dst) |_watcher, status| {
- let result = match status {
- None => Ok(()),
- Some(err) => Err(uv_error_to_io_error(err)),
- };
+ let result = match status {
+ None => Ok(()),
+ Some(err) => Err(uv_error_to_io_error(err)),
+ };
- unsafe { (*result_cell_ptr).put_back(result); }
+ unsafe { (*result_cell_ptr).put_back(result); }
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
-
- assert!(!result_cell.is_empty());
- result_cell.take()
}
+
+ assert!(!result_cell.is_empty());
+ result_cell.take()
}
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- do multi.to_str().with_c_str |m_addr| {
- uvll::uv_udp_set_membership(self_.watcher.native_handle(),
- m_addr, ptr::null(),
- uvll::UV_JOIN_GROUP)
- }
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ do multi.to_str().with_c_str |m_addr| {
+ uvll::uv_udp_set_membership(self.watcher.native_handle(),
+ m_addr, ptr::null(),
+ uvll::UV_JOIN_GROUP)
+ }
+ })
}
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- do multi.to_str().with_c_str |m_addr| {
- uvll::uv_udp_set_membership(self_.watcher.native_handle(),
- m_addr, ptr::null(),
- uvll::UV_LEAVE_GROUP)
- }
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ do multi.to_str().with_c_str |m_addr| {
+ uvll::uv_udp_set_membership(self.watcher.native_handle(),
+ m_addr, ptr::null(),
+ uvll::UV_LEAVE_GROUP)
+ }
+ })
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
- 1 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
+ 1 as c_int)
+ })
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
- 0 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
+ 0 as c_int)
+ })
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_multicast_ttl(self_.watcher.native_handle(),
- ttl as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
+ ttl as c_int)
+ })
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
+ })
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
- 1 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
+ 1 as c_int)
+ })
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- let r = unsafe {
- uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
- 0 as c_int)
- };
- status_to_io_result(r)
- }
+ let _m = self.fire_homing_missile();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
+ 0 as c_int)
+ })
}
}
impl Drop for UvTimer {
fn drop(&mut self) {
- do self.home_for_io_with_sched |self_, scheduler| {
- uvdebug!("closing UvTimer");
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self_.watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ uvdebug!("closing UvTimer");
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do self.watcher.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
impl RtioTimer for UvTimer {
fn sleep(&mut self, msecs: u64) {
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_sched, task| {
- uvdebug!("sleep: entered scheduler context");
- let task_cell = Cell::new(task);
- do self_.watcher.start(msecs, 0) |_, status| {
- assert!(status.is_none());
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ do scheduler.deschedule_running_task_and_then |_sched, task| {
+ uvdebug!("sleep: entered scheduler context");
+ let task_cell = Cell::new(task);
+ do self.watcher.start(msecs, 0) |_, status| {
+ assert!(status.is_none());
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
- self_.watcher.stop();
}
+ self.watcher.stop();
}
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
let (port, chan) = oneshot();
let chan = Cell::new(chan);
- do self.home_for_io |self_| {
- let chan = Cell::new(chan.take());
- do self_.watcher.start(msecs, 0) |_, status| {
- assert!(status.is_none());
- assert!(!chan.is_empty());
- chan.take().send_deferred(());
- }
+ let _m = self.fire_homing_missile();
+ do self.watcher.start(msecs, 0) |_, status| {
+ assert!(status.is_none());
+ assert!(!chan.is_empty());
+ chan.take().send_deferred(());
}
return port;
let (port, chan) = stream();
let chan = Cell::new(chan);
- do self.home_for_io |self_| {
- let chan = Cell::new(chan.take());
- do self_.watcher.start(msecs, msecs) |_, status| {
- assert!(status.is_none());
- do chan.with_ref |chan| {
- chan.send_deferred(());
- }
+ let _m = self.fire_homing_missile();
+ do self.watcher.start(msecs, msecs) |_, status| {
+ assert!(status.is_none());
+ do chan.with_ref |chan| {
+ chan.send_deferred(());
}
}
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_, task| {
- let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
- let task_cell = Cell::new(task);
- let read_req = file::FsRequest::new();
- do read_req.read(&self_.loop_, self_.fd, buf, offset) |req, uverr| {
- let res = match uverr {
- None => Ok(req.get_result() as int),
- Some(err) => Err(uv_error_to_io_error(err))
- };
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+ let task_cell = Cell::new(task);
+ let read_req = file::FsRequest::new();
+ do read_req.read(&self.loop_, self.fd, buf, offset) |req, uverr| {
+ let res = match uverr {
+ None => Ok(req.get_result() as int),
+ Some(err) => Err(uv_error_to_io_error(err))
+ };
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
result_cell.take()
-> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- do self.home_for_io_with_sched |self_, sched| {
- do sched.deschedule_running_task_and_then |_, task| {
- let task = Cell::new(task);
- let req = file::FsRequest::new();
- do f(self_, req) |_, uverr| {
- let res = match uverr {
- None => Ok(()),
- Some(err) => Err(uv_error_to_io_error(err))
- };
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task.take());
- }
+ let (_m, sched) = self.fire_homing_missile_sched();
+ do sched.deschedule_running_task_and_then |_, task| {
+ let task = Cell::new(task);
+ let req = file::FsRequest::new();
+ do f(self_, req) |_, uverr| {
+ let res = match uverr {
+ None => Ok(()),
+ Some(err) => Err(uv_error_to_io_error(err))
+ };
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task.take());
}
}
result_cell.take()
do close_req.close(&self.loop_, self.fd) |_,_| {}
}
CloseSynchronously => {
- do self.home_for_io_with_sched |self_, scheduler| {
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let close_req = file::FsRequest::new();
- do close_req.close(&self_.loop_, self_.fd) |_,_| {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let close_req = file::FsRequest::new();
+ do close_req.close(&self.loop_, self.fd) |_,_| {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
fn tell(&self) -> Result<u64, IoError> {
use std::libc::SEEK_CUR;
// this is temporary
- let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
+ let self_ = unsafe { cast::transmute_mut(self) };
self_.seek_common(0, SEEK_CUR)
}
fn fsync(&mut self) -> Result<(), IoError> {
if self.home.is_none() {
close(self)
} else {
- self.home_for_io(close)
+ let _m = self.fire_homing_missile();
+ close(self)
}
}
}
}
fn kill(&mut self, signal: int) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- match self_.process.kill(signal) {
- Ok(()) => Ok(()),
- Err(uverr) => Err(uv_error_to_io_error(uverr))
- }
+ let _m = self.fire_homing_missile();
+ match self.process.kill(signal) {
+ Ok(()) => Ok(()),
+ Err(uverr) => Err(uv_error_to_io_error(uverr))
}
}
fn wait(&mut self) -> int {
// Make sure (on the home scheduler) that we have an exit status listed
- do self.home_for_io |self_| {
- match self_.exit_status {
- Some(*) => {}
- None => {
- // If there's no exit code previously listed, then the
- // process's exit callback has yet to be invoked. We just
- // need to deschedule ourselves and wait to be reawoken.
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- assert!(self_.descheduled.is_none());
- self_.descheduled = Some(task);
- }
- assert!(self_.exit_status.is_some());
+ let _m = self.fire_homing_missile();
+ match self.exit_status {
+ Some(*) => {}
+ None => {
+ // If there's no exit code previously listed, then the
+ // process's exit callback has yet to be invoked. We just
+ // need to deschedule ourselves and wait to be reawoken.
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ assert!(self.descheduled.is_none());
+ self.descheduled = Some(task);
}
+ assert!(self.exit_status.is_some());
}
}
}
impl RtioUnixListener for UvUnixListener {
- fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> {
- do self.home_for_io_consume |self_| {
- let acceptor = ~UvUnixAcceptor::new(self_);
- let incoming = Cell::new(acceptor.incoming.clone());
- let mut stream = acceptor.listener.inner.pipe.as_stream();
- let res = do stream.listen |mut server, status| {
- do incoming.with_mut_ref |incoming| {
- let inc = match status {
- Some(e) => Err(uv_error_to_io_error(e)),
- None => {
- let pipe = UvUnboundPipe::new(&server.event_loop());
- server.accept(pipe.pipe.as_stream());
- Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
- }
- };
- incoming.send(inc);
- }
- };
- match res {
- Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
- Err(e) => Err(uv_error_to_io_error(e)),
+ fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
+ let _m = self.fire_homing_missile();
+ let acceptor = ~UvUnixAcceptor::new(*self);
+ let incoming = Cell::new(acceptor.incoming.clone());
+ let mut stream = acceptor.listener.inner.pipe.as_stream();
+ let res = do stream.listen |mut server, status| {
+ do incoming.with_mut_ref |incoming| {
+ let inc = match status {
+ Some(e) => Err(uv_error_to_io_error(e)),
+ None => {
+ let pipe = UvUnboundPipe::new(&server.event_loop());
+ server.accept(pipe.pipe.as_stream());
+ Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
+ }
+ };
+ incoming.send(inc);
}
+ };
+ match res {
+ Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
impl RtioTTY for UvTTY {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- read_stream(self_.tty.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ read_stream(self.tty.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- do self.home_for_io_with_sched |self_, scheduler| {
- write_stream(self_.tty.as_stream(), scheduler, buf)
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ write_stream(self.tty.as_stream(), scheduler, buf)
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- match self_.tty.set_mode(raw) {
- Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
- }
+ let _m = self.fire_homing_missile();
+ match self.tty.set_mode(raw) {
+ Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
- do self.home_for_io |self_| {
- match self_.tty.get_winsize() {
- Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
- }
+ let _m = self.fire_homing_missile();
+ match self.tty.get_winsize() {
+ Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
impl RtioUnixAcceptor for UvUnixAcceptor {
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
- do self.home_for_io |self_| {
- self_.incoming.recv()
- }
+ let _m = self.fire_homing_missile();
+ self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1)
- }
+ let _m = self.fire_homing_missile();
+ accept_simultaneously(self.listener.inner.pipe.as_stream(), 1)
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
- do self.home_for_io |self_| {
- accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0)
- }
+ let _m = self.fire_homing_missile();
+ accept_simultaneously(self.listener.inner.pipe.as_stream(), 0)
}
}
impl Drop for UvSignal {
fn drop(&mut self) {
- do self.home_for_io_with_sched |self_, scheduler| {
- uvdebug!("closing UvSignal");
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self_.watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ let (_m, scheduler) = self.fire_homing_missile_sched();
+ uvdebug!("closing UvSignal");
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do self.watcher.close {
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}