]> git.lizzy.rs Git - rust.git/commitdiff
uv: Remove closure-based home_for_io for raii
authorAlex Crichton <alex@alexcrichton.com>
Sun, 3 Nov 2013 19:26:08 +0000 (11:26 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:10 +0000 (01:37 -0800)
Using an raii wrapper means that there's no need for a '_self' variant and we
can greatly reduce the amount of 'self_'-named variables.

src/librustuv/uvio.rs

index 1dbc7d71543e1aa7a25cfe50ef32a39d25c112f1..bf8358070dcdf9e7572f9a909fd59d2f2893fe91 100644 (file)
@@ -86,12 +86,39 @@ fn go_to_IO_home(&mut self) -> uint {
         self.home().sched_id
     }
 
-    // XXX: dummy self parameter
-    fn restore_original_home(_: Option<Self>, io_home: uint) {
+    /// Fires a single homing missile, returning another missile targeted back
+    /// at the original home of this task. In other words, this function will
+    /// move the local task to its I/O scheduler and then return an RAII wrapper
+    /// which will return the task home.
+    fn fire_homing_missile(&mut self) -> HomingMissile {
+        HomingMissile { io_home: self.go_to_IO_home() }
+    }
+
+    /// Same as `fire_homing_missile`, but returns the local I/O scheduler as
+    /// well (the one that was homed to).
+    fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
+        // First, transplant ourselves to the home I/O scheduler
+        let missile = self.fire_homing_missile();
+        // Next (must happen next), grab the local I/O scheduler
+        let io_sched: ~Scheduler = Local::take();
+
+        (missile, io_sched)
+    }
+}
+
+/// After a homing operation has been completed, this will return the current
+/// task back to its appropriate home (if applicable). The field is used to
+/// assert that we are where we think we are.
+struct HomingMissile {
+    priv io_home: uint,
+}
+
+impl Drop for HomingMissile {
+    fn drop(&mut self) {
         // It would truly be a sad day if we had moved off the home I/O
         // scheduler while we were doing I/O.
         assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
-                   io_home);
+                   self.io_home);
 
         // If we were a homed task, then we must send ourselves back to the
         // original scheduler. Otherwise, we can just return and keep running
@@ -106,30 +133,6 @@ fn restore_original_home(_: Option<Self>, io_home: uint) {
             }
         }
     }
-
-    fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
-        let home = self.go_to_IO_home();
-        let a = io(self); // do IO
-        HomingIO::restore_original_home(None::<Self>, home);
-        a // return the result of the IO
-    }
-
-    fn home_for_io_consume<A>(mut self, io: &fn(Self) -> A) -> A {
-        let home = self.go_to_IO_home();
-        let a = io(self); // do IO
-        HomingIO::restore_original_home(None::<Self>, home);
-        a // return the result of the IO
-    }
-
-    fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
-        let home = self.go_to_IO_home();
-        let a = do task::unkillable { // FIXME(#8674)
-            let scheduler: ~Scheduler = Local::take();
-            io_sched(self, scheduler) // do IO and scheduling action
-        };
-        HomingIO::restore_original_home(None::<Self>, home);
-        a // return result of IO
-    }
 }
 
 // get a handle for the current scheduler
@@ -915,13 +918,12 @@ fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
 
 impl Drop for UvTcpListener {
     fn drop(&mut self) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task = Cell::new(task);
-                do self_.watcher.as_stream().close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task.take());
-                }
+        let (_m, sched) = self.fire_homing_missile_sched();
+        do sched.deschedule_running_task_and_then |_, task| {
+            let task = Cell::new(task);
+            do self.watcher.as_stream().close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task.take());
             }
         }
     }
@@ -929,38 +931,36 @@ fn drop(&mut self) {
 
 impl RtioSocket for UvTcpListener {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(Tcp, self_.watcher)
-        }
+        let _m = self.fire_homing_missile();
+        socket_name(Tcp, self.watcher)
     }
 }
 
 impl RtioTcpListener for UvTcpListener {
-    fn listen(~self) -> Result<~RtioTcpAcceptor, IoError> {
-        do self.home_for_io_consume |self_| {
-            let acceptor = ~UvTcpAcceptor::new(self_);
-            let incoming = Cell::new(acceptor.incoming.clone());
-            let mut stream = acceptor.listener.watcher.as_stream();
-            let res = do stream.listen |mut server, status| {
-                do incoming.with_mut_ref |incoming| {
-                    let inc = match status {
-                        Some(_) => Err(standard_error(OtherIoError)),
-                        None => {
-                            let inc = TcpWatcher::new(&server.event_loop());
-                            // first accept call in the callback guarenteed to succeed
-                            server.accept(inc.as_stream());
-                            let home = get_handle_to_current_scheduler!();
-                            Ok(~UvTcpStream { watcher: inc, home: home }
-                                    as ~RtioTcpStream)
-                        }
-                    };
-                    incoming.send(inc);
-                }
-            };
-            match res {
-                Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
-                Err(e) => Err(uv_error_to_io_error(e)),
+    fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
+        let _m = self.fire_homing_missile();
+        let acceptor = ~UvTcpAcceptor::new(*self);
+        let incoming = Cell::new(acceptor.incoming.clone());
+        let mut stream = acceptor.listener.watcher.as_stream();
+        let res = do stream.listen |mut server, status| {
+            do incoming.with_mut_ref |incoming| {
+                let inc = match status {
+                    Some(_) => Err(standard_error(OtherIoError)),
+                    None => {
+                        let inc = TcpWatcher::new(&server.event_loop());
+                        // first accept call in the callback guarenteed to succeed
+                        server.accept(inc.as_stream());
+                        let home = get_handle_to_current_scheduler!();
+                        Ok(~UvTcpStream { watcher: inc, home: home }
+                                as ~RtioTcpStream)
+                    }
+                };
+                incoming.send(inc);
             }
+        };
+        match res {
+            Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 }
@@ -982,9 +982,8 @@ fn new(listener: UvTcpListener) -> UvTcpAcceptor {
 
 impl RtioSocket for UvTcpAcceptor {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(Tcp, self_.listener.watcher)
-        }
+        let _m = self.fire_homing_missile();
+        socket_name(Tcp, self.listener.watcher)
     }
 }
 
@@ -997,21 +996,18 @@ fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
 
 impl RtioTcpAcceptor for UvTcpAcceptor {
     fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
-        do self.home_for_io |self_| {
-            self_.incoming.recv()
-        }
+        let _m = self.fire_homing_missile();
+        self.incoming.recv()
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            accept_simultaneously(self_.listener.watcher.as_stream(), 1)
-        }
+        let _m = self.fire_homing_missile();
+        accept_simultaneously(self.listener.watcher.as_stream(), 1)
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            accept_simultaneously(self_.listener.watcher.as_stream(), 0)
-        }
+        let _m = self.fire_homing_missile();
+        accept_simultaneously(self.listener.watcher.as_stream(), 0)
     }
 }
 
@@ -1102,14 +1098,12 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 
 impl Drop for UvUnboundPipe {
     fn drop(&mut self) {
-        do self.home_for_io |self_| {
-            let scheduler: ~Scheduler = Local::take();
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.pipe.close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, sched) = self.fire_homing_missile_sched();
+        do sched.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do self.pipe.close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -1127,14 +1121,12 @@ pub fn new(inner: UvUnboundPipe) -> UvPipeStream {
 
 impl RtioPipe for UvPipeStream {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        do self.inner.home_for_io_with_sched |self_, scheduler| {
-            read_stream(self_.pipe.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.inner.fire_homing_missile_sched();
+        read_stream(self.inner.pipe.as_stream(), scheduler, buf)
     }
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        do self.inner.home_for_io_with_sched |self_, scheduler| {
-            write_stream(self_.pipe.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.inner.fire_homing_missile_sched();
+        write_stream(self.inner.pipe.as_stream(), scheduler, buf)
     }
 }
 
@@ -1149,13 +1141,12 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 
 impl Drop for UvTcpStream {
     fn drop(&mut self) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.watcher.as_stream().close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, sched) = self.fire_homing_missile_sched();
+        do sched.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do self.watcher.as_stream().close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -1163,67 +1154,55 @@ fn drop(&mut self) {
 
 impl RtioSocket for UvTcpStream {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(Tcp, self_.watcher)
-        }
+        let _m = self.fire_homing_missile();
+        socket_name(Tcp, self.watcher)
     }
 }
 
 impl RtioTcpStream for UvTcpStream {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            read_stream(self_.watcher.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        read_stream(self.watcher.as_stream(), scheduler, buf)
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            write_stream(self_.watcher.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        write_stream(self.watcher.as_stream(), scheduler, buf)
     }
 
     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(TcpPeer, self_.watcher)
-        }
+        let _m = self.fire_homing_missile();
+        socket_name(TcpPeer, self.watcher)
     }
 
     fn control_congestion(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 0 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
+        })
     }
 
     fn nodelay(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 1 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
+        })
     }
 
     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
-                                       delay_in_seconds as c_uint)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
+                                   delay_in_seconds as c_uint)
+        })
     }
 
     fn letdie(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_tcp_keepalive(self_.watcher.native_handle(),
-                                       0 as c_int, 0 as c_uint)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_keepalive(self.watcher.native_handle(),
+                                   0 as c_int, 0 as c_uint)
+        })
     }
 }
 
@@ -1238,13 +1217,12 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 
 impl Drop for UvUdpSocket {
     fn drop(&mut self) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.watcher.close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do self.watcher.close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -1252,156 +1230,138 @@ fn drop(&mut self) {
 
 impl RtioSocket for UvUdpSocket {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(Udp, self_.watcher)
-        }
+        let _m = self.fire_homing_missile();
+        socket_name(Udp, self.watcher)
     }
 }
 
 impl RtioUdpSocket for UvUdpSocket {
     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            let result_cell = Cell::new_empty();
-            let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
-            let uv_buf = slice_to_uv_buf(buf);
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                let alloc: AllocCallback = |_| uv_buf;
-                do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
-                    let _ = flags; // /XXX add handling for partials?
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        let result_cell = Cell::new_empty();
+        let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
 
-                    watcher.recv_stop();
+        let buf_ptr: *&mut [u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
+            do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
+                let _ = flags; // /XXX add handling for partials?
 
-                    let result = match status {
-                        None => {
-                            assert!(nread >= 0);
-                            Ok((nread as uint, addr))
-                        }
-                        Some(err) => Err(uv_error_to_io_error(err)),
-                    };
+                watcher.recv_stop();
 
-                    unsafe { (*result_cell_ptr).put_back(result); }
+                let result = match status {
+                    None => {
+                        assert!(nread >= 0);
+                        Ok((nread as uint, addr))
+                    }
+                    Some(err) => Err(uv_error_to_io_error(err)),
+                };
 
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
-            }
+                unsafe { (*result_cell_ptr).put_back(result); }
 
-            assert!(!result_cell.is_empty());
-            result_cell.take()
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
+            }
         }
+
+        assert!(!result_cell.is_empty());
+        result_cell.take()
     }
 
     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            let result_cell = Cell::new_empty();
-            let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-            let buf_ptr: *&[u8] = &buf;
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-                do self_.watcher.send(buf, dst) |_watcher, status| {
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        let result_cell = Cell::new_empty();
+        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+        let buf_ptr: *&[u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+            do self.watcher.send(buf, dst) |_watcher, status| {
 
-                    let result = match status {
-                        None => Ok(()),
-                        Some(err) => Err(uv_error_to_io_error(err)),
-                    };
+                let result = match status {
+                    None => Ok(()),
+                    Some(err) => Err(uv_error_to_io_error(err)),
+                };
 
-                    unsafe { (*result_cell_ptr).put_back(result); }
+                unsafe { (*result_cell_ptr).put_back(result); }
 
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
-
-            assert!(!result_cell.is_empty());
-            result_cell.take()
         }
+
+        assert!(!result_cell.is_empty());
+        result_cell.take()
     }
 
     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                do multi.to_str().with_c_str |m_addr| {
-                    uvll::uv_udp_set_membership(self_.watcher.native_handle(),
-                                                m_addr, ptr::null(),
-                                                uvll::UV_JOIN_GROUP)
-                }
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            do multi.to_str().with_c_str |m_addr| {
+                uvll::uv_udp_set_membership(self.watcher.native_handle(),
+                                            m_addr, ptr::null(),
+                                            uvll::UV_JOIN_GROUP)
+            }
+        })
     }
 
     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                do multi.to_str().with_c_str |m_addr| {
-                    uvll::uv_udp_set_membership(self_.watcher.native_handle(),
-                                                m_addr, ptr::null(),
-                                                uvll::UV_LEAVE_GROUP)
-                }
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            do multi.to_str().with_c_str |m_addr| {
+                uvll::uv_udp_set_membership(self.watcher.native_handle(),
+                                            m_addr, ptr::null(),
+                                            uvll::UV_LEAVE_GROUP)
+            }
+        })
     }
 
     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
-                                                1 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
+                                            1 as c_int)
+        })
     }
 
     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
-                                                0 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
+                                            0 as c_int)
+        })
     }
 
     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_multicast_ttl(self_.watcher.native_handle(),
-                                               ttl as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
+                                           ttl as c_int)
+        })
     }
 
     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
+        })
     }
 
     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
-                                           1 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
+                                       1 as c_int)
+        })
     }
 
     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            let r = unsafe {
-                uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
-                                           0 as c_int)
-            };
-            status_to_io_result(r)
-        }
+        let _m = self.fire_homing_missile();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
+                                       0 as c_int)
+        })
     }
 }
 
@@ -1422,14 +1382,13 @@ fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
 
 impl Drop for UvTimer {
     fn drop(&mut self) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            uvdebug!("closing UvTimer");
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.watcher.close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        uvdebug!("closing UvTimer");
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do self.watcher.close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -1437,18 +1396,17 @@ fn drop(&mut self) {
 
 impl RtioTimer for UvTimer {
     fn sleep(&mut self, msecs: u64) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_sched, task| {
-                uvdebug!("sleep: entered scheduler context");
-                let task_cell = Cell::new(task);
-                do self_.watcher.start(msecs, 0) |_, status| {
-                    assert!(status.is_none());
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        do scheduler.deschedule_running_task_and_then |_sched, task| {
+            uvdebug!("sleep: entered scheduler context");
+            let task_cell = Cell::new(task);
+            do self.watcher.start(msecs, 0) |_, status| {
+                assert!(status.is_none());
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
-            self_.watcher.stop();
         }
+        self.watcher.stop();
     }
 
     fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
@@ -1456,13 +1414,11 @@ fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
 
         let (port, chan) = oneshot();
         let chan = Cell::new(chan);
-        do self.home_for_io |self_| {
-            let chan = Cell::new(chan.take());
-            do self_.watcher.start(msecs, 0) |_, status| {
-                assert!(status.is_none());
-                assert!(!chan.is_empty());
-                chan.take().send_deferred(());
-            }
+        let _m = self.fire_homing_missile();
+        do self.watcher.start(msecs, 0) |_, status| {
+            assert!(status.is_none());
+            assert!(!chan.is_empty());
+            chan.take().send_deferred(());
         }
 
         return port;
@@ -1473,13 +1429,11 @@ fn period(&mut self, msecs: u64) -> Port<()> {
 
         let (port, chan) = stream();
         let chan = Cell::new(chan);
-        do self.home_for_io |self_| {
-            let chan = Cell::new(chan.take());
-            do self_.watcher.start(msecs, msecs) |_, status| {
-                assert!(status.is_none());
-                do chan.with_ref |chan| {
-                    chan.send_deferred(());
-                }
+        let _m = self.fire_homing_missile();
+        do self.watcher.start(msecs, msecs) |_, status| {
+            assert!(status.is_none());
+            do chan.with_ref |chan| {
+                chan.send_deferred(());
             }
         }
 
@@ -1512,20 +1466,19 @@ fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
         let buf_ptr: *&mut [u8] = &buf;
-        do self.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-                let task_cell = Cell::new(task);
-                let read_req = file::FsRequest::new();
-                do read_req.read(&self_.loop_, self_.fd, buf, offset) |req, uverr| {
-                    let res = match uverr  {
-                        None => Ok(req.get_result() as int),
-                        Some(err) => Err(uv_error_to_io_error(err))
-                    };
-                    unsafe { (*result_cell_ptr).put_back(res); }
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+            let task_cell = Cell::new(task);
+            let read_req = file::FsRequest::new();
+            do read_req.read(&self.loop_, self.fd, buf, offset) |req, uverr| {
+                let res = match uverr  {
+                    None => Ok(req.get_result() as int),
+                    Some(err) => Err(uv_error_to_io_error(err))
+                };
+                unsafe { (*result_cell_ptr).put_back(res); }
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
         result_cell.take()
@@ -1555,19 +1508,18 @@ fn nop_req(&mut self, f: &fn(&mut UvFileStream, file::FsRequest, FsCallback))
             -> Result<(), IoError> {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        do self.home_for_io_with_sched |self_, sched| {
-            do sched.deschedule_running_task_and_then |_, task| {
-                let task = Cell::new(task);
-                let req = file::FsRequest::new();
-                do f(self_, req) |_, uverr| {
-                    let res = match uverr  {
-                        None => Ok(()),
-                        Some(err) => Err(uv_error_to_io_error(err))
-                    };
-                    unsafe { (*result_cell_ptr).put_back(res); }
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task.take());
-                }
+        let (_m, sched) = self.fire_homing_missile_sched();
+        do sched.deschedule_running_task_and_then |_, task| {
+            let task = Cell::new(task);
+            let req = file::FsRequest::new();
+            do f(self_, req) |_, uverr| {
+                let res = match uverr  {
+                    None => Ok(()),
+                    Some(err) => Err(uv_error_to_io_error(err))
+                };
+                unsafe { (*result_cell_ptr).put_back(res); }
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task.take());
             }
         }
         result_cell.take()
@@ -1583,14 +1535,13 @@ fn drop(&mut self) {
                 do close_req.close(&self.loop_, self.fd) |_,_| {}
             }
             CloseSynchronously => {
-                do self.home_for_io_with_sched |self_, scheduler| {
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        let task_cell = Cell::new(task);
-                        let close_req = file::FsRequest::new();
-                        do close_req.close(&self_.loop_, self_.fd) |_,_| {
-                            let scheduler: ~Scheduler = Local::take();
-                            scheduler.resume_blocked_task_immediately(task_cell.take());
-                        }
+                let (_m, scheduler) = self.fire_homing_missile_sched();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    let task_cell = Cell::new(task);
+                    let close_req = file::FsRequest::new();
+                    do close_req.close(&self.loop_, self.fd) |_,_| {
+                        let scheduler: ~Scheduler = Local::take();
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
                 }
             }
@@ -1623,7 +1574,7 @@ fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
     fn tell(&self) -> Result<u64, IoError> {
         use std::libc::SEEK_CUR;
         // this is temporary
-        let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
+        let self_ = unsafe { cast::transmute_mut(self) };
         self_.seek_common(0, SEEK_CUR)
     }
     fn fsync(&mut self) -> Result<(), IoError> {
@@ -1681,7 +1632,8 @@ fn drop(&mut self) {
         if self.home.is_none() {
             close(self)
         } else {
-            self.home_for_io(close)
+            let _m = self.fire_homing_missile();
+            close(self)
         }
     }
 }
@@ -1692,30 +1644,28 @@ fn id(&self) -> pid_t {
     }
 
     fn kill(&mut self, signal: int) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            match self_.process.kill(signal) {
-                Ok(()) => Ok(()),
-                Err(uverr) => Err(uv_error_to_io_error(uverr))
-            }
+        let _m = self.fire_homing_missile();
+        match self.process.kill(signal) {
+            Ok(()) => Ok(()),
+            Err(uverr) => Err(uv_error_to_io_error(uverr))
         }
     }
 
     fn wait(&mut self) -> int {
         // Make sure (on the home scheduler) that we have an exit status listed
-        do self.home_for_io |self_| {
-            match self_.exit_status {
-                Some(*) => {}
-                None => {
-                    // If there's no exit code previously listed, then the
-                    // process's exit callback has yet to be invoked. We just
-                    // need to deschedule ourselves and wait to be reawoken.
-                    let scheduler: ~Scheduler = Local::take();
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        assert!(self_.descheduled.is_none());
-                        self_.descheduled = Some(task);
-                    }
-                    assert!(self_.exit_status.is_some());
+        let _m = self.fire_homing_missile();
+        match self.exit_status {
+            Some(*) => {}
+            None => {
+                // If there's no exit code previously listed, then the
+                // process's exit callback has yet to be invoked. We just
+                // need to deschedule ourselves and wait to be reawoken.
+                let scheduler: ~Scheduler = Local::take();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    assert!(self.descheduled.is_none());
+                    self.descheduled = Some(task);
                 }
+                assert!(self.exit_status.is_some());
             }
         }
 
@@ -1738,28 +1688,27 @@ fn new(pipe: UvUnboundPipe) -> UvUnixListener {
 }
 
 impl RtioUnixListener for UvUnixListener {
-    fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> {
-        do self.home_for_io_consume |self_| {
-            let acceptor = ~UvUnixAcceptor::new(self_);
-            let incoming = Cell::new(acceptor.incoming.clone());
-            let mut stream = acceptor.listener.inner.pipe.as_stream();
-            let res = do stream.listen |mut server, status| {
-                do incoming.with_mut_ref |incoming| {
-                    let inc = match status {
-                        Some(e) => Err(uv_error_to_io_error(e)),
-                        None => {
-                            let pipe = UvUnboundPipe::new(&server.event_loop());
-                            server.accept(pipe.pipe.as_stream());
-                            Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
-                        }
-                    };
-                    incoming.send(inc);
-                }
-            };
-            match res {
-                Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
-                Err(e) => Err(uv_error_to_io_error(e)),
+    fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
+        let _m = self.fire_homing_missile();
+        let acceptor = ~UvUnixAcceptor::new(*self);
+        let incoming = Cell::new(acceptor.incoming.clone());
+        let mut stream = acceptor.listener.inner.pipe.as_stream();
+        let res = do stream.listen |mut server, status| {
+            do incoming.with_mut_ref |incoming| {
+                let inc = match status {
+                    Some(e) => Err(uv_error_to_io_error(e)),
+                    None => {
+                        let pipe = UvUnboundPipe::new(&server.event_loop());
+                        server.accept(pipe.pipe.as_stream());
+                        Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
+                    }
+                };
+                incoming.send(inc);
             }
+        };
+        match res {
+            Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 }
@@ -1787,30 +1736,26 @@ fn drop(&mut self) {
 
 impl RtioTTY for UvTTY {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            read_stream(self_.tty.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        read_stream(self.tty.as_stream(), scheduler, buf)
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            write_stream(self_.tty.as_stream(), scheduler, buf)
-        }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        write_stream(self.tty.as_stream(), scheduler, buf)
     }
 
     fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            match self_.tty.set_mode(raw) {
-                Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
-            }
+        let _m = self.fire_homing_missile();
+        match self.tty.set_mode(raw) {
+            Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
     fn get_winsize(&mut self) -> Result<(int, int), IoError> {
-        do self.home_for_io |self_| {
-            match self_.tty.get_winsize() {
-                Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
-            }
+        let _m = self.fire_homing_missile();
+        match self.tty.get_winsize() {
+            Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
         }
     }
 
@@ -1836,21 +1781,18 @@ fn new(listener: UvUnixListener) -> UvUnixAcceptor {
 
 impl RtioUnixAcceptor for UvUnixAcceptor {
     fn accept(&mut self) -> Result<~RtioPipe, IoError> {
-        do self.home_for_io |self_| {
-            self_.incoming.recv()
-        }
+        let _m = self.fire_homing_missile();
+        self.incoming.recv()
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1)
-        }
+        let _m = self.fire_homing_missile();
+        accept_simultaneously(self.listener.inner.pipe.as_stream(), 1)
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
-            accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0)
-        }
+        let _m = self.fire_homing_missile();
+        accept_simultaneously(self.listener.inner.pipe.as_stream(), 0)
     }
 }
 
@@ -1873,14 +1815,13 @@ impl RtioSignal for UvSignal {}
 
 impl Drop for UvSignal {
     fn drop(&mut self) {
-        do self.home_for_io_with_sched |self_, scheduler| {
-            uvdebug!("closing UvSignal");
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.watcher.close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        let (_m, scheduler) = self.fire_homing_missile_sched();
+        uvdebug!("closing UvSignal");
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do self.watcher.close {
+                let scheduler: ~Scheduler = Local::take();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }