]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
aa7fab2565d8cf6ae910ba88d8f5d47a759b219e
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
12 use libc;
13 use std::mem;
14 use std::ptr;
15 use std::rt::rtio;
16 use std::rt::rtio::IoError;
17 use std::rt::task::BlockedTask;
18
19 use homing::{HomingIO, HomeHandle};
20 use rc::Refcount;
21 use stream::StreamWatcher;
22 use super::{Loop, Request, UvError, Buf, status_to_io_result,
23             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
24             wait_until_woken_after, wakeup};
25 use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
26 use uvio::UvIoFactory;
27 use uvll;
28
29 ////////////////////////////////////////////////////////////////////////////////
30 /// Generic functions related to dealing with sockaddr things
31 ////////////////////////////////////////////////////////////////////////////////
32
33 pub fn htons(u: u16) -> u16 { u.to_big_endian() }
34 pub fn ntohs(u: u16) -> u16 { Int::from_big_endian(u) }
35
36 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
37                         len: uint) -> rtio::SocketAddr {
38     match storage.ss_family as c_int {
39         libc::AF_INET => {
40             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
41             let storage: &libc::sockaddr_in = unsafe {
42                 mem::transmute(storage)
43             };
44             let ip = (storage.sin_addr.s_addr as u32).to_big_endian();
45             let a = (ip >> 24) as u8;
46             let b = (ip >> 16) as u8;
47             let c = (ip >>  8) as u8;
48             let d = (ip >>  0) as u8;
49             rtio::SocketAddr {
50                 ip: rtio::Ipv4Addr(a, b, c, d),
51                 port: ntohs(storage.sin_port),
52             }
53         }
54         libc::AF_INET6 => {
55             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
56             let storage: &libc::sockaddr_in6 = unsafe {
57                 mem::transmute(storage)
58             };
59             let a = ntohs(storage.sin6_addr.s6_addr[0]);
60             let b = ntohs(storage.sin6_addr.s6_addr[1]);
61             let c = ntohs(storage.sin6_addr.s6_addr[2]);
62             let d = ntohs(storage.sin6_addr.s6_addr[3]);
63             let e = ntohs(storage.sin6_addr.s6_addr[4]);
64             let f = ntohs(storage.sin6_addr.s6_addr[5]);
65             let g = ntohs(storage.sin6_addr.s6_addr[6]);
66             let h = ntohs(storage.sin6_addr.s6_addr[7]);
67             rtio::SocketAddr {
68                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
69                 port: ntohs(storage.sin6_port),
70             }
71         }
72         n => {
73             fail!("unknown family {}", n);
74         }
75     }
76 }
77
78 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
79     unsafe {
80         let mut storage: libc::sockaddr_storage = mem::zeroed();
81         let len = match addr.ip {
82             rtio::Ipv4Addr(a, b, c, d) => {
83                 let ip = (a as u32 << 24) |
84                          (b as u32 << 16) |
85                          (c as u32 <<  8) |
86                          (d as u32 <<  0);
87                 let storage: &mut libc::sockaddr_in =
88                     mem::transmute(&mut storage);
89                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
90                 (*storage).sin_port = htons(addr.port);
91                 (*storage).sin_addr = libc::in_addr {
92                     s_addr: Int::from_big_endian(ip),
93
94                 };
95                 mem::size_of::<libc::sockaddr_in>()
96             }
97             rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
98                 let storage: &mut libc::sockaddr_in6 =
99                     mem::transmute(&mut storage);
100                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
101                 storage.sin6_port = htons(addr.port);
102                 storage.sin6_addr = libc::in6_addr {
103                     s6_addr: [
104                         htons(a),
105                         htons(b),
106                         htons(c),
107                         htons(d),
108                         htons(e),
109                         htons(f),
110                         htons(g),
111                         htons(h),
112                     ]
113                 };
114                 mem::size_of::<libc::sockaddr_in6>()
115             }
116         };
117         return (storage, len);
118     }
119 }
120
121 enum SocketNameKind {
122     TcpPeer,
123     Tcp,
124     Udp
125 }
126
127 fn socket_name(sk: SocketNameKind,
128                handle: *c_void) -> Result<rtio::SocketAddr, IoError> {
129     let getsockname = match sk {
130         TcpPeer => uvll::uv_tcp_getpeername,
131         Tcp     => uvll::uv_tcp_getsockname,
132         Udp     => uvll::uv_udp_getsockname,
133     };
134
135     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::zeroed() };
137     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
138
139     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
140     match unsafe {
141         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
142     } {
143         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
144         n => Err(uv_error_to_io_error(UvError(n)))
145     }
146 }
147
148 ////////////////////////////////////////////////////////////////////////////////
149 /// TCP implementation
150 ////////////////////////////////////////////////////////////////////////////////
151
152 pub struct TcpWatcher {
153     handle: *uvll::uv_tcp_t,
154     stream: StreamWatcher,
155     home: HomeHandle,
156     refcount: Refcount,
157
158     // libuv can't support concurrent reads and concurrent writes of the same
159     // stream object, so we use these access guards in order to arbitrate among
160     // multiple concurrent reads and writes. Note that libuv *can* read and
161     // write simultaneously, it just can't read and read simultaneously.
162     read_access: AccessTimeout,
163     write_access: AccessTimeout,
164 }
165
166 pub struct TcpListener {
167     home: HomeHandle,
168     handle: *uvll::uv_pipe_t,
169     outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
170     incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
171 }
172
173 pub struct TcpAcceptor {
174     listener: Box<TcpListener>,
175     timeout: AcceptTimeout,
176 }
177
178 // TCP watchers (clients/streams)
179
180 impl TcpWatcher {
181     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
182         let handle = io.make_handle();
183         TcpWatcher::new_home(&io.loop_, handle)
184     }
185
186     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
187         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
188         assert_eq!(unsafe {
189             uvll::uv_tcp_init(loop_.handle, handle)
190         }, 0);
191         TcpWatcher {
192             home: home,
193             handle: handle,
194             stream: StreamWatcher::new(handle),
195             refcount: Refcount::new(),
196             read_access: AccessTimeout::new(),
197             write_access: AccessTimeout::new(),
198         }
199     }
200
201     pub fn connect(io: &mut UvIoFactory,
202                    address: rtio::SocketAddr,
203                    timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
204         let tcp = TcpWatcher::new(io);
205         let cx = ConnectCtx { status: -1, task: None, timer: None };
206         let (addr, _len) = addr_to_sockaddr(address);
207         let addr_p = &addr as *_ as *libc::sockaddr;
208         cx.connect(tcp, timeout, io, |req, tcp, cb| {
209             unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
210         })
211     }
212 }
213
214 impl HomingIO for TcpWatcher {
215     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
216 }
217
218 impl rtio::RtioSocket for TcpWatcher {
219     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
220         let _m = self.fire_homing_missile();
221         socket_name(Tcp, self.handle)
222     }
223 }
224
225 impl rtio::RtioTcpStream for TcpWatcher {
226     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
227         let m = self.fire_homing_missile();
228         let guard = try!(self.read_access.grant(m));
229
230         // see comments in close_read about this check
231         if guard.access.is_closed() {
232             return Err(uv_error_to_io_error(UvError(uvll::EOF)))
233         }
234
235         self.stream.read(buf).map_err(uv_error_to_io_error)
236     }
237
238     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
239         let m = self.fire_homing_missile();
240         let guard = try!(self.write_access.grant(m));
241         self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
242     }
243
244     fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
245         let _m = self.fire_homing_missile();
246         socket_name(TcpPeer, self.handle)
247     }
248
249     fn control_congestion(&mut self) -> Result<(), IoError> {
250         let _m = self.fire_homing_missile();
251         status_to_io_result(unsafe {
252             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
253         })
254     }
255
256     fn nodelay(&mut self) -> Result<(), IoError> {
257         let _m = self.fire_homing_missile();
258         status_to_io_result(unsafe {
259             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
260         })
261     }
262
263     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
264         let _m = self.fire_homing_missile();
265         status_to_io_result(unsafe {
266             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
267                                    delay_in_seconds as c_uint)
268         })
269     }
270
271     fn letdie(&mut self) -> Result<(), IoError> {
272         let _m = self.fire_homing_missile();
273         status_to_io_result(unsafe {
274             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
275         })
276     }
277
278     fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
279         box TcpWatcher {
280             handle: self.handle,
281             stream: StreamWatcher::new(self.handle),
282             home: self.home.clone(),
283             refcount: self.refcount.clone(),
284             read_access: self.read_access.clone(),
285             write_access: self.write_access.clone(),
286         } as Box<rtio::RtioTcpStream + Send>
287     }
288
289     fn close_read(&mut self) -> Result<(), IoError> {
290         // see comments in PipeWatcher::close_read
291         let task = {
292             let m = self.fire_homing_missile();
293             self.read_access.access.close(&m);
294     self.stream.cancel_read(uvll::EOF as libc::ssize_t)
295         };
296         let _ = task.map(|t| t.reawaken());
297         Ok(())
298     }
299
300     fn close_write(&mut self) -> Result<(), IoError> {
301         let _m = self.fire_homing_missile();
302         shutdown(self.handle, &self.uv_loop())
303     }
304
305     fn set_timeout(&mut self, timeout: Option<u64>) {
306         self.set_read_timeout(timeout);
307         self.set_write_timeout(timeout);
308     }
309
310     fn set_read_timeout(&mut self, ms: Option<u64>) {
311         let _m = self.fire_homing_missile();
312         let loop_ = self.uv_loop();
313         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
314                                      &self.stream as *_ as uint);
315
316         fn cancel_read(stream: uint) -> Option<BlockedTask> {
317             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
318             stream.cancel_read(uvll::ECANCELED as ssize_t)
319         }
320     }
321
322     fn set_write_timeout(&mut self, ms: Option<u64>) {
323         let _m = self.fire_homing_missile();
324         let loop_ = self.uv_loop();
325         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
326                                       &self.stream as *_ as uint);
327
328         fn cancel_write(stream: uint) -> Option<BlockedTask> {
329             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
330             stream.cancel_write()
331         }
332     }
333 }
334
335 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
336     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
337 }
338
339 impl Drop for TcpWatcher {
340     fn drop(&mut self) {
341         let _m = self.fire_homing_missile();
342         if self.refcount.decrement() {
343             self.close();
344         }
345     }
346 }
347
348 // TCP listeners (unbound servers)
349
350 impl TcpListener {
351     pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
352                 -> Result<Box<TcpListener>, UvError> {
353         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
354         assert_eq!(unsafe {
355             uvll::uv_tcp_init(io.uv_loop(), handle)
356         }, 0);
357         let (tx, rx) = channel();
358         let l = box TcpListener {
359             home: io.make_handle(),
360             handle: handle,
361             outgoing: tx,
362             incoming: rx,
363         };
364         let (addr, _len) = addr_to_sockaddr(address);
365         let res = unsafe {
366             let addr_p = &addr as *libc::sockaddr_storage;
367             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
368         };
369         return match res {
370             0 => Ok(l.install()),
371             n => Err(UvError(n))
372         };
373     }
374 }
375
376 impl HomingIO for TcpListener {
377     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
378 }
379
380 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
381     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
382 }
383
384 impl rtio::RtioSocket for TcpListener {
385     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
386         let _m = self.fire_homing_missile();
387         socket_name(Tcp, self.handle)
388     }
389 }
390
391 impl rtio::RtioTcpListener for TcpListener {
392     fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
393         // create the acceptor object from ourselves
394         let mut acceptor = box TcpAcceptor {
395             listener: self,
396             timeout: AcceptTimeout::new(),
397         };
398
399         let _m = acceptor.fire_homing_missile();
400         // FIXME: the 128 backlog should be configurable
401         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
402             0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
403             n => Err(uv_error_to_io_error(UvError(n))),
404         }
405     }
406 }
407
408 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
409     assert!(status != uvll::ECANCELED);
410     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
411     let msg = match status {
412         0 => {
413             let loop_ = Loop::wrap(unsafe {
414                 uvll::get_loop_for_uv_handle(server)
415             });
416             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
417             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
418             Ok(box client as Box<rtio::RtioTcpStream + Send>)
419         }
420         n => Err(uv_error_to_io_error(UvError(n)))
421     };
422     tcp.outgoing.send(msg);
423 }
424
425 impl Drop for TcpListener {
426     fn drop(&mut self) {
427         let _m = self.fire_homing_missile();
428         self.close();
429     }
430 }
431
432 // TCP acceptors (bound servers)
433
434 impl HomingIO for TcpAcceptor {
435     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
436 }
437
438 impl rtio::RtioSocket for TcpAcceptor {
439     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
440         let _m = self.fire_homing_missile();
441         socket_name(Tcp, self.listener.handle)
442     }
443 }
444
445 impl rtio::RtioTcpAcceptor for TcpAcceptor {
446     fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
447         self.timeout.accept(&self.listener.incoming)
448     }
449
450     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
451         let _m = self.fire_homing_missile();
452         status_to_io_result(unsafe {
453             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
454         })
455     }
456
457     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
458         let _m = self.fire_homing_missile();
459         status_to_io_result(unsafe {
460             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
461         })
462     }
463
464     fn set_timeout(&mut self, ms: Option<u64>) {
465         let _m = self.fire_homing_missile();
466         match ms {
467             None => self.timeout.clear(),
468             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
469         }
470     }
471 }
472
473 ////////////////////////////////////////////////////////////////////////////////
474 /// UDP implementation
475 ////////////////////////////////////////////////////////////////////////////////
476
477 pub struct UdpWatcher {
478     handle: *uvll::uv_udp_t,
479     home: HomeHandle,
480
481     // See above for what these fields are
482     refcount: Refcount,
483     read_access: AccessTimeout,
484     write_access: AccessTimeout,
485
486     blocked_sender: Option<BlockedTask>,
487 }
488
489 struct UdpRecvCtx {
490     task: Option<BlockedTask>,
491     buf: Option<Buf>,
492     result: Option<(ssize_t, Option<rtio::SocketAddr>)>,
493 }
494
495 struct UdpSendCtx {
496     result: c_int,
497     data: Option<Vec<u8>>,
498     udp: *mut UdpWatcher,
499 }
500
501 impl UdpWatcher {
502     pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
503                 -> Result<UdpWatcher, UvError> {
504         let udp = UdpWatcher {
505             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
506             home: io.make_handle(),
507             refcount: Refcount::new(),
508             read_access: AccessTimeout::new(),
509             write_access: AccessTimeout::new(),
510             blocked_sender: None,
511         };
512         assert_eq!(unsafe {
513             uvll::uv_udp_init(io.uv_loop(), udp.handle)
514         }, 0);
515         let (addr, _len) = addr_to_sockaddr(address);
516         let result = unsafe {
517             let addr_p = &addr as *libc::sockaddr_storage;
518             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
519         };
520         return match result {
521             0 => Ok(udp),
522             n => Err(UvError(n)),
523         };
524     }
525 }
526
527 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
528     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
529 }
530
531 impl HomingIO for UdpWatcher {
532     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
533 }
534
535 impl rtio::RtioSocket for UdpWatcher {
536     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
537         let _m = self.fire_homing_missile();
538         socket_name(Udp, self.handle)
539     }
540 }
541
542 impl rtio::RtioUdpSocket for UdpWatcher {
543     fn recvfrom(&mut self, buf: &mut [u8])
544         -> Result<(uint, rtio::SocketAddr), IoError>
545     {
546         let loop_ = self.uv_loop();
547         let m = self.fire_homing_missile();
548         let _guard = try!(self.read_access.grant(m));
549
550         return match unsafe {
551             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
552         } {
553             0 => {
554                 let mut cx = UdpRecvCtx {
555                     task: None,
556                     buf: Some(slice_to_uv_buf(buf)),
557                     result: None,
558                 };
559                 let handle = self.handle;
560                 wait_until_woken_after(&mut cx.task, &loop_, || {
561                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
562                 });
563                 match cx.result.take_unwrap() {
564                     (n, _) if n < 0 =>
565                         Err(uv_error_to_io_error(UvError(n as c_int))),
566                     (n, addr) => Ok((n as uint, addr.unwrap()))
567                 }
568             }
569             n => Err(uv_error_to_io_error(UvError(n)))
570         };
571
572         extern fn alloc_cb(handle: *uvll::uv_udp_t,
573                            _suggested_size: size_t,
574                            buf: *mut Buf) {
575             unsafe {
576                 let cx = uvll::get_data_for_uv_handle(handle);
577                 let cx = &mut *(cx as *mut UdpRecvCtx);
578                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
579             }
580         }
581
582         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
583                           addr: *libc::sockaddr, _flags: c_uint) {
584             assert!(nread != uvll::ECANCELED as ssize_t);
585             let cx = unsafe {
586                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
587             };
588
589             // When there's no data to read the recv callback can be a no-op.
590             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
591             // this we just drop back to kqueue and wait for the next callback.
592             if nread == 0 {
593                 cx.buf = Some(unsafe { *buf });
594                 return
595             }
596
597             unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
598             let addr = if addr == ptr::null() {
599                 None
600             } else {
601                 let len = mem::size_of::<libc::sockaddr_storage>();
602                 Some(sockaddr_to_addr(unsafe { mem::transmute(addr) }, len))
603             };
604             cx.result = Some((nread, addr));
605             wakeup(&mut cx.task);
606         }
607     }
608
609     fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> {
610         let m = self.fire_homing_missile();
611         let loop_ = self.uv_loop();
612         let guard = try!(self.write_access.grant(m));
613
614         let mut req = Request::new(uvll::UV_UDP_SEND);
615         let (addr, _len) = addr_to_sockaddr(dst);
616         let addr_p = &addr as *_ as *libc::sockaddr;
617
618         // see comments in StreamWatcher::write for why we may allocate a buffer
619         // here.
620         let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
621         let uv_buf = if guard.can_timeout {
622             slice_to_uv_buf(data.get_ref().as_slice())
623         } else {
624             slice_to_uv_buf(buf)
625         };
626
627         return match unsafe {
628             uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
629         } {
630             0 => {
631                 req.defuse(); // uv callback now owns this request
632                 let mut cx = UdpSendCtx {
633                     result: uvll::ECANCELED, data: data, udp: self as *mut _
634                 };
635                 wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
636                     req.set_data(&cx);
637                 });
638
639                 if cx.result != uvll::ECANCELED {
640                     return match cx.result {
641                         0 => Ok(()),
642                         n => Err(uv_error_to_io_error(UvError(n)))
643                     }
644                 }
645                 let new_cx = box UdpSendCtx {
646                     result: 0,
647                     udp: 0 as *mut UdpWatcher,
648                     data: cx.data.take(),
649                 };
650                 unsafe {
651                     req.set_data(&*new_cx);
652                     mem::forget(new_cx);
653                 }
654                 Err(uv_error_to_io_error(UvError(cx.result)))
655             }
656             n => Err(uv_error_to_io_error(UvError(n)))
657         };
658
659         // This function is the same as stream::write_cb, but adapted for udp
660         // instead of streams.
661         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
662             let req = Request::wrap(req);
663             let cx: &mut UdpSendCtx = unsafe { req.get_data() };
664             cx.result = status;
665
666             if cx.udp as uint != 0 {
667                 let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
668                 wakeup(&mut udp.blocked_sender);
669             } else {
670                 let _cx: Box<UdpSendCtx> = unsafe { mem::transmute(cx) };
671             }
672         }
673     }
674
675     fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
676         let _m = self.fire_homing_missile();
677         status_to_io_result(unsafe {
678             multi.to_str().with_c_str(|m_addr| {
679                 uvll::uv_udp_set_membership(self.handle,
680                                             m_addr, ptr::null(),
681                                             uvll::UV_JOIN_GROUP)
682             })
683         })
684     }
685
686     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
687         let _m = self.fire_homing_missile();
688         status_to_io_result(unsafe {
689             multi.to_str().with_c_str(|m_addr| {
690                 uvll::uv_udp_set_membership(self.handle,
691                                             m_addr, ptr::null(),
692                                             uvll::UV_LEAVE_GROUP)
693             })
694         })
695     }
696
697     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
698         let _m = self.fire_homing_missile();
699         status_to_io_result(unsafe {
700             uvll::uv_udp_set_multicast_loop(self.handle,
701                                             1 as c_int)
702         })
703     }
704
705     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
706         let _m = self.fire_homing_missile();
707         status_to_io_result(unsafe {
708             uvll::uv_udp_set_multicast_loop(self.handle,
709                                             0 as c_int)
710         })
711     }
712
713     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
714         let _m = self.fire_homing_missile();
715         status_to_io_result(unsafe {
716             uvll::uv_udp_set_multicast_ttl(self.handle,
717                                            ttl as c_int)
718         })
719     }
720
721     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
722         let _m = self.fire_homing_missile();
723         status_to_io_result(unsafe {
724             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
725         })
726     }
727
728     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
729         let _m = self.fire_homing_missile();
730         status_to_io_result(unsafe {
731             uvll::uv_udp_set_broadcast(self.handle,
732                                        1 as c_int)
733         })
734     }
735
736     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
737         let _m = self.fire_homing_missile();
738         status_to_io_result(unsafe {
739             uvll::uv_udp_set_broadcast(self.handle,
740                                        0 as c_int)
741         })
742     }
743
744     fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
745         box UdpWatcher {
746             handle: self.handle,
747             home: self.home.clone(),
748             refcount: self.refcount.clone(),
749             write_access: self.write_access.clone(),
750             read_access: self.read_access.clone(),
751             blocked_sender: None,
752         } as Box<rtio::RtioUdpSocket + Send>
753     }
754
755     fn set_timeout(&mut self, timeout: Option<u64>) {
756         self.set_read_timeout(timeout);
757         self.set_write_timeout(timeout);
758     }
759
760     fn set_read_timeout(&mut self, ms: Option<u64>) {
761         let _m = self.fire_homing_missile();
762         let loop_ = self.uv_loop();
763         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
764                                      self.handle as uint);
765
766         fn cancel_read(stream: uint) -> Option<BlockedTask> {
767             // This method is quite similar to StreamWatcher::cancel_read, see
768             // there for more information
769             let handle = stream as *uvll::uv_udp_t;
770             assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
771             let data = unsafe {
772                 let data = uvll::get_data_for_uv_handle(handle);
773                 if data.is_null() { return None }
774                 uvll::set_data_for_uv_handle(handle, 0 as *int);
775                 &mut *(data as *mut UdpRecvCtx)
776             };
777             data.result = Some((uvll::ECANCELED as ssize_t, None));
778             data.task.take()
779         }
780     }
781
782     fn set_write_timeout(&mut self, ms: Option<u64>) {
783         let _m = self.fire_homing_missile();
784         let loop_ = self.uv_loop();
785         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
786                                       self as *mut _ as uint);
787
788         fn cancel_write(stream: uint) -> Option<BlockedTask> {
789             let stream: &mut UdpWatcher = unsafe { mem::transmute(stream) };
790             stream.blocked_sender.take()
791         }
792     }
793 }
794
795 impl Drop for UdpWatcher {
796     fn drop(&mut self) {
797         // Send ourselves home to close this handle (blocking while doing so).
798         let _m = self.fire_homing_missile();
799         if self.refcount.decrement() {
800             self.close();
801         }
802     }
803 }
804
805 ////////////////////////////////////////////////////////////////////////////////
806 // Shutdown helper
807 ////////////////////////////////////////////////////////////////////////////////
808
809 pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
810     struct Ctx {
811         slot: Option<BlockedTask>,
812         status: c_int,
813     }
814     let mut req = Request::new(uvll::UV_SHUTDOWN);
815
816     return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
817         0 => {
818             req.defuse(); // uv callback now owns this request
819             let mut cx = Ctx { slot: None, status: 0 };
820
821             wait_until_woken_after(&mut cx.slot, loop_, || {
822                 req.set_data(&cx);
823             });
824
825             status_to_io_result(cx.status)
826         }
827         n => Err(uv_error_to_io_error(UvError(n)))
828     };
829
830     extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
831         let req = Request::wrap(req);
832         assert!(status != uvll::ECANCELED);
833         let cx: &mut Ctx = unsafe { req.get_data() };
834         cx.status = status;
835         wakeup(&mut cx.slot);
836     }
837 }
838
839 #[cfg(test)]
840 mod test {
841     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
842                         RtioUdpSocket};
843
844     use super::{UdpWatcher, TcpWatcher, TcpListener};
845     use super::super::local_loop;
846
847     #[test]
848     fn connect_close_ip4() {
849         match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) {
850             Ok(..) => fail!(),
851             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
852         }
853     }
854
855     #[test]
856     fn connect_close_ip6() {
857         match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) {
858             Ok(..) => fail!(),
859             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
860         }
861     }
862
863     #[test]
864     fn udp_bind_close_ip4() {
865         match UdpWatcher::bind(local_loop(), ::next_test_ip4()) {
866             Ok(..) => {}
867             Err(..) => fail!()
868         }
869     }
870
871     #[test]
872     fn udp_bind_close_ip6() {
873         match UdpWatcher::bind(local_loop(), ::next_test_ip6()) {
874             Ok(..) => {}
875             Err(..) => fail!()
876         }
877     }
878
879     #[test]
880     fn listen_ip4() {
881         let (tx, rx) = channel();
882         let addr = ::next_test_ip4();
883
884         spawn(proc() {
885             let w = match TcpListener::bind(local_loop(), addr) {
886                 Ok(w) => w, Err(e) => fail!("{:?}", e)
887             };
888             let mut w = match w.listen() {
889                 Ok(w) => w, Err(e) => fail!("{:?}", e),
890             };
891             tx.send(());
892             match w.accept() {
893                 Ok(mut stream) => {
894                     let mut buf = [0u8, ..10];
895                     match stream.read(buf) {
896                         Ok(10) => {} e => fail!("{:?}", e),
897                     }
898                     for i in range(0, 10u8) {
899                         assert_eq!(buf[i as uint], i + 1);
900                     }
901                 }
902                 Err(e) => fail!("{:?}", e)
903             }
904         });
905
906         rx.recv();
907         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
908             Ok(w) => w, Err(e) => fail!("{:?}", e)
909         };
910         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
911             Ok(()) => {}, Err(e) => fail!("{:?}", e)
912         }
913     }
914
915     #[test]
916     fn listen_ip6() {
917         let (tx, rx) = channel();
918         let addr = ::next_test_ip6();
919
920         spawn(proc() {
921             let w = match TcpListener::bind(local_loop(), addr) {
922                 Ok(w) => w, Err(e) => fail!("{:?}", e)
923             };
924             let mut w = match w.listen() {
925                 Ok(w) => w, Err(e) => fail!("{:?}", e),
926             };
927             tx.send(());
928             match w.accept() {
929                 Ok(mut stream) => {
930                     let mut buf = [0u8, ..10];
931                     match stream.read(buf) {
932                         Ok(10) => {} e => fail!("{:?}", e),
933                     }
934                     for i in range(0, 10u8) {
935                         assert_eq!(buf[i as uint], i + 1);
936                     }
937                 }
938                 Err(e) => fail!("{:?}", e)
939             }
940         });
941
942         rx.recv();
943         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
944             Ok(w) => w, Err(e) => fail!("{:?}", e)
945         };
946         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
947             Ok(()) => {}, Err(e) => fail!("{:?}", e)
948         }
949     }
950
951     #[test]
952     fn udp_recv_ip4() {
953         let (tx, rx) = channel();
954         let client = ::next_test_ip4();
955         let server = ::next_test_ip4();
956
957         spawn(proc() {
958             match UdpWatcher::bind(local_loop(), server) {
959                 Ok(mut w) => {
960                     tx.send(());
961                     let mut buf = [0u8, ..10];
962                     match w.recvfrom(buf) {
963                         Ok((10, addr)) => assert!(addr == client),
964                         e => fail!("{:?}", e),
965                     }
966                     for i in range(0, 10u8) {
967                         assert_eq!(buf[i as uint], i + 1);
968                     }
969                 }
970                 Err(e) => fail!("{:?}", e)
971             }
972         });
973
974         rx.recv();
975         let mut w = match UdpWatcher::bind(local_loop(), client) {
976             Ok(w) => w, Err(e) => fail!("{:?}", e)
977         };
978         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
979             Ok(()) => {}, Err(e) => fail!("{:?}", e)
980         }
981     }
982
983     #[test]
984     fn udp_recv_ip6() {
985         let (tx, rx) = channel();
986         let client = ::next_test_ip6();
987         let server = ::next_test_ip6();
988
989         spawn(proc() {
990             match UdpWatcher::bind(local_loop(), server) {
991                 Ok(mut w) => {
992                     tx.send(());
993                     let mut buf = [0u8, ..10];
994                     match w.recvfrom(buf) {
995                         Ok((10, addr)) => assert!(addr == client),
996                         e => fail!("{:?}", e),
997                     }
998                     for i in range(0, 10u8) {
999                         assert_eq!(buf[i as uint], i + 1);
1000                     }
1001                 }
1002                 Err(e) => fail!("{:?}", e)
1003             }
1004         });
1005
1006         rx.recv();
1007         let mut w = match UdpWatcher::bind(local_loop(), client) {
1008             Ok(w) => w, Err(e) => fail!("{:?}", e)
1009         };
1010         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1011             Ok(()) => {}, Err(e) => fail!("{:?}", e)
1012         }
1013     }
1014
1015     #[test]
1016     fn test_read_read_read() {
1017         let addr = ::next_test_ip4();
1018         static MAX: uint = 5000;
1019         let (tx, rx) = channel();
1020
1021         spawn(proc() {
1022             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1023             let mut acceptor = listener.listen().ok().unwrap();
1024             tx.send(());
1025             let mut stream = acceptor.accept().ok().unwrap();
1026             let buf = [1, .. 2048];
1027             let mut total_bytes_written = 0;
1028             while total_bytes_written < MAX {
1029                 assert!(stream.write(buf).is_ok());
1030                 uvdebug!("wrote bytes");
1031                 total_bytes_written += buf.len();
1032             }
1033         });
1034
1035         rx.recv();
1036         let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1037         let mut buf = [0, .. 2048];
1038         let mut total_bytes_read = 0;
1039         while total_bytes_read < MAX {
1040             let nread = stream.read(buf).ok().unwrap();
1041             total_bytes_read += nread;
1042             for i in range(0u, nread) {
1043                 assert_eq!(buf[i], 1);
1044             }
1045         }
1046         uvdebug!("read {} bytes total", total_bytes_read);
1047     }
1048
1049     #[test]
1050     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1051     fn test_udp_twice() {
1052         let server_addr = ::next_test_ip4();
1053         let client_addr = ::next_test_ip4();
1054         let (tx, rx) = channel();
1055
1056         spawn(proc() {
1057             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1058             rx.recv();
1059             assert!(client.sendto([1], server_addr).is_ok());
1060             assert!(client.sendto([2], server_addr).is_ok());
1061         });
1062
1063         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1064         tx.send(());
1065         let mut buf1 = [0];
1066         let mut buf2 = [0];
1067         let (nread1, src1) = server.recvfrom(buf1).ok().unwrap();
1068         let (nread2, src2) = server.recvfrom(buf2).ok().unwrap();
1069         assert_eq!(nread1, 1);
1070         assert_eq!(nread2, 1);
1071         assert!(src1 == client_addr);
1072         assert!(src2 == client_addr);
1073         assert_eq!(buf1[0], 1);
1074         assert_eq!(buf2[0], 2);
1075     }
1076
1077     #[test]
1078     fn test_udp_many_read() {
1079         let server_out_addr = ::next_test_ip4();
1080         let server_in_addr = ::next_test_ip4();
1081         let client_out_addr = ::next_test_ip4();
1082         let client_in_addr = ::next_test_ip4();
1083         static MAX: uint = 500_000;
1084
1085         let (tx1, rx1) = channel::<()>();
1086         let (tx2, rx2) = channel::<()>();
1087
1088         spawn(proc() {
1089             let l = local_loop();
1090             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1091             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1092             let (tx, rx) = (tx2, rx1);
1093             tx.send(());
1094             rx.recv();
1095             let msg = [1, .. 2048];
1096             let mut total_bytes_sent = 0;
1097             let mut buf = [1];
1098             while buf[0] == 1 {
1099                 // send more data
1100                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1101                 total_bytes_sent += msg.len();
1102                 // check if the client has received enough
1103                 let res = server_in.recvfrom(buf);
1104                 assert!(res.is_ok());
1105                 let (nread, src) = res.ok().unwrap();
1106                 assert_eq!(nread, 1);
1107                 assert!(src == client_out_addr);
1108             }
1109             assert!(total_bytes_sent >= MAX);
1110         });
1111
1112         let l = local_loop();
1113         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1114         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1115         let (tx, rx) = (tx1, rx2);
1116         rx.recv();
1117         tx.send(());
1118         let mut total_bytes_recv = 0;
1119         let mut buf = [0, .. 2048];
1120         while total_bytes_recv < MAX {
1121             // ask for more
1122             assert!(client_out.sendto([1], server_in_addr).is_ok());
1123             // wait for data
1124             let res = client_in.recvfrom(buf);
1125             assert!(res.is_ok());
1126             let (nread, src) = res.ok().unwrap();
1127             assert!(src == server_out_addr);
1128             total_bytes_recv += nread;
1129             for i in range(0u, nread) {
1130                 assert_eq!(buf[i], 1);
1131             }
1132         }
1133         // tell the server we're done
1134         assert!(client_out.sendto([0], server_in_addr).is_ok());
1135     }
1136
1137     #[test]
1138     fn test_read_and_block() {
1139         let addr = ::next_test_ip4();
1140         let (tx, rx) = channel::<Receiver<()>>();
1141
1142         spawn(proc() {
1143             let rx = rx.recv();
1144             let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1145             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1146             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1147             rx.recv();
1148             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1149             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1150             rx.recv();
1151         });
1152
1153         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1154         let mut acceptor = listener.listen().ok().unwrap();
1155         let (tx2, rx2) = channel();
1156         tx.send(rx2);
1157         let mut stream = acceptor.accept().ok().unwrap();
1158         let mut buf = [0, .. 2048];
1159
1160         let expected = 32;
1161         let mut current = 0;
1162         let mut reads = 0;
1163
1164         while current < expected {
1165             let nread = stream.read(buf).ok().unwrap();
1166             for i in range(0u, nread) {
1167                 let val = buf[i] as uint;
1168                 assert_eq!(val, current % 8);
1169                 current += 1;
1170             }
1171             reads += 1;
1172
1173             let _ = tx2.send_opt(());
1174         }
1175
1176         // Make sure we had multiple reads
1177         assert!(reads > 1);
1178     }
1179
1180     #[test]
1181     fn test_simple_tcp_server_and_client_on_diff_threads() {
1182         let addr = ::next_test_ip4();
1183
1184         spawn(proc() {
1185             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1186             let mut acceptor = listener.listen().ok().unwrap();
1187             let mut stream = acceptor.accept().ok().unwrap();
1188             let mut buf = [0, .. 2048];
1189             let nread = stream.read(buf).ok().unwrap();
1190             assert_eq!(nread, 8);
1191             for i in range(0u, nread) {
1192                 assert_eq!(buf[i], i as u8);
1193             }
1194         });
1195
1196         let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1197         while stream.is_err() {
1198             stream = TcpWatcher::connect(local_loop(), addr, None);
1199         }
1200         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1201     }
1202
1203     #[should_fail] #[test]
1204     fn tcp_listener_fail_cleanup() {
1205         let addr = ::next_test_ip4();
1206         let w = TcpListener::bind(local_loop(), addr).unwrap();
1207         let _w = w.listen().ok().unwrap();
1208         fail!();
1209     }
1210
1211     #[should_fail] #[test]
1212     fn tcp_stream_fail_cleanup() {
1213         let (tx, rx) = channel();
1214         let addr = ::next_test_ip4();
1215
1216         spawn(proc() {
1217             let w = TcpListener::bind(local_loop(), addr).unwrap();
1218             let mut w = w.listen().ok().unwrap();
1219             tx.send(());
1220             drop(w.accept().ok().unwrap());
1221         });
1222         rx.recv();
1223         let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1224         fail!();
1225     }
1226
1227     #[should_fail] #[test]
1228     fn udp_listener_fail_cleanup() {
1229         let addr = ::next_test_ip4();
1230         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1231         fail!();
1232     }
1233
1234     #[should_fail] #[test]
1235     fn udp_fail_other_task() {
1236         let addr = ::next_test_ip4();
1237         let (tx, rx) = channel();
1238
1239         // force the handle to be created on a different scheduler, failure in
1240         // the original task will force a homing operation back to this
1241         // scheduler.
1242         spawn(proc() {
1243             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1244             tx.send(w);
1245         });
1246
1247         let _w = rx.recv();
1248         fail!();
1249     }
1250 }