]> git.lizzy.rs Git - rust.git/commitdiff
Added home_for_io_with_sched variant. Temporarily making IO unkillable.
authorEric Reed <ecreed@cs.washington.edu>
Tue, 20 Aug 2013 20:20:50 +0000 (13:20 -0700)
committerEric Reed <ecreed@cs.washington.edu>
Tue, 20 Aug 2013 20:27:33 +0000 (13:27 -0700)
src/libstd/rt/uv/uvio.rs

index f794c0a2bec5ff84e955972824add2b279a3bd44..6e79a78e061516360c582dacec753b962599de0f 100644 (file)
@@ -35,6 +35,7 @@
           S_IRUSR, S_IWUSR};
 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
             CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
+use task;
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use unstable::run_in_bare_thread;
@@ -55,30 +56,68 @@ fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
         // go home
         let old_home = Cell::new_empty();
         let old_home_ptr = &old_home;
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            // get the old home first
-            do task.wake().map_move |mut task| {
-                old_home_ptr.put_back(task.take_unwrap_home());
-                self.home().send(PinnedTask(task));
-            };
+        do task::unkillable { // FIXME(#8674)
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                // get the old home first
+                do task.wake().map_move |mut task| {
+                    old_home_ptr.put_back(task.take_unwrap_home());
+                    self.home().send(PinnedTask(task));
+                };
+            }
         }
 
         // do IO
         let a = io(self);
 
         // unhome home
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |scheduler, task| {
-            do task.wake().map_move |mut task| {
-                task.give_home(old_home.take());
-                scheduler.make_handle().send(TaskFromFriend(task));
-            };
+        do task::unkillable { // FIXME(#8674)
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |scheduler, task| {
+                do task.wake().map_move |mut task| {
+                    task.give_home(old_home.take());
+                    scheduler.make_handle().send(TaskFromFriend(task));
+                };
+            }
         }
 
         // return the result of the IO
         a
     }
+
+    fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
+        use rt::sched::{PinnedTask, TaskFromFriend};
+
+        do task::unkillable { // FIXME(#8674)
+            // go home
+            let old_home = Cell::new_empty();
+            let old_home_ptr = &old_home;
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                // get the old home first
+                do task.wake().map_move |mut task| {
+                    old_home_ptr.put_back(task.take_unwrap_home());
+                    self.home().send(PinnedTask(task));
+                };
+            }
+
+            // do IO
+            let scheduler = Local::take::<Scheduler>();
+            let a = io_sched(self, scheduler);
+
+            // unhome home
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |scheduler, task| {
+                do task.wake().map_move |mut task| {
+                    task.give_home(old_home.take());
+                    scheduler.make_handle().send(TaskFromFriend(task));
+                };
+            }
+
+            // return the result of the IO
+            a
+        }
+    }
 }
 
 // get a handle for the current scheduler
@@ -376,35 +415,37 @@ fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoEr
         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
 
         // Block this task and take ownership, switch to scheduler context
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-
-            let mut tcp = TcpWatcher::new(self.uv_loop());
-            let task_cell = Cell::new(task);
+        do task::unkillable { // FIXME(#8674)
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
 
-            // Wait for a connection
-            do tcp.connect(addr) |stream, status| {
-                match status {
-                    None => {
-                        let tcp = NativeHandle::from_native_handle(stream.native_handle());
-                        let home = get_handle_to_current_scheduler!();
-                        let res = Ok(~UvTcpStream { watcher: tcp, home: home });
+                let mut tcp = TcpWatcher::new(self.uv_loop());
+                let task_cell = Cell::new(task);
 
-                        // Store the stream in the task's stack
-                        unsafe { (*result_cell_ptr).put_back(res); }
+                // Wait for a connection
+                do tcp.connect(addr) |stream, status| {
+                    match status {
+                        None => {
+                            let tcp = NativeHandle::from_native_handle(stream.native_handle());
+                            let home = get_handle_to_current_scheduler!();
+                            let res = Ok(~UvTcpStream { watcher: tcp, home: home });
 
-                        // Context switch
-                        let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_blocked_task_immediately(task_cell.take());
-                    }
-                    Some(_) => {
-                        let task_cell = Cell::new(task_cell.take());
-                        do stream.close {
-                            let res = Err(uv_error_to_io_error(status.unwrap()));
+                            // Store the stream in the task's stack
                             unsafe { (*result_cell_ptr).put_back(res); }
+
+                            // Context switch
                             let scheduler = Local::take::<Scheduler>();
                             scheduler.resume_blocked_task_immediately(task_cell.take());
                         }
+                        Some(_) => {
+                            let task_cell = Cell::new(task_cell.take());
+                            do stream.close {
+                                let res = Err(uv_error_to_io_error(status.unwrap()));
+                                unsafe { (*result_cell_ptr).put_back(res); }
+                                let scheduler = Local::take::<Scheduler>();
+                                scheduler.resume_blocked_task_immediately(task_cell.take());
+                            }
+                        }
                     }
                 }
             }
@@ -422,15 +463,17 @@ fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoErr
                 Ok(~UvTcpListener::new(watcher, home))
             }
             Err(uverr) => {
-                let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    let task_cell = Cell::new(task);
-                    do watcher.as_stream().close {
-                        let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                do task::unkillable { // FIXME(#8674)
+                    let scheduler = Local::take::<Scheduler>();
+                    do scheduler.deschedule_running_task_and_then |_, task| {
+                        let task_cell = Cell::new(task);
+                        do watcher.as_stream().close {
+                            let scheduler = Local::take::<Scheduler>();
+                            scheduler.resume_blocked_task_immediately(task_cell.take());
+                        }
                     }
+                    Err(uv_error_to_io_error(uverr))
                 }
-                Err(uv_error_to_io_error(uverr))
             }
         }
     }
@@ -443,15 +486,17 @@ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError
                 Ok(~UvUdpSocket { watcher: watcher, home: home })
             }
             Err(uverr) => {
-                let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    let task_cell = Cell::new(task);
-                    do watcher.close {
-                        let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                do task::unkillable { // FIXME(#8674)
+                    let scheduler = Local::take::<Scheduler>();
+                    do scheduler.deschedule_running_task_and_then |_, task| {
+                        let task_cell = Cell::new(task);
+                        do watcher.close {
+                            let scheduler = Local::take::<Scheduler>();
+                            scheduler.resume_blocked_task_immediately(task_cell.take());
+                        }
                     }
+                    Err(uv_error_to_io_error(uverr))
                 }
-                Err(uv_error_to_io_error(uverr))
             }
         }
     }
@@ -493,30 +538,32 @@ fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
         let result_cell_ptr: *Cell<Result<~RtioFileStream,
                                            IoError>> = &result_cell;
         let path_cell = Cell::new(path);
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let path = path_cell.take();
-            do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
-                  |req,err| {
-                if err.is_none() {
-                    let loop_ = Loop {handle: req.get_loop().native_handle()};
-                    let home = get_handle_to_current_scheduler!();
-                    let fd = file::FileDescriptor(req.get_result());
-                    let fs = ~UvFileStream::new(
-                        loop_, fd, true, home) as ~RtioFileStream;
-                    let res = Ok(fs);
-                    unsafe { (*result_cell_ptr).put_back(res); }
-                    let scheduler = Local::take::<Scheduler>();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                } else {
-                    let res = Err(uv_error_to_io_error(err.unwrap()));
-                    unsafe { (*result_cell_ptr).put_back(res); }
-                    let scheduler = Local::take::<Scheduler>();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
+        do task::unkillable { // FIXME(#8674)
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let path = path_cell.take();
+                do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
+                      |req,err| {
+                    if err.is_none() {
+                        let loop_ = Loop {handle: req.get_loop().native_handle()};
+                        let home = get_handle_to_current_scheduler!();
+                        let fd = file::FileDescriptor(req.get_result());
+                        let fs = ~UvFileStream::new(
+                            loop_, fd, true, home) as ~RtioFileStream;
+                        let res = Ok(fs);
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                    } else {
+                        let res = Err(uv_error_to_io_error(err.unwrap()));
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                    }
+                };
             };
-        };
+        }
         assert!(!result_cell.is_empty());
         return result_cell.take();
     }
@@ -525,20 +572,22 @@ fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
         let path_cell = Cell::new(path);
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let path = path_cell.take();
-            do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
-                let res = match err {
-                    None => Ok(()),
-                    Some(err) => Err(uv_error_to_io_error(err))
+        do task::unkillable { // FIXME(#8674)
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let path = path_cell.take();
+                do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
+                    let res = match err {
+                        None => Ok(()),
+                        Some(err) => Err(uv_error_to_io_error(err))
+                    };
+                    unsafe { (*result_cell_ptr).put_back(res); }
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
                 };
-                unsafe { (*result_cell_ptr).put_back(res); }
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
             };
-        };
+        }
         assert!(!result_cell.is_empty());
         return result_cell.take();
     }
@@ -572,8 +621,7 @@ impl Drop for UvTcpListener {
     fn drop(&self) {
         // XXX need mutable finalizer
         let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
-        do self_.home_for_io |self_| {
-            let scheduler = Local::take::<Scheduler>();
+        do self_.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
                 do self_.watcher().as_stream().close {
@@ -665,8 +713,7 @@ impl Drop for UvTcpStream {
     fn drop(&self) {
         // XXX need mutable finalizer
         let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
-        do this.home_for_io |self_| {
-            let scheduler = Local::take::<Scheduler>();
+        do this.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
                 do self_.watcher.as_stream().close {
@@ -688,11 +735,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 
 impl RtioTcpStream for UvTcpStream {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        do self.home_for_io |self_| {
+        do self.home_for_io_with_sched |self_, scheduler| {
             let result_cell = Cell::new_empty();
             let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
 
-            let scheduler = Local::take::<Scheduler>();
             let buf_ptr: *&mut [u8] = &buf;
             do scheduler.deschedule_running_task_and_then |_sched, task| {
                 let task_cell = Cell::new(task);
@@ -730,10 +776,9 @@ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
+        do self.home_for_io_with_sched |self_, scheduler| {
             let result_cell = Cell::new_empty();
             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-            let scheduler = Local::take::<Scheduler>();
             let buf_ptr: *&[u8] = &buf;
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
@@ -827,11 +872,10 @@ impl Drop for UvUdpSocket {
     fn drop(&self) {
         // XXX need mutable finalizer
         let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
-        do this.home_for_io |_| {
-            let scheduler = Local::take::<Scheduler>();
+        do this.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
-                do this.watcher.close {
+                do self_.watcher.close {
                     let scheduler = Local::take::<Scheduler>();
                     scheduler.resume_blocked_task_immediately(task_cell.take());
                 }
@@ -850,11 +894,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 
 impl RtioUdpSocket for UvUdpSocket {
     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
-        do self.home_for_io |self_| {
+        do self.home_for_io_with_sched |self_, scheduler| {
             let result_cell = Cell::new_empty();
             let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
 
-            let scheduler = Local::take::<Scheduler>();
             let buf_ptr: *&mut [u8] = &buf;
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
@@ -885,10 +928,9 @@ fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
     }
 
     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
-        do self.home_for_io |self_| {
+        do self.home_for_io_with_sched |self_, scheduler| {
             let result_cell = Cell::new_empty();
             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-            let scheduler = Local::take::<Scheduler>();
             let buf_ptr: *&[u8] = &buf;
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
@@ -1047,9 +1089,8 @@ fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
 impl Drop for UvTimer {
     fn drop(&self) {
         let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
-        do self_.home_for_io |self_| {
+        do self_.home_for_io_with_sched |self_, scheduler| {
             rtdebug!("closing UvTimer");
-            let scheduler = Local::take::<Scheduler>();
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
                 do self_.watcher.close {
@@ -1063,8 +1104,7 @@ fn drop(&self) {
 
 impl RtioTimer for UvTimer {
     fn sleep(&mut self, msecs: u64) {
-        do self.home_for_io |self_| {
-            let scheduler = Local::take::<Scheduler>();
+        do self.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_sched, task| {
                 rtdebug!("sleep: entered scheduler context");
                 let task_cell = Cell::new(task);
@@ -1104,8 +1144,7 @@ fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
         let buf_ptr: *&mut [u8] = &buf;
-        do self.home_for_io |self_| {
-            let scheduler = Local::take::<Scheduler>();
+        do self.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
                 let task_cell = Cell::new(task);
@@ -1126,8 +1165,7 @@ fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
         let buf_ptr: *&[u8] = &buf;
-        do self.home_for_io |self_| {
-            let scheduler = Local::take::<Scheduler>();
+        do self.home_for_io_with_sched |self_, scheduler| {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
                 let task_cell = Cell::new(task);
@@ -1166,8 +1204,7 @@ impl Drop for UvFileStream {
     fn drop(&self) {
         let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
         if self.close_on_drop {
-            do self_.home_for_io |self_| {
-                let scheduler = Local::take::<Scheduler>();
+            do self_.home_for_io_with_sched |self_, scheduler| {
                 do scheduler.deschedule_running_task_and_then |_, task| {
                     let task_cell = Cell::new(task);
                     do self_.fd.close(&self.loop_) |_,_| {
@@ -1273,14 +1310,16 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
             assert!(maybe_socket.is_ok());
 
             // block self on sched1
-            let scheduler = Local::take::<Scheduler>();
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                // unblock task
-                do task.wake().map_move |task| {
-                  // send self to sched2
-                  tasksFriendHandle.take().send(TaskFromFriend(task));
-                };
-                // sched1 should now sleep since it has nothing else to do
+            do task::unkillable { // FIXME(#8674)
+                let scheduler = Local::take::<Scheduler>();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    // unblock task
+                    do task.wake().map_move |task| {
+                      // send self to sched2
+                      tasksFriendHandle.take().send(TaskFromFriend(task));
+                    };
+                    // sched1 should now sleep since it has nothing else to do
+                }
             }
             // sched2 will wake up and get the task
             // as we do nothing else, the function ends and the socket goes out of scope
@@ -1548,13 +1587,15 @@ fn test_read_and_block() {
                 }
                 reads += 1;
 
-                let scheduler = Local::take::<Scheduler>();
-                // Yield to the other task in hopes that it
-                // will trigger a read callback while we are
-                // not ready for it
-                do scheduler.deschedule_running_task_and_then |sched, task| {
-                    let task = Cell::new(task);
-                    sched.enqueue_blocked_task(task.take());
+                do task::unkillable { // FIXME(#8674)
+                    let scheduler = Local::take::<Scheduler>();
+                    // Yield to the other task in hopes that it
+                    // will trigger a read callback while we are
+                    // not ready for it
+                    do scheduler.deschedule_running_task_and_then |sched, task| {
+                        let task = Cell::new(task);
+                        sched.enqueue_blocked_task(task.take());
+                    }
                 }
             }