]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
038ebad3540aecd29657474dffaacaaf8a2ab365
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use result::*;
21 use rt::io::IoError;
22 use rt::io::net::ip::{SocketAddr, IpAddr};
23 use rt::io::{standard_error, OtherIoError};
24 use rt::local::Local;
25 use rt::rtio::*;
26 use rt::sched::Scheduler;
27 use rt::tube::Tube;
28 use rt::uv::*;
29 use rt::uv::idle::IdleWatcher;
30 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
31 use unstable::sync::Exclusive;
32
33 #[cfg(test)] use container::Container;
34 #[cfg(test)] use unstable::run_in_bare_thread;
35 #[cfg(test)] use rt::test::{spawntask,
36                             next_test_ip4,
37                             run_in_newsched_task};
38 #[cfg(test)] use iterator::{Iterator, range};
39
40 enum SocketNameKind {
41     TcpPeer,
42     Tcp,
43     Udp
44 }
45
46 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
47                                                  handle: U) -> Result<SocketAddr, IoError> {
48
49     let getsockname = match sk {
50         TcpPeer => uvll::rust_uv_tcp_getpeername,
51         Tcp     => uvll::rust_uv_tcp_getsockname,
52         Udp     => uvll::rust_uv_udp_getsockname
53     };
54
55     // Allocate a sockaddr_storage
56     // since we don't know if it's ipv4 or ipv6
57     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
58
59     let r = unsafe {
60         getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
61     };
62
63     if r != 0 {
64         let status = status_to_maybe_uv_error(handle, r);
65         return Err(uv_error_to_io_error(status.unwrap()));
66     }
67
68     let addr = unsafe {
69         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
70             net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
71         } else {
72             net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
73         }
74     };
75
76     unsafe { uvll::free_sockaddr_storage(r_addr); }
77
78     Ok(addr)
79
80 }
81
82 pub struct UvEventLoop {
83     uvio: UvIoFactory
84 }
85
86 impl UvEventLoop {
87     pub fn new() -> UvEventLoop {
88         UvEventLoop {
89             uvio: UvIoFactory(Loop::new())
90         }
91     }
92 }
93
94 impl Drop for UvEventLoop {
95     fn drop(&self) {
96         // XXX: Need mutable finalizer
97         let this = unsafe {
98             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
99         };
100         this.uvio.uv_loop().close();
101     }
102 }
103
104 impl EventLoop for UvEventLoop {
105     fn run(&mut self) {
106         self.uvio.uv_loop().run();
107     }
108
109     fn callback(&mut self, f: ~fn()) {
110         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
111         do idle_watcher.start |mut idle_watcher, status| {
112             assert!(status.is_none());
113             idle_watcher.stop();
114             idle_watcher.close(||());
115             f();
116         }
117     }
118
119     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
120         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
121         do timer.start(ms, 0) |timer, status| {
122             assert!(status.is_none());
123             timer.close(||());
124             f();
125         }
126     }
127
128     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
129         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
130     }
131
132     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
133         Some(&mut self.uvio)
134     }
135 }
136
137 #[test]
138 fn test_callback_run_once() {
139     do run_in_bare_thread {
140         let mut event_loop = UvEventLoop::new();
141         let mut count = 0;
142         let count_ptr: *mut int = &mut count;
143         do event_loop.callback {
144             unsafe { *count_ptr += 1 }
145         }
146         event_loop.run();
147         assert_eq!(count, 1);
148     }
149 }
150
151 pub struct UvRemoteCallback {
152     // The uv async handle for triggering the callback
153     async: AsyncWatcher,
154     // A flag to tell the callback to exit, set from the dtor. This is
155     // almost never contested - only in rare races with the dtor.
156     exit_flag: Exclusive<bool>
157 }
158
159 impl UvRemoteCallback {
160     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
161         let exit_flag = Exclusive::new(false);
162         let exit_flag_clone = exit_flag.clone();
163         let async = do AsyncWatcher::new(loop_) |watcher, status| {
164             assert!(status.is_none());
165             f();
166             unsafe {
167                 do exit_flag_clone.with_imm |&should_exit| {
168                     if should_exit {
169                         watcher.close(||());
170                     }
171                 }
172             }
173         };
174         UvRemoteCallback {
175             async: async,
176             exit_flag: exit_flag
177         }
178     }
179 }
180
181 impl RemoteCallback for UvRemoteCallback {
182     fn fire(&mut self) { self.async.send() }
183 }
184
185 impl Drop for UvRemoteCallback {
186     fn drop(&self) {
187         unsafe {
188             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
189             do this.exit_flag.with |should_exit| {
190                 // NB: These two things need to happen atomically. Otherwise
191                 // the event handler could wake up due to a *previous*
192                 // signal and see the exit flag, destroying the handle
193                 // before the final send.
194                 *should_exit = true;
195                 this.async.send();
196             }
197         }
198     }
199 }
200
201 #[cfg(test)]
202 mod test_remote {
203     use cell::Cell;
204     use rt::test::*;
205     use rt::thread::Thread;
206     use rt::tube::Tube;
207     use rt::rtio::EventLoop;
208     use rt::local::Local;
209     use rt::sched::Scheduler;
210
211     #[test]
212     fn test_uv_remote() {
213         do run_in_newsched_task {
214             let mut tube = Tube::new();
215             let tube_clone = tube.clone();
216             let remote_cell = Cell::new_empty();
217             do Local::borrow::<Scheduler, ()>() |sched| {
218                 let tube_clone = tube_clone.clone();
219                 let tube_clone_cell = Cell::new(tube_clone);
220                 let remote = do sched.event_loop.remote_callback {
221                     tube_clone_cell.take().send(1);
222                 };
223                 remote_cell.put_back(remote);
224             }
225             let thread = do Thread::start {
226                 remote_cell.take().fire();
227             };
228
229             assert!(tube.recv() == 1);
230             thread.join();
231         }
232     }
233 }
234
235 pub struct UvIoFactory(Loop);
236
237 impl UvIoFactory {
238     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
239         match self { &UvIoFactory(ref mut ptr) => ptr }
240     }
241 }
242
243 impl IoFactory for UvIoFactory {
244     // Connect to an address and return a new stream
245     // NB: This blocks the task waiting on the connection.
246     // It would probably be better to return a future
247     fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
248         // Create a cell in the task to hold the result. We will fill
249         // the cell before resuming the task.
250         let result_cell = Cell::new_empty();
251         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
252
253         let scheduler = Local::take::<Scheduler>();
254
255         // Block this task and take ownership, switch to scheduler context
256         do scheduler.deschedule_running_task_and_then |_, task| {
257
258             rtdebug!("connect: entered scheduler context");
259             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
260             let task_cell = Cell::new(task);
261
262             // Wait for a connection
263             do tcp_watcher.connect(addr) |stream_watcher, status| {
264                 rtdebug!("connect: in connect callback");
265                 if status.is_none() {
266                     rtdebug!("status is none");
267                     let tcp_watcher =
268                         NativeHandle::from_native_handle(stream_watcher.native_handle());
269                     let res = Ok(~UvTcpStream(tcp_watcher));
270
271                     // Store the stream in the task's stack
272                     unsafe { (*result_cell_ptr).put_back(res); }
273
274                     // Context switch
275                     let scheduler = Local::take::<Scheduler>();
276                     scheduler.resume_blocked_task_immediately(task_cell.take());
277                 } else {
278                     rtdebug!("status is some");
279                     let task_cell = Cell::new(task_cell.take());
280                     do stream_watcher.close {
281                         let res = Err(uv_error_to_io_error(status.unwrap()));
282                         unsafe { (*result_cell_ptr).put_back(res); }
283                         let scheduler = Local::take::<Scheduler>();
284                         scheduler.resume_blocked_task_immediately(task_cell.take());
285                     }
286                 };
287             }
288         }
289
290         assert!(!result_cell.is_empty());
291         return result_cell.take();
292     }
293
294     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
295         let mut watcher = TcpWatcher::new(self.uv_loop());
296         match watcher.bind(addr) {
297             Ok(_) => Ok(~UvTcpListener::new(watcher)),
298             Err(uverr) => {
299                 let scheduler = Local::take::<Scheduler>();
300                 do scheduler.deschedule_running_task_and_then |_, task| {
301                     let task_cell = Cell::new(task);
302                     do watcher.as_stream().close {
303                         let scheduler = Local::take::<Scheduler>();
304                         scheduler.resume_blocked_task_immediately(task_cell.take());
305                     }
306                 }
307                 Err(uv_error_to_io_error(uverr))
308             }
309         }
310     }
311
312     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
313         let mut watcher = UdpWatcher::new(self.uv_loop());
314         match watcher.bind(addr) {
315             Ok(_) => Ok(~UvUdpSocket(watcher)),
316             Err(uverr) => {
317                 let scheduler = Local::take::<Scheduler>();
318                 do scheduler.deschedule_running_task_and_then |_, task| {
319                     let task_cell = Cell::new(task);
320                     do watcher.close {
321                         let scheduler = Local::take::<Scheduler>();
322                         scheduler.resume_blocked_task_immediately(task_cell.take());
323                     }
324                 }
325                 Err(uv_error_to_io_error(uverr))
326             }
327         }
328     }
329
330     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
331         Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
332     }
333 }
334
335 pub struct UvTcpListener {
336     watcher: TcpWatcher,
337     listening: bool,
338     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
339 }
340
341 impl UvTcpListener {
342     fn new(watcher: TcpWatcher) -> UvTcpListener {
343         UvTcpListener {
344             watcher: watcher,
345             listening: false,
346             incoming_streams: Tube::new()
347         }
348     }
349
350     fn watcher(&self) -> TcpWatcher { self.watcher }
351 }
352
353 impl Drop for UvTcpListener {
354     fn drop(&self) {
355         let watcher = self.watcher();
356         let scheduler = Local::take::<Scheduler>();
357         do scheduler.deschedule_running_task_and_then |_, task| {
358             let task_cell = Cell::new(task);
359             do watcher.as_stream().close {
360                 let scheduler = Local::take::<Scheduler>();
361                 scheduler.resume_blocked_task_immediately(task_cell.take());
362             }
363         }
364     }
365 }
366
367 impl RtioSocket for UvTcpListener {
368     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
369         socket_name(Tcp, self.watcher)
370     }
371 }
372
373 impl RtioTcpListener for UvTcpListener {
374
375     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
376         rtdebug!("entering listen");
377
378         if self.listening {
379             return self.incoming_streams.recv();
380         }
381
382         self.listening = true;
383
384         let server_tcp_watcher = self.watcher();
385         let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
386
387         let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
388         let mut server_tcp_watcher = server_tcp_watcher;
389         do server_tcp_watcher.listen |mut server_stream_watcher, status| {
390             let maybe_stream = if status.is_none() {
391                 let mut loop_ = server_stream_watcher.event_loop();
392                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
393                 // XXX: Need's to be surfaced in interface
394                 server_stream_watcher.accept(client_tcp_watcher.as_stream());
395                 Ok(~UvTcpStream(client_tcp_watcher))
396             } else {
397                 Err(standard_error(OtherIoError))
398             };
399
400             let mut incoming_streams = incoming_streams_cell.take();
401             incoming_streams.send(maybe_stream);
402             incoming_streams_cell.put_back(incoming_streams);
403         }
404
405         return self.incoming_streams.recv();
406     }
407
408     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
409         let r = unsafe {
410             uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int)
411         };
412
413         match status_to_maybe_uv_error(self.watcher, r) {
414             Some(err) => Err(uv_error_to_io_error(err)),
415             None => Ok(())
416         }
417     }
418
419     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
420         let r = unsafe {
421             uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int)
422         };
423
424         match status_to_maybe_uv_error(self.watcher, r) {
425             Some(err) => Err(uv_error_to_io_error(err)),
426             None => Ok(())
427         }
428     }
429 }
430
431 pub struct UvTcpStream(TcpWatcher);
432
433 impl Drop for UvTcpStream {
434     fn drop(&self) {
435         rtdebug!("closing tcp stream");
436         let scheduler = Local::take::<Scheduler>();
437         do scheduler.deschedule_running_task_and_then |_, task| {
438             let task_cell = Cell::new(task);
439             do self.as_stream().close {
440                 let scheduler = Local::take::<Scheduler>();
441                 scheduler.resume_blocked_task_immediately(task_cell.take());
442             }
443         }
444     }
445 }
446
447 impl RtioSocket for UvTcpStream {
448     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
449         socket_name(Tcp, **self)
450     }
451 }
452
453 impl RtioTcpStream for UvTcpStream {
454     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
455         let result_cell = Cell::new_empty();
456         let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
457
458         let scheduler = Local::take::<Scheduler>();
459         let buf_ptr: *&mut [u8] = &buf;
460         do scheduler.deschedule_running_task_and_then |_sched, task| {
461             rtdebug!("read: entered scheduler context");
462             let task_cell = Cell::new(task);
463             // XXX: We shouldn't reallocate these callbacks every
464             // call to read
465             let alloc: AllocCallback = |_| unsafe {
466                 slice_to_uv_buf(*buf_ptr)
467             };
468             let mut watcher = self.as_stream();
469             do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
470
471                 // Stop reading so that no read callbacks are
472                 // triggered before the user calls `read` again.
473                 // XXX: Is there a performance impact to calling
474                 // stop here?
475                 watcher.read_stop();
476
477                 let result = if status.is_none() {
478                     assert!(nread >= 0);
479                     Ok(nread as uint)
480                 } else {
481                     Err(uv_error_to_io_error(status.unwrap()))
482                 };
483
484                 unsafe { (*result_cell_ptr).put_back(result); }
485
486                 let scheduler = Local::take::<Scheduler>();
487                 scheduler.resume_blocked_task_immediately(task_cell.take());
488             }
489         }
490
491         assert!(!result_cell.is_empty());
492         return result_cell.take();
493     }
494
495     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
496         let result_cell = Cell::new_empty();
497         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
498         let scheduler = Local::take::<Scheduler>();
499         let buf_ptr: *&[u8] = &buf;
500         do scheduler.deschedule_running_task_and_then |_, task| {
501             let task_cell = Cell::new(task);
502             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
503             let mut watcher = self.as_stream();
504             do watcher.write(buf) |_watcher, status| {
505                 let result = if status.is_none() {
506                     Ok(())
507                 } else {
508                     Err(uv_error_to_io_error(status.unwrap()))
509                 };
510
511                 unsafe { (*result_cell_ptr).put_back(result); }
512
513                 let scheduler = Local::take::<Scheduler>();
514                 scheduler.resume_blocked_task_immediately(task_cell.take());
515             }
516         }
517
518         assert!(!result_cell.is_empty());
519         return result_cell.take();
520     }
521
522     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
523         socket_name(TcpPeer, **self)
524     }
525
526     fn control_congestion(&mut self) -> Result<(), IoError> {
527         let r = unsafe {
528             uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int)
529         };
530
531         match status_to_maybe_uv_error(**self, r) {
532             Some(err) => Err(uv_error_to_io_error(err)),
533             None => Ok(())
534         }
535     }
536
537     fn nodelay(&mut self) -> Result<(), IoError> {
538         let r = unsafe {
539             uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int)
540         };
541
542         match status_to_maybe_uv_error(**self, r) {
543             Some(err) => Err(uv_error_to_io_error(err)),
544             None => Ok(())
545         }
546     }
547
548     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
549         let r = unsafe {
550             uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int,
551                                         delay_in_seconds as c_uint)
552         };
553
554         match status_to_maybe_uv_error(**self, r) {
555             Some(err) => Err(uv_error_to_io_error(err)),
556             None => Ok(())
557         }
558     }
559
560     fn letdie(&mut self) -> Result<(), IoError> {
561         let r = unsafe {
562             uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint)
563         };
564
565         match status_to_maybe_uv_error(**self, r) {
566             Some(err) => Err(uv_error_to_io_error(err)),
567             None => Ok(())
568         }
569     }
570 }
571
572 pub struct UvUdpSocket(UdpWatcher);
573
574 impl Drop for UvUdpSocket {
575     fn drop(&self) {
576         rtdebug!("closing udp socket");
577         let scheduler = Local::take::<Scheduler>();
578         do scheduler.deschedule_running_task_and_then |_, task| {
579             let task_cell = Cell::new(task);
580             do self.close {
581                 let scheduler = Local::take::<Scheduler>();
582                 scheduler.resume_blocked_task_immediately(task_cell.take());
583             }
584         }
585     }
586 }
587
588 impl RtioSocket for UvUdpSocket {
589     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
590         socket_name(Udp, **self)
591     }
592 }
593
594 impl RtioUdpSocket for UvUdpSocket {
595     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
596         let result_cell = Cell::new_empty();
597         let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
598
599         let scheduler = Local::take::<Scheduler>();
600         let buf_ptr: *&mut [u8] = &buf;
601         do scheduler.deschedule_running_task_and_then |_sched, task| {
602             rtdebug!("recvfrom: entered scheduler context");
603             let task_cell = Cell::new(task);
604             let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
605             do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
606                 let _ = flags; // XXX add handling for partials?
607
608                 watcher.recv_stop();
609
610                 let result = match status {
611                     None => {
612                         assert!(nread >= 0);
613                         Ok((nread as uint, addr))
614                     }
615                     Some(err) => Err(uv_error_to_io_error(err))
616                 };
617
618                 unsafe { (*result_cell_ptr).put_back(result); }
619
620                 let scheduler = Local::take::<Scheduler>();
621                 scheduler.resume_blocked_task_immediately(task_cell.take());
622             }
623         }
624
625         assert!(!result_cell.is_empty());
626         return result_cell.take();
627     }
628
629     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
630         let result_cell = Cell::new_empty();
631         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
632         let scheduler = Local::take::<Scheduler>();
633         let buf_ptr: *&[u8] = &buf;
634         do scheduler.deschedule_running_task_and_then |_, task| {
635             let task_cell = Cell::new(task);
636             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
637             do self.send(buf, dst) |_watcher, status| {
638
639                 let result = match status {
640                     None => Ok(()),
641                     Some(err) => Err(uv_error_to_io_error(err)),
642                 };
643
644                 unsafe { (*result_cell_ptr).put_back(result); }
645
646                 let scheduler = Local::take::<Scheduler>();
647                 scheduler.resume_blocked_task_immediately(task_cell.take());
648             }
649         }
650
651         assert!(!result_cell.is_empty());
652         return result_cell.take();
653     }
654
655     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
656         let r = unsafe {
657             do multi.to_str().to_c_str().with_ref |m_addr| {
658                 uvll::udp_set_membership(self.native_handle(), m_addr,
659                                          ptr::null(), uvll::UV_JOIN_GROUP)
660             }
661         };
662
663         match status_to_maybe_uv_error(**self, r) {
664             Some(err) => Err(uv_error_to_io_error(err)),
665             None => Ok(())
666         }
667     }
668
669     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
670         let r = unsafe {
671             do multi.to_str().to_c_str().with_ref |m_addr| {
672                 uvll::udp_set_membership(self.native_handle(), m_addr,
673                                          ptr::null(), uvll::UV_LEAVE_GROUP)
674             }
675         };
676
677         match status_to_maybe_uv_error(**self, r) {
678             Some(err) => Err(uv_error_to_io_error(err)),
679             None => Ok(())
680         }
681     }
682
683     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
684         let r = unsafe {
685             uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int)
686         };
687
688         match status_to_maybe_uv_error(**self, r) {
689             Some(err) => Err(uv_error_to_io_error(err)),
690             None => Ok(())
691         }
692     }
693
694     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
695         let r = unsafe {
696             uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int)
697         };
698
699         match status_to_maybe_uv_error(**self, r) {
700             Some(err) => Err(uv_error_to_io_error(err)),
701             None => Ok(())
702         }
703     }
704
705     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
706         let r = unsafe {
707             uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int)
708         };
709
710         match status_to_maybe_uv_error(**self, r) {
711             Some(err) => Err(uv_error_to_io_error(err)),
712             None => Ok(())
713         }
714     }
715
716     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
717         let r = unsafe {
718             uvll::udp_set_ttl(self.native_handle(), ttl as c_int)
719         };
720
721         match status_to_maybe_uv_error(**self, r) {
722             Some(err) => Err(uv_error_to_io_error(err)),
723             None => Ok(())
724         }
725     }
726
727     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
728         let r = unsafe {
729             uvll::udp_set_broadcast(self.native_handle(), 1 as c_int)
730         };
731
732         match status_to_maybe_uv_error(**self, r) {
733             Some(err) => Err(uv_error_to_io_error(err)),
734             None => Ok(())
735         }
736     }
737
738     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
739         let r = unsafe {
740             uvll::udp_set_broadcast(self.native_handle(), 0 as c_int)
741         };
742
743         match status_to_maybe_uv_error(**self, r) {
744             Some(err) => Err(uv_error_to_io_error(err)),
745             None => Ok(())
746         }
747     }
748 }
749
750 pub struct UvTimer(timer::TimerWatcher);
751
752 impl UvTimer {
753     fn new(w: timer::TimerWatcher) -> UvTimer {
754         UvTimer(w)
755     }
756 }
757
758 impl Drop for UvTimer {
759     fn drop(&self) {
760         rtdebug!("closing UvTimer");
761         let scheduler = Local::take::<Scheduler>();
762         do scheduler.deschedule_running_task_and_then |_, task| {
763             let task_cell = Cell::new(task);
764             do self.close {
765                 let scheduler = Local::take::<Scheduler>();
766                 scheduler.resume_blocked_task_immediately(task_cell.take());
767             }
768         }
769     }
770 }
771
772 impl RtioTimer for UvTimer {
773     fn sleep(&self, msecs: u64) {
774         let scheduler = Local::take::<Scheduler>();
775         do scheduler.deschedule_running_task_and_then |_sched, task| {
776             rtdebug!("sleep: entered scheduler context");
777             let task_cell = Cell::new(task);
778             let mut watcher = **self;
779             do watcher.start(msecs, 0) |_, status| {
780                 assert!(status.is_none());
781                 let scheduler = Local::take::<Scheduler>();
782                 scheduler.resume_blocked_task_immediately(task_cell.take());
783             }
784         }
785         let mut w = **self;
786         w.stop();
787     }
788 }
789
790 #[test]
791 fn test_simple_io_no_connect() {
792     do run_in_newsched_task {
793         unsafe {
794             let io = Local::unsafe_borrow::<IoFactoryObject>();
795             let addr = next_test_ip4();
796             let maybe_chan = (*io).tcp_connect(addr);
797             assert!(maybe_chan.is_err());
798         }
799     }
800 }
801
802 #[test]
803 fn test_simple_udp_io_bind_only() {
804     do run_in_newsched_task {
805         unsafe {
806             let io = Local::unsafe_borrow::<IoFactoryObject>();
807             let addr = next_test_ip4();
808             let maybe_socket = (*io).udp_bind(addr);
809             assert!(maybe_socket.is_ok());
810         }
811     }
812 }
813
814 #[test]
815 fn test_simple_tcp_server_and_client() {
816     do run_in_newsched_task {
817         let addr = next_test_ip4();
818
819         // Start the server first so it's listening when we connect
820         do spawntask {
821             unsafe {
822                 let io = Local::unsafe_borrow::<IoFactoryObject>();
823                 let mut listener = (*io).tcp_bind(addr).unwrap();
824                 let mut stream = listener.accept().unwrap();
825                 let mut buf = [0, .. 2048];
826                 let nread = stream.read(buf).unwrap();
827                 assert_eq!(nread, 8);
828                 for i in range(0u, nread) {
829                     rtdebug!("%u", buf[i] as uint);
830                     assert_eq!(buf[i], i as u8);
831                 }
832             }
833         }
834
835         do spawntask {
836             unsafe {
837                 let io = Local::unsafe_borrow::<IoFactoryObject>();
838                 let mut stream = (*io).tcp_connect(addr).unwrap();
839                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
840             }
841         }
842     }
843 }
844
845 #[test]
846 fn test_simple_udp_server_and_client() {
847     do run_in_newsched_task {
848         let server_addr = next_test_ip4();
849         let client_addr = next_test_ip4();
850
851         do spawntask {
852             unsafe {
853                 let io = Local::unsafe_borrow::<IoFactoryObject>();
854                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
855                 let mut buf = [0, .. 2048];
856                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
857                 assert_eq!(nread, 8);
858                 for i in range(0u, nread) {
859                     rtdebug!("%u", buf[i] as uint);
860                     assert_eq!(buf[i], i as u8);
861                 }
862                 assert_eq!(src, client_addr);
863             }
864         }
865
866         do spawntask {
867             unsafe {
868                 let io = Local::unsafe_borrow::<IoFactoryObject>();
869                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
870                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
871             }
872         }
873     }
874 }
875
876 #[test] #[ignore(reason = "busted")]
877 fn test_read_and_block() {
878     do run_in_newsched_task {
879         let addr = next_test_ip4();
880
881         do spawntask {
882             let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
883             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
884             let mut stream = listener.accept().unwrap();
885             let mut buf = [0, .. 2048];
886
887             let expected = 32;
888             let mut current = 0;
889             let mut reads = 0;
890
891             while current < expected {
892                 let nread = stream.read(buf).unwrap();
893                 for i in range(0u, nread) {
894                     let val = buf[i] as uint;
895                     assert_eq!(val, current % 8);
896                     current += 1;
897                 }
898                 reads += 1;
899
900                 let scheduler = Local::take::<Scheduler>();
901                 // Yield to the other task in hopes that it
902                 // will trigger a read callback while we are
903                 // not ready for it
904                 do scheduler.deschedule_running_task_and_then |sched, task| {
905                     let task = Cell::new(task);
906                     sched.enqueue_blocked_task(task.take());
907                 }
908             }
909
910             // Make sure we had multiple reads
911             assert!(reads > 1);
912         }
913
914         do spawntask {
915             unsafe {
916                 let io = Local::unsafe_borrow::<IoFactoryObject>();
917                 let mut stream = (*io).tcp_connect(addr).unwrap();
918                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
919                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
920                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
921                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
922             }
923         }
924
925     }
926 }
927
928 #[test]
929 fn test_read_read_read() {
930     do run_in_newsched_task {
931         let addr = next_test_ip4();
932         static MAX: uint = 500000;
933
934         do spawntask {
935             unsafe {
936                 let io = Local::unsafe_borrow::<IoFactoryObject>();
937                 let mut listener = (*io).tcp_bind(addr).unwrap();
938                 let mut stream = listener.accept().unwrap();
939                 let buf = [1, .. 2048];
940                 let mut total_bytes_written = 0;
941                 while total_bytes_written < MAX {
942                     stream.write(buf);
943                     total_bytes_written += buf.len();
944                 }
945             }
946         }
947
948         do spawntask {
949             unsafe {
950                 let io = Local::unsafe_borrow::<IoFactoryObject>();
951                 let mut stream = (*io).tcp_connect(addr).unwrap();
952                 let mut buf = [0, .. 2048];
953                 let mut total_bytes_read = 0;
954                 while total_bytes_read < MAX {
955                     let nread = stream.read(buf).unwrap();
956                     rtdebug!("read %u bytes", nread as uint);
957                     total_bytes_read += nread;
958                     for i in range(0u, nread) {
959                         assert_eq!(buf[i], 1);
960                     }
961                 }
962                 rtdebug!("read %u bytes total", total_bytes_read as uint);
963             }
964         }
965     }
966 }
967
968 #[test]
969 fn test_udp_twice() {
970     do run_in_newsched_task {
971         let server_addr = next_test_ip4();
972         let client_addr = next_test_ip4();
973
974         do spawntask {
975             unsafe {
976                 let io = Local::unsafe_borrow::<IoFactoryObject>();
977                 let mut client = (*io).udp_bind(client_addr).unwrap();
978                 assert!(client.sendto([1], server_addr).is_ok());
979                 assert!(client.sendto([2], server_addr).is_ok());
980             }
981         }
982
983         do spawntask {
984             unsafe {
985                 let io = Local::unsafe_borrow::<IoFactoryObject>();
986                 let mut server = (*io).udp_bind(server_addr).unwrap();
987                 let mut buf1 = [0];
988                 let mut buf2 = [0];
989                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
990                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
991                 assert_eq!(nread1, 1);
992                 assert_eq!(nread2, 1);
993                 assert_eq!(src1, client_addr);
994                 assert_eq!(src2, client_addr);
995                 assert_eq!(buf1[0], 1);
996                 assert_eq!(buf2[0], 2);
997             }
998         }
999     }
1000 }
1001
1002 #[test]
1003 fn test_udp_many_read() {
1004     do run_in_newsched_task {
1005         let server_out_addr = next_test_ip4();
1006         let server_in_addr = next_test_ip4();
1007         let client_out_addr = next_test_ip4();
1008         let client_in_addr = next_test_ip4();
1009         static MAX: uint = 500_000;
1010
1011         do spawntask {
1012             unsafe {
1013                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1014                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1015                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1016                 let msg = [1, .. 2048];
1017                 let mut total_bytes_sent = 0;
1018                 let mut buf = [1];
1019                 while buf[0] == 1 {
1020                     // send more data
1021                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1022                     total_bytes_sent += msg.len();
1023                     // check if the client has received enough
1024                     let res = server_in.recvfrom(buf);
1025                     assert!(res.is_ok());
1026                     let (nread, src) = res.unwrap();
1027                     assert_eq!(nread, 1);
1028                     assert_eq!(src, client_out_addr);
1029                 }
1030                 assert!(total_bytes_sent >= MAX);
1031             }
1032         }
1033
1034         do spawntask {
1035             unsafe {
1036                 let io = Local::unsafe_borrow::<IoFactoryObject>();
1037                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1038                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1039                 let mut total_bytes_recv = 0;
1040                 let mut buf = [0, .. 2048];
1041                 while total_bytes_recv < MAX {
1042                     // ask for more
1043                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1044                     // wait for data
1045                     let res = client_in.recvfrom(buf);
1046                     assert!(res.is_ok());
1047                     let (nread, src) = res.unwrap();
1048                     assert_eq!(src, server_out_addr);
1049                     total_bytes_recv += nread;
1050                     for i in range(0u, nread) {
1051                         assert_eq!(buf[i], 1);
1052                     }
1053                 }
1054                 // tell the server we're done
1055                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1056             }
1057         }
1058     }
1059 }
1060
1061 fn test_timer_sleep_simple_impl() {
1062     unsafe {
1063         let io = Local::unsafe_borrow::<IoFactoryObject>();
1064         let timer = (*io).timer_init();
1065         match timer {
1066             Ok(t) => t.sleep(1),
1067             Err(_) => assert!(false)
1068         }
1069     }
1070 }
1071 #[test]
1072 fn test_timer_sleep_simple() {
1073     do run_in_newsched_task {
1074         test_timer_sleep_simple_impl();
1075     }
1076 }