]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
9b96c8717346d26186d80dd0719c4b6c413e4522
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::*;
12 use result::*;
13 use ops::Drop;
14 use cell::Cell;
15 use cast;
16 use cast::transmute;
17 use clone::Clone;
18 use rt::io::IoError;
19 use rt::io::net::ip::IpAddr;
20 use rt::uv::*;
21 use rt::uv::idle::IdleWatcher;
22 use rt::rtio::*;
23 use rt::sched::Scheduler;
24 use rt::io::{standard_error, OtherIoError};
25 use rt::tube::Tube;
26 use rt::local::Local;
27 use unstable::sync::{Exclusive, exclusive};
28
29 #[cfg(test)] use container::Container;
30 #[cfg(test)] use uint;
31 #[cfg(test)] use unstable::run_in_bare_thread;
32 #[cfg(test)] use rt::test::{spawntask_immediately,
33                             next_test_ip4,
34                             run_in_newsched_task};
35
36
37 pub struct UvEventLoop {
38     uvio: UvIoFactory
39 }
40
41 impl UvEventLoop {
42     pub fn new() -> UvEventLoop {
43         UvEventLoop {
44             uvio: UvIoFactory(Loop::new())
45         }
46     }
47 }
48
49 impl Drop for UvEventLoop {
50     fn drop(&self) {
51         // XXX: Need mutable finalizer
52         let this = unsafe {
53             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
54         };
55         this.uvio.uv_loop().close();
56     }
57 }
58
59 impl EventLoop for UvEventLoop {
60     fn run(&mut self) {
61         self.uvio.uv_loop().run();
62     }
63
64     fn callback(&mut self, f: ~fn()) {
65         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
66         do idle_watcher.start |mut idle_watcher, status| {
67             assert!(status.is_none());
68             idle_watcher.stop();
69             idle_watcher.close(||());
70             f();
71         }
72     }
73
74     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
75         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
76         do timer.start(ms, 0) |timer, status| {
77             assert!(status.is_none());
78             timer.close(||());
79             f();
80         }
81     }
82
83     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
84         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
85     }
86
87     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
88         Some(&mut self.uvio)
89     }
90 }
91
92 #[test]
93 fn test_callback_run_once() {
94     do run_in_bare_thread {
95         let mut event_loop = UvEventLoop::new();
96         let mut count = 0;
97         let count_ptr: *mut int = &mut count;
98         do event_loop.callback {
99             unsafe { *count_ptr += 1 }
100         }
101         event_loop.run();
102         assert_eq!(count, 1);
103     }
104 }
105
106 pub struct UvRemoteCallback {
107     // The uv async handle for triggering the callback
108     async: AsyncWatcher,
109     // A flag to tell the callback to exit, set from the dtor. This is
110     // almost never contested - only in rare races with the dtor.
111     exit_flag: Exclusive<bool>
112 }
113
114 impl UvRemoteCallback {
115     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
116         let exit_flag = exclusive(false);
117         let exit_flag_clone = exit_flag.clone();
118         let async = do AsyncWatcher::new(loop_) |watcher, status| {
119             assert!(status.is_none());
120             f();
121             unsafe {
122                 do exit_flag_clone.with_imm |&should_exit| {
123                     if should_exit {
124                         watcher.close(||());
125                     }
126                 }
127             }
128         };
129         UvRemoteCallback {
130             async: async,
131             exit_flag: exit_flag
132         }
133     }
134 }
135
136 impl RemoteCallback for UvRemoteCallback {
137     fn fire(&mut self) { self.async.send() }
138 }
139
140 impl Drop for UvRemoteCallback {
141     fn drop(&self) {
142         unsafe {
143             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
144             do this.exit_flag.with |should_exit| {
145                 // NB: These two things need to happen atomically. Otherwise
146                 // the event handler could wake up due to a *previous*
147                 // signal and see the exit flag, destroying the handle
148                 // before the final send.
149                 *should_exit = true;
150                 this.async.send();
151             }
152         }
153     }
154 }
155
156 #[cfg(test)]
157 mod test_remote {
158     use cell::Cell;
159     use rt::test::*;
160     use rt::thread::Thread;
161     use rt::tube::Tube;
162     use rt::rtio::EventLoop;
163     use rt::local::Local;
164     use rt::sched::Scheduler;
165
166     #[test]
167     fn test_uv_remote() {
168         do run_in_newsched_task {
169             let mut tube = Tube::new();
170             let tube_clone = tube.clone();
171             let remote_cell = Cell::new_empty();
172             do Local::borrow::<Scheduler, ()>() |sched| {
173                 let tube_clone = tube_clone.clone();
174                 let tube_clone_cell = Cell::new(tube_clone);
175                 let remote = do sched.event_loop.remote_callback {
176                     tube_clone_cell.take().send(1);
177                 };
178                 remote_cell.put_back(remote);
179             }
180             let _thread = do Thread::start {
181                 remote_cell.take().fire();
182             };
183
184             assert!(tube.recv() == 1);
185         }
186     }
187 }
188
189 pub struct UvIoFactory(Loop);
190
191 impl UvIoFactory {
192     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
193         match self { &UvIoFactory(ref mut ptr) => ptr }
194     }
195 }
196
197 impl IoFactory for UvIoFactory {
198     // Connect to an address and return a new stream
199     // NB: This blocks the task waiting on the connection.
200     // It would probably be better to return a future
201     fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
202         // Create a cell in the task to hold the result. We will fill
203         // the cell before resuming the task.
204         let result_cell = Cell::new_empty();
205         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
206
207         let scheduler = Local::take::<Scheduler>();
208         assert!(scheduler.in_task_context());
209
210         // Block this task and take ownership, switch to scheduler context
211         do scheduler.deschedule_running_task_and_then |sched, task| {
212
213             rtdebug!("connect: entered scheduler context");
214             assert!(!sched.in_task_context());
215             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
216             let task_cell = Cell::new(task);
217
218             // Wait for a connection
219             do tcp_watcher.connect(addr) |stream_watcher, status| {
220                 rtdebug!("connect: in connect callback");
221                 if status.is_none() {
222                     rtdebug!("status is none");
223                     let res = Ok(~UvTcpStream(stream_watcher));
224
225                     // Store the stream in the task's stack
226                     unsafe { (*result_cell_ptr).put_back(res); }
227
228                     // Context switch
229                     let scheduler = Local::take::<Scheduler>();
230                     scheduler.resume_blocked_task_immediately(task_cell.take());
231                 } else {
232                     rtdebug!("status is some");
233                     let task_cell = Cell::new(task_cell.take());
234                     do stream_watcher.close {
235                         let res = Err(uv_error_to_io_error(status.get()));
236                         unsafe { (*result_cell_ptr).put_back(res); }
237                         let scheduler = Local::take::<Scheduler>();
238                         scheduler.resume_blocked_task_immediately(task_cell.take());
239                     }
240                 };
241             }
242         }
243
244         assert!(!result_cell.is_empty());
245         return result_cell.take();
246     }
247
248     fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
249         let mut watcher = TcpWatcher::new(self.uv_loop());
250         match watcher.bind(addr) {
251             Ok(_) => Ok(~UvTcpListener::new(watcher)),
252             Err(uverr) => {
253                 let scheduler = Local::take::<Scheduler>();
254                 do scheduler.deschedule_running_task_and_then |_, task| {
255                     let task_cell = Cell::new(task);
256                     do watcher.as_stream().close {
257                         let scheduler = Local::take::<Scheduler>();
258                         scheduler.resume_blocked_task_immediately(task_cell.take());
259                     }
260                 }
261                 Err(uv_error_to_io_error(uverr))
262             }
263         }
264     }
265
266     fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> {
267         let mut watcher = UdpWatcher::new(self.uv_loop());
268         match watcher.bind(addr) {
269             Ok(_) => Ok(~UvUdpSocket(watcher)),
270             Err(uverr) => {
271                 let scheduler = Local::take::<Scheduler>();
272                 do scheduler.deschedule_running_task_and_then |_, task| {
273                     let task_cell = Cell::new(task);
274                     do watcher.close {
275                         let scheduler = Local::take::<Scheduler>();
276                         scheduler.resume_blocked_task_immediately(task_cell.take());
277                     }
278                 }
279                 Err(uv_error_to_io_error(uverr))
280             }
281         }
282     }
283 }
284
285 // FIXME #6090: Prefer newtype structs but Drop doesn't work
286 pub struct UvTcpListener {
287     watcher: TcpWatcher,
288     listening: bool,
289     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
290 }
291
292 impl UvTcpListener {
293     fn new(watcher: TcpWatcher) -> UvTcpListener {
294         UvTcpListener {
295             watcher: watcher,
296             listening: false,
297             incoming_streams: Tube::new()
298         }
299     }
300
301     fn watcher(&self) -> TcpWatcher { self.watcher }
302 }
303
304 impl Drop for UvTcpListener {
305     fn drop(&self) {
306         let watcher = self.watcher();
307         let scheduler = Local::take::<Scheduler>();
308         do scheduler.deschedule_running_task_and_then |_, task| {
309             let task_cell = Cell::new(task);
310             do watcher.as_stream().close {
311                 let scheduler = Local::take::<Scheduler>();
312                 scheduler.resume_blocked_task_immediately(task_cell.take());
313             }
314         }
315     }
316 }
317
318 impl RtioSocket for UvTcpListener {
319     // XXX implement
320     fn socket_name(&mut self) -> IpAddr { fail!(); }
321 }
322
323 impl RtioTcpListener for UvTcpListener {
324
325     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
326         rtdebug!("entering listen");
327
328         if self.listening {
329             return self.incoming_streams.recv();
330         }
331
332         self.listening = true;
333
334         let server_tcp_watcher = self.watcher();
335         let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
336
337         let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
338         let mut server_tcp_watcher = server_tcp_watcher;
339         do server_tcp_watcher.listen |mut server_stream_watcher, status| {
340             let maybe_stream = if status.is_none() {
341                 let mut loop_ = server_stream_watcher.event_loop();
342                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
343                 let client_tcp_watcher = client_tcp_watcher.as_stream();
344                 // XXX: Need's to be surfaced in interface
345                 server_stream_watcher.accept(client_tcp_watcher);
346                 Ok(~UvTcpStream(client_tcp_watcher))
347             } else {
348                 Err(standard_error(OtherIoError))
349             };
350
351             let mut incoming_streams = incoming_streams_cell.take();
352             incoming_streams.send(maybe_stream);
353             incoming_streams_cell.put_back(incoming_streams);
354         }
355
356         return self.incoming_streams.recv();
357     }
358
359     // XXX implement
360     fn accept_simultaneously(&mut self) { fail!(); }
361     fn dont_accept_simultaneously(&mut self) { fail!(); }
362 }
363
364 // FIXME #6090: Prefer newtype structs but Drop doesn't work
365 pub struct UvTcpStream(StreamWatcher);
366
367 impl Drop for UvTcpStream {
368     fn drop(&self) {
369         rtdebug!("closing tcp stream");
370         let scheduler = Local::take::<Scheduler>();
371         do scheduler.deschedule_running_task_and_then |_, task| {
372             let task_cell = Cell::new(task);
373             do self.close {
374                 let scheduler = Local::take::<Scheduler>();
375                 scheduler.resume_blocked_task_immediately(task_cell.take());
376             }
377         }
378     }
379 }
380
381 impl RtioSocket for UvTcpStream {
382     // XXX implement
383     fn socket_name(&mut self) -> IpAddr { fail!(); }
384 }
385
386 impl RtioTcpStream for UvTcpStream {
387     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
388         let result_cell = Cell::new_empty();
389         let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
390
391         let scheduler = Local::take::<Scheduler>();
392         assert!(scheduler.in_task_context());
393         let buf_ptr: *&mut [u8] = &buf;
394         do scheduler.deschedule_running_task_and_then |sched, task| {
395             rtdebug!("read: entered scheduler context");
396             assert!(!sched.in_task_context());
397             let task_cell = Cell::new(task);
398             // XXX: We shouldn't reallocate these callbacks every
399             // call to read
400             let alloc: AllocCallback = |_| unsafe {
401                 slice_to_uv_buf(*buf_ptr)
402             };
403             let mut watcher = **self;
404             do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
405
406                 // Stop reading so that no read callbacks are
407                 // triggered before the user calls `read` again.
408                 // XXX: Is there a performance impact to calling
409                 // stop here?
410                 watcher.read_stop();
411
412                 let result = if status.is_none() {
413                     assert!(nread >= 0);
414                     Ok(nread as uint)
415                 } else {
416                     Err(uv_error_to_io_error(status.unwrap()))
417                 };
418
419                 unsafe { (*result_cell_ptr).put_back(result); }
420
421                 let scheduler = Local::take::<Scheduler>();
422                 scheduler.resume_blocked_task_immediately(task_cell.take());
423             }
424         }
425
426         assert!(!result_cell.is_empty());
427         return result_cell.take();
428     }
429
430     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
431         let result_cell = Cell::new_empty();
432         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
433         let scheduler = Local::take::<Scheduler>();
434         assert!(scheduler.in_task_context());
435         let buf_ptr: *&[u8] = &buf;
436         do scheduler.deschedule_running_task_and_then |_, task| {
437             let task_cell = Cell::new(task);
438             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
439             let mut watcher = **self;
440             do watcher.write(buf) |_watcher, status| {
441                 let result = if status.is_none() {
442                     Ok(())
443                 } else {
444                     Err(uv_error_to_io_error(status.unwrap()))
445                 };
446
447                 unsafe { (*result_cell_ptr).put_back(result); }
448
449                 let scheduler = Local::take::<Scheduler>();
450                 scheduler.resume_blocked_task_immediately(task_cell.take());
451             }
452         }
453
454         assert!(!result_cell.is_empty());
455         return result_cell.take();
456     }
457
458     // XXX implement
459     fn peer_name(&mut self) -> IpAddr { fail!(); }
460     fn control_congestion(&mut self) { fail!(); }
461     fn nodelay(&mut self) { fail!(); }
462     fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
463     fn letdie(&mut self) { fail!(); }
464 }
465
466 pub struct UvUdpSocket(UdpWatcher);
467
468 impl Drop for UvUdpSocket {
469     fn drop(&self) {
470         rtdebug!("closing udp socket");
471         let scheduler = Local::take::<Scheduler>();
472         do scheduler.deschedule_running_task_and_then |_, task| {
473             let task_cell = Cell::new(task);
474             do self.close {
475                 let scheduler = Local::take::<Scheduler>();
476                 scheduler.resume_blocked_task_immediately(task_cell.take());
477             }
478         }
479     }
480 }
481
482 impl RtioSocket for UvUdpSocket {
483     // XXX implement
484     fn socket_name(&mut self) -> IpAddr { fail!(); }
485 }
486
487 impl RtioUdpSocket for UvUdpSocket {
488     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
489         let result_cell = Cell::new_empty();
490         let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
491
492         let scheduler = Local::take::<Scheduler>();
493         assert!(scheduler.in_task_context());
494         let buf_ptr: *&mut [u8] = &buf;
495         do scheduler.deschedule_running_task_and_then |sched, task| {
496             rtdebug!("recvfrom: entered scheduler context");
497             assert!(!sched.in_task_context());
498             let task_cell = Cell::new(task);
499             let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
500             do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
501                 let _ = flags; // XXX add handling for partials?
502
503                 watcher.recv_stop();
504
505                 let result = match status {
506                     None => {
507                         assert!(nread >= 0);
508                         Ok((nread as uint, addr))
509                     }
510                     Some(err) => Err(uv_error_to_io_error(err))
511                 };
512
513                 unsafe { (*result_cell_ptr).put_back(result); }
514
515                 let scheduler = Local::take::<Scheduler>();
516                 scheduler.resume_blocked_task_immediately(task_cell.take());
517             }
518         }
519
520         assert!(!result_cell.is_empty());
521         return result_cell.take();
522     }
523
524     fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
525         let result_cell = Cell::new_empty();
526         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
527         let scheduler = Local::take::<Scheduler>();
528         assert!(scheduler.in_task_context());
529         let buf_ptr: *&[u8] = &buf;
530         do scheduler.deschedule_running_task_and_then |_, task| {
531             let task_cell = Cell::new(task);
532             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
533             do self.send(buf, dst) |_watcher, status| {
534
535                 let result = match status {
536                     None => Ok(()),
537                     Some(err) => Err(uv_error_to_io_error(err)),
538                 };
539
540                 unsafe { (*result_cell_ptr).put_back(result); }
541
542                 let scheduler = Local::take::<Scheduler>();
543                 scheduler.resume_blocked_task_immediately(task_cell.take());
544             }
545         }
546
547         assert!(!result_cell.is_empty());
548         return result_cell.take();
549     }
550
551     // XXX implement
552     fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
553     fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }
554
555     fn loop_multicast_locally(&mut self) { fail!(); }
556     fn dont_loop_multicast_locally(&mut self) { fail!(); }
557
558     fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
559     fn time_to_live(&mut self, _ttl: int) { fail!(); }
560
561     fn hear_broadcasts(&mut self) { fail!(); }
562     fn ignore_broadcasts(&mut self) { fail!(); }
563 }
564
565 #[test]
566 fn test_simple_io_no_connect() {
567     do run_in_newsched_task {
568         unsafe {
569             let io = Local::unsafe_borrow::<IoFactoryObject>();
570             let addr = next_test_ip4();
571             let maybe_chan = (*io).tcp_connect(addr);
572             assert!(maybe_chan.is_err());
573         }
574     }
575 }
576
577 #[test]
578 fn test_simple_udp_io_bind_only() {
579     do run_in_newsched_task {
580         unsafe {
581             let io = Local::unsafe_borrow::<IoFactoryObject>();
582             let addr = next_test_ip4();
583             let maybe_socket = (*io).udp_bind(addr);
584             assert!(maybe_socket.is_ok());
585         }
586     }
587 }
588
589 #[test]
590 fn test_simple_tcp_server_and_client() {
591     do run_in_newsched_task {
592         let addr = next_test_ip4();
593
594         // Start the server first so it's listening when we connect
595         do spawntask_immediately {
596             unsafe {
597                 let io = Local::unsafe_borrow::<IoFactoryObject>();
598                 let mut listener = (*io).tcp_bind(addr).unwrap();
599                 let mut stream = listener.accept().unwrap();
600                 let mut buf = [0, .. 2048];
601                 let nread = stream.read(buf).unwrap();
602                 assert_eq!(nread, 8);
603                 for uint::range(0, nread) |i| {
604                     rtdebug!("%u", buf[i] as uint);
605                     assert_eq!(buf[i], i as u8);
606                 }
607             }
608         }
609
610         do spawntask_immediately {
611             unsafe {
612                 let io = Local::unsafe_borrow::<IoFactoryObject>();
613                 let mut stream = (*io).tcp_connect(addr).unwrap();
614                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
615             }
616         }
617     }
618 }
619
620 #[test]
621 fn test_simple_udp_server_and_client() {
622     do run_in_newsched_task {
623         let server_addr = next_test_ip4();
624         let client_addr = next_test_ip4();
625
626         do spawntask_immediately {
627             unsafe {
628                 let io = Local::unsafe_borrow::<IoFactoryObject>();
629                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
630                 let mut buf = [0, .. 2048];
631                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
632                 assert_eq!(nread, 8);
633                 for uint::range(0, nread) |i| {
634                     rtdebug!("%u", buf[i] as uint);
635                     assert_eq!(buf[i], i as u8);
636                 }
637                 assert_eq!(src, client_addr);
638             }
639         }
640
641         do spawntask_immediately {
642             unsafe {
643                 let io = Local::unsafe_borrow::<IoFactoryObject>();
644                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
645                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
646             }
647         }
648     }
649 }
650
651 #[test] #[ignore(reason = "busted")]
652 fn test_read_and_block() {
653     do run_in_newsched_task {
654         let addr = next_test_ip4();
655
656         do spawntask_immediately {
657             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
658             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
659             let mut stream = listener.accept().unwrap();
660             let mut buf = [0, .. 2048];
661
662             let expected = 32;
663             let mut current = 0;
664             let mut reads = 0;
665
666             while current < expected {
667                 let nread = stream.read(buf).unwrap();
668                 for uint::range(0, nread) |i| {
669                     let val = buf[i] as uint;
670                     assert_eq!(val, current % 8);
671                     current += 1;
672                 }
673                 reads += 1;
674
675                 let scheduler = Local::take::<Scheduler>();
676                 // Yield to the other task in hopes that it
677                 // will trigger a read callback while we are
678                 // not ready for it
679                 do scheduler.deschedule_running_task_and_then |sched, task| {
680                     let task = Cell::new(task);
681                     sched.enqueue_blocked_task(task.take());
682                 }
683             }
684
685             // Make sure we had multiple reads
686             assert!(reads > 1);
687         }
688
689         do spawntask_immediately {
690             unsafe {
691                 let io = Local::unsafe_borrow::<IoFactoryObject>();
692                 let mut stream = (*io).tcp_connect(addr).unwrap();
693                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
694                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
695                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
696                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
697             }
698         }
699
700     }
701 }
702
703 #[test]
704 fn test_read_read_read() {
705     do run_in_newsched_task {
706         let addr = next_test_ip4();
707         static MAX: uint = 500000;
708
709         do spawntask_immediately {
710             unsafe {
711                 let io = Local::unsafe_borrow::<IoFactoryObject>();
712                 let mut listener = (*io).tcp_bind(addr).unwrap();
713                 let mut stream = listener.accept().unwrap();
714                 let buf = [1, .. 2048];
715                 let mut total_bytes_written = 0;
716                 while total_bytes_written < MAX {
717                     stream.write(buf);
718                     total_bytes_written += buf.len();
719                 }
720             }
721         }
722
723         do spawntask_immediately {
724             unsafe {
725                 let io = Local::unsafe_borrow::<IoFactoryObject>();
726                 let mut stream = (*io).tcp_connect(addr).unwrap();
727                 let mut buf = [0, .. 2048];
728                 let mut total_bytes_read = 0;
729                 while total_bytes_read < MAX {
730                     let nread = stream.read(buf).unwrap();
731                     rtdebug!("read %u bytes", nread as uint);
732                     total_bytes_read += nread;
733                     for uint::range(0, nread) |i| {
734                         assert_eq!(buf[i], 1);
735                     }
736                 }
737                 rtdebug!("read %u bytes total", total_bytes_read as uint);
738             }
739         }
740     }
741 }
742
743 #[test]
744 fn test_udp_twice() {
745     do run_in_newsched_task {
746         let server_addr = next_test_ip4();
747         let client_addr = next_test_ip4();
748
749         do spawntask_immediately {
750             unsafe {
751                 let io = Local::unsafe_borrow::<IoFactoryObject>();
752                 let mut client = (*io).udp_bind(client_addr).unwrap();
753                 assert!(client.sendto([1], server_addr).is_ok());
754                 assert!(client.sendto([2], server_addr).is_ok());
755             }
756         }
757
758         do spawntask_immediately {
759             unsafe {
760                 let io = Local::unsafe_borrow::<IoFactoryObject>();
761                 let mut server = (*io).udp_bind(server_addr).unwrap();
762                 let mut buf1 = [0];
763                 let mut buf2 = [0];
764                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
765                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
766                 assert_eq!(nread1, 1);
767                 assert_eq!(nread2, 1);
768                 assert_eq!(src1, client_addr);
769                 assert_eq!(src2, client_addr);
770                 assert_eq!(buf1[0], 1);
771                 assert_eq!(buf2[0], 2);
772             }
773         }
774     }
775 }
776
777 #[test]
778 fn test_udp_many_read() {
779     do run_in_newsched_task {
780         let server_out_addr = next_test_ip4();
781         let server_in_addr = next_test_ip4();
782         let client_out_addr = next_test_ip4();
783         let client_in_addr = next_test_ip4();
784         static MAX: uint = 500_000;
785
786         do spawntask_immediately {
787             unsafe {
788                 let io = Local::unsafe_borrow::<IoFactoryObject>();
789                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
790                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
791                 let msg = [1, .. 2048];
792                 let mut total_bytes_sent = 0;
793                 let mut buf = [1];
794                 while buf[0] == 1 {
795                     // send more data
796                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
797                     total_bytes_sent += msg.len();
798                     // check if the client has received enough
799                     let res = server_in.recvfrom(buf);
800                     assert!(res.is_ok());
801                     let (nread, src) = res.unwrap();
802                     assert_eq!(nread, 1);
803                     assert_eq!(src, client_out_addr);
804                 }
805                 assert!(total_bytes_sent >= MAX);
806             }
807         }
808
809         do spawntask_immediately {
810             unsafe {
811                 let io = Local::unsafe_borrow::<IoFactoryObject>();
812                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
813                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
814                 let mut total_bytes_recv = 0;
815                 let mut buf = [0, .. 2048];
816                 while total_bytes_recv < MAX {
817                     // ask for more
818                     assert!(client_out.sendto([1], server_in_addr).is_ok());
819                     // wait for data
820                     let res = client_in.recvfrom(buf);
821                     assert!(res.is_ok());
822                     let (nread, src) = res.unwrap();
823                     assert_eq!(src, server_out_addr);
824                     total_bytes_recv += nread;
825                     for uint::range(0, nread) |i| {
826                         assert_eq!(buf[i], 1);
827                     }
828                 }
829                 // tell the server we're done
830                 assert!(client_out.sendto([0], server_in_addr).is_ok());
831             }
832         }
833     }
834 }