]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
auto merge of #8656 : toddaaro/rust/idle-opt+cleaning, r=brson
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use result::*;
21 use rt::io::IoError;
22 use rt::io::net::ip::{SocketAddr, IpAddr};
23 use rt::io::{standard_error, OtherIoError};
24 use rt::local::Local;
25 use rt::rtio::*;
26 use rt::sched::{Scheduler, SchedHandle};
27 use rt::tube::Tube;
28 use rt::uv::*;
29 use rt::uv::idle::IdleWatcher;
30 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
31 use unstable::sync::Exclusive;
32
33 #[cfg(test)] use container::Container;
34 #[cfg(test)] use unstable::run_in_bare_thread;
35 #[cfg(test)] use rt::test::{spawntask,
36                             next_test_ip4,
37                             run_in_newsched_task};
38 #[cfg(test)] use iterator::{Iterator, range};
39
40 // XXX we should not be calling uvll functions in here.
41
42 trait HomingIO {
43     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
44     /* XXX This will move pinned tasks to do IO on the proper scheduler
45      * and then move them back to their home.
46      */
47     fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
48         use rt::sched::{PinnedTask, TaskFromFriend};
49         // go home
50         let old_home = Cell::new_empty();
51         let old_home_ptr = &old_home;
52         let scheduler = Local::take::<Scheduler>();
53         do scheduler.deschedule_running_task_and_then |_, task| {
54             // get the old home first
55             do task.wake().map_move |mut task| {
56                 old_home_ptr.put_back(task.take_unwrap_home());
57                 self.home().send(PinnedTask(task));
58             };
59         }
60
61         // do IO
62         let a = io(self);
63
64         // unhome home
65         let scheduler = Local::take::<Scheduler>();
66         do scheduler.deschedule_running_task_and_then |scheduler, task| {
67             do task.wake().map_move |mut task| {
68                 task.give_home(old_home.take());
69                 scheduler.make_handle().send(TaskFromFriend(task));
70             };
71         }
72
73         // return the result of the IO
74         a
75     }
76 }
77
78 // get a handle for the current scheduler
79 macro_rules! get_handle_to_current_scheduler(
80     () => (do Local::borrow::<Scheduler, SchedHandle> |sched| { sched.make_handle() })
81 )
82
83 enum SocketNameKind {
84     TcpPeer,
85     Tcp,
86     Udp
87 }
88
89 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
90                                                  handle: U) -> Result<SocketAddr, IoError> {
91     let getsockname = match sk {
92         TcpPeer => uvll::tcp_getpeername,
93         Tcp     => uvll::tcp_getsockname,
94         Udp     => uvll::udp_getsockname,
95     };
96
97     // Allocate a sockaddr_storage
98     // since we don't know if it's ipv4 or ipv6
99     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
100
101     let r = unsafe {
102         getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
103     };
104
105     if r != 0 {
106         let status = status_to_maybe_uv_error(handle, r);
107         return Err(uv_error_to_io_error(status.unwrap()));
108     }
109
110     let addr = unsafe {
111         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
112             net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
113         } else {
114             net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
115         }
116     };
117
118     unsafe { uvll::free_sockaddr_storage(r_addr); }
119
120     Ok(addr)
121
122 }
123
124 // Obviously an Event Loop is always home.
125 pub struct UvEventLoop {
126     uvio: UvIoFactory
127 }
128
129 impl UvEventLoop {
130     pub fn new() -> UvEventLoop {
131         UvEventLoop {
132             uvio: UvIoFactory(Loop::new())
133         }
134     }
135 }
136
137 impl Drop for UvEventLoop {
138     fn drop(&self) {
139         // XXX: Need mutable finalizer
140         let this = unsafe {
141             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
142         };
143         this.uvio.uv_loop().close();
144     }
145 }
146
147 impl EventLoop for UvEventLoop {
148     fn run(&mut self) {
149         self.uvio.uv_loop().run();
150     }
151
152     fn callback(&mut self, f: ~fn()) {
153         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
154         do idle_watcher.start |mut idle_watcher, status| {
155             assert!(status.is_none());
156             idle_watcher.stop();
157             idle_watcher.close(||());
158             f();
159         }
160     }
161
162     fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
163         let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
164         return ~UvPausibleIdleCallback {
165             watcher: idle_watcher,
166             idle_flag: false,
167             closed: false
168         };
169     }
170
171     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
172         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
173         do timer.start(ms, 0) |timer, status| {
174             assert!(status.is_none());
175             timer.close(||());
176             f();
177         }
178     }
179
180     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
181         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
182     }
183
184     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
185         Some(&mut self.uvio)
186     }
187 }
188
189 pub struct UvPausibleIdleCallback {
190     watcher: IdleWatcher,
191     idle_flag: bool,
192     closed: bool
193 }
194
195 impl UvPausibleIdleCallback {
196     #[inline]
197     pub fn start(&mut self, f: ~fn()) {
198         do self.watcher.start |_idle_watcher, _status| {
199             f();
200         };
201         self.idle_flag = true;
202     }
203     #[inline]
204     pub fn pause(&mut self) {
205         if self.idle_flag == true {
206             self.watcher.stop();
207             self.idle_flag = false;
208         }
209     }
210     #[inline]
211     pub fn resume(&mut self) {
212         if self.idle_flag == false {
213             self.watcher.restart();
214             self.idle_flag = true;
215         }
216     }
217     #[inline]
218     pub fn close(&mut self) {
219         self.pause();
220         if !self.closed {
221             self.closed = true;
222             self.watcher.close(||{});
223         }
224     }
225 }
226
227 #[test]
228 fn test_callback_run_once() {
229     do run_in_bare_thread {
230         let mut event_loop = UvEventLoop::new();
231         let mut count = 0;
232         let count_ptr: *mut int = &mut count;
233         do event_loop.callback {
234             unsafe { *count_ptr += 1 }
235         }
236         event_loop.run();
237         assert_eq!(count, 1);
238     }
239 }
240
241 // The entire point of async is to call into a loop from other threads so it does not need to home.
242 pub struct UvRemoteCallback {
243     // The uv async handle for triggering the callback
244     async: AsyncWatcher,
245     // A flag to tell the callback to exit, set from the dtor. This is
246     // almost never contested - only in rare races with the dtor.
247     exit_flag: Exclusive<bool>
248 }
249
250 impl UvRemoteCallback {
251     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
252         let exit_flag = Exclusive::new(false);
253         let exit_flag_clone = exit_flag.clone();
254         let async = do AsyncWatcher::new(loop_) |watcher, status| {
255             assert!(status.is_none());
256
257             // The synchronization logic here is subtle. To review,
258             // the uv async handle type promises that, after it is
259             // triggered the remote callback is definitely called at
260             // least once. UvRemoteCallback needs to maintain those
261             // semantics while also shutting down cleanly from the
262             // dtor. In our case that means that, when the
263             // UvRemoteCallback dtor calls `async.send()`, here `f` is
264             // always called later.
265
266             // In the dtor both the exit flag is set and the async
267             // callback fired under a lock.  Here, before calling `f`,
268             // we take the lock and check the flag. Because we are
269             // checking the flag before calling `f`, and the flag is
270             // set under the same lock as the send, then if the flag
271             // is set then we're guaranteed to call `f` after the
272             // final send.
273
274             // If the check was done after `f()` then there would be a
275             // period between that call and the check where the dtor
276             // could be called in the other thread, missing the final
277             // callback while still destroying the handle.
278
279             let should_exit = unsafe {
280                 exit_flag_clone.with_imm(|&should_exit| should_exit)
281             };
282
283             f();
284
285             if should_exit {
286                 watcher.close(||());
287             }
288
289         };
290         UvRemoteCallback {
291             async: async,
292             exit_flag: exit_flag
293         }
294     }
295 }
296
297 impl RemoteCallback for UvRemoteCallback {
298     fn fire(&mut self) { self.async.send() }
299 }
300
301 impl Drop for UvRemoteCallback {
302     fn drop(&self) {
303         unsafe {
304             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
305             do this.exit_flag.with |should_exit| {
306                 // NB: These two things need to happen atomically. Otherwise
307                 // the event handler could wake up due to a *previous*
308                 // signal and see the exit flag, destroying the handle
309                 // before the final send.
310                 *should_exit = true;
311                 this.async.send();
312             }
313         }
314     }
315 }
316
317 #[cfg(test)]
318 mod test_remote {
319     use cell::Cell;
320     use rt::test::*;
321     use rt::thread::Thread;
322     use rt::tube::Tube;
323     use rt::rtio::EventLoop;
324     use rt::local::Local;
325     use rt::sched::Scheduler;
326
327     #[test]
328     fn test_uv_remote() {
329         do run_in_newsched_task {
330             let mut tube = Tube::new();
331             let tube_clone = tube.clone();
332             let remote_cell = Cell::new_empty();
333             do Local::borrow::<Scheduler, ()>() |sched| {
334                 let tube_clone = tube_clone.clone();
335                 let tube_clone_cell = Cell::new(tube_clone);
336                 let remote = do sched.event_loop.remote_callback {
337                     // This could be called multiple times
338                     if !tube_clone_cell.is_empty() {
339                         tube_clone_cell.take().send(1);
340                     }
341                 };
342                 remote_cell.put_back(remote);
343             }
344             let thread = do Thread::start {
345                 remote_cell.take().fire();
346             };
347
348             assert!(tube.recv() == 1);
349             thread.join();
350         }
351     }
352 }
353
354 pub struct UvIoFactory(Loop);
355
356 impl UvIoFactory {
357     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
358         match self { &UvIoFactory(ref mut ptr) => ptr }
359     }
360 }
361
362 impl IoFactory for UvIoFactory {
363     // Connect to an address and return a new stream
364     // NB: This blocks the task waiting on the connection.
365     // It would probably be better to return a future
366     fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
367         // Create a cell in the task to hold the result. We will fill
368         // the cell before resuming the task.
369         let result_cell = Cell::new_empty();
370         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
371
372         // Block this task and take ownership, switch to scheduler context
373         let scheduler = Local::take::<Scheduler>();
374         do scheduler.deschedule_running_task_and_then |_, task| {
375
376             let mut tcp = TcpWatcher::new(self.uv_loop());
377             let task_cell = Cell::new(task);
378
379             // Wait for a connection
380             do tcp.connect(addr) |stream, status| {
381                 match status {
382                     None => {
383                         let tcp = NativeHandle::from_native_handle(stream.native_handle());
384                         let home = get_handle_to_current_scheduler!();
385                         let res = Ok(~UvTcpStream { watcher: tcp, home: home });
386
387                         // Store the stream in the task's stack
388                         unsafe { (*result_cell_ptr).put_back(res); }
389
390                         // Context switch
391                         let scheduler = Local::take::<Scheduler>();
392                         scheduler.resume_blocked_task_immediately(task_cell.take());
393                     }
394                     Some(_) => {
395                         let task_cell = Cell::new(task_cell.take());
396                         do stream.close {
397                             let res = Err(uv_error_to_io_error(status.unwrap()));
398                             unsafe { (*result_cell_ptr).put_back(res); }
399                             let scheduler = Local::take::<Scheduler>();
400                             scheduler.resume_blocked_task_immediately(task_cell.take());
401                         }
402                     }
403                 }
404             }
405         }
406
407         assert!(!result_cell.is_empty());
408         return result_cell.take();
409     }
410
411     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
412         let mut watcher = TcpWatcher::new(self.uv_loop());
413         match watcher.bind(addr) {
414             Ok(_) => {
415                 let home = get_handle_to_current_scheduler!();
416                 Ok(~UvTcpListener::new(watcher, home))
417             }
418             Err(uverr) => {
419                 let scheduler = Local::take::<Scheduler>();
420                 do scheduler.deschedule_running_task_and_then |_, task| {
421                     let task_cell = Cell::new(task);
422                     do watcher.as_stream().close {
423                         let scheduler = Local::take::<Scheduler>();
424                         scheduler.resume_blocked_task_immediately(task_cell.take());
425                     }
426                 }
427                 Err(uv_error_to_io_error(uverr))
428             }
429         }
430     }
431
432     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
433         let mut watcher = UdpWatcher::new(self.uv_loop());
434         match watcher.bind(addr) {
435             Ok(_) => {
436                 let home = get_handle_to_current_scheduler!();
437                 Ok(~UvUdpSocket { watcher: watcher, home: home })
438             }
439             Err(uverr) => {
440                 let scheduler = Local::take::<Scheduler>();
441                 do scheduler.deschedule_running_task_and_then |_, task| {
442                     let task_cell = Cell::new(task);
443                     do watcher.close {
444                         let scheduler = Local::take::<Scheduler>();
445                         scheduler.resume_blocked_task_immediately(task_cell.take());
446                     }
447                 }
448                 Err(uv_error_to_io_error(uverr))
449             }
450         }
451     }
452
453     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
454         let watcher = TimerWatcher::new(self.uv_loop());
455         let home = get_handle_to_current_scheduler!();
456         Ok(~UvTimer::new(watcher, home))
457     }
458 }
459
460 pub struct UvTcpListener {
461     watcher: TcpWatcher,
462     listening: bool,
463     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
464     home: SchedHandle,
465 }
466
467 impl HomingIO for UvTcpListener {
468     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
469 }
470
471 impl UvTcpListener {
472     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
473         UvTcpListener {
474             watcher: watcher,
475             listening: false,
476             incoming_streams: Tube::new(),
477             home: home,
478         }
479     }
480
481     fn watcher(&self) -> TcpWatcher { self.watcher }
482 }
483
484 impl Drop for UvTcpListener {
485     fn drop(&self) {
486         // XXX need mutable finalizer
487         let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
488         do self_.home_for_io |self_| {
489             let scheduler = Local::take::<Scheduler>();
490             do scheduler.deschedule_running_task_and_then |_, task| {
491                 let task_cell = Cell::new(task);
492                 do self_.watcher().as_stream().close {
493                     let scheduler = Local::take::<Scheduler>();
494                     scheduler.resume_blocked_task_immediately(task_cell.take());
495                 }
496             }
497         }
498     }
499 }
500
501 impl RtioSocket for UvTcpListener {
502     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
503         do self.home_for_io |self_| {
504           socket_name(Tcp, self_.watcher)
505         }
506     }
507 }
508
509 impl RtioTcpListener for UvTcpListener {
510
511     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
512         do self.home_for_io |self_| {
513
514             if !self_.listening {
515                 self_.listening = true;
516
517                 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
518
519                 do self_.watcher().listen |mut server, status| {
520                     let stream = match status {
521                         Some(_) => Err(standard_error(OtherIoError)),
522                         None => {
523                             let client = TcpWatcher::new(&server.event_loop());
524                             // XXX: needs to be surfaced in interface
525                             server.accept(client.as_stream());
526                             let home = get_handle_to_current_scheduler!();
527                             Ok(~UvTcpStream { watcher: client, home: home })
528                         }
529                     };
530
531                     let mut incoming_streams = incoming_streams_cell.take();
532                     incoming_streams.send(stream);
533                     incoming_streams_cell.put_back(incoming_streams);
534                 }
535
536             }
537             self_.incoming_streams.recv()
538         }
539     }
540
541     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
542         do self.home_for_io |self_| {
543             let r = unsafe {
544                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
545             };
546
547             match status_to_maybe_uv_error(self_.watcher(), r) {
548                 Some(err) => Err(uv_error_to_io_error(err)),
549                 None => Ok(())
550             }
551         }
552     }
553
554     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
555         do self.home_for_io |self_| {
556             let r = unsafe {
557                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
558             };
559
560             match status_to_maybe_uv_error(self_.watcher(), r) {
561                 Some(err) => Err(uv_error_to_io_error(err)),
562                 None => Ok(())
563             }
564         }
565     }
566 }
567
568 pub struct UvTcpStream {
569     watcher: TcpWatcher,
570     home: SchedHandle,
571 }
572
573 impl HomingIO for UvTcpStream {
574     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
575 }
576
577 impl Drop for UvTcpStream {
578     fn drop(&self) {
579         // XXX need mutable finalizer
580         let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
581         do this.home_for_io |self_| {
582             let scheduler = Local::take::<Scheduler>();
583             do scheduler.deschedule_running_task_and_then |_, task| {
584                 let task_cell = Cell::new(task);
585                 do self_.watcher.as_stream().close {
586                     let scheduler = Local::take::<Scheduler>();
587                     scheduler.resume_blocked_task_immediately(task_cell.take());
588                 }
589             }
590         }
591     }
592 }
593
594 impl RtioSocket for UvTcpStream {
595     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
596         do self.home_for_io |self_| {
597             socket_name(Tcp, self_.watcher)
598         }
599     }
600 }
601
602 impl RtioTcpStream for UvTcpStream {
603     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
604         do self.home_for_io |self_| {
605             let result_cell = Cell::new_empty();
606             let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
607
608             let scheduler = Local::take::<Scheduler>();
609             let buf_ptr: *&mut [u8] = &buf;
610             do scheduler.deschedule_running_task_and_then |_sched, task| {
611                 let task_cell = Cell::new(task);
612                 // XXX: We shouldn't reallocate these callbacks every
613                 // call to read
614                 let alloc: AllocCallback = |_| unsafe {
615                     slice_to_uv_buf(*buf_ptr)
616                 };
617                 let mut watcher = self_.watcher.as_stream();
618                 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
619
620                     // Stop reading so that no read callbacks are
621                     // triggered before the user calls `read` again.
622                     // XXX: Is there a performance impact to calling
623                     // stop here?
624                     watcher.read_stop();
625
626                     let result = if status.is_none() {
627                         assert!(nread >= 0);
628                         Ok(nread as uint)
629                     } else {
630                         Err(uv_error_to_io_error(status.unwrap()))
631                     };
632
633                     unsafe { (*result_cell_ptr).put_back(result); }
634
635                     let scheduler = Local::take::<Scheduler>();
636                     scheduler.resume_blocked_task_immediately(task_cell.take());
637                 }
638             }
639
640             assert!(!result_cell.is_empty());
641             result_cell.take()
642         }
643     }
644
645     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
646         do self.home_for_io |self_| {
647             let result_cell = Cell::new_empty();
648             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
649             let scheduler = Local::take::<Scheduler>();
650             let buf_ptr: *&[u8] = &buf;
651             do scheduler.deschedule_running_task_and_then |_, task| {
652                 let task_cell = Cell::new(task);
653                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
654                 let mut watcher = self_.watcher.as_stream();
655                 do watcher.write(buf) |_watcher, status| {
656                     let result = if status.is_none() {
657                         Ok(())
658                     } else {
659                         Err(uv_error_to_io_error(status.unwrap()))
660                     };
661
662                     unsafe { (*result_cell_ptr).put_back(result); }
663
664                     let scheduler = Local::take::<Scheduler>();
665                     scheduler.resume_blocked_task_immediately(task_cell.take());
666                 }
667             }
668
669             assert!(!result_cell.is_empty());
670             result_cell.take()
671         }
672     }
673
674     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
675         do self.home_for_io |self_| {
676             socket_name(TcpPeer, self_.watcher)
677         }
678     }
679
680     fn control_congestion(&mut self) -> Result<(), IoError> {
681         do self.home_for_io |self_| {
682             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
683
684             match status_to_maybe_uv_error(self_.watcher, r) {
685                 Some(err) => Err(uv_error_to_io_error(err)),
686                 None => Ok(())
687             }
688         }
689     }
690
691     fn nodelay(&mut self) -> Result<(), IoError> {
692         do self.home_for_io |self_| {
693             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
694
695             match status_to_maybe_uv_error(self_.watcher, r) {
696                 Some(err) => Err(uv_error_to_io_error(err)),
697                 None => Ok(())
698             }
699         }
700     }
701
702     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
703         do self.home_for_io |self_| {
704             let r = unsafe {
705                 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
706                                     delay_in_seconds as c_uint)
707             };
708
709             match status_to_maybe_uv_error(self_.watcher, r) {
710                 Some(err) => Err(uv_error_to_io_error(err)),
711                 None => Ok(())
712             }
713         }
714     }
715
716     fn letdie(&mut self) -> Result<(), IoError> {
717         do self.home_for_io |self_| {
718             let r = unsafe {
719                 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
720             };
721
722             match status_to_maybe_uv_error(self_.watcher, r) {
723                 Some(err) => Err(uv_error_to_io_error(err)),
724                 None => Ok(())
725             }
726         }
727     }
728 }
729
730 pub struct UvUdpSocket {
731     watcher: UdpWatcher,
732     home: SchedHandle,
733 }
734
735 impl HomingIO for UvUdpSocket {
736     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
737 }
738
739 impl Drop for UvUdpSocket {
740     fn drop(&self) {
741         // XXX need mutable finalizer
742         let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
743         do this.home_for_io |_| {
744             let scheduler = Local::take::<Scheduler>();
745             do scheduler.deschedule_running_task_and_then |_, task| {
746                 let task_cell = Cell::new(task);
747                 do this.watcher.close {
748                     let scheduler = Local::take::<Scheduler>();
749                     scheduler.resume_blocked_task_immediately(task_cell.take());
750                 }
751             }
752         }
753     }
754 }
755
756 impl RtioSocket for UvUdpSocket {
757     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
758         do self.home_for_io |self_| {
759             socket_name(Udp, self_.watcher)
760         }
761     }
762 }
763
764 impl RtioUdpSocket for UvUdpSocket {
765     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
766         do self.home_for_io |self_| {
767             let result_cell = Cell::new_empty();
768             let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
769
770             let scheduler = Local::take::<Scheduler>();
771             let buf_ptr: *&mut [u8] = &buf;
772             do scheduler.deschedule_running_task_and_then |_, task| {
773                 let task_cell = Cell::new(task);
774                 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
775                 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
776                     let _ = flags; // /XXX add handling for partials?
777
778                     watcher.recv_stop();
779
780                     let result = match status {
781                         None => {
782                             assert!(nread >= 0);
783                             Ok((nread as uint, addr))
784                         }
785                         Some(err) => Err(uv_error_to_io_error(err)),
786                     };
787
788                     unsafe { (*result_cell_ptr).put_back(result); }
789
790                     let scheduler = Local::take::<Scheduler>();
791                     scheduler.resume_blocked_task_immediately(task_cell.take());
792                 }
793             }
794
795             assert!(!result_cell.is_empty());
796             result_cell.take()
797         }
798     }
799
800     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
801         do self.home_for_io |self_| {
802             let result_cell = Cell::new_empty();
803             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
804             let scheduler = Local::take::<Scheduler>();
805             let buf_ptr: *&[u8] = &buf;
806             do scheduler.deschedule_running_task_and_then |_, task| {
807                 let task_cell = Cell::new(task);
808                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
809                 do self_.watcher.send(buf, dst) |_watcher, status| {
810
811                     let result = match status {
812                         None => Ok(()),
813                         Some(err) => Err(uv_error_to_io_error(err)),
814                     };
815
816                     unsafe { (*result_cell_ptr).put_back(result); }
817
818                     let scheduler = Local::take::<Scheduler>();
819                     scheduler.resume_blocked_task_immediately(task_cell.take());
820                 }
821             }
822
823             assert!(!result_cell.is_empty());
824             result_cell.take()
825         }
826     }
827
828     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
829         do self.home_for_io |self_| {
830             let r = unsafe {
831                 do multi.to_str().with_c_str |m_addr| {
832                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
833                                              ptr::null(), uvll::UV_JOIN_GROUP)
834                 }
835             };
836
837             match status_to_maybe_uv_error(self_.watcher, r) {
838                 Some(err) => Err(uv_error_to_io_error(err)),
839                 None => Ok(())
840             }
841         }
842     }
843
844     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
845         do self.home_for_io |self_| {
846             let r = unsafe {
847                 do multi.to_str().with_c_str |m_addr| {
848                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
849                                              ptr::null(), uvll::UV_LEAVE_GROUP)
850                 }
851             };
852
853             match status_to_maybe_uv_error(self_.watcher, r) {
854                 Some(err) => Err(uv_error_to_io_error(err)),
855                 None => Ok(())
856             }
857         }
858     }
859
860     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
861         do self.home_for_io |self_| {
862
863             let r = unsafe {
864                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
865             };
866
867             match status_to_maybe_uv_error(self_.watcher, r) {
868                 Some(err) => Err(uv_error_to_io_error(err)),
869                 None => Ok(())
870             }
871         }
872     }
873
874     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
875         do self.home_for_io |self_| {
876
877             let r = unsafe {
878                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
879             };
880
881             match status_to_maybe_uv_error(self_.watcher, r) {
882                 Some(err) => Err(uv_error_to_io_error(err)),
883                 None => Ok(())
884             }
885         }
886     }
887
888     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
889         do self.home_for_io |self_| {
890
891             let r = unsafe {
892                 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
893             };
894
895             match status_to_maybe_uv_error(self_.watcher, r) {
896                 Some(err) => Err(uv_error_to_io_error(err)),
897                 None => Ok(())
898             }
899         }
900     }
901
902     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
903         do self.home_for_io |self_| {
904
905             let r = unsafe {
906                 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
907             };
908
909             match status_to_maybe_uv_error(self_.watcher, r) {
910                 Some(err) => Err(uv_error_to_io_error(err)),
911                 None => Ok(())
912             }
913         }
914     }
915
916     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
917         do self.home_for_io |self_| {
918
919             let r = unsafe {
920                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
921             };
922
923             match status_to_maybe_uv_error(self_.watcher, r) {
924                 Some(err) => Err(uv_error_to_io_error(err)),
925                 None => Ok(())
926             }
927         }
928     }
929
930     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
931         do self.home_for_io |self_| {
932
933             let r = unsafe {
934                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
935             };
936
937             match status_to_maybe_uv_error(self_.watcher, r) {
938                 Some(err) => Err(uv_error_to_io_error(err)),
939                 None => Ok(())
940             }
941         }
942     }
943 }
944
945 pub struct UvTimer {
946     watcher: timer::TimerWatcher,
947     home: SchedHandle,
948 }
949
950 impl HomingIO for UvTimer {
951     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
952 }
953
954 impl UvTimer {
955     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
956         UvTimer { watcher: w, home: home }
957     }
958 }
959
960 impl Drop for UvTimer {
961     fn drop(&self) {
962         let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
963         do self_.home_for_io |self_| {
964             rtdebug!("closing UvTimer");
965             let scheduler = Local::take::<Scheduler>();
966             do scheduler.deschedule_running_task_and_then |_, task| {
967                 let task_cell = Cell::new(task);
968                 do self_.watcher.close {
969                     let scheduler = Local::take::<Scheduler>();
970                     scheduler.resume_blocked_task_immediately(task_cell.take());
971                 }
972             }
973         }
974     }
975 }
976
977 impl RtioTimer for UvTimer {
978     fn sleep(&mut self, msecs: u64) {
979         do self.home_for_io |self_| {
980             let scheduler = Local::take::<Scheduler>();
981             do scheduler.deschedule_running_task_and_then |_sched, task| {
982                 rtdebug!("sleep: entered scheduler context");
983                 let task_cell = Cell::new(task);
984                 do self_.watcher.start(msecs, 0) |_, status| {
985                     assert!(status.is_none());
986                     let scheduler = Local::take::<Scheduler>();
987                     scheduler.resume_blocked_task_immediately(task_cell.take());
988                 }
989             }
990             self_.watcher.stop();
991         }
992     }
993 }
994
995 #[test]
996 fn test_simple_io_no_connect() {
997     do run_in_newsched_task {
998         unsafe {
999             let io = Local::unsafe_borrow::<IoFactoryObject>();
1000             let addr = next_test_ip4();
1001             let maybe_chan = (*io).tcp_connect(addr);
1002             assert!(maybe_chan.is_err());
1003         }
1004     }
1005 }
1006
1007 #[test]
1008 fn test_simple_udp_io_bind_only() {
1009     do run_in_newsched_task {
1010         unsafe {
1011             let io = Local::unsafe_borrow::<IoFactoryObject>();
1012             let addr = next_test_ip4();
1013             let maybe_socket = (*io).udp_bind(addr);
1014             assert!(maybe_socket.is_ok());
1015         }
1016     }
1017 }
1018
1019 #[test]
1020 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1021     use rt::sleeper_list::SleeperList;
1022     use rt::work_queue::WorkQueue;
1023     use rt::thread::Thread;
1024     use rt::task::Task;
1025     use rt::sched::{Shutdown, TaskFromFriend};
1026     do run_in_bare_thread {
1027         let sleepers = SleeperList::new();
1028         let work_queue1 = WorkQueue::new();
1029         let work_queue2 = WorkQueue::new();
1030         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1031
1032         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1033                                          sleepers.clone());
1034         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1035                                          sleepers.clone());
1036
1037         let handle1 = Cell::new(sched1.make_handle());
1038         let handle2 = Cell::new(sched2.make_handle());
1039         let tasksFriendHandle = Cell::new(sched2.make_handle());
1040
1041         let on_exit: ~fn(bool) = |exit_status| {
1042             handle1.take().send(Shutdown);
1043             handle2.take().send(Shutdown);
1044             rtassert!(exit_status);
1045         };
1046
1047         let test_function: ~fn() = || {
1048             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1049             let addr = next_test_ip4();
1050             let maybe_socket = unsafe { (*io).udp_bind(addr) };
1051             // this socket is bound to this event loop
1052             assert!(maybe_socket.is_ok());
1053
1054             // block self on sched1
1055             let scheduler = Local::take::<Scheduler>();
1056             do scheduler.deschedule_running_task_and_then |_, task| {
1057                 // unblock task
1058                 do task.wake().map_move |task| {
1059                   // send self to sched2
1060                   tasksFriendHandle.take().send(TaskFromFriend(task));
1061                 };
1062                 // sched1 should now sleep since it has nothing else to do
1063             }
1064             // sched2 will wake up and get the task
1065             // as we do nothing else, the function ends and the socket goes out of scope
1066             // sched2 will start to run the destructor
1067             // the destructor will first block the task, set it's home as sched1, then enqueue it
1068             // sched2 will dequeue the task, see that it has a home, and send it to sched1
1069             // sched1 will wake up, exec the close function on the correct loop, and then we're done
1070         };
1071
1072         let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1073         main_task.death.on_exit = Some(on_exit);
1074         let main_task = Cell::new(main_task);
1075
1076         let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1077
1078         let sched1 = Cell::new(sched1);
1079         let sched2 = Cell::new(sched2);
1080
1081         let thread1 = do Thread::start {
1082             sched1.take().bootstrap(main_task.take());
1083         };
1084         let thread2 = do Thread::start {
1085             sched2.take().bootstrap(null_task.take());
1086         };
1087
1088         thread1.join();
1089         thread2.join();
1090     }
1091 }
1092
1093 #[test]
1094 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1095     use rt::sleeper_list::SleeperList;
1096     use rt::work_queue::WorkQueue;
1097     use rt::thread::Thread;
1098     use rt::task::Task;
1099     use rt::comm::oneshot;
1100     use rt::sched::Shutdown;
1101     do run_in_bare_thread {
1102         let sleepers = SleeperList::new();
1103         let work_queue1 = WorkQueue::new();
1104         let work_queue2 = WorkQueue::new();
1105         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1106
1107         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1108                                          sleepers.clone());
1109         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1110                                          sleepers.clone());
1111
1112         let handle1 = Cell::new(sched1.make_handle());
1113         let handle2 = Cell::new(sched2.make_handle());
1114
1115         let (port, chan) = oneshot();
1116         let port = Cell::new(port);
1117         let chan = Cell::new(chan);
1118
1119         let body1: ~fn() = || {
1120             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1121             let addr = next_test_ip4();
1122             let socket = unsafe { (*io).udp_bind(addr) };
1123             assert!(socket.is_ok());
1124             chan.take().send(socket);
1125         };
1126
1127         let body2: ~fn() = || {
1128             let socket = port.take().recv();
1129             assert!(socket.is_ok());
1130             /* The socket goes out of scope and the destructor is called.
1131              * The destructor:
1132              *  - sends itself back to sched1
1133              *  - frees the socket
1134              *  - resets the home of the task to whatever it was previously
1135              */
1136         };
1137
1138         let on_exit: ~fn(bool) = |exit| {
1139             handle1.take().send(Shutdown);
1140             handle2.take().send(Shutdown);
1141             rtassert!(exit);
1142         };
1143
1144         let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1145
1146         let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1147         task2.death.on_exit = Some(on_exit);
1148         let task2 = Cell::new(task2);
1149
1150         let sched1 = Cell::new(sched1);
1151         let sched2 = Cell::new(sched2);
1152
1153         let thread1 = do Thread::start {
1154             sched1.take().bootstrap(task1.take());
1155         };
1156         let thread2 = do Thread::start {
1157             sched2.take().bootstrap(task2.take());
1158         };
1159
1160         thread1.join();
1161         thread2.join();
1162     }
1163 }
1164
1165 #[test]
1166 fn test_simple_tcp_server_and_client() {
1167     do run_in_newsched_task {
1168         let addr = next_test_ip4();
1169
1170         // Start the server first so it's listening when we connect
1171         do spawntask {
1172             unsafe {
1173                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1174                 let mut listener = (*io).tcp_bind(addr).unwrap();
1175                 let mut stream = listener.accept().unwrap();
1176                 let mut buf = [0, .. 2048];
1177                 let nread = stream.read(buf).unwrap();
1178                 assert_eq!(nread, 8);
1179                 for i in range(0u, nread) {
1180                     rtdebug!("%u", buf[i] as uint);
1181                     assert_eq!(buf[i], i as u8);
1182                 }
1183             }
1184         }
1185
1186         do spawntask {
1187             unsafe {
1188                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1189                 let mut stream = (*io).tcp_connect(addr).unwrap();
1190                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1191             }
1192         }
1193     }
1194 }
1195
1196 #[test]
1197 fn test_simple_tcp_server_and_client_on_diff_threads() {
1198     use rt::sleeper_list::SleeperList;
1199     use rt::work_queue::WorkQueue;
1200     use rt::thread::Thread;
1201     use rt::task::Task;
1202     use rt::sched::{Shutdown};
1203     do run_in_bare_thread {
1204         let sleepers = SleeperList::new();
1205
1206         let server_addr = next_test_ip4();
1207         let client_addr = server_addr.clone();
1208
1209         let server_work_queue = WorkQueue::new();
1210         let client_work_queue = WorkQueue::new();
1211         let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1212
1213         let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1214                                                queues.clone(), sleepers.clone());
1215         let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1216                                                queues.clone(), sleepers.clone());
1217
1218         let server_handle = Cell::new(server_sched.make_handle());
1219         let client_handle = Cell::new(client_sched.make_handle());
1220
1221         let server_on_exit: ~fn(bool) = |exit_status| {
1222             server_handle.take().send(Shutdown);
1223             rtassert!(exit_status);
1224         };
1225
1226         let client_on_exit: ~fn(bool) = |exit_status| {
1227             client_handle.take().send(Shutdown);
1228             rtassert!(exit_status);
1229         };
1230
1231         let server_fn: ~fn() = || {
1232             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1233             let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1234             let mut stream = listener.accept().unwrap();
1235             let mut buf = [0, .. 2048];
1236             let nread = stream.read(buf).unwrap();
1237             assert_eq!(nread, 8);
1238             for i in range(0u, nread) {
1239                 assert_eq!(buf[i], i as u8);
1240             }
1241         };
1242
1243         let client_fn: ~fn() = || {
1244             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1245             let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1246             while stream.is_err() {
1247                 stream = unsafe { (*io).tcp_connect(client_addr) };
1248             }
1249             stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1250         };
1251
1252         let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1253         server_task.death.on_exit = Some(server_on_exit);
1254         let server_task = Cell::new(server_task);
1255
1256         let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1257         client_task.death.on_exit = Some(client_on_exit);
1258         let client_task = Cell::new(client_task);
1259
1260         let server_sched = Cell::new(server_sched);
1261         let client_sched = Cell::new(client_sched);
1262
1263         let server_thread = do Thread::start {
1264             server_sched.take().bootstrap(server_task.take());
1265         };
1266         let client_thread = do Thread::start {
1267             client_sched.take().bootstrap(client_task.take());
1268         };
1269
1270         server_thread.join();
1271         client_thread.join();
1272     }
1273 }
1274
1275 #[test]
1276 fn test_simple_udp_server_and_client() {
1277     do run_in_newsched_task {
1278         let server_addr = next_test_ip4();
1279         let client_addr = next_test_ip4();
1280
1281         do spawntask {
1282             unsafe {
1283                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1284                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1285                 let mut buf = [0, .. 2048];
1286                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1287                 assert_eq!(nread, 8);
1288                 for i in range(0u, nread) {
1289                     rtdebug!("%u", buf[i] as uint);
1290                     assert_eq!(buf[i], i as u8);
1291                 }
1292                 assert_eq!(src, client_addr);
1293             }
1294         }
1295
1296         do spawntask {
1297             unsafe {
1298                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1299                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1300                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1301             }
1302         }
1303     }
1304 }
1305
1306 #[test] #[ignore(reason = "busted")]
1307 fn test_read_and_block() {
1308     do run_in_newsched_task {
1309         let addr = next_test_ip4();
1310
1311         do spawntask {
1312             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1313             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1314             let mut stream = listener.accept().unwrap();
1315             let mut buf = [0, .. 2048];
1316
1317             let expected = 32;
1318             let mut current = 0;
1319             let mut reads = 0;
1320
1321             while current < expected {
1322                 let nread = stream.read(buf).unwrap();
1323                 for i in range(0u, nread) {
1324                     let val = buf[i] as uint;
1325                     assert_eq!(val, current % 8);
1326                     current += 1;
1327                 }
1328                 reads += 1;
1329
1330                 let scheduler = Local::take::<Scheduler>();
1331                 // Yield to the other task in hopes that it
1332                 // will trigger a read callback while we are
1333                 // not ready for it
1334                 do scheduler.deschedule_running_task_and_then |sched, task| {
1335                     let task = Cell::new(task);
1336                     sched.enqueue_blocked_task(task.take());
1337                 }
1338             }
1339
1340             // Make sure we had multiple reads
1341             assert!(reads > 1);
1342         }
1343
1344         do spawntask {
1345             unsafe {
1346                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1347                 let mut stream = (*io).tcp_connect(addr).unwrap();
1348                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1349                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1350                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1351                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1352             }
1353         }
1354
1355     }
1356 }
1357
1358 #[test]
1359 fn test_read_read_read() {
1360     do run_in_newsched_task {
1361         let addr = next_test_ip4();
1362         static MAX: uint = 500000;
1363
1364         do spawntask {
1365             unsafe {
1366                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1367                 let mut listener = (*io).tcp_bind(addr).unwrap();
1368                 let mut stream = listener.accept().unwrap();
1369                 let buf = [1, .. 2048];
1370                 let mut total_bytes_written = 0;
1371                 while total_bytes_written < MAX {
1372                     stream.write(buf);
1373                     total_bytes_written += buf.len();
1374                 }
1375             }
1376         }
1377
1378         do spawntask {
1379             unsafe {
1380                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1381                 let mut stream = (*io).tcp_connect(addr).unwrap();
1382                 let mut buf = [0, .. 2048];
1383                 let mut total_bytes_read = 0;
1384                 while total_bytes_read < MAX {
1385                     let nread = stream.read(buf).unwrap();
1386                     rtdebug!("read %u bytes", nread as uint);
1387                     total_bytes_read += nread;
1388                     for i in range(0u, nread) {
1389                         assert_eq!(buf[i], 1);
1390                     }
1391                 }
1392                 rtdebug!("read %u bytes total", total_bytes_read as uint);
1393             }
1394         }
1395     }
1396 }
1397
1398 #[test]
1399 fn test_udp_twice() {
1400     do run_in_newsched_task {
1401         let server_addr = next_test_ip4();
1402         let client_addr = next_test_ip4();
1403
1404         do spawntask {
1405             unsafe {
1406                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1407                 let mut client = (*io).udp_bind(client_addr).unwrap();
1408                 assert!(client.sendto([1], server_addr).is_ok());
1409                 assert!(client.sendto([2], server_addr).is_ok());
1410             }
1411         }
1412
1413         do spawntask {
1414             unsafe {
1415                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1416                 let mut server = (*io).udp_bind(server_addr).unwrap();
1417                 let mut buf1 = [0];
1418                 let mut buf2 = [0];
1419                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1420                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1421                 assert_eq!(nread1, 1);
1422                 assert_eq!(nread2, 1);
1423                 assert_eq!(src1, client_addr);
1424                 assert_eq!(src2, client_addr);
1425                 assert_eq!(buf1[0], 1);
1426                 assert_eq!(buf2[0], 2);
1427             }
1428         }
1429     }
1430 }
1431
1432 #[test]
1433 fn test_udp_many_read() {
1434     do run_in_newsched_task {
1435         let server_out_addr = next_test_ip4();
1436         let server_in_addr = next_test_ip4();
1437         let client_out_addr = next_test_ip4();
1438         let client_in_addr = next_test_ip4();
1439         static MAX: uint = 500_000;
1440
1441         do spawntask {
1442             unsafe {
1443                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1444                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1445                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1446                 let msg = [1, .. 2048];
1447                 let mut total_bytes_sent = 0;
1448                 let mut buf = [1];
1449                 while buf[0] == 1 {
1450                     // send more data
1451                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1452                     total_bytes_sent += msg.len();
1453                     // check if the client has received enough
1454                     let res = server_in.recvfrom(buf);
1455                     assert!(res.is_ok());
1456                     let (nread, src) = res.unwrap();
1457                     assert_eq!(nread, 1);
1458                     assert_eq!(src, client_out_addr);
1459                 }
1460                 assert!(total_bytes_sent >= MAX);
1461             }
1462         }
1463
1464         do spawntask {
1465             unsafe {
1466                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1467                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1468                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1469                 let mut total_bytes_recv = 0;
1470                 let mut buf = [0, .. 2048];
1471                 while total_bytes_recv < MAX {
1472                     // ask for more
1473                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1474                     // wait for data
1475                     let res = client_in.recvfrom(buf);
1476                     assert!(res.is_ok());
1477                     let (nread, src) = res.unwrap();
1478                     assert_eq!(src, server_out_addr);
1479                     total_bytes_recv += nread;
1480                     for i in range(0u, nread) {
1481                         assert_eq!(buf[i], 1);
1482                     }
1483                 }
1484                 // tell the server we're done
1485                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1486             }
1487         }
1488     }
1489 }
1490
1491 #[test]
1492 fn test_timer_sleep_simple() {
1493     do run_in_newsched_task {
1494         unsafe {
1495             let io = Local::unsafe_borrow::<IoFactoryObject>();
1496             let timer = (*io).timer_init();
1497             do timer.map_move |mut t| { t.sleep(1) };
1498         }
1499     }
1500 }