]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
std: add RtioTimer and UvTimer impl atop rt::uv
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::*;
12 use result::*;
13 use ops::Drop;
14 use cell::Cell;
15 use cast;
16 use cast::transmute;
17 use clone::Clone;
18 use rt::io::IoError;
19 use rt::io::net::ip::IpAddr;
20 use rt::uv::*;
21 use rt::uv::idle::IdleWatcher;
22 use rt::rtio::*;
23 use rt::sched::Scheduler;
24 use rt::io::{standard_error, OtherIoError};
25 use rt::tube::Tube;
26 use rt::local::Local;
27 use unstable::sync::{Exclusive, exclusive};
28
29 #[cfg(test)] use container::Container;
30 #[cfg(test)] use uint;
31 #[cfg(test)] use unstable::run_in_bare_thread;
32 #[cfg(test)] use rt::test::{spawntask_immediately,
33                             next_test_ip4,
34                             run_in_newsched_task};
35
36
37 pub struct UvEventLoop {
38     uvio: UvIoFactory
39 }
40
41 impl UvEventLoop {
42     pub fn new() -> UvEventLoop {
43         UvEventLoop {
44             uvio: UvIoFactory(Loop::new())
45         }
46     }
47 }
48
49 impl Drop for UvEventLoop {
50     fn drop(&self) {
51         // XXX: Need mutable finalizer
52         let this = unsafe {
53             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
54         };
55         this.uvio.uv_loop().close();
56     }
57 }
58
59 impl EventLoop for UvEventLoop {
60     fn run(&mut self) {
61         self.uvio.uv_loop().run();
62     }
63
64     fn callback(&mut self, f: ~fn()) {
65         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
66         do idle_watcher.start |mut idle_watcher, status| {
67             assert!(status.is_none());
68             idle_watcher.stop();
69             idle_watcher.close(||());
70             f();
71         }
72     }
73
74     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
75         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
76         do timer.start(ms, 0) |timer, status| {
77             assert!(status.is_none());
78             timer.close(||());
79             f();
80         }
81     }
82
83     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
84         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
85     }
86
87     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
88         Some(&mut self.uvio)
89     }
90 }
91
92 #[test]
93 fn test_callback_run_once() {
94     do run_in_bare_thread {
95         let mut event_loop = UvEventLoop::new();
96         let mut count = 0;
97         let count_ptr: *mut int = &mut count;
98         do event_loop.callback {
99             unsafe { *count_ptr += 1 }
100         }
101         event_loop.run();
102         assert_eq!(count, 1);
103     }
104 }
105
106 pub struct UvRemoteCallback {
107     // The uv async handle for triggering the callback
108     async: AsyncWatcher,
109     // A flag to tell the callback to exit, set from the dtor. This is
110     // almost never contested - only in rare races with the dtor.
111     exit_flag: Exclusive<bool>
112 }
113
114 impl UvRemoteCallback {
115     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
116         let exit_flag = exclusive(false);
117         let exit_flag_clone = exit_flag.clone();
118         let async = do AsyncWatcher::new(loop_) |watcher, status| {
119             assert!(status.is_none());
120             f();
121             unsafe {
122                 do exit_flag_clone.with_imm |&should_exit| {
123                     if should_exit {
124                         watcher.close(||());
125                     }
126                 }
127             }
128         };
129         UvRemoteCallback {
130             async: async,
131             exit_flag: exit_flag
132         }
133     }
134 }
135
136 impl RemoteCallback for UvRemoteCallback {
137     fn fire(&mut self) { self.async.send() }
138 }
139
140 impl Drop for UvRemoteCallback {
141     fn drop(&self) {
142         unsafe {
143             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
144             do this.exit_flag.with |should_exit| {
145                 // NB: These two things need to happen atomically. Otherwise
146                 // the event handler could wake up due to a *previous*
147                 // signal and see the exit flag, destroying the handle
148                 // before the final send.
149                 *should_exit = true;
150                 this.async.send();
151             }
152         }
153     }
154 }
155
156 #[cfg(test)]
157 mod test_remote {
158     use cell::Cell;
159     use rt::test::*;
160     use rt::thread::Thread;
161     use rt::tube::Tube;
162     use rt::rtio::EventLoop;
163     use rt::local::Local;
164     use rt::sched::Scheduler;
165
166     #[test]
167     fn test_uv_remote() {
168         do run_in_newsched_task {
169             let mut tube = Tube::new();
170             let tube_clone = tube.clone();
171             let remote_cell = Cell::new_empty();
172             do Local::borrow::<Scheduler, ()>() |sched| {
173                 let tube_clone = tube_clone.clone();
174                 let tube_clone_cell = Cell::new(tube_clone);
175                 let remote = do sched.event_loop.remote_callback {
176                     tube_clone_cell.take().send(1);
177                 };
178                 remote_cell.put_back(remote);
179             }
180             let _thread = do Thread::start {
181                 remote_cell.take().fire();
182             };
183
184             assert!(tube.recv() == 1);
185         }
186     }
187 }
188
189 pub struct UvIoFactory(Loop);
190
191 impl UvIoFactory {
192     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
193         match self { &UvIoFactory(ref mut ptr) => ptr }
194     }
195 }
196
197 impl IoFactory for UvIoFactory {
198     // Connect to an address and return a new stream
199     // NB: This blocks the task waiting on the connection.
200     // It would probably be better to return a future
201     fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
202         // Create a cell in the task to hold the result. We will fill
203         // the cell before resuming the task.
204         let result_cell = Cell::new_empty();
205         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
206
207         let scheduler = Local::take::<Scheduler>();
208         assert!(scheduler.in_task_context());
209
210         // Block this task and take ownership, switch to scheduler context
211         do scheduler.deschedule_running_task_and_then |sched, task| {
212
213             rtdebug!("connect: entered scheduler context");
214             assert!(!sched.in_task_context());
215             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
216             let task_cell = Cell::new(task);
217
218             // Wait for a connection
219             do tcp_watcher.connect(addr) |stream_watcher, status| {
220                 rtdebug!("connect: in connect callback");
221                 if status.is_none() {
222                     rtdebug!("status is none");
223                     let res = Ok(~UvTcpStream(stream_watcher));
224
225                     // Store the stream in the task's stack
226                     unsafe { (*result_cell_ptr).put_back(res); }
227
228                     // Context switch
229                     let scheduler = Local::take::<Scheduler>();
230                     scheduler.resume_blocked_task_immediately(task_cell.take());
231                 } else {
232                     rtdebug!("status is some");
233                     let task_cell = Cell::new(task_cell.take());
234                     do stream_watcher.close {
235                         let res = Err(uv_error_to_io_error(status.get()));
236                         unsafe { (*result_cell_ptr).put_back(res); }
237                         let scheduler = Local::take::<Scheduler>();
238                         scheduler.resume_blocked_task_immediately(task_cell.take());
239                     }
240                 };
241             }
242         }
243
244         assert!(!result_cell.is_empty());
245         return result_cell.take();
246     }
247
248     fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
249         let mut watcher = TcpWatcher::new(self.uv_loop());
250         match watcher.bind(addr) {
251             Ok(_) => Ok(~UvTcpListener::new(watcher)),
252             Err(uverr) => {
253                 let scheduler = Local::take::<Scheduler>();
254                 do scheduler.deschedule_running_task_and_then |_, task| {
255                     let task_cell = Cell::new(task);
256                     do watcher.as_stream().close {
257                         let scheduler = Local::take::<Scheduler>();
258                         scheduler.resume_blocked_task_immediately(task_cell.take());
259                     }
260                 }
261                 Err(uv_error_to_io_error(uverr))
262             }
263         }
264     }
265
266     fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> {
267         let mut watcher = UdpWatcher::new(self.uv_loop());
268         match watcher.bind(addr) {
269             Ok(_) => Ok(~UvUdpSocket(watcher)),
270             Err(uverr) => {
271                 let scheduler = Local::take::<Scheduler>();
272                 do scheduler.deschedule_running_task_and_then |_, task| {
273                     let task_cell = Cell::new(task);
274                     do watcher.close {
275                         let scheduler = Local::take::<Scheduler>();
276                         scheduler.resume_blocked_task_immediately(task_cell.take());
277                     }
278                 }
279                 Err(uv_error_to_io_error(uverr))
280             }
281         }
282     }
283
284     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
285         Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
286     }
287 }
288
289 // FIXME #6090: Prefer newtype structs but Drop doesn't work
290 pub struct UvTcpListener {
291     watcher: TcpWatcher,
292     listening: bool,
293     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
294 }
295
296 impl UvTcpListener {
297     fn new(watcher: TcpWatcher) -> UvTcpListener {
298         UvTcpListener {
299             watcher: watcher,
300             listening: false,
301             incoming_streams: Tube::new()
302         }
303     }
304
305     fn watcher(&self) -> TcpWatcher { self.watcher }
306 }
307
308 impl Drop for UvTcpListener {
309     fn drop(&self) {
310         let watcher = self.watcher();
311         let scheduler = Local::take::<Scheduler>();
312         do scheduler.deschedule_running_task_and_then |_, task| {
313             let task_cell = Cell::new(task);
314             do watcher.as_stream().close {
315                 let scheduler = Local::take::<Scheduler>();
316                 scheduler.resume_blocked_task_immediately(task_cell.take());
317             }
318         }
319     }
320 }
321
322 impl RtioSocket for UvTcpListener {
323     // XXX implement
324     fn socket_name(&mut self) -> IpAddr { fail!(); }
325 }
326
327 impl RtioTcpListener for UvTcpListener {
328
329     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
330         rtdebug!("entering listen");
331
332         if self.listening {
333             return self.incoming_streams.recv();
334         }
335
336         self.listening = true;
337
338         let server_tcp_watcher = self.watcher();
339         let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
340
341         let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
342         let mut server_tcp_watcher = server_tcp_watcher;
343         do server_tcp_watcher.listen |mut server_stream_watcher, status| {
344             let maybe_stream = if status.is_none() {
345                 let mut loop_ = server_stream_watcher.event_loop();
346                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
347                 let client_tcp_watcher = client_tcp_watcher.as_stream();
348                 // XXX: Need's to be surfaced in interface
349                 server_stream_watcher.accept(client_tcp_watcher);
350                 Ok(~UvTcpStream(client_tcp_watcher))
351             } else {
352                 Err(standard_error(OtherIoError))
353             };
354
355             let mut incoming_streams = incoming_streams_cell.take();
356             incoming_streams.send(maybe_stream);
357             incoming_streams_cell.put_back(incoming_streams);
358         }
359
360         return self.incoming_streams.recv();
361     }
362
363     // XXX implement
364     fn accept_simultaneously(&mut self) { fail!(); }
365     fn dont_accept_simultaneously(&mut self) { fail!(); }
366 }
367
368 // FIXME #6090: Prefer newtype structs but Drop doesn't work
369 pub struct UvTcpStream(StreamWatcher);
370
371 impl Drop for UvTcpStream {
372     fn drop(&self) {
373         rtdebug!("closing tcp stream");
374         let scheduler = Local::take::<Scheduler>();
375         do scheduler.deschedule_running_task_and_then |_, task| {
376             let task_cell = Cell::new(task);
377             do self.close {
378                 let scheduler = Local::take::<Scheduler>();
379                 scheduler.resume_blocked_task_immediately(task_cell.take());
380             }
381         }
382     }
383 }
384
385 impl RtioSocket for UvTcpStream {
386     // XXX implement
387     fn socket_name(&mut self) -> IpAddr { fail!(); }
388 }
389
390 impl RtioTcpStream for UvTcpStream {
391     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
392         let result_cell = Cell::new_empty();
393         let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
394
395         let scheduler = Local::take::<Scheduler>();
396         assert!(scheduler.in_task_context());
397         let buf_ptr: *&mut [u8] = &buf;
398         do scheduler.deschedule_running_task_and_then |sched, task| {
399             rtdebug!("read: entered scheduler context");
400             assert!(!sched.in_task_context());
401             let task_cell = Cell::new(task);
402             // XXX: We shouldn't reallocate these callbacks every
403             // call to read
404             let alloc: AllocCallback = |_| unsafe {
405                 slice_to_uv_buf(*buf_ptr)
406             };
407             let mut watcher = **self;
408             do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
409
410                 // Stop reading so that no read callbacks are
411                 // triggered before the user calls `read` again.
412                 // XXX: Is there a performance impact to calling
413                 // stop here?
414                 watcher.read_stop();
415
416                 let result = if status.is_none() {
417                     assert!(nread >= 0);
418                     Ok(nread as uint)
419                 } else {
420                     Err(uv_error_to_io_error(status.unwrap()))
421                 };
422
423                 unsafe { (*result_cell_ptr).put_back(result); }
424
425                 let scheduler = Local::take::<Scheduler>();
426                 scheduler.resume_blocked_task_immediately(task_cell.take());
427             }
428         }
429
430         assert!(!result_cell.is_empty());
431         return result_cell.take();
432     }
433
434     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
435         let result_cell = Cell::new_empty();
436         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
437         let scheduler = Local::take::<Scheduler>();
438         assert!(scheduler.in_task_context());
439         let buf_ptr: *&[u8] = &buf;
440         do scheduler.deschedule_running_task_and_then |_, task| {
441             let task_cell = Cell::new(task);
442             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
443             let mut watcher = **self;
444             do watcher.write(buf) |_watcher, status| {
445                 let result = if status.is_none() {
446                     Ok(())
447                 } else {
448                     Err(uv_error_to_io_error(status.unwrap()))
449                 };
450
451                 unsafe { (*result_cell_ptr).put_back(result); }
452
453                 let scheduler = Local::take::<Scheduler>();
454                 scheduler.resume_blocked_task_immediately(task_cell.take());
455             }
456         }
457
458         assert!(!result_cell.is_empty());
459         return result_cell.take();
460     }
461
462     // XXX implement
463     fn peer_name(&mut self) -> IpAddr { fail!(); }
464     fn control_congestion(&mut self) { fail!(); }
465     fn nodelay(&mut self) { fail!(); }
466     fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
467     fn letdie(&mut self) { fail!(); }
468 }
469
470 pub struct UvUdpSocket(UdpWatcher);
471
472 impl Drop for UvUdpSocket {
473     fn drop(&self) {
474         rtdebug!("closing udp socket");
475         let scheduler = Local::take::<Scheduler>();
476         do scheduler.deschedule_running_task_and_then |_, task| {
477             let task_cell = Cell::new(task);
478             do self.close {
479                 let scheduler = Local::take::<Scheduler>();
480                 scheduler.resume_blocked_task_immediately(task_cell.take());
481             }
482         }
483     }
484 }
485
486 impl RtioSocket for UvUdpSocket {
487     // XXX implement
488     fn socket_name(&mut self) -> IpAddr { fail!(); }
489 }
490
491 impl RtioUdpSocket for UvUdpSocket {
492     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
493         let result_cell = Cell::new_empty();
494         let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
495
496         let scheduler = Local::take::<Scheduler>();
497         assert!(scheduler.in_task_context());
498         let buf_ptr: *&mut [u8] = &buf;
499         do scheduler.deschedule_running_task_and_then |sched, task| {
500             rtdebug!("recvfrom: entered scheduler context");
501             assert!(!sched.in_task_context());
502             let task_cell = Cell::new(task);
503             let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
504             do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
505                 let _ = flags; // XXX add handling for partials?
506
507                 watcher.recv_stop();
508
509                 let result = match status {
510                     None => {
511                         assert!(nread >= 0);
512                         Ok((nread as uint, addr))
513                     }
514                     Some(err) => Err(uv_error_to_io_error(err))
515                 };
516
517                 unsafe { (*result_cell_ptr).put_back(result); }
518
519                 let scheduler = Local::take::<Scheduler>();
520                 scheduler.resume_blocked_task_immediately(task_cell.take());
521             }
522         }
523
524         assert!(!result_cell.is_empty());
525         return result_cell.take();
526     }
527
528     fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
529         let result_cell = Cell::new_empty();
530         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
531         let scheduler = Local::take::<Scheduler>();
532         assert!(scheduler.in_task_context());
533         let buf_ptr: *&[u8] = &buf;
534         do scheduler.deschedule_running_task_and_then |_, task| {
535             let task_cell = Cell::new(task);
536             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
537             do self.send(buf, dst) |_watcher, status| {
538
539                 let result = match status {
540                     None => Ok(()),
541                     Some(err) => Err(uv_error_to_io_error(err)),
542                 };
543
544                 unsafe { (*result_cell_ptr).put_back(result); }
545
546                 let scheduler = Local::take::<Scheduler>();
547                 scheduler.resume_blocked_task_immediately(task_cell.take());
548             }
549         }
550
551         assert!(!result_cell.is_empty());
552         return result_cell.take();
553     }
554
555     // XXX implement
556     fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
557     fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }
558
559     fn loop_multicast_locally(&mut self) { fail!(); }
560     fn dont_loop_multicast_locally(&mut self) { fail!(); }
561
562     fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
563     fn time_to_live(&mut self, _ttl: int) { fail!(); }
564
565     fn hear_broadcasts(&mut self) { fail!(); }
566     fn ignore_broadcasts(&mut self) { fail!(); }
567 }
568
569 pub struct UvTimer(timer::TimerWatcher);
570
571 impl UvTimer {
572     fn new(w: timer::TimerWatcher) -> UvTimer {
573         UvTimer(w)
574     }
575 }
576
577 impl Drop for UvTimer {
578     fn drop(&self) {
579         rtdebug!("closing UvTimer");
580         let scheduler = Local::take::<Scheduler>();
581         do scheduler.deschedule_running_task_and_then |_, task| {
582             let task_cell = Cell::new(task);
583             do self.close {
584                 let scheduler = Local::take::<Scheduler>();
585                 scheduler.resume_task_immediately(task_cell.take());
586             }
587         }
588     }
589 }
590
591 impl RtioTimer for UvTimer {
592     fn sleep(&self, msecs: u64) {
593         let scheduler = Local::take::<Scheduler>();
594         assert!(scheduler.in_task_context());
595         do scheduler.deschedule_running_task_and_then |sched, task| {
596             rtdebug!("sleep: entered scheduler context");
597             assert!(!sched.in_task_context());
598             let task_cell = Cell::new(task);
599             let mut watcher = **self;
600             do watcher.start(msecs, 0) |_, status| {
601                 assert!(status.is_none());
602                 let scheduler = Local::take::<Scheduler>();
603                 scheduler.resume_task_immediately(task_cell.take());
604             }
605         }
606         let mut w = **self;
607         w.stop();
608     }
609 }
610
611 #[test]
612 fn test_simple_io_no_connect() {
613     do run_in_newsched_task {
614         unsafe {
615             let io = Local::unsafe_borrow::<IoFactoryObject>();
616             let addr = next_test_ip4();
617             let maybe_chan = (*io).tcp_connect(addr);
618             assert!(maybe_chan.is_err());
619         }
620     }
621 }
622
623 #[test]
624 fn test_simple_udp_io_bind_only() {
625     do run_in_newsched_task {
626         unsafe {
627             let io = Local::unsafe_borrow::<IoFactoryObject>();
628             let addr = next_test_ip4();
629             let maybe_socket = (*io).udp_bind(addr);
630             assert!(maybe_socket.is_ok());
631         }
632     }
633 }
634
635 #[test]
636 fn test_simple_tcp_server_and_client() {
637     do run_in_newsched_task {
638         let addr = next_test_ip4();
639
640         // Start the server first so it's listening when we connect
641         do spawntask_immediately {
642             unsafe {
643                 let io = Local::unsafe_borrow::<IoFactoryObject>();
644                 let mut listener = (*io).tcp_bind(addr).unwrap();
645                 let mut stream = listener.accept().unwrap();
646                 let mut buf = [0, .. 2048];
647                 let nread = stream.read(buf).unwrap();
648                 assert_eq!(nread, 8);
649                 for uint::range(0, nread) |i| {
650                     rtdebug!("%u", buf[i] as uint);
651                     assert_eq!(buf[i], i as u8);
652                 }
653             }
654         }
655
656         do spawntask_immediately {
657             unsafe {
658                 let io = Local::unsafe_borrow::<IoFactoryObject>();
659                 let mut stream = (*io).tcp_connect(addr).unwrap();
660                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
661             }
662         }
663     }
664 }
665
666 #[test]
667 fn test_simple_udp_server_and_client() {
668     do run_in_newsched_task {
669         let server_addr = next_test_ip4();
670         let client_addr = next_test_ip4();
671
672         do spawntask_immediately {
673             unsafe {
674                 let io = Local::unsafe_borrow::<IoFactoryObject>();
675                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
676                 let mut buf = [0, .. 2048];
677                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
678                 assert_eq!(nread, 8);
679                 for uint::range(0, nread) |i| {
680                     rtdebug!("%u", buf[i] as uint);
681                     assert_eq!(buf[i], i as u8);
682                 }
683                 assert_eq!(src, client_addr);
684             }
685         }
686
687         do spawntask_immediately {
688             unsafe {
689                 let io = Local::unsafe_borrow::<IoFactoryObject>();
690                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
691                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
692             }
693         }
694     }
695 }
696
697 #[test] #[ignore(reason = "busted")]
698 fn test_read_and_block() {
699     do run_in_newsched_task {
700         let addr = next_test_ip4();
701
702         do spawntask_immediately {
703             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
704             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
705             let mut stream = listener.accept().unwrap();
706             let mut buf = [0, .. 2048];
707
708             let expected = 32;
709             let mut current = 0;
710             let mut reads = 0;
711
712             while current < expected {
713                 let nread = stream.read(buf).unwrap();
714                 for uint::range(0, nread) |i| {
715                     let val = buf[i] as uint;
716                     assert_eq!(val, current % 8);
717                     current += 1;
718                 }
719                 reads += 1;
720
721                 let scheduler = Local::take::<Scheduler>();
722                 // Yield to the other task in hopes that it
723                 // will trigger a read callback while we are
724                 // not ready for it
725                 do scheduler.deschedule_running_task_and_then |sched, task| {
726                     let task = Cell::new(task);
727                     sched.enqueue_blocked_task(task.take());
728                 }
729             }
730
731             // Make sure we had multiple reads
732             assert!(reads > 1);
733         }
734
735         do spawntask_immediately {
736             unsafe {
737                 let io = Local::unsafe_borrow::<IoFactoryObject>();
738                 let mut stream = (*io).tcp_connect(addr).unwrap();
739                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
740                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
741                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
742                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
743             }
744         }
745
746     }
747 }
748
749 #[test]
750 fn test_read_read_read() {
751     do run_in_newsched_task {
752         let addr = next_test_ip4();
753         static MAX: uint = 500000;
754
755         do spawntask_immediately {
756             unsafe {
757                 let io = Local::unsafe_borrow::<IoFactoryObject>();
758                 let mut listener = (*io).tcp_bind(addr).unwrap();
759                 let mut stream = listener.accept().unwrap();
760                 let buf = [1, .. 2048];
761                 let mut total_bytes_written = 0;
762                 while total_bytes_written < MAX {
763                     stream.write(buf);
764                     total_bytes_written += buf.len();
765                 }
766             }
767         }
768
769         do spawntask_immediately {
770             unsafe {
771                 let io = Local::unsafe_borrow::<IoFactoryObject>();
772                 let mut stream = (*io).tcp_connect(addr).unwrap();
773                 let mut buf = [0, .. 2048];
774                 let mut total_bytes_read = 0;
775                 while total_bytes_read < MAX {
776                     let nread = stream.read(buf).unwrap();
777                     rtdebug!("read %u bytes", nread as uint);
778                     total_bytes_read += nread;
779                     for uint::range(0, nread) |i| {
780                         assert_eq!(buf[i], 1);
781                     }
782                 }
783                 rtdebug!("read %u bytes total", total_bytes_read as uint);
784             }
785         }
786     }
787 }
788
789 #[test]
790 fn test_udp_twice() {
791     do run_in_newsched_task {
792         let server_addr = next_test_ip4();
793         let client_addr = next_test_ip4();
794
795         do spawntask_immediately {
796             unsafe {
797                 let io = Local::unsafe_borrow::<IoFactoryObject>();
798                 let mut client = (*io).udp_bind(client_addr).unwrap();
799                 assert!(client.sendto([1], server_addr).is_ok());
800                 assert!(client.sendto([2], server_addr).is_ok());
801             }
802         }
803
804         do spawntask_immediately {
805             unsafe {
806                 let io = Local::unsafe_borrow::<IoFactoryObject>();
807                 let mut server = (*io).udp_bind(server_addr).unwrap();
808                 let mut buf1 = [0];
809                 let mut buf2 = [0];
810                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
811                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
812                 assert_eq!(nread1, 1);
813                 assert_eq!(nread2, 1);
814                 assert_eq!(src1, client_addr);
815                 assert_eq!(src2, client_addr);
816                 assert_eq!(buf1[0], 1);
817                 assert_eq!(buf2[0], 2);
818             }
819         }
820     }
821 }
822
823 #[test]
824 fn test_udp_many_read() {
825     do run_in_newsched_task {
826         let server_out_addr = next_test_ip4();
827         let server_in_addr = next_test_ip4();
828         let client_out_addr = next_test_ip4();
829         let client_in_addr = next_test_ip4();
830         static MAX: uint = 500_000;
831
832         do spawntask_immediately {
833             unsafe {
834                 let io = Local::unsafe_borrow::<IoFactoryObject>();
835                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
836                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
837                 let msg = [1, .. 2048];
838                 let mut total_bytes_sent = 0;
839                 let mut buf = [1];
840                 while buf[0] == 1 {
841                     // send more data
842                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
843                     total_bytes_sent += msg.len();
844                     // check if the client has received enough
845                     let res = server_in.recvfrom(buf);
846                     assert!(res.is_ok());
847                     let (nread, src) = res.unwrap();
848                     assert_eq!(nread, 1);
849                     assert_eq!(src, client_out_addr);
850                 }
851                 assert!(total_bytes_sent >= MAX);
852             }
853         }
854
855         do spawntask_immediately {
856             unsafe {
857                 let io = Local::unsafe_borrow::<IoFactoryObject>();
858                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
859                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
860                 let mut total_bytes_recv = 0;
861                 let mut buf = [0, .. 2048];
862                 while total_bytes_recv < MAX {
863                     // ask for more
864                     assert!(client_out.sendto([1], server_in_addr).is_ok());
865                     // wait for data
866                     let res = client_in.recvfrom(buf);
867                     assert!(res.is_ok());
868                     let (nread, src) = res.unwrap();
869                     assert_eq!(src, server_out_addr);
870                     total_bytes_recv += nread;
871                     for uint::range(0, nread) |i| {
872                         assert_eq!(buf[i], 1);
873                     }
874                 }
875                 // tell the server we're done
876                 assert!(client_out.sendto([0], server_in_addr).is_ok());
877             }
878         }
879     }
880 }
881
882 fn test_timer_sleep_simple_impl() {
883     unsafe {
884         let io = Local::unsafe_borrow::<IoFactoryObject>();
885         let timer = (*io).timer_init();
886         match timer {
887             Ok(t) => t.sleep(1),
888             Err(_) => assert!(false)
889         }
890     }
891 }
892 #[test]
893 fn test_timer_sleep_simple() {
894     do run_in_newsched_task {
895         test_timer_sleep_simple_impl();
896     }
897 }