]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
c771f93cef5e0de13b9b80e62b47fad4411662c2
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void, pid_t};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use str;
21 use result::*;
22 use rt::io::IoError;
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
25 use rt::kill::BlockedTask;
26 use rt::local::Local;
27 use rt::rtio::*;
28 use rt::sched::{Scheduler, SchedHandle};
29 use rt::tube::Tube;
30 use rt::uv::*;
31 use rt::uv::idle::IdleWatcher;
32 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
33 use unstable::sync::Exclusive;
34 use super::super::io::support::PathLike;
35 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
36           S_IRUSR, S_IWUSR};
37 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
38             CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
39 use task;
40
41 #[cfg(test)] use container::Container;
42 #[cfg(test)] use unstable::run_in_bare_thread;
43 #[cfg(test)] use rt::test::{spawntask,
44                             next_test_ip4,
45                             run_in_newsched_task};
46 #[cfg(test)] use iterator::{Iterator, range};
47
48 // XXX we should not be calling uvll functions in here.
49
50 trait HomingIO {
51     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
52     /* XXX This will move pinned tasks to do IO on the proper scheduler
53      * and then move them back to their home.
54      */
55     fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
56         use rt::sched::{PinnedTask, TaskFromFriend};
57         // go home
58         let old_home = Cell::new_empty();
59         let old_home_ptr = &old_home;
60         do task::unkillable { // FIXME(#8674)
61             let scheduler: ~Scheduler = Local::take();
62             do scheduler.deschedule_running_task_and_then |_, task| {
63                 // get the old home first
64                 do task.wake().map_move |mut task| {
65                     old_home_ptr.put_back(task.take_unwrap_home());
66                     self.home().send(PinnedTask(task));
67                 };
68             }
69         }
70
71         // do IO
72         let a = io(self);
73
74         // unhome home
75         do task::unkillable { // FIXME(#8674)
76             let scheduler: ~Scheduler = Local::take();
77             do scheduler.deschedule_running_task_and_then |scheduler, task| {
78                 do task.wake().map_move |mut task| {
79                     task.give_home(old_home.take());
80                     scheduler.make_handle().send(TaskFromFriend(task));
81                 };
82             }
83         }
84
85         // return the result of the IO
86         a
87     }
88
89     fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
90         use rt::sched::{PinnedTask, TaskFromFriend};
91
92         do task::unkillable { // FIXME(#8674)
93             // go home
94             let old_home = Cell::new_empty();
95             let old_home_ptr = &old_home;
96             let scheduler: ~Scheduler = Local::take();
97             do scheduler.deschedule_running_task_and_then |_, task| {
98                 // get the old home first
99                 do task.wake().map_move |mut task| {
100                     old_home_ptr.put_back(task.take_unwrap_home());
101                     self.home().send(PinnedTask(task));
102                 };
103             }
104
105             // do IO
106             let scheduler: ~Scheduler = Local::take();
107             let a = io_sched(self, scheduler);
108
109             // unhome home
110             let scheduler: ~Scheduler = Local::take();
111             do scheduler.deschedule_running_task_and_then |scheduler, task| {
112                 do task.wake().map_move |mut task| {
113                     task.give_home(old_home.take());
114                     scheduler.make_handle().send(TaskFromFriend(task));
115                 };
116             }
117
118             // return the result of the IO
119             a
120         }
121     }
122 }
123
124 // get a handle for the current scheduler
125 macro_rules! get_handle_to_current_scheduler(
126     () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
127 )
128
129 enum SocketNameKind {
130     TcpPeer,
131     Tcp,
132     Udp
133 }
134
135 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
136                                                  handle: U) -> Result<SocketAddr, IoError> {
137     let getsockname = match sk {
138         TcpPeer => uvll::tcp_getpeername,
139         Tcp     => uvll::tcp_getsockname,
140         Udp     => uvll::udp_getsockname,
141     };
142
143     // Allocate a sockaddr_storage
144     // since we don't know if it's ipv4 or ipv6
145     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
146
147     let r = unsafe {
148         getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
149     };
150
151     if r != 0 {
152         let status = status_to_maybe_uv_error(r);
153         return Err(uv_error_to_io_error(status.unwrap()));
154     }
155
156     let addr = unsafe {
157         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
158             net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
159         } else {
160             net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
161         }
162     };
163
164     unsafe { uvll::free_sockaddr_storage(r_addr); }
165
166     Ok(addr)
167
168 }
169
170 // Obviously an Event Loop is always home.
171 pub struct UvEventLoop {
172     uvio: UvIoFactory
173 }
174
175 impl UvEventLoop {
176     pub fn new() -> UvEventLoop {
177         UvEventLoop {
178             uvio: UvIoFactory(Loop::new())
179         }
180     }
181 }
182
183 impl Drop for UvEventLoop {
184     fn drop(&self) {
185         // XXX: Need mutable finalizer
186         let this = unsafe {
187             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
188         };
189         this.uvio.uv_loop().close();
190     }
191 }
192
193 impl EventLoop for UvEventLoop {
194     fn run(&mut self) {
195         self.uvio.uv_loop().run();
196     }
197
198     fn callback(&mut self, f: ~fn()) {
199         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
200         do idle_watcher.start |mut idle_watcher, status| {
201             assert!(status.is_none());
202             idle_watcher.stop();
203             idle_watcher.close(||());
204             f();
205         }
206     }
207
208     fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
209         let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
210         return ~UvPausibleIdleCallback {
211             watcher: idle_watcher,
212             idle_flag: false,
213             closed: false
214         };
215     }
216
217     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
218         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
219         do timer.start(ms, 0) |timer, status| {
220             assert!(status.is_none());
221             timer.close(||());
222             f();
223         }
224     }
225
226     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
227         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
228     }
229
230     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
231         Some(&mut self.uvio)
232     }
233 }
234
235 pub struct UvPausibleIdleCallback {
236     watcher: IdleWatcher,
237     idle_flag: bool,
238     closed: bool
239 }
240
241 impl UvPausibleIdleCallback {
242     #[inline]
243     pub fn start(&mut self, f: ~fn()) {
244         do self.watcher.start |_idle_watcher, _status| {
245             f();
246         };
247         self.idle_flag = true;
248     }
249     #[inline]
250     pub fn pause(&mut self) {
251         if self.idle_flag == true {
252             self.watcher.stop();
253             self.idle_flag = false;
254         }
255     }
256     #[inline]
257     pub fn resume(&mut self) {
258         if self.idle_flag == false {
259             self.watcher.restart();
260             self.idle_flag = true;
261         }
262     }
263     #[inline]
264     pub fn close(&mut self) {
265         self.pause();
266         if !self.closed {
267             self.closed = true;
268             self.watcher.close(||{});
269         }
270     }
271 }
272
273 #[test]
274 fn test_callback_run_once() {
275     do run_in_bare_thread {
276         let mut event_loop = UvEventLoop::new();
277         let mut count = 0;
278         let count_ptr: *mut int = &mut count;
279         do event_loop.callback {
280             unsafe { *count_ptr += 1 }
281         }
282         event_loop.run();
283         assert_eq!(count, 1);
284     }
285 }
286
287 // The entire point of async is to call into a loop from other threads so it does not need to home.
288 pub struct UvRemoteCallback {
289     // The uv async handle for triggering the callback
290     async: AsyncWatcher,
291     // A flag to tell the callback to exit, set from the dtor. This is
292     // almost never contested - only in rare races with the dtor.
293     exit_flag: Exclusive<bool>
294 }
295
296 impl UvRemoteCallback {
297     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
298         let exit_flag = Exclusive::new(false);
299         let exit_flag_clone = exit_flag.clone();
300         let async = do AsyncWatcher::new(loop_) |watcher, status| {
301             assert!(status.is_none());
302
303             // The synchronization logic here is subtle. To review,
304             // the uv async handle type promises that, after it is
305             // triggered the remote callback is definitely called at
306             // least once. UvRemoteCallback needs to maintain those
307             // semantics while also shutting down cleanly from the
308             // dtor. In our case that means that, when the
309             // UvRemoteCallback dtor calls `async.send()`, here `f` is
310             // always called later.
311
312             // In the dtor both the exit flag is set and the async
313             // callback fired under a lock.  Here, before calling `f`,
314             // we take the lock and check the flag. Because we are
315             // checking the flag before calling `f`, and the flag is
316             // set under the same lock as the send, then if the flag
317             // is set then we're guaranteed to call `f` after the
318             // final send.
319
320             // If the check was done after `f()` then there would be a
321             // period between that call and the check where the dtor
322             // could be called in the other thread, missing the final
323             // callback while still destroying the handle.
324
325             let should_exit = unsafe {
326                 exit_flag_clone.with_imm(|&should_exit| should_exit)
327             };
328
329             f();
330
331             if should_exit {
332                 watcher.close(||());
333             }
334
335         };
336         UvRemoteCallback {
337             async: async,
338             exit_flag: exit_flag
339         }
340     }
341 }
342
343 impl RemoteCallback for UvRemoteCallback {
344     fn fire(&mut self) { self.async.send() }
345 }
346
347 impl Drop for UvRemoteCallback {
348     fn drop(&self) {
349         unsafe {
350             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
351             do this.exit_flag.with |should_exit| {
352                 // NB: These two things need to happen atomically. Otherwise
353                 // the event handler could wake up due to a *previous*
354                 // signal and see the exit flag, destroying the handle
355                 // before the final send.
356                 *should_exit = true;
357                 this.async.send();
358             }
359         }
360     }
361 }
362
363 #[cfg(test)]
364 mod test_remote {
365     use cell::Cell;
366     use rt::test::*;
367     use rt::thread::Thread;
368     use rt::tube::Tube;
369     use rt::rtio::EventLoop;
370     use rt::local::Local;
371     use rt::sched::Scheduler;
372
373     #[test]
374     fn test_uv_remote() {
375         do run_in_newsched_task {
376             let mut tube = Tube::new();
377             let tube_clone = tube.clone();
378             let remote_cell = Cell::new_empty();
379             do Local::borrow |sched: &mut Scheduler| {
380                 let tube_clone = tube_clone.clone();
381                 let tube_clone_cell = Cell::new(tube_clone);
382                 let remote = do sched.event_loop.remote_callback {
383                     // This could be called multiple times
384                     if !tube_clone_cell.is_empty() {
385                         tube_clone_cell.take().send(1);
386                     }
387                 };
388                 remote_cell.put_back(remote);
389             }
390             let thread = do Thread::start {
391                 remote_cell.take().fire();
392             };
393
394             assert!(tube.recv() == 1);
395             thread.join();
396         }
397     }
398 }
399
400 pub struct UvIoFactory(Loop);
401
402 impl UvIoFactory {
403     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
404         match self { &UvIoFactory(ref mut ptr) => ptr }
405     }
406 }
407
408 impl IoFactory for UvIoFactory {
409     // Connect to an address and return a new stream
410     // NB: This blocks the task waiting on the connection.
411     // It would probably be better to return a future
412     fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
413         // Create a cell in the task to hold the result. We will fill
414         // the cell before resuming the task.
415         let result_cell = Cell::new_empty();
416         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
417
418         // Block this task and take ownership, switch to scheduler context
419         do task::unkillable { // FIXME(#8674)
420             let scheduler: ~Scheduler = Local::take();
421             do scheduler.deschedule_running_task_and_then |_, task| {
422
423                 let mut tcp = TcpWatcher::new(self.uv_loop());
424                 let task_cell = Cell::new(task);
425
426                 // Wait for a connection
427                 do tcp.connect(addr) |stream, status| {
428                     match status {
429                         None => {
430                             let tcp = NativeHandle::from_native_handle(stream.native_handle());
431                             let home = get_handle_to_current_scheduler!();
432                             let res = Ok(~UvTcpStream { watcher: tcp, home: home });
433
434                             // Store the stream in the task's stack
435                             unsafe { (*result_cell_ptr).put_back(res); }
436
437                             // Context switch
438                             let scheduler: ~Scheduler = Local::take();
439                             scheduler.resume_blocked_task_immediately(task_cell.take());
440                         }
441                         Some(_) => {
442                             let task_cell = Cell::new(task_cell.take());
443                             do stream.close {
444                                 let res = Err(uv_error_to_io_error(status.unwrap()));
445                                 unsafe { (*result_cell_ptr).put_back(res); }
446                                 let scheduler: ~Scheduler = Local::take();
447                                 scheduler.resume_blocked_task_immediately(task_cell.take());
448                             }
449                         }
450                     }
451                 }
452             }
453         }
454
455         assert!(!result_cell.is_empty());
456         return result_cell.take();
457     }
458
459     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
460         let mut watcher = TcpWatcher::new(self.uv_loop());
461         match watcher.bind(addr) {
462             Ok(_) => {
463                 let home = get_handle_to_current_scheduler!();
464                 Ok(~UvTcpListener::new(watcher, home))
465             }
466             Err(uverr) => {
467                 do task::unkillable { // FIXME(#8674)
468                     let scheduler: ~Scheduler = Local::take();
469                     do scheduler.deschedule_running_task_and_then |_, task| {
470                         let task_cell = Cell::new(task);
471                         do watcher.as_stream().close {
472                             let scheduler: ~Scheduler = Local::take();
473                             scheduler.resume_blocked_task_immediately(task_cell.take());
474                         }
475                     }
476                     Err(uv_error_to_io_error(uverr))
477                 }
478             }
479         }
480     }
481
482     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
483         let mut watcher = UdpWatcher::new(self.uv_loop());
484         match watcher.bind(addr) {
485             Ok(_) => {
486                 let home = get_handle_to_current_scheduler!();
487                 Ok(~UvUdpSocket { watcher: watcher, home: home })
488             }
489             Err(uverr) => {
490                 do task::unkillable { // FIXME(#8674)
491                     let scheduler: ~Scheduler = Local::take();
492                     do scheduler.deschedule_running_task_and_then |_, task| {
493                         let task_cell = Cell::new(task);
494                         do watcher.close {
495                             let scheduler: ~Scheduler = Local::take();
496                             scheduler.resume_blocked_task_immediately(task_cell.take());
497                         }
498                     }
499                     Err(uv_error_to_io_error(uverr))
500                 }
501             }
502         }
503     }
504
505     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
506         let watcher = TimerWatcher::new(self.uv_loop());
507         let home = get_handle_to_current_scheduler!();
508         Ok(~UvTimer::new(watcher, home))
509     }
510
511     fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
512         let loop_ = Loop {handle: self.uv_loop().native_handle()};
513         let fd = file::FileDescriptor(fd);
514         let home = get_handle_to_current_scheduler!();
515         ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
516     }
517
518     fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
519         -> Result<~RtioFileStream, IoError> {
520         let mut flags = match fm {
521             Open => 0,
522             Create => O_CREAT,
523             OpenOrCreate => O_CREAT,
524             Append => O_APPEND,
525             Truncate => O_TRUNC,
526             CreateOrTruncate => O_TRUNC | O_CREAT
527         };
528         flags = match fa {
529             Read => flags | O_RDONLY,
530             Write => flags | O_WRONLY,
531             ReadWrite => flags | O_RDWR
532         };
533         let create_mode = match fm {
534             Create|OpenOrCreate|CreateOrTruncate =>
535                 S_IRUSR | S_IWUSR,
536             _ => 0
537         };
538         let result_cell = Cell::new_empty();
539         let result_cell_ptr: *Cell<Result<~RtioFileStream,
540                                            IoError>> = &result_cell;
541         let path_cell = Cell::new(path);
542         do task::unkillable { // FIXME(#8674)
543             let scheduler: ~Scheduler = Local::take();
544             do scheduler.deschedule_running_task_and_then |_, task| {
545                 let task_cell = Cell::new(task);
546                 let path = path_cell.take();
547                 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
548                       |req,err| {
549                     if err.is_none() {
550                         let loop_ = Loop {handle: req.get_loop().native_handle()};
551                         let home = get_handle_to_current_scheduler!();
552                         let fd = file::FileDescriptor(req.get_result());
553                         let fs = ~UvFileStream::new(
554                             loop_, fd, true, home) as ~RtioFileStream;
555                         let res = Ok(fs);
556                         unsafe { (*result_cell_ptr).put_back(res); }
557                         let scheduler: ~Scheduler = Local::take();
558                         scheduler.resume_blocked_task_immediately(task_cell.take());
559                     } else {
560                         let res = Err(uv_error_to_io_error(err.unwrap()));
561                         unsafe { (*result_cell_ptr).put_back(res); }
562                         let scheduler: ~Scheduler = Local::take();
563                         scheduler.resume_blocked_task_immediately(task_cell.take());
564                     }
565                 };
566             };
567         }
568         assert!(!result_cell.is_empty());
569         return result_cell.take();
570     }
571
572     fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
573         let result_cell = Cell::new_empty();
574         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
575         let path_cell = Cell::new(path);
576         do task::unkillable { // FIXME(#8674)
577             let scheduler: ~Scheduler = Local::take();
578             do scheduler.deschedule_running_task_and_then |_, task| {
579                 let task_cell = Cell::new(task);
580                 let path = path_cell.take();
581                 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
582                     let res = match err {
583                         None => Ok(()),
584                         Some(err) => Err(uv_error_to_io_error(err))
585                     };
586                     unsafe { (*result_cell_ptr).put_back(res); }
587                     let scheduler: ~Scheduler = Local::take();
588                     scheduler.resume_blocked_task_immediately(task_cell.take());
589                 };
590             };
591         }
592         assert!(!result_cell.is_empty());
593         return result_cell.take();
594     }
595
596     fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> {
597         let home = get_handle_to_current_scheduler!();
598         Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
599     }
600
601     fn spawn(&mut self,
602              config: &process::Config) -> Result<~RtioProcessObject, IoError> {
603         // Sadly, we must create the UvProcess before we actually call uv_spawn
604         // so that the exit_cb can close over it and notify it when the process
605         // has exited.
606         let mut ret = ~UvProcess {
607             process: Process::new(),
608             home: None,
609             exit_status: None,
610             term_signal: None,
611             exit_error: None,
612             descheduled: None,
613         };
614         let ret_ptr = unsafe {
615             *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
616         };
617
618         // The purpose of this exit callback is to record the data about the
619         // exit and then wake up the task which may be waiting for the process
620         // to exit. This is all performed in the current io-loop, and the
621         // implementation of UvProcess ensures that reading these fields always
622         // occurs on the current io-loop.
623         let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
624             unsafe {
625                 assert!((*ret_ptr).exit_status.is_none());
626                 (*ret_ptr).exit_status = Some(exit_status);
627                 (*ret_ptr).term_signal = Some(term_signal);
628                 (*ret_ptr).exit_error = error;
629                 match (*ret_ptr).descheduled.take() {
630                     Some(task) => {
631                         let scheduler: ~Scheduler = Local::take();
632                         scheduler.resume_blocked_task_immediately(task);
633                     }
634                     None => {}
635                 }
636             }
637         };
638
639         match ret.process.spawn(self.uv_loop(), config, exit_cb) {
640             Ok(()) => {
641                 // Only now do we actually get a handle to this scheduler.
642                 ret.home = Some(get_handle_to_current_scheduler!());
643                 Ok(ret)
644             }
645             Err(uverr) => {
646                 // We still need to close the process handle we created, but
647                 // that's taken care for us in the destructor of UvProcess
648                 Err(uv_error_to_io_error(uverr))
649             }
650         }
651     }
652 }
653
654 pub struct UvTcpListener {
655     watcher: TcpWatcher,
656     listening: bool,
657     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
658     home: SchedHandle,
659 }
660
661 impl HomingIO for UvTcpListener {
662     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
663 }
664
665 impl UvTcpListener {
666     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
667         UvTcpListener {
668             watcher: watcher,
669             listening: false,
670             incoming_streams: Tube::new(),
671             home: home,
672         }
673     }
674
675     fn watcher(&self) -> TcpWatcher { self.watcher }
676 }
677
678 impl Drop for UvTcpListener {
679     fn drop(&self) {
680         // XXX need mutable finalizer
681         let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
682         do self_.home_for_io_with_sched |self_, scheduler| {
683             do scheduler.deschedule_running_task_and_then |_, task| {
684                 let task_cell = Cell::new(task);
685                 do self_.watcher().as_stream().close {
686                     let scheduler: ~Scheduler = Local::take();
687                     scheduler.resume_blocked_task_immediately(task_cell.take());
688                 }
689             }
690         }
691     }
692 }
693
694 impl RtioSocket for UvTcpListener {
695     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
696         do self.home_for_io |self_| {
697           socket_name(Tcp, self_.watcher)
698         }
699     }
700 }
701
702 impl RtioTcpListener for UvTcpListener {
703
704     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
705         do self.home_for_io |self_| {
706
707             if !self_.listening {
708                 self_.listening = true;
709
710                 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
711
712                 do self_.watcher().listen |mut server, status| {
713                     let stream = match status {
714                         Some(_) => Err(standard_error(OtherIoError)),
715                         None => {
716                             let client = TcpWatcher::new(&server.event_loop());
717                             // XXX: needs to be surfaced in interface
718                             server.accept(client.as_stream());
719                             let home = get_handle_to_current_scheduler!();
720                             Ok(~UvTcpStream { watcher: client, home: home })
721                         }
722                     };
723
724                     let mut incoming_streams = incoming_streams_cell.take();
725                     incoming_streams.send(stream);
726                     incoming_streams_cell.put_back(incoming_streams);
727                 }
728
729             }
730             self_.incoming_streams.recv()
731         }
732     }
733
734     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
735         do self.home_for_io |self_| {
736             let r = unsafe {
737                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
738             };
739
740             match status_to_maybe_uv_error(r) {
741                 Some(err) => Err(uv_error_to_io_error(err)),
742                 None => Ok(())
743             }
744         }
745     }
746
747     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
748         do self.home_for_io |self_| {
749             let r = unsafe {
750                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
751             };
752
753             match status_to_maybe_uv_error(r) {
754                 Some(err) => Err(uv_error_to_io_error(err)),
755                 None => Ok(())
756             }
757         }
758     }
759 }
760
761 trait UvStream: HomingIO {
762     fn as_stream(&mut self) -> StreamWatcher;
763 }
764
765 // FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but
766 //              that has conflicts with other traits that also have methods
767 //              called `read` and `write`
768 macro_rules! rtiostream(($t:ident) => {
769 impl RtioStream for $t {
770     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
771         do self.home_for_io_with_sched |self_, scheduler| {
772             let result_cell = Cell::new_empty();
773             let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
774
775             let buf_ptr: *&mut [u8] = &buf;
776             do scheduler.deschedule_running_task_and_then |_sched, task| {
777                 let task_cell = Cell::new(task);
778                 // XXX: We shouldn't reallocate these callbacks every
779                 // call to read
780                 let alloc: AllocCallback = |_| unsafe {
781                     slice_to_uv_buf(*buf_ptr)
782                 };
783                 let mut watcher = self_.as_stream();
784                 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
785
786                     // Stop reading so that no read callbacks are
787                     // triggered before the user calls `read` again.
788                     // XXX: Is there a performance impact to calling
789                     // stop here?
790                     watcher.read_stop();
791
792                     let result = if status.is_none() {
793                         assert!(nread >= 0);
794                         Ok(nread as uint)
795                     } else {
796                         Err(uv_error_to_io_error(status.unwrap()))
797                     };
798
799                     unsafe { (*result_cell_ptr).put_back(result); }
800
801                     let scheduler: ~Scheduler = Local::take();
802                     scheduler.resume_blocked_task_immediately(task_cell.take());
803                 }
804             }
805
806             assert!(!result_cell.is_empty());
807             result_cell.take()
808         }
809     }
810
811     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
812         do self.home_for_io_with_sched |self_, scheduler| {
813             let result_cell = Cell::new_empty();
814             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
815             let buf_ptr: *&[u8] = &buf;
816             do scheduler.deschedule_running_task_and_then |_, task| {
817                 let task_cell = Cell::new(task);
818                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
819                 let mut watcher = self_.as_stream();
820                 do watcher.write(buf) |_watcher, status| {
821                     let result = if status.is_none() {
822                         Ok(())
823                     } else {
824                         Err(uv_error_to_io_error(status.unwrap()))
825                     };
826
827                     unsafe { (*result_cell_ptr).put_back(result); }
828
829                     let scheduler: ~Scheduler = Local::take();
830                     scheduler.resume_blocked_task_immediately(task_cell.take());
831                 }
832             }
833
834             assert!(!result_cell.is_empty());
835             result_cell.take()
836         }
837     }
838 }
839 })
840
841 rtiostream!(UvPipeStream)
842 rtiostream!(UvTcpStream)
843
844 pub struct UvPipeStream {
845     pipe: Pipe,
846     home: SchedHandle,
847 }
848
849 impl UvStream for UvPipeStream {
850     fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() }
851 }
852
853 impl HomingIO for UvPipeStream {
854     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
855 }
856
857 impl Drop for UvPipeStream {
858     fn drop(&self) {
859         // FIXME(#4330): should not need a transmute
860         let this = unsafe { cast::transmute_mut(self) };
861         do this.home_for_io |self_| {
862             let scheduler: ~Scheduler = Local::take();
863             do scheduler.deschedule_running_task_and_then |_, task| {
864                 let task_cell = Cell::new(task);
865                 do self_.pipe.close {
866                     let scheduler: ~Scheduler = Local::take();
867                     scheduler.resume_blocked_task_immediately(task_cell.take());
868                 }
869             }
870         }
871     }
872 }
873
874 impl UvPipeStream {
875     pub fn uv_pipe(&self) -> Pipe { self.pipe }
876 }
877
878 pub struct UvTcpStream {
879     watcher: TcpWatcher,
880     home: SchedHandle,
881 }
882
883 impl HomingIO for UvTcpStream {
884     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
885 }
886
887 impl Drop for UvTcpStream {
888     fn drop(&self) {
889         // FIXME(#4330): should not need a transmute
890         let this = unsafe { cast::transmute_mut(self) };
891         do this.home_for_io |self_| {
892             let scheduler: ~Scheduler = Local::take();
893             do scheduler.deschedule_running_task_and_then |_, task| {
894                 let task_cell = Cell::new(task);
895                 do self_.watcher.as_stream().close {
896                     let scheduler: ~Scheduler = Local::take();
897                     scheduler.resume_blocked_task_immediately(task_cell.take());
898                 }
899             }
900         }
901     }
902 }
903
904 impl UvStream for UvTcpStream {
905     fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() }
906 }
907
908 impl RtioSocket for UvTcpStream {
909     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
910         do self.home_for_io |self_| {
911             socket_name(Tcp, self_.watcher)
912         }
913     }
914 }
915
916 impl RtioTcpStream for UvTcpStream {
917     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
918         do self.home_for_io |self_| {
919             socket_name(TcpPeer, self_.watcher)
920         }
921     }
922
923     fn control_congestion(&mut self) -> Result<(), IoError> {
924         do self.home_for_io |self_| {
925             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
926
927             match status_to_maybe_uv_error(r) {
928                 Some(err) => Err(uv_error_to_io_error(err)),
929                 None => Ok(())
930             }
931         }
932     }
933
934     fn nodelay(&mut self) -> Result<(), IoError> {
935         do self.home_for_io |self_| {
936             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
937
938             match status_to_maybe_uv_error(r) {
939                 Some(err) => Err(uv_error_to_io_error(err)),
940                 None => Ok(())
941             }
942         }
943     }
944
945     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
946         do self.home_for_io |self_| {
947             let r = unsafe {
948                 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
949                                     delay_in_seconds as c_uint)
950             };
951
952             match status_to_maybe_uv_error(r) {
953                 Some(err) => Err(uv_error_to_io_error(err)),
954                 None => Ok(())
955             }
956         }
957     }
958
959     fn letdie(&mut self) -> Result<(), IoError> {
960         do self.home_for_io |self_| {
961             let r = unsafe {
962                 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
963             };
964
965             match status_to_maybe_uv_error(r) {
966                 Some(err) => Err(uv_error_to_io_error(err)),
967                 None => Ok(())
968             }
969         }
970     }
971 }
972
973 pub struct UvUdpSocket {
974     watcher: UdpWatcher,
975     home: SchedHandle,
976 }
977
978 impl HomingIO for UvUdpSocket {
979     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
980 }
981
982 impl Drop for UvUdpSocket {
983     fn drop(&self) {
984         // XXX need mutable finalizer
985         let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
986         do this.home_for_io_with_sched |self_, scheduler| {
987             do scheduler.deschedule_running_task_and_then |_, task| {
988                 let task_cell = Cell::new(task);
989                 do self_.watcher.close {
990                     let scheduler: ~Scheduler = Local::take();
991                     scheduler.resume_blocked_task_immediately(task_cell.take());
992                 }
993             }
994         }
995     }
996 }
997
998 impl RtioSocket for UvUdpSocket {
999     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
1000         do self.home_for_io |self_| {
1001             socket_name(Udp, self_.watcher)
1002         }
1003     }
1004 }
1005
1006 impl RtioUdpSocket for UvUdpSocket {
1007     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
1008         do self.home_for_io_with_sched |self_, scheduler| {
1009             let result_cell = Cell::new_empty();
1010             let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
1011
1012             let buf_ptr: *&mut [u8] = &buf;
1013             do scheduler.deschedule_running_task_and_then |_, task| {
1014                 let task_cell = Cell::new(task);
1015                 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
1016                 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
1017                     let _ = flags; // /XXX add handling for partials?
1018
1019                     watcher.recv_stop();
1020
1021                     let result = match status {
1022                         None => {
1023                             assert!(nread >= 0);
1024                             Ok((nread as uint, addr))
1025                         }
1026                         Some(err) => Err(uv_error_to_io_error(err)),
1027                     };
1028
1029                     unsafe { (*result_cell_ptr).put_back(result); }
1030
1031                     let scheduler: ~Scheduler = Local::take();
1032                     scheduler.resume_blocked_task_immediately(task_cell.take());
1033                 }
1034             }
1035
1036             assert!(!result_cell.is_empty());
1037             result_cell.take()
1038         }
1039     }
1040
1041     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
1042         do self.home_for_io_with_sched |self_, scheduler| {
1043             let result_cell = Cell::new_empty();
1044             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1045             let buf_ptr: *&[u8] = &buf;
1046             do scheduler.deschedule_running_task_and_then |_, task| {
1047                 let task_cell = Cell::new(task);
1048                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1049                 do self_.watcher.send(buf, dst) |_watcher, status| {
1050
1051                     let result = match status {
1052                         None => Ok(()),
1053                         Some(err) => Err(uv_error_to_io_error(err)),
1054                     };
1055
1056                     unsafe { (*result_cell_ptr).put_back(result); }
1057
1058                     let scheduler: ~Scheduler = Local::take();
1059                     scheduler.resume_blocked_task_immediately(task_cell.take());
1060                 }
1061             }
1062
1063             assert!(!result_cell.is_empty());
1064             result_cell.take()
1065         }
1066     }
1067
1068     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1069         do self.home_for_io |self_| {
1070             let r = unsafe {
1071                 do multi.to_str().with_c_str |m_addr| {
1072                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1073                                              ptr::null(), uvll::UV_JOIN_GROUP)
1074                 }
1075             };
1076
1077             match status_to_maybe_uv_error(r) {
1078                 Some(err) => Err(uv_error_to_io_error(err)),
1079                 None => Ok(())
1080             }
1081         }
1082     }
1083
1084     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1085         do self.home_for_io |self_| {
1086             let r = unsafe {
1087                 do multi.to_str().with_c_str |m_addr| {
1088                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1089                                              ptr::null(), uvll::UV_LEAVE_GROUP)
1090                 }
1091             };
1092
1093             match status_to_maybe_uv_error(r) {
1094                 Some(err) => Err(uv_error_to_io_error(err)),
1095                 None => Ok(())
1096             }
1097         }
1098     }
1099
1100     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
1101         do self.home_for_io |self_| {
1102
1103             let r = unsafe {
1104                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
1105             };
1106
1107             match status_to_maybe_uv_error(r) {
1108                 Some(err) => Err(uv_error_to_io_error(err)),
1109                 None => Ok(())
1110             }
1111         }
1112     }
1113
1114     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1115         do self.home_for_io |self_| {
1116
1117             let r = unsafe {
1118                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1119             };
1120
1121             match status_to_maybe_uv_error(r) {
1122                 Some(err) => Err(uv_error_to_io_error(err)),
1123                 None => Ok(())
1124             }
1125         }
1126     }
1127
1128     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1129         do self.home_for_io |self_| {
1130
1131             let r = unsafe {
1132                 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1133             };
1134
1135             match status_to_maybe_uv_error(r) {
1136                 Some(err) => Err(uv_error_to_io_error(err)),
1137                 None => Ok(())
1138             }
1139         }
1140     }
1141
1142     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1143         do self.home_for_io |self_| {
1144
1145             let r = unsafe {
1146                 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1147             };
1148
1149             match status_to_maybe_uv_error(r) {
1150                 Some(err) => Err(uv_error_to_io_error(err)),
1151                 None => Ok(())
1152             }
1153         }
1154     }
1155
1156     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1157         do self.home_for_io |self_| {
1158
1159             let r = unsafe {
1160                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1161             };
1162
1163             match status_to_maybe_uv_error(r) {
1164                 Some(err) => Err(uv_error_to_io_error(err)),
1165                 None => Ok(())
1166             }
1167         }
1168     }
1169
1170     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1171         do self.home_for_io |self_| {
1172
1173             let r = unsafe {
1174                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1175             };
1176
1177             match status_to_maybe_uv_error(r) {
1178                 Some(err) => Err(uv_error_to_io_error(err)),
1179                 None => Ok(())
1180             }
1181         }
1182     }
1183 }
1184
1185 pub struct UvTimer {
1186     watcher: timer::TimerWatcher,
1187     home: SchedHandle,
1188 }
1189
1190 impl HomingIO for UvTimer {
1191     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1192 }
1193
1194 impl UvTimer {
1195     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1196         UvTimer { watcher: w, home: home }
1197     }
1198 }
1199
1200 impl Drop for UvTimer {
1201     fn drop(&self) {
1202         let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1203         do self_.home_for_io_with_sched |self_, scheduler| {
1204             rtdebug!("closing UvTimer");
1205             do scheduler.deschedule_running_task_and_then |_, task| {
1206                 let task_cell = Cell::new(task);
1207                 do self_.watcher.close {
1208                     let scheduler: ~Scheduler = Local::take();
1209                     scheduler.resume_blocked_task_immediately(task_cell.take());
1210                 }
1211             }
1212         }
1213     }
1214 }
1215
1216 impl RtioTimer for UvTimer {
1217     fn sleep(&mut self, msecs: u64) {
1218         do self.home_for_io_with_sched |self_, scheduler| {
1219             do scheduler.deschedule_running_task_and_then |_sched, task| {
1220                 rtdebug!("sleep: entered scheduler context");
1221                 let task_cell = Cell::new(task);
1222                 do self_.watcher.start(msecs, 0) |_, status| {
1223                     assert!(status.is_none());
1224                     let scheduler: ~Scheduler = Local::take();
1225                     scheduler.resume_blocked_task_immediately(task_cell.take());
1226                 }
1227             }
1228             self_.watcher.stop();
1229         }
1230     }
1231 }
1232
1233 pub struct UvFileStream {
1234     loop_: Loop,
1235     fd: file::FileDescriptor,
1236     close_on_drop: bool,
1237     home: SchedHandle
1238 }
1239
1240 impl HomingIO for UvFileStream {
1241     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1242 }
1243
1244 impl UvFileStream {
1245     fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1246            home: SchedHandle) -> UvFileStream {
1247         UvFileStream {
1248             loop_: loop_,
1249             fd: fd,
1250             close_on_drop: close_on_drop,
1251             home: home
1252         }
1253     }
1254     fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1255         let result_cell = Cell::new_empty();
1256         let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1257         let buf_ptr: *&mut [u8] = &buf;
1258         do self.home_for_io_with_sched |self_, scheduler| {
1259             do scheduler.deschedule_running_task_and_then |_, task| {
1260                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1261                 let task_cell = Cell::new(task);
1262                 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1263                     let res = match uverr  {
1264                         None => Ok(req.get_result() as int),
1265                         Some(err) => Err(uv_error_to_io_error(err))
1266                     };
1267                     unsafe { (*result_cell_ptr).put_back(res); }
1268                     let scheduler: ~Scheduler = Local::take();
1269                     scheduler.resume_blocked_task_immediately(task_cell.take());
1270                 };
1271             };
1272         };
1273         result_cell.take()
1274     }
1275     fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1276         let result_cell = Cell::new_empty();
1277         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1278         let buf_ptr: *&[u8] = &buf;
1279         do self.home_for_io_with_sched |self_, scheduler| {
1280             do scheduler.deschedule_running_task_and_then |_, task| {
1281                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1282                 let task_cell = Cell::new(task);
1283                 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1284                     let res = match uverr  {
1285                         None => Ok(()),
1286                         Some(err) => Err(uv_error_to_io_error(err))
1287                     };
1288                     unsafe { (*result_cell_ptr).put_back(res); }
1289                     let scheduler: ~Scheduler = Local::take();
1290                     scheduler.resume_blocked_task_immediately(task_cell.take());
1291                 };
1292             };
1293         };
1294         result_cell.take()
1295     }
1296     fn seek_common(&mut self, pos: i64, whence: c_int) ->
1297         Result<u64, IoError>{
1298         #[fixed_stack_segment]; #[inline(never)];
1299         unsafe {
1300             match lseek((*self.fd), pos as off_t, whence) {
1301                 -1 => {
1302                     Err(IoError {
1303                         kind: OtherIoError,
1304                         desc: "Failed to lseek.",
1305                         detail: None
1306                     })
1307                 },
1308                 n => Ok(n as u64)
1309             }
1310         }
1311     }
1312 }
1313
1314 impl Drop for UvFileStream {
1315     fn drop(&self) {
1316         let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1317         if self.close_on_drop {
1318             do self_.home_for_io_with_sched |self_, scheduler| {
1319                 do scheduler.deschedule_running_task_and_then |_, task| {
1320                     let task_cell = Cell::new(task);
1321                     do self_.fd.close(&self.loop_) |_,_| {
1322                         let scheduler: ~Scheduler = Local::take();
1323                         scheduler.resume_blocked_task_immediately(task_cell.take());
1324                     };
1325                 };
1326             }
1327         }
1328     }
1329 }
1330
1331 impl RtioFileStream for UvFileStream {
1332     fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1333         self.base_read(buf, -1)
1334     }
1335     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1336         self.base_write(buf, -1)
1337     }
1338     fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1339         self.base_read(buf, offset as i64)
1340     }
1341     fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1342         self.base_write(buf, offset as i64)
1343     }
1344     fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1345         use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1346         let whence = match whence {
1347             SeekSet => SEEK_SET,
1348             SeekCur => SEEK_CUR,
1349             SeekEnd => SEEK_END
1350         };
1351         self.seek_common(pos, whence)
1352     }
1353     fn tell(&self) -> Result<u64, IoError> {
1354         use libc::SEEK_CUR;
1355         // this is temporary
1356         let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1357         self_.seek_common(0, SEEK_CUR)
1358     }
1359     fn flush(&mut self) -> Result<(), IoError> {
1360         Ok(())
1361     }
1362 }
1363
1364 pub struct UvProcess {
1365     process: process::Process,
1366
1367     // Sadly, this structure must be created before we return it, so in that
1368     // brief interim the `home` is None.
1369     home: Option<SchedHandle>,
1370
1371     // All None until the process exits (exit_error may stay None)
1372     priv exit_status: Option<int>,
1373     priv term_signal: Option<int>,
1374     priv exit_error: Option<UvError>,
1375
1376     // Used to store which task to wake up from the exit_cb
1377     priv descheduled: Option<BlockedTask>,
1378 }
1379
1380 impl HomingIO for UvProcess {
1381     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
1382 }
1383
1384 impl Drop for UvProcess {
1385     fn drop(&self) {
1386         // FIXME(#4330): should not need a transmute
1387         let this = unsafe { cast::transmute_mut(self) };
1388
1389         let close = |self_: &mut UvProcess| {
1390             let scheduler: ~Scheduler = Local::take();
1391             do scheduler.deschedule_running_task_and_then |_, task| {
1392                 let task = Cell::new(task);
1393                 do self_.process.close {
1394                     let scheduler: ~Scheduler = Local::take();
1395                     scheduler.resume_blocked_task_immediately(task.take());
1396                 }
1397             }
1398         };
1399
1400         // If home is none, then this process never actually successfully
1401         // spawned, so there's no need to switch event loops
1402         if this.home.is_none() {
1403             close(this)
1404         } else {
1405             this.home_for_io(close)
1406         }
1407     }
1408 }
1409
1410 impl RtioProcess for UvProcess {
1411     fn id(&self) -> pid_t {
1412         self.process.pid()
1413     }
1414
1415     fn kill(&mut self, signal: int) -> Result<(), IoError> {
1416         do self.home_for_io |self_| {
1417             match self_.process.kill(signal) {
1418                 Ok(()) => Ok(()),
1419                 Err(uverr) => Err(uv_error_to_io_error(uverr))
1420             }
1421         }
1422     }
1423
1424     fn wait(&mut self) -> int {
1425         // Make sure (on the home scheduler) that we have an exit status listed
1426         do self.home_for_io |self_| {
1427             match self_.exit_status {
1428                 Some(*) => {}
1429                 None => {
1430                     // If there's no exit code previously listed, then the
1431                     // process's exit callback has yet to be invoked. We just
1432                     // need to deschedule ourselves and wait to be reawoken.
1433                     let scheduler: ~Scheduler = Local::take();
1434                     do scheduler.deschedule_running_task_and_then |_, task| {
1435                         assert!(self_.descheduled.is_none());
1436                         self_.descheduled = Some(task);
1437                     }
1438                     assert!(self_.exit_status.is_some());
1439                 }
1440             }
1441         }
1442
1443         self.exit_status.unwrap()
1444     }
1445 }
1446
1447 #[test]
1448 fn test_simple_io_no_connect() {
1449     do run_in_newsched_task {
1450         unsafe {
1451             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1452             let addr = next_test_ip4();
1453             let maybe_chan = (*io).tcp_connect(addr);
1454             assert!(maybe_chan.is_err());
1455         }
1456     }
1457 }
1458
1459 #[test]
1460 fn test_simple_udp_io_bind_only() {
1461     do run_in_newsched_task {
1462         unsafe {
1463             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1464             let addr = next_test_ip4();
1465             let maybe_socket = (*io).udp_bind(addr);
1466             assert!(maybe_socket.is_ok());
1467         }
1468     }
1469 }
1470
1471 #[test]
1472 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1473     use rt::sleeper_list::SleeperList;
1474     use rt::work_queue::WorkQueue;
1475     use rt::thread::Thread;
1476     use rt::task::Task;
1477     use rt::sched::{Shutdown, TaskFromFriend};
1478     do run_in_bare_thread {
1479         let sleepers = SleeperList::new();
1480         let work_queue1 = WorkQueue::new();
1481         let work_queue2 = WorkQueue::new();
1482         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1483
1484         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1485                                          sleepers.clone());
1486         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1487                                          sleepers.clone());
1488
1489         let handle1 = Cell::new(sched1.make_handle());
1490         let handle2 = Cell::new(sched2.make_handle());
1491         let tasksFriendHandle = Cell::new(sched2.make_handle());
1492
1493         let on_exit: ~fn(bool) = |exit_status| {
1494             handle1.take().send(Shutdown);
1495             handle2.take().send(Shutdown);
1496             rtassert!(exit_status);
1497         };
1498
1499         let test_function: ~fn() = || {
1500             let io: *mut IoFactoryObject = unsafe {
1501                 Local::unsafe_borrow()
1502             };
1503             let addr = next_test_ip4();
1504             let maybe_socket = unsafe { (*io).udp_bind(addr) };
1505             // this socket is bound to this event loop
1506             assert!(maybe_socket.is_ok());
1507
1508             // block self on sched1
1509             do task::unkillable { // FIXME(#8674)
1510                 let scheduler: ~Scheduler = Local::take();
1511                 do scheduler.deschedule_running_task_and_then |_, task| {
1512                     // unblock task
1513                     do task.wake().map_move |task| {
1514                       // send self to sched2
1515                       tasksFriendHandle.take().send(TaskFromFriend(task));
1516                     };
1517                     // sched1 should now sleep since it has nothing else to do
1518                 }
1519             }
1520             // sched2 will wake up and get the task
1521             // as we do nothing else, the function ends and the socket goes out of scope
1522             // sched2 will start to run the destructor
1523             // the destructor will first block the task, set it's home as sched1, then enqueue it
1524             // sched2 will dequeue the task, see that it has a home, and send it to sched1
1525             // sched1 will wake up, exec the close function on the correct loop, and then we're done
1526         };
1527
1528         let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1529         main_task.death.on_exit = Some(on_exit);
1530         let main_task = Cell::new(main_task);
1531
1532         let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1533
1534         let sched1 = Cell::new(sched1);
1535         let sched2 = Cell::new(sched2);
1536
1537         let thread1 = do Thread::start {
1538             sched1.take().bootstrap(main_task.take());
1539         };
1540         let thread2 = do Thread::start {
1541             sched2.take().bootstrap(null_task.take());
1542         };
1543
1544         thread1.join();
1545         thread2.join();
1546     }
1547 }
1548
1549 #[test]
1550 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1551     use rt::sleeper_list::SleeperList;
1552     use rt::work_queue::WorkQueue;
1553     use rt::thread::Thread;
1554     use rt::task::Task;
1555     use rt::comm::oneshot;
1556     use rt::sched::Shutdown;
1557     do run_in_bare_thread {
1558         let sleepers = SleeperList::new();
1559         let work_queue1 = WorkQueue::new();
1560         let work_queue2 = WorkQueue::new();
1561         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1562
1563         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1564                                          sleepers.clone());
1565         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1566                                          sleepers.clone());
1567
1568         let handle1 = Cell::new(sched1.make_handle());
1569         let handle2 = Cell::new(sched2.make_handle());
1570
1571         let (port, chan) = oneshot();
1572         let port = Cell::new(port);
1573         let chan = Cell::new(chan);
1574
1575         let body1: ~fn() = || {
1576             let io: *mut IoFactoryObject = unsafe {
1577                 Local::unsafe_borrow()
1578             };
1579             let addr = next_test_ip4();
1580             let socket = unsafe { (*io).udp_bind(addr) };
1581             assert!(socket.is_ok());
1582             chan.take().send(socket);
1583         };
1584
1585         let body2: ~fn() = || {
1586             let socket = port.take().recv();
1587             assert!(socket.is_ok());
1588             /* The socket goes out of scope and the destructor is called.
1589              * The destructor:
1590              *  - sends itself back to sched1
1591              *  - frees the socket
1592              *  - resets the home of the task to whatever it was previously
1593              */
1594         };
1595
1596         let on_exit: ~fn(bool) = |exit| {
1597             handle1.take().send(Shutdown);
1598             handle2.take().send(Shutdown);
1599             rtassert!(exit);
1600         };
1601
1602         let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1603
1604         let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1605         task2.death.on_exit = Some(on_exit);
1606         let task2 = Cell::new(task2);
1607
1608         let sched1 = Cell::new(sched1);
1609         let sched2 = Cell::new(sched2);
1610
1611         let thread1 = do Thread::start {
1612             sched1.take().bootstrap(task1.take());
1613         };
1614         let thread2 = do Thread::start {
1615             sched2.take().bootstrap(task2.take());
1616         };
1617
1618         thread1.join();
1619         thread2.join();
1620     }
1621 }
1622
1623 #[test]
1624 fn test_simple_tcp_server_and_client() {
1625     do run_in_newsched_task {
1626         let addr = next_test_ip4();
1627
1628         // Start the server first so it's listening when we connect
1629         do spawntask {
1630             unsafe {
1631                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1632                 let mut listener = (*io).tcp_bind(addr).unwrap();
1633                 let mut stream = listener.accept().unwrap();
1634                 let mut buf = [0, .. 2048];
1635                 let nread = stream.read(buf).unwrap();
1636                 assert_eq!(nread, 8);
1637                 for i in range(0u, nread) {
1638                     rtdebug!("%u", buf[i] as uint);
1639                     assert_eq!(buf[i], i as u8);
1640                 }
1641             }
1642         }
1643
1644         do spawntask {
1645             unsafe {
1646                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1647                 let mut stream = (*io).tcp_connect(addr).unwrap();
1648                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1649             }
1650         }
1651     }
1652 }
1653
1654 #[test]
1655 fn test_simple_tcp_server_and_client_on_diff_threads() {
1656     use rt::sleeper_list::SleeperList;
1657     use rt::work_queue::WorkQueue;
1658     use rt::thread::Thread;
1659     use rt::task::Task;
1660     use rt::sched::{Shutdown};
1661     do run_in_bare_thread {
1662         let sleepers = SleeperList::new();
1663
1664         let server_addr = next_test_ip4();
1665         let client_addr = server_addr.clone();
1666
1667         let server_work_queue = WorkQueue::new();
1668         let client_work_queue = WorkQueue::new();
1669         let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1670
1671         let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1672                                                queues.clone(), sleepers.clone());
1673         let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1674                                                queues.clone(), sleepers.clone());
1675
1676         let server_handle = Cell::new(server_sched.make_handle());
1677         let client_handle = Cell::new(client_sched.make_handle());
1678
1679         let server_on_exit: ~fn(bool) = |exit_status| {
1680             server_handle.take().send(Shutdown);
1681             rtassert!(exit_status);
1682         };
1683
1684         let client_on_exit: ~fn(bool) = |exit_status| {
1685             client_handle.take().send(Shutdown);
1686             rtassert!(exit_status);
1687         };
1688
1689         let server_fn: ~fn() = || {
1690             let io: *mut IoFactoryObject = unsafe {
1691                 Local::unsafe_borrow()
1692             };
1693             let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1694             let mut stream = listener.accept().unwrap();
1695             let mut buf = [0, .. 2048];
1696             let nread = stream.read(buf).unwrap();
1697             assert_eq!(nread, 8);
1698             for i in range(0u, nread) {
1699                 assert_eq!(buf[i], i as u8);
1700             }
1701         };
1702
1703         let client_fn: ~fn() = || {
1704             let io: *mut IoFactoryObject = unsafe {
1705                 Local::unsafe_borrow()
1706             };
1707             let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1708             while stream.is_err() {
1709                 stream = unsafe { (*io).tcp_connect(client_addr) };
1710             }
1711             stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1712         };
1713
1714         let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1715         server_task.death.on_exit = Some(server_on_exit);
1716         let server_task = Cell::new(server_task);
1717
1718         let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1719         client_task.death.on_exit = Some(client_on_exit);
1720         let client_task = Cell::new(client_task);
1721
1722         let server_sched = Cell::new(server_sched);
1723         let client_sched = Cell::new(client_sched);
1724
1725         let server_thread = do Thread::start {
1726             server_sched.take().bootstrap(server_task.take());
1727         };
1728         let client_thread = do Thread::start {
1729             client_sched.take().bootstrap(client_task.take());
1730         };
1731
1732         server_thread.join();
1733         client_thread.join();
1734     }
1735 }
1736
1737 #[test]
1738 fn test_simple_udp_server_and_client() {
1739     do run_in_newsched_task {
1740         let server_addr = next_test_ip4();
1741         let client_addr = next_test_ip4();
1742
1743         do spawntask {
1744             unsafe {
1745                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1746                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1747                 let mut buf = [0, .. 2048];
1748                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1749                 assert_eq!(nread, 8);
1750                 for i in range(0u, nread) {
1751                     rtdebug!("%u", buf[i] as uint);
1752                     assert_eq!(buf[i], i as u8);
1753                 }
1754                 assert_eq!(src, client_addr);
1755             }
1756         }
1757
1758         do spawntask {
1759             unsafe {
1760                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1761                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1762                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1763             }
1764         }
1765     }
1766 }
1767
1768 #[test] #[ignore(reason = "busted")]
1769 fn test_read_and_block() {
1770     do run_in_newsched_task {
1771         let addr = next_test_ip4();
1772
1773         do spawntask {
1774             let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1775             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1776             let mut stream = listener.accept().unwrap();
1777             let mut buf = [0, .. 2048];
1778
1779             let expected = 32;
1780             let mut current = 0;
1781             let mut reads = 0;
1782
1783             while current < expected {
1784                 let nread = stream.read(buf).unwrap();
1785                 for i in range(0u, nread) {
1786                     let val = buf[i] as uint;
1787                     assert_eq!(val, current % 8);
1788                     current += 1;
1789                 }
1790                 reads += 1;
1791
1792                 do task::unkillable { // FIXME(#8674)
1793                     let scheduler: ~Scheduler = Local::take();
1794                     // Yield to the other task in hopes that it
1795                     // will trigger a read callback while we are
1796                     // not ready for it
1797                     do scheduler.deschedule_running_task_and_then |sched, task| {
1798                         let task = Cell::new(task);
1799                         sched.enqueue_blocked_task(task.take());
1800                     }
1801                 }
1802             }
1803
1804             // Make sure we had multiple reads
1805             assert!(reads > 1);
1806         }
1807
1808         do spawntask {
1809             unsafe {
1810                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1811                 let mut stream = (*io).tcp_connect(addr).unwrap();
1812                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1813                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1814                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1815                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1816             }
1817         }
1818
1819     }
1820 }
1821
1822 #[test]
1823 fn test_read_read_read() {
1824     do run_in_newsched_task {
1825         let addr = next_test_ip4();
1826         static MAX: uint = 500000;
1827
1828         do spawntask {
1829             unsafe {
1830                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1831                 let mut listener = (*io).tcp_bind(addr).unwrap();
1832                 let mut stream = listener.accept().unwrap();
1833                 let buf = [1, .. 2048];
1834                 let mut total_bytes_written = 0;
1835                 while total_bytes_written < MAX {
1836                     stream.write(buf);
1837                     total_bytes_written += buf.len();
1838                 }
1839             }
1840         }
1841
1842         do spawntask {
1843             unsafe {
1844                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1845                 let mut stream = (*io).tcp_connect(addr).unwrap();
1846                 let mut buf = [0, .. 2048];
1847                 let mut total_bytes_read = 0;
1848                 while total_bytes_read < MAX {
1849                     let nread = stream.read(buf).unwrap();
1850                     rtdebug!("read %u bytes", nread as uint);
1851                     total_bytes_read += nread;
1852                     for i in range(0u, nread) {
1853                         assert_eq!(buf[i], 1);
1854                     }
1855                 }
1856                 rtdebug!("read %u bytes total", total_bytes_read as uint);
1857             }
1858         }
1859     }
1860 }
1861
1862 #[test]
1863 fn test_udp_twice() {
1864     do run_in_newsched_task {
1865         let server_addr = next_test_ip4();
1866         let client_addr = next_test_ip4();
1867
1868         do spawntask {
1869             unsafe {
1870                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1871                 let mut client = (*io).udp_bind(client_addr).unwrap();
1872                 assert!(client.sendto([1], server_addr).is_ok());
1873                 assert!(client.sendto([2], server_addr).is_ok());
1874             }
1875         }
1876
1877         do spawntask {
1878             unsafe {
1879                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1880                 let mut server = (*io).udp_bind(server_addr).unwrap();
1881                 let mut buf1 = [0];
1882                 let mut buf2 = [0];
1883                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1884                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1885                 assert_eq!(nread1, 1);
1886                 assert_eq!(nread2, 1);
1887                 assert_eq!(src1, client_addr);
1888                 assert_eq!(src2, client_addr);
1889                 assert_eq!(buf1[0], 1);
1890                 assert_eq!(buf2[0], 2);
1891             }
1892         }
1893     }
1894 }
1895
1896 #[test]
1897 fn test_udp_many_read() {
1898     do run_in_newsched_task {
1899         let server_out_addr = next_test_ip4();
1900         let server_in_addr = next_test_ip4();
1901         let client_out_addr = next_test_ip4();
1902         let client_in_addr = next_test_ip4();
1903         static MAX: uint = 500_000;
1904
1905         do spawntask {
1906             unsafe {
1907                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1908                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1909                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1910                 let msg = [1, .. 2048];
1911                 let mut total_bytes_sent = 0;
1912                 let mut buf = [1];
1913                 while buf[0] == 1 {
1914                     // send more data
1915                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1916                     total_bytes_sent += msg.len();
1917                     // check if the client has received enough
1918                     let res = server_in.recvfrom(buf);
1919                     assert!(res.is_ok());
1920                     let (nread, src) = res.unwrap();
1921                     assert_eq!(nread, 1);
1922                     assert_eq!(src, client_out_addr);
1923                 }
1924                 assert!(total_bytes_sent >= MAX);
1925             }
1926         }
1927
1928         do spawntask {
1929             unsafe {
1930                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1931                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1932                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1933                 let mut total_bytes_recv = 0;
1934                 let mut buf = [0, .. 2048];
1935                 while total_bytes_recv < MAX {
1936                     // ask for more
1937                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1938                     // wait for data
1939                     let res = client_in.recvfrom(buf);
1940                     assert!(res.is_ok());
1941                     let (nread, src) = res.unwrap();
1942                     assert_eq!(src, server_out_addr);
1943                     total_bytes_recv += nread;
1944                     for i in range(0u, nread) {
1945                         assert_eq!(buf[i], 1);
1946                     }
1947                 }
1948                 // tell the server we're done
1949                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1950             }
1951         }
1952     }
1953 }
1954
1955 #[test]
1956 fn test_timer_sleep_simple() {
1957     do run_in_newsched_task {
1958         unsafe {
1959             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1960             let timer = (*io).timer_init();
1961             do timer.map_move |mut t| { t.sleep(1) };
1962         }
1963     }
1964 }
1965
1966 fn file_test_uvio_full_simple_impl() {
1967     use str::StrSlice; // why does this have to be explicitly imported to work?
1968                        // compiler was complaining about no trait for str that
1969                        // does .as_bytes() ..
1970     use path::Path;
1971     use rt::io::{Open, Create, ReadWrite, Read};
1972     unsafe {
1973         let io: *mut IoFactoryObject = Local::unsafe_borrow();
1974         let write_val = "hello uvio!";
1975         let path = "./tmp/file_test_uvio_full.txt";
1976         {
1977             let create_fm = Create;
1978             let create_fa = ReadWrite;
1979             let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1980             let write_buf = write_val.as_bytes();
1981             fd.write(write_buf);
1982         }
1983         {
1984             let ro_fm = Open;
1985             let ro_fa = Read;
1986             let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1987             let mut read_vec = [0, .. 1028];
1988             let nread = fd.read(read_vec).unwrap();
1989             let read_val = str::from_bytes(read_vec.slice(0, nread as uint));
1990             assert!(read_val == write_val.to_owned());
1991         }
1992         (*io).fs_unlink(&Path(path));
1993     }
1994 }
1995
1996 #[test]
1997 fn file_test_uvio_full_simple() {
1998     do run_in_newsched_task {
1999         file_test_uvio_full_simple_impl();
2000     }
2001 }
2002
2003 fn uvio_naive_print(input: &str) {
2004     use str::StrSlice;
2005     unsafe {
2006         use libc::{STDOUT_FILENO};
2007         let io: *mut IoFactoryObject = Local::unsafe_borrow();
2008         {
2009             let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
2010             let write_buf = input.as_bytes();
2011             fd.write(write_buf);
2012         }
2013     }
2014 }
2015
2016 #[test]
2017 fn file_test_uvio_write_to_stdout() {
2018     do run_in_newsched_task {
2019         uvio_naive_print("jubilation\n");
2020     }
2021 }