]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
Revert "auto merge of #8645 : alexcrichton/rust/issue-6436-run-non-blocking, r=brson"
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use str;
21 use result::*;
22 use rt::io::IoError;
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
25 use rt::local::Local;
26 use rt::rtio::*;
27 use rt::sched::{Scheduler, SchedHandle};
28 use rt::tube::Tube;
29 use rt::uv::*;
30 use rt::uv::idle::IdleWatcher;
31 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
32 use unstable::sync::Exclusive;
33 use super::super::io::support::PathLike;
34 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
35           S_IRUSR, S_IWUSR};
36 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
37             CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
38 use task;
39
40 #[cfg(test)] use container::Container;
41 #[cfg(test)] use unstable::run_in_bare_thread;
42 #[cfg(test)] use rt::test::{spawntask,
43                             next_test_ip4,
44                             run_in_newsched_task};
45 #[cfg(test)] use iterator::{Iterator, range};
46
47 // XXX we should not be calling uvll functions in here.
48
49 trait HomingIO {
50     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
51     /* XXX This will move pinned tasks to do IO on the proper scheduler
52      * and then move them back to their home.
53      */
54     fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
55         use rt::sched::{PinnedTask, TaskFromFriend};
56         // go home
57         let old_home = Cell::new_empty();
58         let old_home_ptr = &old_home;
59         do task::unkillable { // FIXME(#8674)
60             let scheduler: ~Scheduler = Local::take();
61             do scheduler.deschedule_running_task_and_then |_, task| {
62                 // get the old home first
63                 do task.wake().map_move |mut task| {
64                     old_home_ptr.put_back(task.take_unwrap_home());
65                     self.home().send(PinnedTask(task));
66                 };
67             }
68         }
69
70         // do IO
71         let a = io(self);
72
73         // unhome home
74         do task::unkillable { // FIXME(#8674)
75             let scheduler: ~Scheduler = Local::take();
76             do scheduler.deschedule_running_task_and_then |scheduler, task| {
77                 do task.wake().map_move |mut task| {
78                     task.give_home(old_home.take());
79                     scheduler.make_handle().send(TaskFromFriend(task));
80                 };
81             }
82         }
83
84         // return the result of the IO
85         a
86     }
87
88     fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
89         use rt::sched::{PinnedTask, TaskFromFriend};
90
91         do task::unkillable { // FIXME(#8674)
92             // go home
93             let old_home = Cell::new_empty();
94             let old_home_ptr = &old_home;
95             let scheduler: ~Scheduler = Local::take();
96             do scheduler.deschedule_running_task_and_then |_, task| {
97                 // get the old home first
98                 do task.wake().map_move |mut task| {
99                     old_home_ptr.put_back(task.take_unwrap_home());
100                     self.home().send(PinnedTask(task));
101                 };
102             }
103
104             // do IO
105             let scheduler: ~Scheduler = Local::take();
106             let a = io_sched(self, scheduler);
107
108             // unhome home
109             let scheduler: ~Scheduler = Local::take();
110             do scheduler.deschedule_running_task_and_then |scheduler, task| {
111                 do task.wake().map_move |mut task| {
112                     task.give_home(old_home.take());
113                     scheduler.make_handle().send(TaskFromFriend(task));
114                 };
115             }
116
117             // return the result of the IO
118             a
119         }
120     }
121 }
122
123 // get a handle for the current scheduler
124 macro_rules! get_handle_to_current_scheduler(
125     () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
126 )
127
128 enum SocketNameKind {
129     TcpPeer,
130     Tcp,
131     Udp
132 }
133
134 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
135                                                  handle: U) -> Result<SocketAddr, IoError> {
136     let getsockname = match sk {
137         TcpPeer => uvll::tcp_getpeername,
138         Tcp     => uvll::tcp_getsockname,
139         Udp     => uvll::udp_getsockname,
140     };
141
142     // Allocate a sockaddr_storage
143     // since we don't know if it's ipv4 or ipv6
144     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
145
146     let r = unsafe {
147         getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
148     };
149
150     if r != 0 {
151         let status = status_to_maybe_uv_error(handle, r);
152         return Err(uv_error_to_io_error(status.unwrap()));
153     }
154
155     let addr = unsafe {
156         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
157             net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
158         } else {
159             net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
160         }
161     };
162
163     unsafe { uvll::free_sockaddr_storage(r_addr); }
164
165     Ok(addr)
166
167 }
168
169 // Obviously an Event Loop is always home.
170 pub struct UvEventLoop {
171     uvio: UvIoFactory
172 }
173
174 impl UvEventLoop {
175     pub fn new() -> UvEventLoop {
176         UvEventLoop {
177             uvio: UvIoFactory(Loop::new())
178         }
179     }
180 }
181
182 impl Drop for UvEventLoop {
183     fn drop(&self) {
184         // XXX: Need mutable finalizer
185         let this = unsafe {
186             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
187         };
188         this.uvio.uv_loop().close();
189     }
190 }
191
192 impl EventLoop for UvEventLoop {
193     fn run(&mut self) {
194         self.uvio.uv_loop().run();
195     }
196
197     fn callback(&mut self, f: ~fn()) {
198         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
199         do idle_watcher.start |mut idle_watcher, status| {
200             assert!(status.is_none());
201             idle_watcher.stop();
202             idle_watcher.close(||());
203             f();
204         }
205     }
206
207     fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
208         let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
209         return ~UvPausibleIdleCallback {
210             watcher: idle_watcher,
211             idle_flag: false,
212             closed: false
213         };
214     }
215
216     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
217         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
218         do timer.start(ms, 0) |timer, status| {
219             assert!(status.is_none());
220             timer.close(||());
221             f();
222         }
223     }
224
225     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
226         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
227     }
228
229     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
230         Some(&mut self.uvio)
231     }
232 }
233
234 pub struct UvPausibleIdleCallback {
235     watcher: IdleWatcher,
236     idle_flag: bool,
237     closed: bool
238 }
239
240 impl UvPausibleIdleCallback {
241     #[inline]
242     pub fn start(&mut self, f: ~fn()) {
243         do self.watcher.start |_idle_watcher, _status| {
244             f();
245         };
246         self.idle_flag = true;
247     }
248     #[inline]
249     pub fn pause(&mut self) {
250         if self.idle_flag == true {
251             self.watcher.stop();
252             self.idle_flag = false;
253         }
254     }
255     #[inline]
256     pub fn resume(&mut self) {
257         if self.idle_flag == false {
258             self.watcher.restart();
259             self.idle_flag = true;
260         }
261     }
262     #[inline]
263     pub fn close(&mut self) {
264         self.pause();
265         if !self.closed {
266             self.closed = true;
267             self.watcher.close(||{});
268         }
269     }
270 }
271
272 #[test]
273 fn test_callback_run_once() {
274     do run_in_bare_thread {
275         let mut event_loop = UvEventLoop::new();
276         let mut count = 0;
277         let count_ptr: *mut int = &mut count;
278         do event_loop.callback {
279             unsafe { *count_ptr += 1 }
280         }
281         event_loop.run();
282         assert_eq!(count, 1);
283     }
284 }
285
286 // The entire point of async is to call into a loop from other threads so it does not need to home.
287 pub struct UvRemoteCallback {
288     // The uv async handle for triggering the callback
289     async: AsyncWatcher,
290     // A flag to tell the callback to exit, set from the dtor. This is
291     // almost never contested - only in rare races with the dtor.
292     exit_flag: Exclusive<bool>
293 }
294
295 impl UvRemoteCallback {
296     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
297         let exit_flag = Exclusive::new(false);
298         let exit_flag_clone = exit_flag.clone();
299         let async = do AsyncWatcher::new(loop_) |watcher, status| {
300             assert!(status.is_none());
301
302             // The synchronization logic here is subtle. To review,
303             // the uv async handle type promises that, after it is
304             // triggered the remote callback is definitely called at
305             // least once. UvRemoteCallback needs to maintain those
306             // semantics while also shutting down cleanly from the
307             // dtor. In our case that means that, when the
308             // UvRemoteCallback dtor calls `async.send()`, here `f` is
309             // always called later.
310
311             // In the dtor both the exit flag is set and the async
312             // callback fired under a lock.  Here, before calling `f`,
313             // we take the lock and check the flag. Because we are
314             // checking the flag before calling `f`, and the flag is
315             // set under the same lock as the send, then if the flag
316             // is set then we're guaranteed to call `f` after the
317             // final send.
318
319             // If the check was done after `f()` then there would be a
320             // period between that call and the check where the dtor
321             // could be called in the other thread, missing the final
322             // callback while still destroying the handle.
323
324             let should_exit = unsafe {
325                 exit_flag_clone.with_imm(|&should_exit| should_exit)
326             };
327
328             f();
329
330             if should_exit {
331                 watcher.close(||());
332             }
333
334         };
335         UvRemoteCallback {
336             async: async,
337             exit_flag: exit_flag
338         }
339     }
340 }
341
342 impl RemoteCallback for UvRemoteCallback {
343     fn fire(&mut self) { self.async.send() }
344 }
345
346 impl Drop for UvRemoteCallback {
347     fn drop(&self) {
348         unsafe {
349             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
350             do this.exit_flag.with |should_exit| {
351                 // NB: These two things need to happen atomically. Otherwise
352                 // the event handler could wake up due to a *previous*
353                 // signal and see the exit flag, destroying the handle
354                 // before the final send.
355                 *should_exit = true;
356                 this.async.send();
357             }
358         }
359     }
360 }
361
362 #[cfg(test)]
363 mod test_remote {
364     use cell::Cell;
365     use rt::test::*;
366     use rt::thread::Thread;
367     use rt::tube::Tube;
368     use rt::rtio::EventLoop;
369     use rt::local::Local;
370     use rt::sched::Scheduler;
371
372     #[test]
373     fn test_uv_remote() {
374         do run_in_newsched_task {
375             let mut tube = Tube::new();
376             let tube_clone = tube.clone();
377             let remote_cell = Cell::new_empty();
378             do Local::borrow |sched: &mut Scheduler| {
379                 let tube_clone = tube_clone.clone();
380                 let tube_clone_cell = Cell::new(tube_clone);
381                 let remote = do sched.event_loop.remote_callback {
382                     // This could be called multiple times
383                     if !tube_clone_cell.is_empty() {
384                         tube_clone_cell.take().send(1);
385                     }
386                 };
387                 remote_cell.put_back(remote);
388             }
389             let thread = do Thread::start {
390                 remote_cell.take().fire();
391             };
392
393             assert!(tube.recv() == 1);
394             thread.join();
395         }
396     }
397 }
398
399 pub struct UvIoFactory(Loop);
400
401 impl UvIoFactory {
402     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
403         match self { &UvIoFactory(ref mut ptr) => ptr }
404     }
405 }
406
407 impl IoFactory for UvIoFactory {
408     // Connect to an address and return a new stream
409     // NB: This blocks the task waiting on the connection.
410     // It would probably be better to return a future
411     fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
412         // Create a cell in the task to hold the result. We will fill
413         // the cell before resuming the task.
414         let result_cell = Cell::new_empty();
415         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
416
417         // Block this task and take ownership, switch to scheduler context
418         do task::unkillable { // FIXME(#8674)
419             let scheduler: ~Scheduler = Local::take();
420             do scheduler.deschedule_running_task_and_then |_, task| {
421
422                 let mut tcp = TcpWatcher::new(self.uv_loop());
423                 let task_cell = Cell::new(task);
424
425                 // Wait for a connection
426                 do tcp.connect(addr) |stream, status| {
427                     match status {
428                         None => {
429                             let tcp = NativeHandle::from_native_handle(stream.native_handle());
430                             let home = get_handle_to_current_scheduler!();
431                             let res = Ok(~UvTcpStream { watcher: tcp, home: home });
432
433                             // Store the stream in the task's stack
434                             unsafe { (*result_cell_ptr).put_back(res); }
435
436                             // Context switch
437                             let scheduler: ~Scheduler = Local::take();
438                             scheduler.resume_blocked_task_immediately(task_cell.take());
439                         }
440                         Some(_) => {
441                             let task_cell = Cell::new(task_cell.take());
442                             do stream.close {
443                                 let res = Err(uv_error_to_io_error(status.unwrap()));
444                                 unsafe { (*result_cell_ptr).put_back(res); }
445                                 let scheduler: ~Scheduler = Local::take();
446                                 scheduler.resume_blocked_task_immediately(task_cell.take());
447                             }
448                         }
449                     }
450                 }
451             }
452         }
453
454         assert!(!result_cell.is_empty());
455         return result_cell.take();
456     }
457
458     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
459         let mut watcher = TcpWatcher::new(self.uv_loop());
460         match watcher.bind(addr) {
461             Ok(_) => {
462                 let home = get_handle_to_current_scheduler!();
463                 Ok(~UvTcpListener::new(watcher, home))
464             }
465             Err(uverr) => {
466                 do task::unkillable { // FIXME(#8674)
467                     let scheduler: ~Scheduler = Local::take();
468                     do scheduler.deschedule_running_task_and_then |_, task| {
469                         let task_cell = Cell::new(task);
470                         do watcher.as_stream().close {
471                             let scheduler: ~Scheduler = Local::take();
472                             scheduler.resume_blocked_task_immediately(task_cell.take());
473                         }
474                     }
475                     Err(uv_error_to_io_error(uverr))
476                 }
477             }
478         }
479     }
480
481     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
482         let mut watcher = UdpWatcher::new(self.uv_loop());
483         match watcher.bind(addr) {
484             Ok(_) => {
485                 let home = get_handle_to_current_scheduler!();
486                 Ok(~UvUdpSocket { watcher: watcher, home: home })
487             }
488             Err(uverr) => {
489                 do task::unkillable { // FIXME(#8674)
490                     let scheduler: ~Scheduler = Local::take();
491                     do scheduler.deschedule_running_task_and_then |_, task| {
492                         let task_cell = Cell::new(task);
493                         do watcher.close {
494                             let scheduler: ~Scheduler = Local::take();
495                             scheduler.resume_blocked_task_immediately(task_cell.take());
496                         }
497                     }
498                     Err(uv_error_to_io_error(uverr))
499                 }
500             }
501         }
502     }
503
504     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
505         let watcher = TimerWatcher::new(self.uv_loop());
506         let home = get_handle_to_current_scheduler!();
507         Ok(~UvTimer::new(watcher, home))
508     }
509
510     fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
511         let loop_ = Loop {handle: self.uv_loop().native_handle()};
512         let fd = file::FileDescriptor(fd);
513         let home = get_handle_to_current_scheduler!();
514         ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
515     }
516
517     fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
518         -> Result<~RtioFileStream, IoError> {
519         let mut flags = match fm {
520             Open => 0,
521             Create => O_CREAT,
522             OpenOrCreate => O_CREAT,
523             Append => O_APPEND,
524             Truncate => O_TRUNC,
525             CreateOrTruncate => O_TRUNC | O_CREAT
526         };
527         flags = match fa {
528             Read => flags | O_RDONLY,
529             Write => flags | O_WRONLY,
530             ReadWrite => flags | O_RDWR
531         };
532         let create_mode = match fm {
533             Create|OpenOrCreate|CreateOrTruncate =>
534                 S_IRUSR | S_IWUSR,
535             _ => 0
536         };
537         let result_cell = Cell::new_empty();
538         let result_cell_ptr: *Cell<Result<~RtioFileStream,
539                                            IoError>> = &result_cell;
540         let path_cell = Cell::new(path);
541         do task::unkillable { // FIXME(#8674)
542             let scheduler: ~Scheduler = Local::take();
543             do scheduler.deschedule_running_task_and_then |_, task| {
544                 let task_cell = Cell::new(task);
545                 let path = path_cell.take();
546                 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
547                       |req,err| {
548                     if err.is_none() {
549                         let loop_ = Loop {handle: req.get_loop().native_handle()};
550                         let home = get_handle_to_current_scheduler!();
551                         let fd = file::FileDescriptor(req.get_result());
552                         let fs = ~UvFileStream::new(
553                             loop_, fd, true, home) as ~RtioFileStream;
554                         let res = Ok(fs);
555                         unsafe { (*result_cell_ptr).put_back(res); }
556                         let scheduler: ~Scheduler = Local::take();
557                         scheduler.resume_blocked_task_immediately(task_cell.take());
558                     } else {
559                         let res = Err(uv_error_to_io_error(err.unwrap()));
560                         unsafe { (*result_cell_ptr).put_back(res); }
561                         let scheduler: ~Scheduler = Local::take();
562                         scheduler.resume_blocked_task_immediately(task_cell.take());
563                     }
564                 };
565             };
566         }
567         assert!(!result_cell.is_empty());
568         return result_cell.take();
569     }
570
571     fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
572         let result_cell = Cell::new_empty();
573         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
574         let path_cell = Cell::new(path);
575         do task::unkillable { // FIXME(#8674)
576             let scheduler: ~Scheduler = Local::take();
577             do scheduler.deschedule_running_task_and_then |_, task| {
578                 let task_cell = Cell::new(task);
579                 let path = path_cell.take();
580                 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
581                     let res = match err {
582                         None => Ok(()),
583                         Some(err) => Err(uv_error_to_io_error(err))
584                     };
585                     unsafe { (*result_cell_ptr).put_back(res); }
586                     let scheduler: ~Scheduler = Local::take();
587                     scheduler.resume_blocked_task_immediately(task_cell.take());
588                 };
589             };
590         }
591         assert!(!result_cell.is_empty());
592         return result_cell.take();
593     }
594 }
595
596 pub struct UvTcpListener {
597     watcher: TcpWatcher,
598     listening: bool,
599     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
600     home: SchedHandle,
601 }
602
603 impl HomingIO for UvTcpListener {
604     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
605 }
606
607 impl UvTcpListener {
608     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
609         UvTcpListener {
610             watcher: watcher,
611             listening: false,
612             incoming_streams: Tube::new(),
613             home: home,
614         }
615     }
616
617     fn watcher(&self) -> TcpWatcher { self.watcher }
618 }
619
620 impl Drop for UvTcpListener {
621     fn drop(&self) {
622         // XXX need mutable finalizer
623         let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
624         do self_.home_for_io_with_sched |self_, scheduler| {
625             do scheduler.deschedule_running_task_and_then |_, task| {
626                 let task_cell = Cell::new(task);
627                 do self_.watcher().as_stream().close {
628                     let scheduler: ~Scheduler = Local::take();
629                     scheduler.resume_blocked_task_immediately(task_cell.take());
630                 }
631             }
632         }
633     }
634 }
635
636 impl RtioSocket for UvTcpListener {
637     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
638         do self.home_for_io |self_| {
639           socket_name(Tcp, self_.watcher)
640         }
641     }
642 }
643
644 impl RtioTcpListener for UvTcpListener {
645
646     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
647         do self.home_for_io |self_| {
648
649             if !self_.listening {
650                 self_.listening = true;
651
652                 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
653
654                 do self_.watcher().listen |mut server, status| {
655                     let stream = match status {
656                         Some(_) => Err(standard_error(OtherIoError)),
657                         None => {
658                             let client = TcpWatcher::new(&server.event_loop());
659                             // XXX: needs to be surfaced in interface
660                             server.accept(client.as_stream());
661                             let home = get_handle_to_current_scheduler!();
662                             Ok(~UvTcpStream { watcher: client, home: home })
663                         }
664                     };
665
666                     let mut incoming_streams = incoming_streams_cell.take();
667                     incoming_streams.send(stream);
668                     incoming_streams_cell.put_back(incoming_streams);
669                 }
670
671             }
672             self_.incoming_streams.recv()
673         }
674     }
675
676     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
677         do self.home_for_io |self_| {
678             let r = unsafe {
679                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
680             };
681
682             match status_to_maybe_uv_error(self_.watcher(), r) {
683                 Some(err) => Err(uv_error_to_io_error(err)),
684                 None => Ok(())
685             }
686         }
687     }
688
689     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
690         do self.home_for_io |self_| {
691             let r = unsafe {
692                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
693             };
694
695             match status_to_maybe_uv_error(self_.watcher(), r) {
696                 Some(err) => Err(uv_error_to_io_error(err)),
697                 None => Ok(())
698             }
699         }
700     }
701 }
702
703 pub struct UvTcpStream {
704     watcher: TcpWatcher,
705     home: SchedHandle,
706 }
707
708 impl HomingIO for UvTcpStream {
709     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
710 }
711
712 impl Drop for UvTcpStream {
713     fn drop(&self) {
714         // XXX need mutable finalizer
715         let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
716         do this.home_for_io_with_sched |self_, scheduler| {
717             do scheduler.deschedule_running_task_and_then |_, task| {
718                 let task_cell = Cell::new(task);
719                 do self_.watcher.as_stream().close {
720                     let scheduler: ~Scheduler = Local::take();
721                     scheduler.resume_blocked_task_immediately(task_cell.take());
722                 }
723             }
724         }
725     }
726 }
727
728 impl RtioSocket for UvTcpStream {
729     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
730         do self.home_for_io |self_| {
731             socket_name(Tcp, self_.watcher)
732         }
733     }
734 }
735
736 impl RtioTcpStream for UvTcpStream {
737     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
738         do self.home_for_io_with_sched |self_, scheduler| {
739             let result_cell = Cell::new_empty();
740             let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
741
742             let buf_ptr: *&mut [u8] = &buf;
743             do scheduler.deschedule_running_task_and_then |_sched, task| {
744                 let task_cell = Cell::new(task);
745                 // XXX: We shouldn't reallocate these callbacks every
746                 // call to read
747                 let alloc: AllocCallback = |_| unsafe {
748                     slice_to_uv_buf(*buf_ptr)
749                 };
750                 let mut watcher = self_.watcher.as_stream();
751                 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
752
753                     // Stop reading so that no read callbacks are
754                     // triggered before the user calls `read` again.
755                     // XXX: Is there a performance impact to calling
756                     // stop here?
757                     watcher.read_stop();
758
759                     let result = if status.is_none() {
760                         assert!(nread >= 0);
761                         Ok(nread as uint)
762                     } else {
763                         Err(uv_error_to_io_error(status.unwrap()))
764                     };
765
766                     unsafe { (*result_cell_ptr).put_back(result); }
767
768                     let scheduler: ~Scheduler = Local::take();
769                     scheduler.resume_blocked_task_immediately(task_cell.take());
770                 }
771             }
772
773             assert!(!result_cell.is_empty());
774             result_cell.take()
775         }
776     }
777
778     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
779         do self.home_for_io_with_sched |self_, scheduler| {
780             let result_cell = Cell::new_empty();
781             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
782             let buf_ptr: *&[u8] = &buf;
783             do scheduler.deschedule_running_task_and_then |_, task| {
784                 let task_cell = Cell::new(task);
785                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
786                 let mut watcher = self_.watcher.as_stream();
787                 do watcher.write(buf) |_watcher, status| {
788                     let result = if status.is_none() {
789                         Ok(())
790                     } else {
791                         Err(uv_error_to_io_error(status.unwrap()))
792                     };
793
794                     unsafe { (*result_cell_ptr).put_back(result); }
795
796                     let scheduler: ~Scheduler = Local::take();
797                     scheduler.resume_blocked_task_immediately(task_cell.take());
798                 }
799             }
800
801             assert!(!result_cell.is_empty());
802             result_cell.take()
803         }
804     }
805
806     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
807         do self.home_for_io |self_| {
808             socket_name(TcpPeer, self_.watcher)
809         }
810     }
811
812     fn control_congestion(&mut self) -> Result<(), IoError> {
813         do self.home_for_io |self_| {
814             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
815
816             match status_to_maybe_uv_error(self_.watcher, r) {
817                 Some(err) => Err(uv_error_to_io_error(err)),
818                 None => Ok(())
819             }
820         }
821     }
822
823     fn nodelay(&mut self) -> Result<(), IoError> {
824         do self.home_for_io |self_| {
825             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
826
827             match status_to_maybe_uv_error(self_.watcher, r) {
828                 Some(err) => Err(uv_error_to_io_error(err)),
829                 None => Ok(())
830             }
831         }
832     }
833
834     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
835         do self.home_for_io |self_| {
836             let r = unsafe {
837                 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
838                                     delay_in_seconds as c_uint)
839             };
840
841             match status_to_maybe_uv_error(self_.watcher, r) {
842                 Some(err) => Err(uv_error_to_io_error(err)),
843                 None => Ok(())
844             }
845         }
846     }
847
848     fn letdie(&mut self) -> Result<(), IoError> {
849         do self.home_for_io |self_| {
850             let r = unsafe {
851                 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
852             };
853
854             match status_to_maybe_uv_error(self_.watcher, r) {
855                 Some(err) => Err(uv_error_to_io_error(err)),
856                 None => Ok(())
857             }
858         }
859     }
860 }
861
862 pub struct UvUdpSocket {
863     watcher: UdpWatcher,
864     home: SchedHandle,
865 }
866
867 impl HomingIO for UvUdpSocket {
868     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
869 }
870
871 impl Drop for UvUdpSocket {
872     fn drop(&self) {
873         // XXX need mutable finalizer
874         let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
875         do this.home_for_io_with_sched |self_, scheduler| {
876             do scheduler.deschedule_running_task_and_then |_, task| {
877                 let task_cell = Cell::new(task);
878                 do self_.watcher.close {
879                     let scheduler: ~Scheduler = Local::take();
880                     scheduler.resume_blocked_task_immediately(task_cell.take());
881                 }
882             }
883         }
884     }
885 }
886
887 impl RtioSocket for UvUdpSocket {
888     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
889         do self.home_for_io |self_| {
890             socket_name(Udp, self_.watcher)
891         }
892     }
893 }
894
895 impl RtioUdpSocket for UvUdpSocket {
896     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
897         do self.home_for_io_with_sched |self_, scheduler| {
898             let result_cell = Cell::new_empty();
899             let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
900
901             let buf_ptr: *&mut [u8] = &buf;
902             do scheduler.deschedule_running_task_and_then |_, task| {
903                 let task_cell = Cell::new(task);
904                 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
905                 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
906                     let _ = flags; // /XXX add handling for partials?
907
908                     watcher.recv_stop();
909
910                     let result = match status {
911                         None => {
912                             assert!(nread >= 0);
913                             Ok((nread as uint, addr))
914                         }
915                         Some(err) => Err(uv_error_to_io_error(err)),
916                     };
917
918                     unsafe { (*result_cell_ptr).put_back(result); }
919
920                     let scheduler: ~Scheduler = Local::take();
921                     scheduler.resume_blocked_task_immediately(task_cell.take());
922                 }
923             }
924
925             assert!(!result_cell.is_empty());
926             result_cell.take()
927         }
928     }
929
930     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
931         do self.home_for_io_with_sched |self_, scheduler| {
932             let result_cell = Cell::new_empty();
933             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
934             let buf_ptr: *&[u8] = &buf;
935             do scheduler.deschedule_running_task_and_then |_, task| {
936                 let task_cell = Cell::new(task);
937                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
938                 do self_.watcher.send(buf, dst) |_watcher, status| {
939
940                     let result = match status {
941                         None => Ok(()),
942                         Some(err) => Err(uv_error_to_io_error(err)),
943                     };
944
945                     unsafe { (*result_cell_ptr).put_back(result); }
946
947                     let scheduler: ~Scheduler = Local::take();
948                     scheduler.resume_blocked_task_immediately(task_cell.take());
949                 }
950             }
951
952             assert!(!result_cell.is_empty());
953             result_cell.take()
954         }
955     }
956
957     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
958         do self.home_for_io |self_| {
959             let r = unsafe {
960                 do multi.to_str().with_c_str |m_addr| {
961                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
962                                              ptr::null(), uvll::UV_JOIN_GROUP)
963                 }
964             };
965
966             match status_to_maybe_uv_error(self_.watcher, r) {
967                 Some(err) => Err(uv_error_to_io_error(err)),
968                 None => Ok(())
969             }
970         }
971     }
972
973     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
974         do self.home_for_io |self_| {
975             let r = unsafe {
976                 do multi.to_str().with_c_str |m_addr| {
977                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
978                                              ptr::null(), uvll::UV_LEAVE_GROUP)
979                 }
980             };
981
982             match status_to_maybe_uv_error(self_.watcher, r) {
983                 Some(err) => Err(uv_error_to_io_error(err)),
984                 None => Ok(())
985             }
986         }
987     }
988
989     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
990         do self.home_for_io |self_| {
991
992             let r = unsafe {
993                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
994             };
995
996             match status_to_maybe_uv_error(self_.watcher, r) {
997                 Some(err) => Err(uv_error_to_io_error(err)),
998                 None => Ok(())
999             }
1000         }
1001     }
1002
1003     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1004         do self.home_for_io |self_| {
1005
1006             let r = unsafe {
1007                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1008             };
1009
1010             match status_to_maybe_uv_error(self_.watcher, r) {
1011                 Some(err) => Err(uv_error_to_io_error(err)),
1012                 None => Ok(())
1013             }
1014         }
1015     }
1016
1017     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1018         do self.home_for_io |self_| {
1019
1020             let r = unsafe {
1021                 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1022             };
1023
1024             match status_to_maybe_uv_error(self_.watcher, r) {
1025                 Some(err) => Err(uv_error_to_io_error(err)),
1026                 None => Ok(())
1027             }
1028         }
1029     }
1030
1031     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1032         do self.home_for_io |self_| {
1033
1034             let r = unsafe {
1035                 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1036             };
1037
1038             match status_to_maybe_uv_error(self_.watcher, r) {
1039                 Some(err) => Err(uv_error_to_io_error(err)),
1040                 None => Ok(())
1041             }
1042         }
1043     }
1044
1045     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1046         do self.home_for_io |self_| {
1047
1048             let r = unsafe {
1049                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1050             };
1051
1052             match status_to_maybe_uv_error(self_.watcher, r) {
1053                 Some(err) => Err(uv_error_to_io_error(err)),
1054                 None => Ok(())
1055             }
1056         }
1057     }
1058
1059     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1060         do self.home_for_io |self_| {
1061
1062             let r = unsafe {
1063                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1064             };
1065
1066             match status_to_maybe_uv_error(self_.watcher, r) {
1067                 Some(err) => Err(uv_error_to_io_error(err)),
1068                 None => Ok(())
1069             }
1070         }
1071     }
1072 }
1073
1074 pub struct UvTimer {
1075     watcher: timer::TimerWatcher,
1076     home: SchedHandle,
1077 }
1078
1079 impl HomingIO for UvTimer {
1080     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1081 }
1082
1083 impl UvTimer {
1084     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1085         UvTimer { watcher: w, home: home }
1086     }
1087 }
1088
1089 impl Drop for UvTimer {
1090     fn drop(&self) {
1091         let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1092         do self_.home_for_io_with_sched |self_, scheduler| {
1093             rtdebug!("closing UvTimer");
1094             do scheduler.deschedule_running_task_and_then |_, task| {
1095                 let task_cell = Cell::new(task);
1096                 do self_.watcher.close {
1097                     let scheduler: ~Scheduler = Local::take();
1098                     scheduler.resume_blocked_task_immediately(task_cell.take());
1099                 }
1100             }
1101         }
1102     }
1103 }
1104
1105 impl RtioTimer for UvTimer {
1106     fn sleep(&mut self, msecs: u64) {
1107         do self.home_for_io_with_sched |self_, scheduler| {
1108             do scheduler.deschedule_running_task_and_then |_sched, task| {
1109                 rtdebug!("sleep: entered scheduler context");
1110                 let task_cell = Cell::new(task);
1111                 do self_.watcher.start(msecs, 0) |_, status| {
1112                     assert!(status.is_none());
1113                     let scheduler: ~Scheduler = Local::take();
1114                     scheduler.resume_blocked_task_immediately(task_cell.take());
1115                 }
1116             }
1117             self_.watcher.stop();
1118         }
1119     }
1120 }
1121
1122 pub struct UvFileStream {
1123     loop_: Loop,
1124     fd: file::FileDescriptor,
1125     close_on_drop: bool,
1126     home: SchedHandle
1127 }
1128
1129 impl HomingIO for UvFileStream {
1130     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1131 }
1132
1133 impl UvFileStream {
1134     fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1135            home: SchedHandle) -> UvFileStream {
1136         UvFileStream {
1137             loop_: loop_,
1138             fd: fd,
1139             close_on_drop: close_on_drop,
1140             home: home
1141         }
1142     }
1143     fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1144         let result_cell = Cell::new_empty();
1145         let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1146         let buf_ptr: *&mut [u8] = &buf;
1147         do self.home_for_io_with_sched |self_, scheduler| {
1148             do scheduler.deschedule_running_task_and_then |_, task| {
1149                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1150                 let task_cell = Cell::new(task);
1151                 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1152                     let res = match uverr  {
1153                         None => Ok(req.get_result() as int),
1154                         Some(err) => Err(uv_error_to_io_error(err))
1155                     };
1156                     unsafe { (*result_cell_ptr).put_back(res); }
1157                     let scheduler: ~Scheduler = Local::take();
1158                     scheduler.resume_blocked_task_immediately(task_cell.take());
1159                 };
1160             };
1161         };
1162         result_cell.take()
1163     }
1164     fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1165         let result_cell = Cell::new_empty();
1166         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1167         let buf_ptr: *&[u8] = &buf;
1168         do self.home_for_io_with_sched |self_, scheduler| {
1169             do scheduler.deschedule_running_task_and_then |_, task| {
1170                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1171                 let task_cell = Cell::new(task);
1172                 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1173                     let res = match uverr  {
1174                         None => Ok(()),
1175                         Some(err) => Err(uv_error_to_io_error(err))
1176                     };
1177                     unsafe { (*result_cell_ptr).put_back(res); }
1178                     let scheduler: ~Scheduler = Local::take();
1179                     scheduler.resume_blocked_task_immediately(task_cell.take());
1180                 };
1181             };
1182         };
1183         result_cell.take()
1184     }
1185     fn seek_common(&mut self, pos: i64, whence: c_int) ->
1186         Result<u64, IoError>{
1187         #[fixed_stack_segment]; #[inline(never)];
1188         unsafe {
1189             match lseek((*self.fd), pos as off_t, whence) {
1190                 -1 => {
1191                     Err(IoError {
1192                         kind: OtherIoError,
1193                         desc: "Failed to lseek.",
1194                         detail: None
1195                     })
1196                 },
1197                 n => Ok(n as u64)
1198             }
1199         }
1200     }
1201 }
1202
1203 impl Drop for UvFileStream {
1204     fn drop(&self) {
1205         let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1206         if self.close_on_drop {
1207             do self_.home_for_io_with_sched |self_, scheduler| {
1208                 do scheduler.deschedule_running_task_and_then |_, task| {
1209                     let task_cell = Cell::new(task);
1210                     do self_.fd.close(&self.loop_) |_,_| {
1211                         let scheduler: ~Scheduler = Local::take();
1212                         scheduler.resume_blocked_task_immediately(task_cell.take());
1213                     };
1214                 };
1215             }
1216         }
1217     }
1218 }
1219
1220 impl RtioFileStream for UvFileStream {
1221     fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1222         self.base_read(buf, -1)
1223     }
1224     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1225         self.base_write(buf, -1)
1226     }
1227     fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1228         self.base_read(buf, offset as i64)
1229     }
1230     fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1231         self.base_write(buf, offset as i64)
1232     }
1233     fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1234         use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1235         let whence = match whence {
1236             SeekSet => SEEK_SET,
1237             SeekCur => SEEK_CUR,
1238             SeekEnd => SEEK_END
1239         };
1240         self.seek_common(pos, whence)
1241     }
1242     fn tell(&self) -> Result<u64, IoError> {
1243         use libc::SEEK_CUR;
1244         // this is temporary
1245         let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1246         self_.seek_common(0, SEEK_CUR)
1247     }
1248     fn flush(&mut self) -> Result<(), IoError> {
1249         Ok(())
1250     }
1251 }
1252
1253 #[test]
1254 fn test_simple_io_no_connect() {
1255     do run_in_newsched_task {
1256         unsafe {
1257             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1258             let addr = next_test_ip4();
1259             let maybe_chan = (*io).tcp_connect(addr);
1260             assert!(maybe_chan.is_err());
1261         }
1262     }
1263 }
1264
1265 #[test]
1266 fn test_simple_udp_io_bind_only() {
1267     do run_in_newsched_task {
1268         unsafe {
1269             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1270             let addr = next_test_ip4();
1271             let maybe_socket = (*io).udp_bind(addr);
1272             assert!(maybe_socket.is_ok());
1273         }
1274     }
1275 }
1276
1277 #[test]
1278 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1279     use rt::sleeper_list::SleeperList;
1280     use rt::work_queue::WorkQueue;
1281     use rt::thread::Thread;
1282     use rt::task::Task;
1283     use rt::sched::{Shutdown, TaskFromFriend};
1284     do run_in_bare_thread {
1285         let sleepers = SleeperList::new();
1286         let work_queue1 = WorkQueue::new();
1287         let work_queue2 = WorkQueue::new();
1288         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1289
1290         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1291                                          sleepers.clone());
1292         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1293                                          sleepers.clone());
1294
1295         let handle1 = Cell::new(sched1.make_handle());
1296         let handle2 = Cell::new(sched2.make_handle());
1297         let tasksFriendHandle = Cell::new(sched2.make_handle());
1298
1299         let on_exit: ~fn(bool) = |exit_status| {
1300             handle1.take().send(Shutdown);
1301             handle2.take().send(Shutdown);
1302             rtassert!(exit_status);
1303         };
1304
1305         let test_function: ~fn() = || {
1306             let io: *mut IoFactoryObject = unsafe {
1307                 Local::unsafe_borrow()
1308             };
1309             let addr = next_test_ip4();
1310             let maybe_socket = unsafe { (*io).udp_bind(addr) };
1311             // this socket is bound to this event loop
1312             assert!(maybe_socket.is_ok());
1313
1314             // block self on sched1
1315             do task::unkillable { // FIXME(#8674)
1316                 let scheduler: ~Scheduler = Local::take();
1317                 do scheduler.deschedule_running_task_and_then |_, task| {
1318                     // unblock task
1319                     do task.wake().map_move |task| {
1320                       // send self to sched2
1321                       tasksFriendHandle.take().send(TaskFromFriend(task));
1322                     };
1323                     // sched1 should now sleep since it has nothing else to do
1324                 }
1325             }
1326             // sched2 will wake up and get the task
1327             // as we do nothing else, the function ends and the socket goes out of scope
1328             // sched2 will start to run the destructor
1329             // the destructor will first block the task, set it's home as sched1, then enqueue it
1330             // sched2 will dequeue the task, see that it has a home, and send it to sched1
1331             // sched1 will wake up, exec the close function on the correct loop, and then we're done
1332         };
1333
1334         let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1335         main_task.death.on_exit = Some(on_exit);
1336         let main_task = Cell::new(main_task);
1337
1338         let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1339
1340         let sched1 = Cell::new(sched1);
1341         let sched2 = Cell::new(sched2);
1342
1343         let thread1 = do Thread::start {
1344             sched1.take().bootstrap(main_task.take());
1345         };
1346         let thread2 = do Thread::start {
1347             sched2.take().bootstrap(null_task.take());
1348         };
1349
1350         thread1.join();
1351         thread2.join();
1352     }
1353 }
1354
1355 #[test]
1356 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1357     use rt::sleeper_list::SleeperList;
1358     use rt::work_queue::WorkQueue;
1359     use rt::thread::Thread;
1360     use rt::task::Task;
1361     use rt::comm::oneshot;
1362     use rt::sched::Shutdown;
1363     do run_in_bare_thread {
1364         let sleepers = SleeperList::new();
1365         let work_queue1 = WorkQueue::new();
1366         let work_queue2 = WorkQueue::new();
1367         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1368
1369         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1370                                          sleepers.clone());
1371         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1372                                          sleepers.clone());
1373
1374         let handle1 = Cell::new(sched1.make_handle());
1375         let handle2 = Cell::new(sched2.make_handle());
1376
1377         let (port, chan) = oneshot();
1378         let port = Cell::new(port);
1379         let chan = Cell::new(chan);
1380
1381         let body1: ~fn() = || {
1382             let io: *mut IoFactoryObject = unsafe {
1383                 Local::unsafe_borrow()
1384             };
1385             let addr = next_test_ip4();
1386             let socket = unsafe { (*io).udp_bind(addr) };
1387             assert!(socket.is_ok());
1388             chan.take().send(socket);
1389         };
1390
1391         let body2: ~fn() = || {
1392             let socket = port.take().recv();
1393             assert!(socket.is_ok());
1394             /* The socket goes out of scope and the destructor is called.
1395              * The destructor:
1396              *  - sends itself back to sched1
1397              *  - frees the socket
1398              *  - resets the home of the task to whatever it was previously
1399              */
1400         };
1401
1402         let on_exit: ~fn(bool) = |exit| {
1403             handle1.take().send(Shutdown);
1404             handle2.take().send(Shutdown);
1405             rtassert!(exit);
1406         };
1407
1408         let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1409
1410         let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1411         task2.death.on_exit = Some(on_exit);
1412         let task2 = Cell::new(task2);
1413
1414         let sched1 = Cell::new(sched1);
1415         let sched2 = Cell::new(sched2);
1416
1417         let thread1 = do Thread::start {
1418             sched1.take().bootstrap(task1.take());
1419         };
1420         let thread2 = do Thread::start {
1421             sched2.take().bootstrap(task2.take());
1422         };
1423
1424         thread1.join();
1425         thread2.join();
1426     }
1427 }
1428
1429 #[test]
1430 fn test_simple_tcp_server_and_client() {
1431     do run_in_newsched_task {
1432         let addr = next_test_ip4();
1433
1434         // Start the server first so it's listening when we connect
1435         do spawntask {
1436             unsafe {
1437                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1438                 let mut listener = (*io).tcp_bind(addr).unwrap();
1439                 let mut stream = listener.accept().unwrap();
1440                 let mut buf = [0, .. 2048];
1441                 let nread = stream.read(buf).unwrap();
1442                 assert_eq!(nread, 8);
1443                 for i in range(0u, nread) {
1444                     rtdebug!("%u", buf[i] as uint);
1445                     assert_eq!(buf[i], i as u8);
1446                 }
1447             }
1448         }
1449
1450         do spawntask {
1451             unsafe {
1452                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1453                 let mut stream = (*io).tcp_connect(addr).unwrap();
1454                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1455             }
1456         }
1457     }
1458 }
1459
1460 #[test]
1461 fn test_simple_tcp_server_and_client_on_diff_threads() {
1462     use rt::sleeper_list::SleeperList;
1463     use rt::work_queue::WorkQueue;
1464     use rt::thread::Thread;
1465     use rt::task::Task;
1466     use rt::sched::{Shutdown};
1467     do run_in_bare_thread {
1468         let sleepers = SleeperList::new();
1469
1470         let server_addr = next_test_ip4();
1471         let client_addr = server_addr.clone();
1472
1473         let server_work_queue = WorkQueue::new();
1474         let client_work_queue = WorkQueue::new();
1475         let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1476
1477         let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1478                                                queues.clone(), sleepers.clone());
1479         let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1480                                                queues.clone(), sleepers.clone());
1481
1482         let server_handle = Cell::new(server_sched.make_handle());
1483         let client_handle = Cell::new(client_sched.make_handle());
1484
1485         let server_on_exit: ~fn(bool) = |exit_status| {
1486             server_handle.take().send(Shutdown);
1487             rtassert!(exit_status);
1488         };
1489
1490         let client_on_exit: ~fn(bool) = |exit_status| {
1491             client_handle.take().send(Shutdown);
1492             rtassert!(exit_status);
1493         };
1494
1495         let server_fn: ~fn() = || {
1496             let io: *mut IoFactoryObject = unsafe {
1497                 Local::unsafe_borrow()
1498             };
1499             let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1500             let mut stream = listener.accept().unwrap();
1501             let mut buf = [0, .. 2048];
1502             let nread = stream.read(buf).unwrap();
1503             assert_eq!(nread, 8);
1504             for i in range(0u, nread) {
1505                 assert_eq!(buf[i], i as u8);
1506             }
1507         };
1508
1509         let client_fn: ~fn() = || {
1510             let io: *mut IoFactoryObject = unsafe {
1511                 Local::unsafe_borrow()
1512             };
1513             let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1514             while stream.is_err() {
1515                 stream = unsafe { (*io).tcp_connect(client_addr) };
1516             }
1517             stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1518         };
1519
1520         let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1521         server_task.death.on_exit = Some(server_on_exit);
1522         let server_task = Cell::new(server_task);
1523
1524         let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1525         client_task.death.on_exit = Some(client_on_exit);
1526         let client_task = Cell::new(client_task);
1527
1528         let server_sched = Cell::new(server_sched);
1529         let client_sched = Cell::new(client_sched);
1530
1531         let server_thread = do Thread::start {
1532             server_sched.take().bootstrap(server_task.take());
1533         };
1534         let client_thread = do Thread::start {
1535             client_sched.take().bootstrap(client_task.take());
1536         };
1537
1538         server_thread.join();
1539         client_thread.join();
1540     }
1541 }
1542
1543 #[test]
1544 fn test_simple_udp_server_and_client() {
1545     do run_in_newsched_task {
1546         let server_addr = next_test_ip4();
1547         let client_addr = next_test_ip4();
1548
1549         do spawntask {
1550             unsafe {
1551                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1552                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1553                 let mut buf = [0, .. 2048];
1554                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1555                 assert_eq!(nread, 8);
1556                 for i in range(0u, nread) {
1557                     rtdebug!("%u", buf[i] as uint);
1558                     assert_eq!(buf[i], i as u8);
1559                 }
1560                 assert_eq!(src, client_addr);
1561             }
1562         }
1563
1564         do spawntask {
1565             unsafe {
1566                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1567                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1568                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1569             }
1570         }
1571     }
1572 }
1573
1574 #[test] #[ignore(reason = "busted")]
1575 fn test_read_and_block() {
1576     do run_in_newsched_task {
1577         let addr = next_test_ip4();
1578
1579         do spawntask {
1580             let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1581             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1582             let mut stream = listener.accept().unwrap();
1583             let mut buf = [0, .. 2048];
1584
1585             let expected = 32;
1586             let mut current = 0;
1587             let mut reads = 0;
1588
1589             while current < expected {
1590                 let nread = stream.read(buf).unwrap();
1591                 for i in range(0u, nread) {
1592                     let val = buf[i] as uint;
1593                     assert_eq!(val, current % 8);
1594                     current += 1;
1595                 }
1596                 reads += 1;
1597
1598                 do task::unkillable { // FIXME(#8674)
1599                     let scheduler: ~Scheduler = Local::take();
1600                     // Yield to the other task in hopes that it
1601                     // will trigger a read callback while we are
1602                     // not ready for it
1603                     do scheduler.deschedule_running_task_and_then |sched, task| {
1604                         let task = Cell::new(task);
1605                         sched.enqueue_blocked_task(task.take());
1606                     }
1607                 }
1608             }
1609
1610             // Make sure we had multiple reads
1611             assert!(reads > 1);
1612         }
1613
1614         do spawntask {
1615             unsafe {
1616                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1617                 let mut stream = (*io).tcp_connect(addr).unwrap();
1618                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1619                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1620                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1621                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1622             }
1623         }
1624
1625     }
1626 }
1627
1628 #[test]
1629 fn test_read_read_read() {
1630     do run_in_newsched_task {
1631         let addr = next_test_ip4();
1632         static MAX: uint = 500000;
1633
1634         do spawntask {
1635             unsafe {
1636                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1637                 let mut listener = (*io).tcp_bind(addr).unwrap();
1638                 let mut stream = listener.accept().unwrap();
1639                 let buf = [1, .. 2048];
1640                 let mut total_bytes_written = 0;
1641                 while total_bytes_written < MAX {
1642                     stream.write(buf);
1643                     total_bytes_written += buf.len();
1644                 }
1645             }
1646         }
1647
1648         do spawntask {
1649             unsafe {
1650                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1651                 let mut stream = (*io).tcp_connect(addr).unwrap();
1652                 let mut buf = [0, .. 2048];
1653                 let mut total_bytes_read = 0;
1654                 while total_bytes_read < MAX {
1655                     let nread = stream.read(buf).unwrap();
1656                     rtdebug!("read %u bytes", nread as uint);
1657                     total_bytes_read += nread;
1658                     for i in range(0u, nread) {
1659                         assert_eq!(buf[i], 1);
1660                     }
1661                 }
1662                 rtdebug!("read %u bytes total", total_bytes_read as uint);
1663             }
1664         }
1665     }
1666 }
1667
1668 #[test]
1669 fn test_udp_twice() {
1670     do run_in_newsched_task {
1671         let server_addr = next_test_ip4();
1672         let client_addr = next_test_ip4();
1673
1674         do spawntask {
1675             unsafe {
1676                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1677                 let mut client = (*io).udp_bind(client_addr).unwrap();
1678                 assert!(client.sendto([1], server_addr).is_ok());
1679                 assert!(client.sendto([2], server_addr).is_ok());
1680             }
1681         }
1682
1683         do spawntask {
1684             unsafe {
1685                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1686                 let mut server = (*io).udp_bind(server_addr).unwrap();
1687                 let mut buf1 = [0];
1688                 let mut buf2 = [0];
1689                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1690                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1691                 assert_eq!(nread1, 1);
1692                 assert_eq!(nread2, 1);
1693                 assert_eq!(src1, client_addr);
1694                 assert_eq!(src2, client_addr);
1695                 assert_eq!(buf1[0], 1);
1696                 assert_eq!(buf2[0], 2);
1697             }
1698         }
1699     }
1700 }
1701
1702 #[test]
1703 fn test_udp_many_read() {
1704     do run_in_newsched_task {
1705         let server_out_addr = next_test_ip4();
1706         let server_in_addr = next_test_ip4();
1707         let client_out_addr = next_test_ip4();
1708         let client_in_addr = next_test_ip4();
1709         static MAX: uint = 500_000;
1710
1711         do spawntask {
1712             unsafe {
1713                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1714                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1715                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1716                 let msg = [1, .. 2048];
1717                 let mut total_bytes_sent = 0;
1718                 let mut buf = [1];
1719                 while buf[0] == 1 {
1720                     // send more data
1721                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1722                     total_bytes_sent += msg.len();
1723                     // check if the client has received enough
1724                     let res = server_in.recvfrom(buf);
1725                     assert!(res.is_ok());
1726                     let (nread, src) = res.unwrap();
1727                     assert_eq!(nread, 1);
1728                     assert_eq!(src, client_out_addr);
1729                 }
1730                 assert!(total_bytes_sent >= MAX);
1731             }
1732         }
1733
1734         do spawntask {
1735             unsafe {
1736                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1737                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1738                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1739                 let mut total_bytes_recv = 0;
1740                 let mut buf = [0, .. 2048];
1741                 while total_bytes_recv < MAX {
1742                     // ask for more
1743                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1744                     // wait for data
1745                     let res = client_in.recvfrom(buf);
1746                     assert!(res.is_ok());
1747                     let (nread, src) = res.unwrap();
1748                     assert_eq!(src, server_out_addr);
1749                     total_bytes_recv += nread;
1750                     for i in range(0u, nread) {
1751                         assert_eq!(buf[i], 1);
1752                     }
1753                 }
1754                 // tell the server we're done
1755                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1756             }
1757         }
1758     }
1759 }
1760
1761 #[test]
1762 fn test_timer_sleep_simple() {
1763     do run_in_newsched_task {
1764         unsafe {
1765             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1766             let timer = (*io).timer_init();
1767             do timer.map_move |mut t| { t.sleep(1) };
1768         }
1769     }
1770 }
1771
1772 fn file_test_uvio_full_simple_impl() {
1773     use str::StrSlice; // why does this have to be explicitly imported to work?
1774                        // compiler was complaining about no trait for str that
1775                        // does .as_bytes() ..
1776     use path::Path;
1777     use rt::io::{Open, Create, ReadWrite, Read};
1778     unsafe {
1779         let io: *mut IoFactoryObject = Local::unsafe_borrow();
1780         let write_val = "hello uvio!";
1781         let path = "./tmp/file_test_uvio_full.txt";
1782         {
1783             let create_fm = Create;
1784             let create_fa = ReadWrite;
1785             let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1786             let write_buf = write_val.as_bytes();
1787             fd.write(write_buf);
1788         }
1789         {
1790             let ro_fm = Open;
1791             let ro_fa = Read;
1792             let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1793             let mut read_vec = [0, .. 1028];
1794             let nread = fd.read(read_vec).unwrap();
1795             let read_val = str::from_bytes(read_vec.slice(0, nread as uint));
1796             assert!(read_val == write_val.to_owned());
1797         }
1798         (*io).fs_unlink(&Path(path));
1799     }
1800 }
1801
1802 #[test]
1803 fn file_test_uvio_full_simple() {
1804     do run_in_newsched_task {
1805         file_test_uvio_full_simple_impl();
1806     }
1807 }
1808
1809 fn uvio_naive_print(input: &str) {
1810     use str::StrSlice;
1811     unsafe {
1812         use libc::{STDOUT_FILENO};
1813         let io: *mut IoFactoryObject = Local::unsafe_borrow();
1814         {
1815             let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
1816             let write_buf = input.as_bytes();
1817             fd.write(write_buf);
1818         }
1819     }
1820 }
1821
1822 #[test]
1823 fn file_test_uvio_write_to_stdout() {
1824     do run_in_newsched_task {
1825         uvio_naive_print("jubilation\n");
1826     }
1827 }