]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Update to the latest libuv
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::libc;
13 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
14 use std::ptr;
15 use std::rt::BlockedTask;
16 use std::rt::io::IoError;
17 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
18 use std::rt::local::Local;
19 use std::rt::rtio;
20 use std::rt::sched::{Scheduler, SchedHandle};
21 use std::rt::tube::Tube;
22 use std::str;
23 use std::task;
24 use std::vec;
25
26 use stream::StreamWatcher;
27 use super::{Loop, Request, UvError, Buf, status_to_io_result,
28             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
29             wait_until_woken_after};
30 use uvio::HomingIO;
31 use uvll;
32 use uvll::sockaddr;
33
34 ////////////////////////////////////////////////////////////////////////////////
35 /// Generic functions related to dealing with sockaddr things
36 ////////////////////////////////////////////////////////////////////////////////
37
38 #[fixed_stack_segment]
39 fn socket_addr_as_sockaddr<T>(addr: SocketAddr, f: &fn(*sockaddr) -> T) -> T {
40     let malloc = match addr.ip {
41         Ipv4Addr(*) => uvll::rust_malloc_ip4_addr,
42         Ipv6Addr(*) => uvll::rust_malloc_ip6_addr,
43     };
44
45     let ip = addr.ip.to_str();
46     let addr = ip.with_c_str(|p| unsafe { malloc(p, addr.port as c_int) });
47     do (|| {
48         f(addr)
49     }).finally {
50         unsafe { libc::free(addr) };
51     }
52 }
53
54 #[fixed_stack_segment]
55 pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr {
56     unsafe {
57         let ip_size = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
58             4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/
59         } else if uvll::rust_is_ipv6_sockaddr(addr) == 1 {
60             8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/
61         } else {
62             fail!("unknown address?");
63         };
64         let ip_name = {
65             let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
66             let buf_ptr = vec::raw::to_ptr(buf);
67             if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
68                 uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t);
69             } else {
70                 uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t);
71             }
72             buf
73         };
74         let ip_port = {
75             let port = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
76                 uvll::rust_ip4_port(addr)
77             } else {
78                 uvll::rust_ip6_port(addr)
79             };
80             port as u16
81         };
82         let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
83         let ip_addr = FromStr::from_str(ip_str).unwrap();
84
85         SocketAddr { ip: ip_addr, port: ip_port }
86     }
87 }
88
89 #[cfg(test)]
90 #[test]
91 fn test_ip4_conversion() {
92     use std::rt;
93     let ip4 = rt::test::next_test_ip4();
94     do socket_addr_as_sockaddr(ip4) |addr| {
95         assert_eq!(ip4, sockaddr_to_socket_addr(addr));
96     }
97 }
98
99 #[cfg(test)]
100 #[test]
101 fn test_ip6_conversion() {
102     use std::rt;
103     let ip6 = rt::test::next_test_ip6();
104     do socket_addr_as_sockaddr(ip6) |addr| {
105         assert_eq!(ip6, sockaddr_to_socket_addr(addr));
106     }
107 }
108
109 enum SocketNameKind {
110     TcpPeer,
111     Tcp,
112     Udp
113 }
114
115 #[fixed_stack_segment]
116 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
117     unsafe {
118         let getsockname = match sk {
119             TcpPeer => uvll::uv_tcp_getpeername,
120             Tcp     => uvll::uv_tcp_getsockname,
121             Udp     => uvll::uv_udp_getsockname,
122         };
123
124         // Allocate a sockaddr_storage
125         // since we don't know if it's ipv4 or ipv6
126         let size = uvll::rust_sockaddr_size();
127         let name = libc::malloc(size as size_t);
128         assert!(!name.is_null());
129         let mut namelen = size;
130
131         let ret = match getsockname(handle, name, &mut namelen) {
132             0 => Ok(sockaddr_to_socket_addr(name)),
133             n => Err(uv_error_to_io_error(UvError(n)))
134         };
135         libc::free(name);
136         ret
137     }
138 }
139
140 ////////////////////////////////////////////////////////////////////////////////
141 /// TCP implementation
142 ////////////////////////////////////////////////////////////////////////////////
143
144 pub struct TcpWatcher {
145     handle: *uvll::uv_tcp_t,
146     stream: StreamWatcher,
147     home: SchedHandle,
148 }
149
150 pub struct TcpListener {
151     home: SchedHandle,
152     handle: *uvll::uv_pipe_t,
153     priv closing_task: Option<BlockedTask>,
154     priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
155 }
156
157 pub struct TcpAcceptor {
158     listener: ~TcpListener,
159     priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
160 }
161
162 // TCP watchers (clients/streams)
163
164 impl TcpWatcher {
165     pub fn new(loop_: &Loop) -> TcpWatcher {
166         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
167         assert_eq!(unsafe {
168             uvll::uv_tcp_init(loop_.handle, handle)
169         }, 0);
170         TcpWatcher {
171             home: get_handle_to_current_scheduler!(),
172             handle: handle,
173             stream: StreamWatcher::new(handle),
174         }
175     }
176
177     pub fn connect(loop_: &mut Loop, address: SocketAddr)
178         -> Result<TcpWatcher, UvError>
179     {
180         struct Ctx { status: c_int, task: Option<BlockedTask> }
181
182         return do task::unkillable {
183             let tcp = TcpWatcher::new(loop_);
184             let ret = do socket_addr_as_sockaddr(address) |addr| {
185                 let mut req = Request::new(uvll::UV_CONNECT);
186                 let result = unsafe {
187                     uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
188                                          connect_cb)
189                 };
190                 match result {
191                     0 => {
192                         req.defuse(); // uv callback now owns this request
193                         let mut cx = Ctx { status: 0, task: None };
194                         do wait_until_woken_after(&mut cx.task) {
195                             req.set_data(&cx);
196                         }
197                         match cx.status {
198                             0 => Ok(()),
199                             n => Err(UvError(n)),
200                         }
201                     }
202                     n => Err(UvError(n))
203                 }
204             };
205
206             match ret {
207                 Ok(()) => Ok(tcp),
208                 Err(e) => Err(e),
209             }
210         };
211
212         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
213             let req = Request::wrap(req);
214             assert!(status != uvll::ECANCELED);
215             let cx: &mut Ctx = unsafe { req.get_data() };
216             cx.status = status;
217             let scheduler: ~Scheduler = Local::take();
218             scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
219         }
220     }
221 }
222
223 impl HomingIO for TcpWatcher {
224     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
225 }
226
227 impl rtio::RtioSocket for TcpWatcher {
228     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
229         let _m = self.fire_homing_missile();
230         socket_name(Tcp, self.handle)
231     }
232 }
233
234 impl rtio::RtioTcpStream for TcpWatcher {
235     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
236         let _m = self.fire_homing_missile();
237         self.stream.read(buf).map_err(uv_error_to_io_error)
238     }
239
240     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
241         let _m = self.fire_homing_missile();
242         self.stream.write(buf).map_err(uv_error_to_io_error)
243     }
244
245     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
246         let _m = self.fire_homing_missile();
247         socket_name(TcpPeer, self.handle)
248     }
249
250     fn control_congestion(&mut self) -> Result<(), IoError> {
251         let _m = self.fire_homing_missile();
252         status_to_io_result(unsafe {
253             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
254         })
255     }
256
257     fn nodelay(&mut self) -> Result<(), IoError> {
258         let _m = self.fire_homing_missile();
259         status_to_io_result(unsafe {
260             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
261         })
262     }
263
264     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
265         let _m = self.fire_homing_missile();
266         status_to_io_result(unsafe {
267             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
268                                    delay_in_seconds as c_uint)
269         })
270     }
271
272     fn letdie(&mut self) -> Result<(), IoError> {
273         let _m = self.fire_homing_missile();
274         status_to_io_result(unsafe {
275             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
276         })
277     }
278 }
279
280 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
281     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
282 }
283
284 impl Drop for TcpWatcher {
285     fn drop(&mut self) {
286         let _m = self.fire_homing_missile();
287         self.close();
288     }
289 }
290
291 // TCP listeners (unbound servers)
292
293 impl TcpListener {
294     pub fn bind(loop_: &mut Loop, address: SocketAddr)
295         -> Result<~TcpListener, UvError>
296     {
297         do task::unkillable {
298             let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
299             assert_eq!(unsafe {
300                 uvll::uv_tcp_init(loop_.handle, handle)
301             }, 0);
302             let l = ~TcpListener {
303                 home: get_handle_to_current_scheduler!(),
304                 handle: handle,
305                 closing_task: None,
306                 outgoing: Tube::new(),
307             };
308             let res = socket_addr_as_sockaddr(address, |addr| unsafe {
309                 uvll::uv_tcp_bind(l.handle, addr)
310             });
311             match res {
312                 0 => Ok(l.install()),
313                 n => Err(UvError(n))
314             }
315         }
316     }
317 }
318
319 impl HomingIO for TcpListener {
320     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
321 }
322
323 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
324     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
325 }
326
327 impl rtio::RtioSocket for TcpListener {
328     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
329         let _m = self.fire_homing_missile();
330         socket_name(Tcp, self.handle)
331     }
332 }
333
334 impl rtio::RtioTcpListener for TcpListener {
335     fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
336         // create the acceptor object from ourselves
337         let incoming = self.outgoing.clone();
338         let mut acceptor = ~TcpAcceptor {
339             listener: self,
340             incoming: incoming,
341         };
342
343         let _m = acceptor.fire_homing_missile();
344         // XXX: the 128 backlog should be configurable
345         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
346             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
347             n => Err(uv_error_to_io_error(UvError(n))),
348         }
349     }
350 }
351
352 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
353     assert!(status != uvll::ECANCELED);
354     let msg = match status {
355         0 => {
356             let loop_ = Loop::wrap(unsafe {
357                 uvll::get_loop_for_uv_handle(server)
358             });
359             let client = TcpWatcher::new(&loop_);
360             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
361             Ok(~client as ~rtio::RtioTcpStream)
362         }
363         n => Err(uv_error_to_io_error(UvError(n)))
364     };
365
366     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
367     tcp.outgoing.send(msg);
368 }
369
370 impl Drop for TcpListener {
371     fn drop(&mut self) {
372         let _m = self.fire_homing_missile();
373         self.close();
374     }
375 }
376
377 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
378     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
379     unsafe { uvll::free_handle(handle) }
380
381     let sched: ~Scheduler = Local::take();
382     sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
383 }
384
385 // TCP acceptors (bound servers)
386
387 impl HomingIO for TcpAcceptor {
388     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
389 }
390
391 impl rtio::RtioSocket for TcpAcceptor {
392     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
393         let _m = self.fire_homing_missile();
394         socket_name(Tcp, self.listener.handle)
395     }
396 }
397
398 impl rtio::RtioTcpAcceptor for TcpAcceptor {
399     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
400         let _m = self.fire_homing_missile();
401         self.incoming.recv()
402     }
403
404     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
405         let _m = self.fire_homing_missile();
406         status_to_io_result(unsafe {
407             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
408         })
409     }
410
411     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
412         let _m = self.fire_homing_missile();
413         status_to_io_result(unsafe {
414             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
415         })
416     }
417 }
418
419 ////////////////////////////////////////////////////////////////////////////////
420 /// UDP implementation
421 ////////////////////////////////////////////////////////////////////////////////
422
423 pub struct UdpWatcher {
424     handle: *uvll::uv_udp_t,
425     home: SchedHandle,
426 }
427
428 impl UdpWatcher {
429     pub fn bind(loop_: &Loop, address: SocketAddr)
430         -> Result<UdpWatcher, UvError>
431     {
432         do task::unkillable {
433             let udp = UdpWatcher {
434                 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
435                 home: get_handle_to_current_scheduler!(),
436             };
437             assert_eq!(unsafe {
438                 uvll::uv_udp_init(loop_.handle, udp.handle)
439             }, 0);
440             let result = socket_addr_as_sockaddr(address, |addr| unsafe {
441                 uvll::uv_udp_bind(udp.handle, addr, 0u32)
442             });
443             match result {
444                 0 => Ok(udp),
445                 n => Err(UvError(n)),
446             }
447         }
448     }
449 }
450
451 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
452     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
453 }
454
455 impl HomingIO for UdpWatcher {
456     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
457 }
458
459 impl rtio::RtioSocket for UdpWatcher {
460     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
461         let _m = self.fire_homing_missile();
462         socket_name(Udp, self.handle)
463     }
464 }
465
466 impl rtio::RtioUdpSocket for UdpWatcher {
467     fn recvfrom(&mut self, buf: &mut [u8])
468         -> Result<(uint, SocketAddr), IoError>
469     {
470         struct Ctx {
471             task: Option<BlockedTask>,
472             buf: Option<Buf>,
473             result: Option<(ssize_t, Option<SocketAddr>)>,
474         }
475         let _m = self.fire_homing_missile();
476
477         let a = match unsafe {
478             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
479         } {
480             0 => {
481                 let mut cx = Ctx {
482                     task: None,
483                     buf: Some(slice_to_uv_buf(buf)),
484                     result: None,
485                 };
486                 do wait_until_woken_after(&mut cx.task) {
487                     unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
488                 }
489                 match cx.result.take_unwrap() {
490                     (n, _) if n < 0 =>
491                         Err(uv_error_to_io_error(UvError(n as c_int))),
492                     (n, addr) => Ok((n as uint, addr.unwrap()))
493                 }
494             }
495             n => Err(uv_error_to_io_error(UvError(n)))
496         };
497         return a;
498
499         extern fn alloc_cb(handle: *uvll::uv_udp_t,
500                            _suggested_size: size_t,
501                            buf: *mut Buf) {
502             unsafe {
503                 let cx: &mut Ctx =
504                     cast::transmute(uvll::get_data_for_uv_handle(handle));
505                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
506             }
507         }
508
509         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
510                           addr: *uvll::sockaddr, _flags: c_uint) {
511             assert!(nread != uvll::ECANCELED as ssize_t);
512             let cx: &mut Ctx = unsafe {
513                 cast::transmute(uvll::get_data_for_uv_handle(handle))
514             };
515
516             // When there's no data to read the recv callback can be a no-op.
517             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
518             // this we just drop back to kqueue and wait for the next callback.
519             if nread == 0 {
520                 cx.buf = Some(unsafe { *buf });
521                 return
522             }
523
524             unsafe {
525                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
526             }
527
528             let cx: &mut Ctx = unsafe {
529                 cast::transmute(uvll::get_data_for_uv_handle(handle))
530             };
531             let addr = if addr == ptr::null() {
532                 None
533             } else {
534                 Some(sockaddr_to_socket_addr(addr))
535             };
536             cx.result = Some((nread, addr));
537
538             let sched: ~Scheduler = Local::take();
539             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
540         }
541     }
542
543     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
544         struct Ctx { task: Option<BlockedTask>, result: c_int }
545
546         let _m = self.fire_homing_missile();
547
548         let mut req = Request::new(uvll::UV_UDP_SEND);
549         let buf = slice_to_uv_buf(buf);
550         let result = socket_addr_as_sockaddr(dst, |dst| unsafe {
551             uvll::uv_udp_send(req.handle, self.handle, [buf], dst, send_cb)
552         });
553
554         return match result {
555             0 => {
556                 req.defuse(); // uv callback now owns this request
557                 let mut cx = Ctx { task: None, result: 0 };
558                 do wait_until_woken_after(&mut cx.task) {
559                     req.set_data(&cx);
560                 }
561                 match cx.result {
562                     0 => Ok(()),
563                     n => Err(uv_error_to_io_error(UvError(n)))
564                 }
565             }
566             n => Err(uv_error_to_io_error(UvError(n)))
567         };
568
569         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
570             let req = Request::wrap(req);
571             assert!(status != uvll::ECANCELED);
572             let cx: &mut Ctx = unsafe { req.get_data() };
573             cx.result = status;
574
575             let sched: ~Scheduler = Local::take();
576             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
577         }
578     }
579
580     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
581         let _m = self.fire_homing_missile();
582         status_to_io_result(unsafe {
583             do multi.to_str().with_c_str |m_addr| {
584                 uvll::uv_udp_set_membership(self.handle,
585                                             m_addr, ptr::null(),
586                                             uvll::UV_JOIN_GROUP)
587             }
588         })
589     }
590
591     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
592         let _m = self.fire_homing_missile();
593         status_to_io_result(unsafe {
594             do multi.to_str().with_c_str |m_addr| {
595                 uvll::uv_udp_set_membership(self.handle,
596                                             m_addr, ptr::null(),
597                                             uvll::UV_LEAVE_GROUP)
598             }
599         })
600     }
601
602     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
603         let _m = self.fire_homing_missile();
604         status_to_io_result(unsafe {
605             uvll::uv_udp_set_multicast_loop(self.handle,
606                                             1 as c_int)
607         })
608     }
609
610     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
611         let _m = self.fire_homing_missile();
612         status_to_io_result(unsafe {
613             uvll::uv_udp_set_multicast_loop(self.handle,
614                                             0 as c_int)
615         })
616     }
617
618     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
619         let _m = self.fire_homing_missile();
620         status_to_io_result(unsafe {
621             uvll::uv_udp_set_multicast_ttl(self.handle,
622                                            ttl as c_int)
623         })
624     }
625
626     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
627         let _m = self.fire_homing_missile();
628         status_to_io_result(unsafe {
629             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
630         })
631     }
632
633     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
634         let _m = self.fire_homing_missile();
635         status_to_io_result(unsafe {
636             uvll::uv_udp_set_broadcast(self.handle,
637                                        1 as c_int)
638         })
639     }
640
641     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
642         let _m = self.fire_homing_missile();
643         status_to_io_result(unsafe {
644             uvll::uv_udp_set_broadcast(self.handle,
645                                        0 as c_int)
646         })
647     }
648 }
649
650 impl Drop for UdpWatcher {
651     fn drop(&mut self) {
652         // Send ourselves home to close this handle (blocking while doing so).
653         let _m = self.fire_homing_missile();
654         self.close();
655     }
656 }
657
658 ////////////////////////////////////////////////////////////////////////////////
659 /// UV request support
660 ////////////////////////////////////////////////////////////////////////////////
661
662 #[cfg(test)]
663 mod test {
664     use std::cell::Cell;
665     use std::comm::oneshot;
666     use std::rt::test::*;
667     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
668                         RtioUdpSocket};
669     use std::task;
670
671     use super::*;
672     use super::super::local_loop;
673
674     #[test]
675     fn connect_close_ip4() {
676         match TcpWatcher::connect(local_loop(), next_test_ip4()) {
677             Ok(*) => fail!(),
678             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
679         }
680     }
681
682     #[test]
683     fn connect_close_ip6() {
684         match TcpWatcher::connect(local_loop(), next_test_ip6()) {
685             Ok(*) => fail!(),
686             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
687         }
688     }
689
690     #[test]
691     fn udp_bind_close_ip4() {
692         match UdpWatcher::bind(local_loop(), next_test_ip4()) {
693             Ok(*) => {}
694             Err(*) => fail!()
695         }
696     }
697
698     #[test]
699     fn udp_bind_close_ip6() {
700         match UdpWatcher::bind(local_loop(), next_test_ip6()) {
701             Ok(*) => {}
702             Err(*) => fail!()
703         }
704     }
705
706     #[test]
707     fn listen_ip4() {
708         let (port, chan) = oneshot();
709         let chan = Cell::new(chan);
710         let addr = next_test_ip4();
711
712         do spawn {
713             let w = match TcpListener::bind(local_loop(), addr) {
714                 Ok(w) => w, Err(e) => fail!("{:?}", e)
715             };
716             let mut w = match w.listen() {
717                 Ok(w) => w, Err(e) => fail!("{:?}", e),
718             };
719             chan.take().send(());
720             match w.accept() {
721                 Ok(mut stream) => {
722                     let mut buf = [0u8, ..10];
723                     match stream.read(buf) {
724                         Ok(10) => {} e => fail!("{:?}", e),
725                     }
726                     for i in range(0, 10u8) {
727                         assert_eq!(buf[i], i + 1);
728                     }
729                 }
730                 Err(e) => fail!("{:?}", e)
731             }
732         }
733
734         port.recv();
735         let mut w = match TcpWatcher::connect(local_loop(), addr) {
736             Ok(w) => w, Err(e) => fail!("{:?}", e)
737         };
738         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
739             Ok(()) => {}, Err(e) => fail!("{:?}", e)
740         }
741     }
742
743     #[test]
744     fn listen_ip6() {
745         let (port, chan) = oneshot();
746         let chan = Cell::new(chan);
747         let addr = next_test_ip6();
748
749         do spawn {
750             let w = match TcpListener::bind(local_loop(), addr) {
751                 Ok(w) => w, Err(e) => fail!("{:?}", e)
752             };
753             let mut w = match w.listen() {
754                 Ok(w) => w, Err(e) => fail!("{:?}", e),
755             };
756             chan.take().send(());
757             match w.accept() {
758                 Ok(mut stream) => {
759                     let mut buf = [0u8, ..10];
760                     match stream.read(buf) {
761                         Ok(10) => {} e => fail!("{:?}", e),
762                     }
763                     for i in range(0, 10u8) {
764                         assert_eq!(buf[i], i + 1);
765                     }
766                 }
767                 Err(e) => fail!("{:?}", e)
768             }
769         }
770
771         port.recv();
772         let mut w = match TcpWatcher::connect(local_loop(), addr) {
773             Ok(w) => w, Err(e) => fail!("{:?}", e)
774         };
775         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
776             Ok(()) => {}, Err(e) => fail!("{:?}", e)
777         }
778     }
779
780     #[test]
781     fn udp_recv_ip4() {
782         let (port, chan) = oneshot();
783         let chan = Cell::new(chan);
784         let client = next_test_ip4();
785         let server = next_test_ip4();
786
787         do spawn {
788             match UdpWatcher::bind(local_loop(), server) {
789                 Ok(mut w) => {
790                     chan.take().send(());
791                     let mut buf = [0u8, ..10];
792                     match w.recvfrom(buf) {
793                         Ok((10, addr)) => assert_eq!(addr, client),
794                         e => fail!("{:?}", e),
795                     }
796                     for i in range(0, 10u8) {
797                         assert_eq!(buf[i], i + 1);
798                     }
799                 }
800                 Err(e) => fail!("{:?}", e)
801             }
802         }
803
804         port.recv();
805         let mut w = match UdpWatcher::bind(local_loop(), client) {
806             Ok(w) => w, Err(e) => fail!("{:?}", e)
807         };
808         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
809             Ok(()) => {}, Err(e) => fail!("{:?}", e)
810         }
811     }
812
813     #[test]
814     fn udp_recv_ip6() {
815         let (port, chan) = oneshot();
816         let chan = Cell::new(chan);
817         let client = next_test_ip6();
818         let server = next_test_ip6();
819
820         do spawn {
821             match UdpWatcher::bind(local_loop(), server) {
822                 Ok(mut w) => {
823                     chan.take().send(());
824                     let mut buf = [0u8, ..10];
825                     match w.recvfrom(buf) {
826                         Ok((10, addr)) => assert_eq!(addr, client),
827                         e => fail!("{:?}", e),
828                     }
829                     for i in range(0, 10u8) {
830                         assert_eq!(buf[i], i + 1);
831                     }
832                 }
833                 Err(e) => fail!("{:?}", e)
834             }
835         }
836
837         port.recv();
838         let mut w = match UdpWatcher::bind(local_loop(), client) {
839             Ok(w) => w, Err(e) => fail!("{:?}", e)
840         };
841         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
842             Ok(()) => {}, Err(e) => fail!("{:?}", e)
843         }
844     }
845
846     #[test]
847     fn test_read_read_read() {
848         use std::rt::rtio::*;
849         let addr = next_test_ip4();
850         static MAX: uint = 5000;
851         let (port, chan) = oneshot();
852         let port = Cell::new(port);
853         let chan = Cell::new(chan);
854
855         do spawn {
856             let listener = TcpListener::bind(local_loop(), addr).unwrap();
857             let mut acceptor = listener.listen().unwrap();
858             chan.take().send(());
859             let mut stream = acceptor.accept().unwrap();
860             let buf = [1, .. 2048];
861             let mut total_bytes_written = 0;
862             while total_bytes_written < MAX {
863                 assert!(stream.write(buf).is_ok());
864                 uvdebug!("wrote bytes");
865                 total_bytes_written += buf.len();
866             }
867         }
868
869         do spawn {
870             port.take().recv();
871             let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
872             let mut buf = [0, .. 2048];
873             let mut total_bytes_read = 0;
874             while total_bytes_read < MAX {
875                 let nread = stream.read(buf).unwrap();
876                 total_bytes_read += nread;
877                 for i in range(0u, nread) {
878                     assert_eq!(buf[i], 1);
879                 }
880             }
881             uvdebug!("read {} bytes total", total_bytes_read);
882         }
883     }
884
885     #[test]
886     fn test_udp_twice() {
887         let server_addr = next_test_ip4();
888         let client_addr = next_test_ip4();
889         let (port, chan) = oneshot();
890         let port = Cell::new(port);
891         let chan = Cell::new(chan);
892
893         do spawn {
894             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
895             port.take().recv();
896             assert!(client.sendto([1], server_addr).is_ok());
897             assert!(client.sendto([2], server_addr).is_ok());
898         }
899
900         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
901         chan.take().send(());
902         let mut buf1 = [0];
903         let mut buf2 = [0];
904         let (nread1, src1) = server.recvfrom(buf1).unwrap();
905         let (nread2, src2) = server.recvfrom(buf2).unwrap();
906         assert_eq!(nread1, 1);
907         assert_eq!(nread2, 1);
908         assert_eq!(src1, client_addr);
909         assert_eq!(src2, client_addr);
910         assert_eq!(buf1[0], 1);
911         assert_eq!(buf2[0], 2);
912     }
913
914     #[test]
915     fn test_udp_many_read() {
916         let server_out_addr = next_test_ip4();
917         let server_in_addr = next_test_ip4();
918         let client_out_addr = next_test_ip4();
919         let client_in_addr = next_test_ip4();
920         static MAX: uint = 500_000;
921
922         let (p1, c1) = oneshot();
923         let (p2, c2) = oneshot();
924
925         let first = Cell::new((p1, c2));
926         let second = Cell::new((p2, c1));
927
928         do spawn {
929             let l = local_loop();
930             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
931             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
932             let (port, chan) = first.take();
933             chan.send(());
934             port.recv();
935             let msg = [1, .. 2048];
936             let mut total_bytes_sent = 0;
937             let mut buf = [1];
938             while buf[0] == 1 {
939                 // send more data
940                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
941                 total_bytes_sent += msg.len();
942                 // check if the client has received enough
943                 let res = server_in.recvfrom(buf);
944                 assert!(res.is_ok());
945                 let (nread, src) = res.unwrap();
946                 assert_eq!(nread, 1);
947                 assert_eq!(src, client_out_addr);
948             }
949             assert!(total_bytes_sent >= MAX);
950         }
951
952         do spawn {
953             let l = local_loop();
954             let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
955             let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
956             let (port, chan) = second.take();
957             port.recv();
958             chan.send(());
959             let mut total_bytes_recv = 0;
960             let mut buf = [0, .. 2048];
961             while total_bytes_recv < MAX {
962                 // ask for more
963                 assert!(client_out.sendto([1], server_in_addr).is_ok());
964                 // wait for data
965                 let res = client_in.recvfrom(buf);
966                 assert!(res.is_ok());
967                 let (nread, src) = res.unwrap();
968                 assert_eq!(src, server_out_addr);
969                 total_bytes_recv += nread;
970                 for i in range(0u, nread) {
971                     assert_eq!(buf[i], 1);
972                 }
973             }
974             // tell the server we're done
975             assert!(client_out.sendto([0], server_in_addr).is_ok());
976         }
977     }
978
979     #[test]
980     fn test_read_and_block() {
981         let addr = next_test_ip4();
982         let (port, chan) = oneshot();
983         let port = Cell::new(port);
984         let chan = Cell::new(chan);
985
986         do spawn {
987             let listener = TcpListener::bind(local_loop(), addr).unwrap();
988             let mut acceptor = listener.listen().unwrap();
989             let (port2, chan2) = stream();
990             chan.take().send(port2);
991             let mut stream = acceptor.accept().unwrap();
992             let mut buf = [0, .. 2048];
993
994             let expected = 32;
995             let mut current = 0;
996             let mut reads = 0;
997
998             while current < expected {
999                 let nread = stream.read(buf).unwrap();
1000                 for i in range(0u, nread) {
1001                     let val = buf[i] as uint;
1002                     assert_eq!(val, current % 8);
1003                     current += 1;
1004                 }
1005                 reads += 1;
1006
1007                 chan2.send(());
1008             }
1009
1010             // Make sure we had multiple reads
1011             assert!(reads > 1);
1012         }
1013
1014         do spawn {
1015             let port2 = port.take().recv();
1016             let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
1017             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1018             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1019             port2.recv();
1020             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1021             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1022             port2.recv();
1023         }
1024     }
1025
1026     #[test]
1027     fn test_simple_tcp_server_and_client_on_diff_threads() {
1028         let addr = next_test_ip4();
1029
1030         do task::spawn_sched(task::SingleThreaded) {
1031             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1032             let mut acceptor = listener.listen().unwrap();
1033             let mut stream = acceptor.accept().unwrap();
1034             let mut buf = [0, .. 2048];
1035             let nread = stream.read(buf).unwrap();
1036             assert_eq!(nread, 8);
1037             for i in range(0u, nread) {
1038                 assert_eq!(buf[i], i as u8);
1039             }
1040         }
1041
1042         do task::spawn_sched(task::SingleThreaded) {
1043             let mut stream = TcpWatcher::connect(local_loop(), addr);
1044             while stream.is_err() {
1045                 stream = TcpWatcher::connect(local_loop(), addr);
1046             }
1047             stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1048         }
1049     }
1050
1051     // On one thread, create a udp socket. Then send that socket to another
1052     // thread and destroy the socket on the remote thread. This should make sure
1053     // that homing kicks in for the socket to go back home to the original
1054     // thread, close itself, and then come back to the last thread.
1055     #[test]
1056     fn test_homing_closes_correctly() {
1057         let (port, chan) = oneshot();
1058         let port = Cell::new(port);
1059         let chan = Cell::new(chan);
1060
1061         do task::spawn_sched(task::SingleThreaded) {
1062             let chan = Cell::new(chan.take());
1063             let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
1064             chan.take().send(listener);
1065         }
1066
1067         do task::spawn_sched(task::SingleThreaded) {
1068             let port = Cell::new(port.take());
1069             port.take().recv();
1070         }
1071     }
1072
1073     // This is a bit of a crufty old test, but it has its uses.
1074     #[test]
1075     fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1076         use std::cast;
1077         use std::rt::local::Local;
1078         use std::rt::rtio::{EventLoop, IoFactory};
1079         use std::rt::sched::Scheduler;
1080         use std::rt::sched::{Shutdown, TaskFromFriend};
1081         use std::rt::sleeper_list::SleeperList;
1082         use std::rt::task::Task;
1083         use std::rt::task::UnwindResult;
1084         use std::rt::thread::Thread;
1085         use std::rt::work_queue::WorkQueue;
1086         use std::unstable::run_in_bare_thread;
1087         use uvio::UvEventLoop;
1088
1089         do run_in_bare_thread {
1090             let sleepers = SleeperList::new();
1091             let work_queue1 = WorkQueue::new();
1092             let work_queue2 = WorkQueue::new();
1093             let queues = ~[work_queue1.clone(), work_queue2.clone()];
1094
1095             let loop1 = ~UvEventLoop::new() as ~EventLoop;
1096             let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
1097                                              sleepers.clone());
1098             let loop2 = ~UvEventLoop::new() as ~EventLoop;
1099             let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
1100                                              sleepers.clone());
1101
1102             let handle1 = Cell::new(sched1.make_handle());
1103             let handle2 = Cell::new(sched2.make_handle());
1104             let tasksFriendHandle = Cell::new(sched2.make_handle());
1105
1106             let on_exit: ~fn(UnwindResult) = |exit_status| {
1107                 handle1.take().send(Shutdown);
1108                 handle2.take().send(Shutdown);
1109                 assert!(exit_status.is_success());
1110             };
1111
1112             unsafe fn local_io() -> &'static mut IoFactory {
1113                 do Local::borrow |sched: &mut Scheduler| {
1114                     let mut io = None;
1115                     sched.event_loop.io(|i| io = Some(i));
1116                     cast::transmute(io.unwrap())
1117                 }
1118             }
1119
1120             let test_function: ~fn() = || {
1121                 let io = unsafe { local_io() };
1122                 let addr = next_test_ip4();
1123                 let maybe_socket = io.udp_bind(addr);
1124                 // this socket is bound to this event loop
1125                 assert!(maybe_socket.is_ok());
1126
1127                 // block self on sched1
1128                 do task::unkillable { // FIXME(#8674)
1129                     let scheduler: ~Scheduler = Local::take();
1130                     do scheduler.deschedule_running_task_and_then |_, task| {
1131                         // unblock task
1132                         do task.wake().map |task| {
1133                             // send self to sched2
1134                             tasksFriendHandle.take().send(TaskFromFriend(task));
1135                         };
1136                         // sched1 should now sleep since it has nothing else to do
1137                     }
1138                 }
1139                 // sched2 will wake up and get the task as we do nothing else,
1140                 // the function ends and the socket goes out of scope sched2
1141                 // will start to run the destructor the destructor will first
1142                 // block the task, set it's home as sched1, then enqueue it
1143                 // sched2 will dequeue the task, see that it has a home, and
1144                 // send it to sched1 sched1 will wake up, exec the close
1145                 // function on the correct loop, and then we're done
1146             };
1147
1148             let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
1149                                                 test_function);
1150             main_task.death.on_exit = Some(on_exit);
1151             let main_task = Cell::new(main_task);
1152
1153             let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool,
1154                                                          None) || {});
1155
1156             let sched1 = Cell::new(sched1);
1157             let sched2 = Cell::new(sched2);
1158
1159             let thread1 = do Thread::start {
1160                 sched1.take().bootstrap(main_task.take());
1161             };
1162             let thread2 = do Thread::start {
1163                 sched2.take().bootstrap(null_task.take());
1164             };
1165
1166             thread1.join();
1167             thread2.join();
1168         }
1169     }
1170
1171     #[should_fail]
1172     #[test]
1173     #[ignore(reason = "linked failure")]
1174     fn linked_failure1() {
1175         let (port, chan) = oneshot();
1176         let chan = Cell::new(chan);
1177         let addr = next_test_ip4();
1178
1179         do spawn {
1180             let w = TcpListener::bind(local_loop(), addr).unwrap();
1181             let mut w = w.listen().unwrap();
1182             chan.take().send(());
1183             w.accept();
1184         }
1185
1186         port.recv();
1187         fail!();
1188     }
1189
1190     #[should_fail]
1191     #[test]
1192     #[ignore(reason = "linked failure")]
1193     fn linked_failure2() {
1194         let (port, chan) = oneshot();
1195         let chan = Cell::new(chan);
1196         let addr = next_test_ip4();
1197
1198         do spawn {
1199             let w = TcpListener::bind(local_loop(), addr).unwrap();
1200             let mut w = w.listen().unwrap();
1201             chan.take().send(());
1202             let mut buf = [0];
1203             w.accept().unwrap().read(buf);
1204         }
1205
1206         port.recv();
1207         let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1208
1209         fail!();
1210     }
1211
1212     #[should_fail]
1213     #[test]
1214     #[ignore(reason = "linked failure")]
1215     fn linked_failure3() {
1216         let (port, chan) = stream();
1217         let chan = Cell::new(chan);
1218         let addr = next_test_ip4();
1219
1220         do spawn {
1221             let chan = chan.take();
1222             let w = TcpListener::bind(local_loop(), addr).unwrap();
1223             let mut w = w.listen().unwrap();
1224             chan.send(());
1225             let mut conn = w.accept().unwrap();
1226             chan.send(());
1227             let buf = [0, ..65536];
1228             conn.write(buf);
1229         }
1230
1231         port.recv();
1232         let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1233         port.recv();
1234         fail!();
1235     }
1236 }