S_IRUSR, S_IWUSR};
use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
+use task;
#[cfg(test)] use container::Container;
#[cfg(test)] use unstable::run_in_bare_thread;
// go home
let old_home = Cell::new_empty();
let old_home_ptr = &old_home;
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- // get the old home first
- do task.wake().map_move |mut task| {
- old_home_ptr.put_back(task.take_unwrap_home());
- self.home().send(PinnedTask(task));
- };
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ // get the old home first
+ do task.wake().map_move |mut task| {
+ old_home_ptr.put_back(task.take_unwrap_home());
+ self.home().send(PinnedTask(task));
+ };
+ }
}
// do IO
let a = io(self);
// unhome home
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |scheduler, task| {
- do task.wake().map_move |mut task| {
- task.give_home(old_home.take());
- scheduler.make_handle().send(TaskFromFriend(task));
- };
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |scheduler, task| {
+ do task.wake().map_move |mut task| {
+ task.give_home(old_home.take());
+ scheduler.make_handle().send(TaskFromFriend(task));
+ };
+ }
}
// return the result of the IO
a
}
+
+ fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
+ use rt::sched::{PinnedTask, TaskFromFriend};
+
+ do task::unkillable { // FIXME(#8674)
+ // go home
+ let old_home = Cell::new_empty();
+ let old_home_ptr = &old_home;
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ // get the old home first
+ do task.wake().map_move |mut task| {
+ old_home_ptr.put_back(task.take_unwrap_home());
+ self.home().send(PinnedTask(task));
+ };
+ }
+
+ // do IO
+ let scheduler = Local::take::<Scheduler>();
+ let a = io_sched(self, scheduler);
+
+ // unhome home
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |scheduler, task| {
+ do task.wake().map_move |mut task| {
+ task.give_home(old_home.take());
+ scheduler.make_handle().send(TaskFromFriend(task));
+ };
+ }
+
+ // return the result of the IO
+ a
+ }
+ }
}
// get a handle for the current scheduler
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
// Block this task and take ownership, switch to scheduler context
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
-
- let mut tcp = TcpWatcher::new(self.uv_loop());
- let task_cell = Cell::new(task);
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
- // Wait for a connection
- do tcp.connect(addr) |stream, status| {
- match status {
- None => {
- let tcp = NativeHandle::from_native_handle(stream.native_handle());
- let home = get_handle_to_current_scheduler!();
- let res = Ok(~UvTcpStream { watcher: tcp, home: home });
+ let mut tcp = TcpWatcher::new(self.uv_loop());
+ let task_cell = Cell::new(task);
- // Store the stream in the task's stack
- unsafe { (*result_cell_ptr).put_back(res); }
+ // Wait for a connection
+ do tcp.connect(addr) |stream, status| {
+ match status {
+ None => {
+ let tcp = NativeHandle::from_native_handle(stream.native_handle());
+ let home = get_handle_to_current_scheduler!();
+ let res = Ok(~UvTcpStream { watcher: tcp, home: home });
- // Context switch
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- Some(_) => {
- let task_cell = Cell::new(task_cell.take());
- do stream.close {
- let res = Err(uv_error_to_io_error(status.unwrap()));
+ // Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
+
+ // Context switch
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
+ Some(_) => {
+ let task_cell = Cell::new(task_cell.take());
+ do stream.close {
+ let res = Err(uv_error_to_io_error(status.unwrap()));
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
+ }
}
}
}
Ok(~UvTcpListener::new(watcher, home))
}
Err(uverr) => {
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do watcher.as_stream().close {
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do watcher.as_stream().close {
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
}
+ Err(uv_error_to_io_error(uverr))
}
- Err(uv_error_to_io_error(uverr))
}
}
}
Ok(~UvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do watcher.close {
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ do watcher.close {
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
}
+ Err(uv_error_to_io_error(uverr))
}
- Err(uv_error_to_io_error(uverr))
}
}
}
let result_cell_ptr: *Cell<Result<~RtioFileStream,
IoError>> = &result_cell;
let path_cell = Cell::new(path);
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let path = path_cell.take();
- do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
- |req,err| {
- if err.is_none() {
- let loop_ = Loop {handle: req.get_loop().native_handle()};
- let home = get_handle_to_current_scheduler!();
- let fd = file::FileDescriptor(req.get_result());
- let fs = ~UvFileStream::new(
- loop_, fd, true, home) as ~RtioFileStream;
- let res = Ok(fs);
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- } else {
- let res = Err(uv_error_to_io_error(err.unwrap()));
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let path = path_cell.take();
+ do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
+ |req,err| {
+ if err.is_none() {
+ let loop_ = Loop {handle: req.get_loop().native_handle()};
+ let home = get_handle_to_current_scheduler!();
+ let fd = file::FileDescriptor(req.get_result());
+ let fs = ~UvFileStream::new(
+ loop_, fd, true, home) as ~RtioFileStream;
+ let res = Ok(fs);
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ } else {
+ let res = Err(uv_error_to_io_error(err.unwrap()));
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
+ }
+ };
};
- };
+ }
assert!(!result_cell.is_empty());
return result_cell.take();
}
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let path_cell = Cell::new(path);
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let path = path_cell.take();
- do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
- let res = match err {
- None => Ok(()),
- Some(err) => Err(uv_error_to_io_error(err))
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ let task_cell = Cell::new(task);
+ let path = path_cell.take();
+ do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
+ let res = match err {
+ None => Ok(()),
+ Some(err) => Err(uv_error_to_io_error(err))
+ };
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler = Local::take::<Scheduler>();
+ scheduler.resume_blocked_task_immediately(task_cell.take());
};
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler = Local::take::<Scheduler>();
- scheduler.resume_blocked_task_immediately(task_cell.take());
};
- };
+ }
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn drop(&self) {
// XXX need mutable finalizer
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
- do self_.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher().as_stream().close {
fn drop(&self) {
// XXX need mutable finalizer
let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
- do this.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do this.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.as_stream().close {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- do self.home_for_io |self_| {
+ do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
- let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_sched, task| {
let task_cell = Cell::new(task);
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- do self.home_for_io |self_| {
+ do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
fn drop(&self) {
// XXX need mutable finalizer
let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
- do this.home_for_io |_| {
- let scheduler = Local::take::<Scheduler>();
+ do this.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
- do this.watcher.close {
+ do self_.watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
- do self.home_for_io |self_| {
+ do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
- let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
- do self.home_for_io |self_| {
+ do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
impl Drop for UvTimer {
fn drop(&self) {
let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
- do self_.home_for_io |self_| {
+ do self_.home_for_io_with_sched |self_, scheduler| {
rtdebug!("closing UvTimer");
- let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.close {
impl RtioTimer for UvTimer {
fn sleep(&mut self, msecs: u64) {
- do self.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("sleep: entered scheduler context");
let task_cell = Cell::new(task);
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
- do self.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
- do self.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
fn drop(&self) {
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
if self.close_on_drop {
- do self_.home_for_io |self_| {
- let scheduler = Local::take::<Scheduler>();
+ do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.fd.close(&self.loop_) |_,_| {
assert!(maybe_socket.is_ok());
// block self on sched1
- let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |_, task| {
- // unblock task
- do task.wake().map_move |task| {
- // send self to sched2
- tasksFriendHandle.take().send(TaskFromFriend(task));
- };
- // sched1 should now sleep since it has nothing else to do
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ // unblock task
+ do task.wake().map_move |task| {
+ // send self to sched2
+ tasksFriendHandle.take().send(TaskFromFriend(task));
+ };
+ // sched1 should now sleep since it has nothing else to do
+ }
}
// sched2 will wake up and get the task
// as we do nothing else, the function ends and the socket goes out of scope
}
reads += 1;
- let scheduler = Local::take::<Scheduler>();
- // Yield to the other task in hopes that it
- // will trigger a read callback while we are
- // not ready for it
- do scheduler.deschedule_running_task_and_then |sched, task| {
- let task = Cell::new(task);
- sched.enqueue_blocked_task(task.take());
+ do task::unkillable { // FIXME(#8674)
+ let scheduler = Local::take::<Scheduler>();
+ // Yield to the other task in hopes that it
+ // will trigger a read callback while we are
+ // not ready for it
+ do scheduler.deschedule_running_task_and_then |sched, task| {
+ let task = Cell::new(task);
+ sched.enqueue_blocked_task(task.take());
+ }
}
}