]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
auto merge of #14739 : zwarich/rust/mut-unique-path, r=nikomatsakis
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::rt::mutex;
15 use std::rt::rtio;
16 use std::rt::rtio::{IoResult, IoError};
17
18 use super::{retry, keep_going};
19 use super::c;
20 use super::util;
21
22 ////////////////////////////////////////////////////////////////////////////////
23 // sockaddr and misc bindings
24 ////////////////////////////////////////////////////////////////////////////////
25
26 #[cfg(windows)] pub type sock_t = libc::SOCKET;
27 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
28
29 pub fn htons(u: u16) -> u16 {
30     mem::to_be16(u)
31 }
32 pub fn ntohs(u: u16) -> u16 {
33     mem::from_be16(u)
34 }
35
36 enum InAddr {
37     InAddr(libc::in_addr),
38     In6Addr(libc::in6_addr),
39 }
40
41 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
42     match ip {
43         rtio::Ipv4Addr(a, b, c, d) => {
44             let ip = (a as u32 << 24) |
45                      (b as u32 << 16) |
46                      (c as u32 <<  8) |
47                      (d as u32 <<  0);
48             InAddr(libc::in_addr {
49                 s_addr: mem::from_be32(ip)
50             })
51         }
52         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
53             In6Addr(libc::in6_addr {
54                 s6_addr: [
55                     htons(a),
56                     htons(b),
57                     htons(c),
58                     htons(d),
59                     htons(e),
60                     htons(f),
61                     htons(g),
62                     htons(h),
63                 ]
64             })
65         }
66     }
67 }
68
69 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
70     unsafe {
71         let storage: libc::sockaddr_storage = mem::zeroed();
72         let len = match ip_to_inaddr(addr.ip) {
73             InAddr(inaddr) => {
74                 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
75                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
76                 (*storage).sin_port = htons(addr.port);
77                 (*storage).sin_addr = inaddr;
78                 mem::size_of::<libc::sockaddr_in>()
79             }
80             In6Addr(inaddr) => {
81                 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
82                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
83                 (*storage).sin6_port = htons(addr.port);
84                 (*storage).sin6_addr = inaddr;
85                 mem::size_of::<libc::sockaddr_in6>()
86             }
87         };
88         return (storage, len);
89     }
90 }
91
92 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
93     unsafe {
94         let fam = match addr.ip {
95             rtio::Ipv4Addr(..) => libc::AF_INET,
96             rtio::Ipv6Addr(..) => libc::AF_INET6,
97         };
98         match libc::socket(fam, ty, 0) {
99             -1 => Err(super::last_error()),
100             fd => Ok(fd),
101         }
102     }
103 }
104
105 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
106                  payload: T) -> IoResult<()> {
107     unsafe {
108         let payload = &payload as *T as *libc::c_void;
109         let ret = libc::setsockopt(fd, opt, val,
110                                    payload,
111                                    mem::size_of::<T>() as libc::socklen_t);
112         if ret != 0 {
113             Err(last_error())
114         } else {
115             Ok(())
116         }
117     }
118 }
119
120 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121                            val: libc::c_int) -> IoResult<T> {
122     unsafe {
123         let mut slot: T = mem::zeroed();
124         let mut len = mem::size_of::<T>() as libc::socklen_t;
125         let ret = c::getsockopt(fd, opt, val,
126                                 &mut slot as *mut _ as *mut _,
127                                 &mut len);
128         if ret != 0 {
129             Err(last_error())
130         } else {
131             assert!(len as uint == mem::size_of::<T>());
132             Ok(slot)
133         }
134     }
135 }
136
137 #[cfg(windows)]
138 pub fn last_error() -> IoError {
139     use std::os;
140     let code = unsafe { c::WSAGetLastError() as uint };
141     IoError {
142         code: code,
143         extra: 0,
144         detail: Some(os::error_string(code)),
145     }
146 }
147
148 #[cfg(not(windows))]
149 fn last_error() -> IoError {
150     super::last_error()
151 }
152
153 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
154 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
155
156 fn sockname(fd: sock_t,
157             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
158                                          *mut libc::socklen_t) -> libc::c_int)
159     -> IoResult<rtio::SocketAddr>
160 {
161     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
162     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
163     unsafe {
164         let storage = &mut storage as *mut libc::sockaddr_storage;
165         let ret = f(fd,
166                     storage as *mut libc::sockaddr,
167                     &mut len as *mut libc::socklen_t);
168         if ret != 0 {
169             return Err(last_error())
170         }
171     }
172     return sockaddr_to_addr(&storage, len as uint);
173 }
174
175 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
176                         len: uint) -> IoResult<rtio::SocketAddr> {
177     match storage.ss_family as libc::c_int {
178         libc::AF_INET => {
179             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
180             let storage: &libc::sockaddr_in = unsafe {
181                 mem::transmute(storage)
182             };
183             let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
184             let a = (ip >> 24) as u8;
185             let b = (ip >> 16) as u8;
186             let c = (ip >>  8) as u8;
187             let d = (ip >>  0) as u8;
188             Ok(rtio::SocketAddr {
189                 ip: rtio::Ipv4Addr(a, b, c, d),
190                 port: ntohs(storage.sin_port),
191             })
192         }
193         libc::AF_INET6 => {
194             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
195             let storage: &libc::sockaddr_in6 = unsafe {
196                 mem::transmute(storage)
197             };
198             let a = ntohs(storage.sin6_addr.s6_addr[0]);
199             let b = ntohs(storage.sin6_addr.s6_addr[1]);
200             let c = ntohs(storage.sin6_addr.s6_addr[2]);
201             let d = ntohs(storage.sin6_addr.s6_addr[3]);
202             let e = ntohs(storage.sin6_addr.s6_addr[4]);
203             let f = ntohs(storage.sin6_addr.s6_addr[5]);
204             let g = ntohs(storage.sin6_addr.s6_addr[6]);
205             let h = ntohs(storage.sin6_addr.s6_addr[7]);
206             Ok(rtio::SocketAddr {
207                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
208                 port: ntohs(storage.sin6_port),
209             })
210         }
211         _ => {
212             #[cfg(unix)] use ERROR = libc::EINVAL;
213             #[cfg(windows)] use ERROR = libc::WSAEINVAL;
214             Err(IoError {
215                 code: ERROR as uint,
216                 extra: 0,
217                 detail: None,
218             })
219         }
220     }
221 }
222
223 #[cfg(unix)]
224 pub fn init() {}
225
226 #[cfg(windows)]
227 pub fn init() {
228
229     unsafe {
230         use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
231         static mut INITIALIZED: bool = false;
232         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
233
234         let _guard = LOCK.lock();
235         if !INITIALIZED {
236             let mut data: c::WSADATA = mem::zeroed();
237             let ret = c::WSAStartup(0x202,      // version 2.2
238                                     &mut data);
239             assert_eq!(ret, 0);
240             INITIALIZED = true;
241         }
242     }
243 }
244
245 ////////////////////////////////////////////////////////////////////////////////
246 // TCP streams
247 ////////////////////////////////////////////////////////////////////////////////
248
249 pub struct TcpStream {
250     inner: Arc<Inner>,
251     read_deadline: u64,
252     write_deadline: u64,
253 }
254
255 struct Inner {
256     fd: sock_t,
257
258     // Unused on Linux, where this lock is not necessary.
259     #[allow(dead_code)]
260     lock: mutex::NativeMutex
261 }
262
263 pub struct Guard<'a> {
264     pub fd: sock_t,
265     pub guard: mutex::LockGuard<'a>,
266 }
267
268 impl Inner {
269     fn new(fd: sock_t) -> Inner {
270         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
271     }
272 }
273
274 impl TcpStream {
275     pub fn connect(addr: rtio::SocketAddr,
276                    timeout: Option<u64>) -> IoResult<TcpStream> {
277         let fd = try!(socket(addr, libc::SOCK_STREAM));
278         let ret = TcpStream::new(Inner::new(fd));
279
280         let (addr, len) = addr_to_sockaddr(addr);
281         let addrp = &addr as *_ as *libc::sockaddr;
282         let len = len as libc::socklen_t;
283
284         match timeout {
285             Some(timeout) => {
286                 try!(util::connect_timeout(fd, addrp, len, timeout));
287                 Ok(ret)
288             },
289             None => {
290                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
291                     -1 => Err(last_error()),
292                     _ => Ok(ret),
293                 }
294             }
295         }
296     }
297
298     fn new(inner: Inner) -> TcpStream {
299         TcpStream {
300             inner: Arc::new(inner),
301             read_deadline: 0,
302             write_deadline: 0,
303         }
304     }
305
306     pub fn fd(&self) -> sock_t { self.inner.fd }
307
308     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
309         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
310                    nodelay as libc::c_int)
311     }
312
313     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
314         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
315                              seconds.is_some() as libc::c_int);
316         match seconds {
317             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
318             None => ret,
319         }
320     }
321
322     #[cfg(target_os = "macos")]
323     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
324         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
325                    seconds as libc::c_int)
326     }
327     #[cfg(target_os = "freebsd")]
328     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
329         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
330                    seconds as libc::c_int)
331     }
332     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
333     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
334         Ok(())
335     }
336
337     #[cfg(target_os = "linux")]
338     fn lock_nonblocking(&self) {}
339
340     #[cfg(not(target_os = "linux"))]
341     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
342         let ret = Guard {
343             fd: self.fd(),
344             guard: unsafe { self.inner.lock.lock() },
345         };
346         assert!(util::set_nonblocking(self.fd(), true).is_ok());
347         ret
348     }
349 }
350
351 #[cfg(windows)] type wrlen = libc::c_int;
352 #[cfg(not(windows))] type wrlen = libc::size_t;
353
354 impl rtio::RtioTcpStream for TcpStream {
355     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
356         let fd = self.fd();
357         let dolock = || self.lock_nonblocking();
358         let doread = |nb| unsafe {
359             let flags = if nb {c::MSG_DONTWAIT} else {0};
360             libc::recv(fd,
361                        buf.as_mut_ptr() as *mut libc::c_void,
362                        buf.len() as wrlen,
363                        flags) as libc::c_int
364         };
365         read(fd, self.read_deadline, dolock, doread)
366     }
367
368     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
369         let fd = self.fd();
370         let dolock = || self.lock_nonblocking();
371         let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
372             let flags = if nb {c::MSG_DONTWAIT} else {0};
373             libc::send(fd,
374                        buf as *mut libc::c_void,
375                        len as wrlen,
376                        flags) as i64
377         };
378         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
379             Ok(_) => Ok(()),
380             Err(e) => Err(e)
381         }
382     }
383     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
384         sockname(self.fd(), libc::getpeername)
385     }
386     fn control_congestion(&mut self) -> IoResult<()> {
387         self.set_nodelay(false)
388     }
389     fn nodelay(&mut self) -> IoResult<()> {
390         self.set_nodelay(true)
391     }
392     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
393         self.set_keepalive(Some(delay_in_seconds))
394     }
395     fn letdie(&mut self) -> IoResult<()> {
396         self.set_keepalive(None)
397     }
398
399     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
400         box TcpStream {
401             inner: self.inner.clone(),
402             read_deadline: 0,
403             write_deadline: 0,
404         } as Box<rtio::RtioTcpStream:Send>
405     }
406
407     fn close_write(&mut self) -> IoResult<()> {
408         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
409     }
410     fn close_read(&mut self) -> IoResult<()> {
411         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
412     }
413
414     fn set_timeout(&mut self, timeout: Option<u64>) {
415         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
416         self.read_deadline = deadline;
417         self.write_deadline = deadline;
418     }
419     fn set_read_timeout(&mut self, timeout: Option<u64>) {
420         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
421     }
422     fn set_write_timeout(&mut self, timeout: Option<u64>) {
423         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
424     }
425 }
426
427 impl rtio::RtioSocket for TcpStream {
428     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
429         sockname(self.fd(), libc::getsockname)
430     }
431 }
432
433 impl Drop for Inner {
434     fn drop(&mut self) { unsafe { close(self.fd); } }
435 }
436
437 #[unsafe_destructor]
438 impl<'a> Drop for Guard<'a> {
439     fn drop(&mut self) {
440         assert!(util::set_nonblocking(self.fd, false).is_ok());
441     }
442 }
443
444 ////////////////////////////////////////////////////////////////////////////////
445 // TCP listeners
446 ////////////////////////////////////////////////////////////////////////////////
447
448 pub struct TcpListener {
449     inner: Inner,
450 }
451
452 impl TcpListener {
453     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
454         let fd = try!(socket(addr, libc::SOCK_STREAM));
455         let ret = TcpListener { inner: Inner::new(fd) };
456
457         let (addr, len) = addr_to_sockaddr(addr);
458         let addrp = &addr as *_ as *libc::sockaddr;
459         let len = len as libc::socklen_t;
460
461         // On platforms with Berkeley-derived sockets, this allows
462         // to quickly rebind a socket, without needing to wait for
463         // the OS to clean up the previous one.
464         if cfg!(unix) {
465             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
466                             1 as libc::c_int));
467         }
468
469         match unsafe { libc::bind(fd, addrp, len) } {
470             -1 => Err(last_error()),
471             _ => Ok(ret),
472         }
473     }
474
475     pub fn fd(&self) -> sock_t { self.inner.fd }
476
477     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
478         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
479             -1 => Err(last_error()),
480             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
481         }
482     }
483 }
484
485 impl rtio::RtioTcpListener for TcpListener {
486     fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
487         self.native_listen(128).map(|a| {
488             box a as Box<rtio::RtioTcpAcceptor:Send>
489         })
490     }
491 }
492
493 impl rtio::RtioSocket for TcpListener {
494     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
495         sockname(self.fd(), libc::getsockname)
496     }
497 }
498
499 pub struct TcpAcceptor {
500     listener: TcpListener,
501     deadline: u64,
502 }
503
504 impl TcpAcceptor {
505     pub fn fd(&self) -> sock_t { self.listener.fd() }
506
507     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
508         if self.deadline != 0 {
509             try!(util::await(self.fd(), Some(self.deadline), util::Readable));
510         }
511         unsafe {
512             let mut storage: libc::sockaddr_storage = mem::zeroed();
513             let storagep = &mut storage as *mut libc::sockaddr_storage;
514             let size = mem::size_of::<libc::sockaddr_storage>();
515             let mut size = size as libc::socklen_t;
516             match retry(|| {
517                 libc::accept(self.fd(),
518                              storagep as *mut libc::sockaddr,
519                              &mut size as *mut libc::socklen_t) as libc::c_int
520             }) as sock_t {
521                 -1 => Err(last_error()),
522                 fd => Ok(TcpStream::new(Inner::new(fd))),
523             }
524         }
525     }
526 }
527
528 impl rtio::RtioSocket for TcpAcceptor {
529     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
530         sockname(self.fd(), libc::getsockname)
531     }
532 }
533
534 impl rtio::RtioTcpAcceptor for TcpAcceptor {
535     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
536         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
537     }
538
539     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
540     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
541     fn set_timeout(&mut self, timeout: Option<u64>) {
542         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
543     }
544 }
545
546 ////////////////////////////////////////////////////////////////////////////////
547 // UDP
548 ////////////////////////////////////////////////////////////////////////////////
549
550 pub struct UdpSocket {
551     inner: Arc<Inner>,
552     read_deadline: u64,
553     write_deadline: u64,
554 }
555
556 impl UdpSocket {
557     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
558         let fd = try!(socket(addr, libc::SOCK_DGRAM));
559         let ret = UdpSocket {
560             inner: Arc::new(Inner::new(fd)),
561             read_deadline: 0,
562             write_deadline: 0,
563         };
564
565         let (addr, len) = addr_to_sockaddr(addr);
566         let addrp = &addr as *_ as *libc::sockaddr;
567         let len = len as libc::socklen_t;
568
569         match unsafe { libc::bind(fd, addrp, len) } {
570             -1 => Err(last_error()),
571             _ => Ok(ret),
572         }
573     }
574
575     pub fn fd(&self) -> sock_t { self.inner.fd }
576
577     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
578         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
579                    on as libc::c_int)
580     }
581
582     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
583         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
584                    on as libc::c_int)
585     }
586
587     pub fn set_membership(&mut self, addr: rtio::IpAddr,
588                           opt: libc::c_int) -> IoResult<()> {
589         match ip_to_inaddr(addr) {
590             InAddr(addr) => {
591                 let mreq = libc::ip_mreq {
592                     imr_multiaddr: addr,
593                     // interface == INADDR_ANY
594                     imr_interface: libc::in_addr { s_addr: 0x0 },
595                 };
596                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
597             }
598             In6Addr(addr) => {
599                 let mreq = libc::ip6_mreq {
600                     ipv6mr_multiaddr: addr,
601                     ipv6mr_interface: 0,
602                 };
603                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
604             }
605         }
606     }
607
608     #[cfg(target_os = "linux")]
609     fn lock_nonblocking(&self) {}
610
611     #[cfg(not(target_os = "linux"))]
612     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
613         let ret = Guard {
614             fd: self.fd(),
615             guard: unsafe { self.inner.lock.lock() },
616         };
617         assert!(util::set_nonblocking(self.fd(), true).is_ok());
618         ret
619     }
620 }
621
622 impl rtio::RtioSocket for UdpSocket {
623     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
624         sockname(self.fd(), libc::getsockname)
625     }
626 }
627
628 #[cfg(windows)] type msglen_t = libc::c_int;
629 #[cfg(unix)]    type msglen_t = libc::size_t;
630
631 impl rtio::RtioUdpSocket for UdpSocket {
632     fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
633         let fd = self.fd();
634         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
635         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
636         let mut addrlen: libc::socklen_t =
637                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
638
639         let dolock = || self.lock_nonblocking();
640         let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
641             let flags = if nb {c::MSG_DONTWAIT} else {0};
642             libc::recvfrom(fd,
643                            buf.as_mut_ptr() as *mut libc::c_void,
644                            buf.len() as msglen_t,
645                            flags,
646                            storagep,
647                            &mut addrlen) as libc::c_int
648         }));
649         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
650             Ok((n as uint, addr))
651         })
652     }
653
654     fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
655         let (dst, dstlen) = addr_to_sockaddr(dst);
656         let dstp = &dst as *_ as *libc::sockaddr;
657         let dstlen = dstlen as libc::socklen_t;
658
659         let fd = self.fd();
660         let dolock = || self.lock_nonblocking();
661         let dowrite = |nb, buf: *u8, len: uint| unsafe {
662             let flags = if nb {c::MSG_DONTWAIT} else {0};
663             libc::sendto(fd,
664                          buf as *libc::c_void,
665                          len as msglen_t,
666                          flags,
667                          dstp,
668                          dstlen) as i64
669         };
670
671         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
672         if n != buf.len() {
673             Err(util::short_write(n, "couldn't send entire packet at once"))
674         } else {
675             Ok(())
676         }
677     }
678
679     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
680         match multi {
681             rtio::Ipv4Addr(..) => {
682                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
683             }
684             rtio::Ipv6Addr(..) => {
685                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
686             }
687         }
688     }
689     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
690         match multi {
691             rtio::Ipv4Addr(..) => {
692                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
693             }
694             rtio::Ipv6Addr(..) => {
695                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
696             }
697         }
698     }
699
700     fn loop_multicast_locally(&mut self) -> IoResult<()> {
701         self.set_multicast_loop(true)
702     }
703     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
704         self.set_multicast_loop(false)
705     }
706
707     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
708         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
709                    ttl as libc::c_int)
710     }
711     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
712         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
713     }
714
715     fn hear_broadcasts(&mut self) -> IoResult<()> {
716         self.set_broadcast(true)
717     }
718     fn ignore_broadcasts(&mut self) -> IoResult<()> {
719         self.set_broadcast(false)
720     }
721
722     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
723         box UdpSocket {
724             inner: self.inner.clone(),
725             read_deadline: 0,
726             write_deadline: 0,
727         } as Box<rtio::RtioUdpSocket:Send>
728     }
729
730     fn set_timeout(&mut self, timeout: Option<u64>) {
731         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
732         self.read_deadline = deadline;
733         self.write_deadline = deadline;
734     }
735     fn set_read_timeout(&mut self, timeout: Option<u64>) {
736         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
737     }
738     fn set_write_timeout(&mut self, timeout: Option<u64>) {
739         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
740     }
741 }
742
743 ////////////////////////////////////////////////////////////////////////////////
744 // Timeout helpers
745 //
746 // The read/write functions below are the helpers for reading/writing a socket
747 // with a possible deadline specified. This is generally viewed as a timed out
748 // I/O operation.
749 //
750 // From the application's perspective, timeouts apply to the I/O object, not to
751 // the underlying file descriptor (it's one timeout per object). This means that
752 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
753 //
754 // The next idea to implement timeouts would be to use nonblocking I/O. An
755 // invocation of select() would wait (with a timeout) for a socket to be ready.
756 // Once its ready, we can perform the operation. Note that the operation *must*
757 // be nonblocking, even though select() says the socket is ready. This is
758 // because some other thread could have come and stolen our data (handles can be
759 // cloned).
760 //
761 // To implement nonblocking I/O, the first option we have is to use the
762 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
763 // I/O objects, so this was initially viewed as unwise.
764 //
765 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
766 // send/recv, but the niftiness wears off once you realize it only works well on
767 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
768 // operation on linux (no flag fidding, no affecting other objects), but not on
769 // other platforms.
770 //
771 // To work around this constraint on other platforms, we end up using the
772 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
773 // could cause other objects' blocking operations to suddenly become
774 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
775 // falls back to using the same code path as nonblocking operations, but with an
776 // infinite timeout (select + send/recv). This helps emulate blocking
777 // reads/writes despite the underlying descriptor being nonblocking, as well as
778 // optimizing the fast path of just hitting one syscall in the good case.
779 //
780 // As a final caveat, this implementation uses a mutex so only one thread is
781 // doing a nonblocking operation at at time. This is the operation that comes
782 // after the select() (at which point we think the socket is ready). This is
783 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
784 // expect (wouldn't want someone turning it on when it should be off!). All
785 // operations performed in the lock are *nonblocking* to avoid holding the mutex
786 // forever.
787 //
788 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
789 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
790 // reads/writes are still blocking.
791 //
792 // Fun, fun!
793 //
794 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
795 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
796
797 pub fn read<T>(fd: sock_t,
798                deadline: u64,
799                lock: || -> T,
800                read: |bool| -> libc::c_int) -> IoResult<uint> {
801     let mut ret = -1;
802     if deadline == 0 {
803         ret = retry(|| read(false));
804     }
805
806     if deadline != 0 || (ret == -1 && util::wouldblock()) {
807         let deadline = match deadline {
808             0 => None,
809             n => Some(n),
810         };
811         loop {
812             // With a timeout, first we wait for the socket to become
813             // readable using select(), specifying the relevant timeout for
814             // our previously set deadline.
815             try!(util::await(fd, deadline, util::Readable));
816
817             // At this point, we're still within the timeout, and we've
818             // determined that the socket is readable (as returned by
819             // select). We must still read the socket in *nonblocking* mode
820             // because some other thread could come steal our data. If we
821             // fail to read some data, we retry (hence the outer loop) and
822             // wait for the socket to become readable again.
823             let _guard = lock();
824             match retry(|| read(deadline.is_some())) {
825                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
826                 -1 => return Err(last_error()),
827                n => { ret = n; break }
828             }
829         }
830     }
831
832     match ret {
833         0 => Err(util::eof()),
834         n if n < 0 => Err(last_error()),
835         n => Ok(n as uint)
836     }
837 }
838
839 pub fn write<T>(fd: sock_t,
840                 deadline: u64,
841                 buf: &[u8],
842                 write_everything: bool,
843                 lock: || -> T,
844                 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
845     let mut ret = -1;
846     let mut written = 0;
847     if deadline == 0 {
848         if write_everything {
849             ret = keep_going(buf, |inner, len| {
850                 written = buf.len() - len;
851                 write(false, inner, len)
852             });
853         } else {
854             ret = retry(|| {
855                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
856             }) as i64;
857             if ret > 0 { written = ret as uint; }
858         }
859     }
860
861     if deadline != 0 || (ret == -1 && util::wouldblock()) {
862         let deadline = match deadline {
863             0 => None,
864             n => Some(n),
865         };
866         while written < buf.len() && (write_everything || written == 0) {
867             // As with read(), first wait for the socket to be ready for
868             // the I/O operation.
869             match util::await(fd, deadline, util::Writable) {
870                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
871                     assert!(deadline.is_some());
872                     return Err(util::short_write(written, "short write"))
873                 }
874                 Err(e) => return Err(e),
875                 Ok(()) => {}
876             }
877
878             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
879             // against unforseen circumstances.
880             let _guard = lock();
881             let ptr = buf.slice_from(written).as_ptr();
882             let len = buf.len() - written;
883             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
884                 -1 if util::wouldblock() => {}
885                 -1 => return Err(last_error()),
886                 n => { written += n as uint; }
887             }
888         }
889         ret = 0;
890     }
891     if ret < 0 {
892         Err(last_error())
893     } else {
894         Ok(written)
895     }
896 }