]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
auto merge of #14788 : Sawyer47/rust/issue-13214, r=huonw
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::rt::mutex;
15 use std::rt::rtio;
16 use std::rt::rtio::{IoResult, IoError};
17
18 use super::{retry, keep_going};
19 use super::c;
20 use super::util;
21
22 ////////////////////////////////////////////////////////////////////////////////
23 // sockaddr and misc bindings
24 ////////////////////////////////////////////////////////////////////////////////
25
26 #[cfg(windows)] pub type sock_t = libc::SOCKET;
27 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
28
29 pub fn htons(u: u16) -> u16 {
30     mem::to_be16(u)
31 }
32 pub fn ntohs(u: u16) -> u16 {
33     mem::from_be16(u)
34 }
35
36 enum InAddr {
37     InAddr(libc::in_addr),
38     In6Addr(libc::in6_addr),
39 }
40
41 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
42     match ip {
43         rtio::Ipv4Addr(a, b, c, d) => {
44             let ip = (a as u32 << 24) |
45                      (b as u32 << 16) |
46                      (c as u32 <<  8) |
47                      (d as u32 <<  0);
48             InAddr(libc::in_addr {
49                 s_addr: mem::from_be32(ip)
50             })
51         }
52         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
53             In6Addr(libc::in6_addr {
54                 s6_addr: [
55                     htons(a),
56                     htons(b),
57                     htons(c),
58                     htons(d),
59                     htons(e),
60                     htons(f),
61                     htons(g),
62                     htons(h),
63                 ]
64             })
65         }
66     }
67 }
68
69 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
70     unsafe {
71         let storage: libc::sockaddr_storage = mem::zeroed();
72         let len = match ip_to_inaddr(addr.ip) {
73             InAddr(inaddr) => {
74                 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
75                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
76                 (*storage).sin_port = htons(addr.port);
77                 (*storage).sin_addr = inaddr;
78                 mem::size_of::<libc::sockaddr_in>()
79             }
80             In6Addr(inaddr) => {
81                 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
82                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
83                 (*storage).sin6_port = htons(addr.port);
84                 (*storage).sin6_addr = inaddr;
85                 mem::size_of::<libc::sockaddr_in6>()
86             }
87         };
88         return (storage, len);
89     }
90 }
91
92 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
93     unsafe {
94         let fam = match addr.ip {
95             rtio::Ipv4Addr(..) => libc::AF_INET,
96             rtio::Ipv6Addr(..) => libc::AF_INET6,
97         };
98         match libc::socket(fam, ty, 0) {
99             -1 => Err(super::last_error()),
100             fd => Ok(fd),
101         }
102     }
103 }
104
105 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
106                  payload: T) -> IoResult<()> {
107     unsafe {
108         let payload = &payload as *T as *libc::c_void;
109         let ret = libc::setsockopt(fd, opt, val,
110                                    payload,
111                                    mem::size_of::<T>() as libc::socklen_t);
112         if ret != 0 {
113             Err(last_error())
114         } else {
115             Ok(())
116         }
117     }
118 }
119
120 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121                            val: libc::c_int) -> IoResult<T> {
122     unsafe {
123         let mut slot: T = mem::zeroed();
124         let mut len = mem::size_of::<T>() as libc::socklen_t;
125         let ret = c::getsockopt(fd, opt, val,
126                                 &mut slot as *mut _ as *mut _,
127                                 &mut len);
128         if ret != 0 {
129             Err(last_error())
130         } else {
131             assert!(len as uint == mem::size_of::<T>());
132             Ok(slot)
133         }
134     }
135 }
136
137 #[cfg(windows)]
138 pub fn last_error() -> IoError {
139     use std::os;
140     let code = unsafe { c::WSAGetLastError() as uint };
141     IoError {
142         code: code,
143         extra: 0,
144         detail: Some(os::error_string(code)),
145     }
146 }
147
148 #[cfg(not(windows))]
149 fn last_error() -> IoError {
150     super::last_error()
151 }
152
153 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
154 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
155
156 fn sockname(fd: sock_t,
157             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
158                                          *mut libc::socklen_t) -> libc::c_int)
159     -> IoResult<rtio::SocketAddr>
160 {
161     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
162     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
163     unsafe {
164         let storage = &mut storage as *mut libc::sockaddr_storage;
165         let ret = f(fd,
166                     storage as *mut libc::sockaddr,
167                     &mut len as *mut libc::socklen_t);
168         if ret != 0 {
169             return Err(last_error())
170         }
171     }
172     return sockaddr_to_addr(&storage, len as uint);
173 }
174
175 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
176                         len: uint) -> IoResult<rtio::SocketAddr> {
177     match storage.ss_family as libc::c_int {
178         libc::AF_INET => {
179             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
180             let storage: &libc::sockaddr_in = unsafe {
181                 mem::transmute(storage)
182             };
183             let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
184             let a = (ip >> 24) as u8;
185             let b = (ip >> 16) as u8;
186             let c = (ip >>  8) as u8;
187             let d = (ip >>  0) as u8;
188             Ok(rtio::SocketAddr {
189                 ip: rtio::Ipv4Addr(a, b, c, d),
190                 port: ntohs(storage.sin_port),
191             })
192         }
193         libc::AF_INET6 => {
194             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
195             let storage: &libc::sockaddr_in6 = unsafe {
196                 mem::transmute(storage)
197             };
198             let a = ntohs(storage.sin6_addr.s6_addr[0]);
199             let b = ntohs(storage.sin6_addr.s6_addr[1]);
200             let c = ntohs(storage.sin6_addr.s6_addr[2]);
201             let d = ntohs(storage.sin6_addr.s6_addr[3]);
202             let e = ntohs(storage.sin6_addr.s6_addr[4]);
203             let f = ntohs(storage.sin6_addr.s6_addr[5]);
204             let g = ntohs(storage.sin6_addr.s6_addr[6]);
205             let h = ntohs(storage.sin6_addr.s6_addr[7]);
206             Ok(rtio::SocketAddr {
207                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
208                 port: ntohs(storage.sin6_port),
209             })
210         }
211         _ => {
212             #[cfg(unix)] use ERROR = libc::EINVAL;
213             #[cfg(windows)] use ERROR = libc::WSAEINVAL;
214             Err(IoError {
215                 code: ERROR as uint,
216                 extra: 0,
217                 detail: None,
218             })
219         }
220     }
221 }
222
223 #[cfg(unix)]
224 pub fn init() {}
225
226 #[cfg(windows)]
227 pub fn init() {
228
229     unsafe {
230         use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
231         static mut INITIALIZED: bool = false;
232         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
233
234         let _guard = LOCK.lock();
235         if !INITIALIZED {
236             let mut data: c::WSADATA = mem::zeroed();
237             let ret = c::WSAStartup(0x202,      // version 2.2
238                                     &mut data);
239             assert_eq!(ret, 0);
240             INITIALIZED = true;
241         }
242     }
243 }
244
245 ////////////////////////////////////////////////////////////////////////////////
246 // TCP streams
247 ////////////////////////////////////////////////////////////////////////////////
248
249 pub struct TcpStream {
250     inner: Arc<Inner>,
251     read_deadline: u64,
252     write_deadline: u64,
253 }
254
255 struct Inner {
256     fd: sock_t,
257
258     // Unused on Linux, where this lock is not necessary.
259     #[allow(dead_code)]
260     lock: mutex::NativeMutex
261 }
262
263 pub struct Guard<'a> {
264     pub fd: sock_t,
265     pub guard: mutex::LockGuard<'a>,
266 }
267
268 impl Inner {
269     fn new(fd: sock_t) -> Inner {
270         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
271     }
272 }
273
274 impl TcpStream {
275     pub fn connect(addr: rtio::SocketAddr,
276                    timeout: Option<u64>) -> IoResult<TcpStream> {
277         let fd = try!(socket(addr, libc::SOCK_STREAM));
278         let ret = TcpStream::new(Inner::new(fd));
279
280         let (addr, len) = addr_to_sockaddr(addr);
281         let addrp = &addr as *_ as *libc::sockaddr;
282         let len = len as libc::socklen_t;
283
284         match timeout {
285             Some(timeout) => {
286                 try!(util::connect_timeout(fd, addrp, len, timeout));
287                 Ok(ret)
288             },
289             None => {
290                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
291                     -1 => Err(last_error()),
292                     _ => Ok(ret),
293                 }
294             }
295         }
296     }
297
298     fn new(inner: Inner) -> TcpStream {
299         TcpStream {
300             inner: Arc::new(inner),
301             read_deadline: 0,
302             write_deadline: 0,
303         }
304     }
305
306     pub fn fd(&self) -> sock_t { self.inner.fd }
307
308     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
309         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
310                    nodelay as libc::c_int)
311     }
312
313     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
314         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
315                              seconds.is_some() as libc::c_int);
316         match seconds {
317             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
318             None => ret,
319         }
320     }
321
322     #[cfg(target_os = "macos")]
323     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
324         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
325                    seconds as libc::c_int)
326     }
327     #[cfg(target_os = "freebsd")]
328     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
329         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
330                    seconds as libc::c_int)
331     }
332     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
333     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
334         Ok(())
335     }
336
337     #[cfg(target_os = "linux")]
338     fn lock_nonblocking(&self) {}
339
340     #[cfg(not(target_os = "linux"))]
341     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
342         let ret = Guard {
343             fd: self.fd(),
344             guard: unsafe { self.inner.lock.lock() },
345         };
346         assert!(util::set_nonblocking(self.fd(), true).is_ok());
347         ret
348     }
349 }
350
351 #[cfg(windows)] type wrlen = libc::c_int;
352 #[cfg(not(windows))] type wrlen = libc::size_t;
353
354 impl rtio::RtioTcpStream for TcpStream {
355     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
356         let fd = self.fd();
357         let dolock = || self.lock_nonblocking();
358         let doread = |nb| unsafe {
359             let flags = if nb {c::MSG_DONTWAIT} else {0};
360             libc::recv(fd,
361                        buf.as_mut_ptr() as *mut libc::c_void,
362                        buf.len() as wrlen,
363                        flags) as libc::c_int
364         };
365         read(fd, self.read_deadline, dolock, doread)
366     }
367
368     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
369         let fd = self.fd();
370         let dolock = || self.lock_nonblocking();
371         let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
372             let flags = if nb {c::MSG_DONTWAIT} else {0};
373             libc::send(fd,
374                        buf as *mut libc::c_void,
375                        len as wrlen,
376                        flags) as i64
377         };
378         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
379             Ok(_) => Ok(()),
380             Err(e) => Err(e)
381         }
382     }
383     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
384         sockname(self.fd(), libc::getpeername)
385     }
386     fn control_congestion(&mut self) -> IoResult<()> {
387         self.set_nodelay(false)
388     }
389     fn nodelay(&mut self) -> IoResult<()> {
390         self.set_nodelay(true)
391     }
392     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
393         self.set_keepalive(Some(delay_in_seconds))
394     }
395     fn letdie(&mut self) -> IoResult<()> {
396         self.set_keepalive(None)
397     }
398
399     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
400         box TcpStream {
401             inner: self.inner.clone(),
402             read_deadline: 0,
403             write_deadline: 0,
404         } as Box<rtio::RtioTcpStream:Send>
405     }
406
407     fn close_write(&mut self) -> IoResult<()> {
408         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
409     }
410     fn close_read(&mut self) -> IoResult<()> {
411         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
412     }
413
414     fn set_timeout(&mut self, timeout: Option<u64>) {
415         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
416         self.read_deadline = deadline;
417         self.write_deadline = deadline;
418     }
419     fn set_read_timeout(&mut self, timeout: Option<u64>) {
420         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
421     }
422     fn set_write_timeout(&mut self, timeout: Option<u64>) {
423         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
424     }
425 }
426
427 impl rtio::RtioSocket for TcpStream {
428     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
429         sockname(self.fd(), libc::getsockname)
430     }
431 }
432
433 impl Drop for Inner {
434     fn drop(&mut self) { unsafe { close(self.fd); } }
435 }
436
437 #[unsafe_destructor]
438 impl<'a> Drop for Guard<'a> {
439     fn drop(&mut self) {
440         assert!(util::set_nonblocking(self.fd, false).is_ok());
441     }
442 }
443
444 ////////////////////////////////////////////////////////////////////////////////
445 // TCP listeners
446 ////////////////////////////////////////////////////////////////////////////////
447
448 pub struct TcpListener {
449     inner: Inner,
450 }
451
452 impl TcpListener {
453     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
454         let fd = try!(socket(addr, libc::SOCK_STREAM));
455         let ret = TcpListener { inner: Inner::new(fd) };
456
457         let (addr, len) = addr_to_sockaddr(addr);
458         let addrp = &addr as *_ as *libc::sockaddr;
459         let len = len as libc::socklen_t;
460
461         // On platforms with Berkeley-derived sockets, this allows
462         // to quickly rebind a socket, without needing to wait for
463         // the OS to clean up the previous one.
464         if cfg!(unix) {
465             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
466                             1 as libc::c_int));
467         }
468
469         match unsafe { libc::bind(fd, addrp, len) } {
470             -1 => Err(last_error()),
471             _ => Ok(ret),
472         }
473     }
474
475     pub fn fd(&self) -> sock_t { self.inner.fd }
476
477     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
478         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
479             -1 => Err(last_error()),
480             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
481         }
482     }
483 }
484
485 impl rtio::RtioTcpListener for TcpListener {
486     fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
487         self.native_listen(128).map(|a| {
488             box a as Box<rtio::RtioTcpAcceptor:Send>
489         })
490     }
491 }
492
493 impl rtio::RtioSocket for TcpListener {
494     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
495         sockname(self.fd(), libc::getsockname)
496     }
497 }
498
499 pub struct TcpAcceptor {
500     listener: TcpListener,
501     deadline: u64,
502 }
503
504 impl TcpAcceptor {
505     pub fn fd(&self) -> sock_t { self.listener.fd() }
506
507     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
508         if self.deadline != 0 {
509             try!(util::await(self.fd(), Some(self.deadline), util::Readable));
510         }
511         unsafe {
512             let mut storage: libc::sockaddr_storage = mem::zeroed();
513             let storagep = &mut storage as *mut libc::sockaddr_storage;
514             let size = mem::size_of::<libc::sockaddr_storage>();
515             let mut size = size as libc::socklen_t;
516             match retry(|| {
517                 libc::accept(self.fd(),
518                              storagep as *mut libc::sockaddr,
519                              &mut size as *mut libc::socklen_t) as libc::c_int
520             }) as sock_t {
521                 -1 => Err(last_error()),
522                 fd => Ok(TcpStream::new(Inner::new(fd))),
523             }
524         }
525     }
526 }
527
528 impl rtio::RtioSocket for TcpAcceptor {
529     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
530         sockname(self.fd(), libc::getsockname)
531     }
532 }
533
534 impl rtio::RtioTcpAcceptor for TcpAcceptor {
535     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
536         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
537     }
538
539     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
540     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
541     fn set_timeout(&mut self, timeout: Option<u64>) {
542         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
543     }
544 }
545
546 ////////////////////////////////////////////////////////////////////////////////
547 // UDP
548 ////////////////////////////////////////////////////////////////////////////////
549
550 pub struct UdpSocket {
551     inner: Arc<Inner>,
552     read_deadline: u64,
553     write_deadline: u64,
554 }
555
556 impl UdpSocket {
557     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
558         let fd = try!(socket(addr, libc::SOCK_DGRAM));
559         let ret = UdpSocket {
560             inner: Arc::new(Inner::new(fd)),
561             read_deadline: 0,
562             write_deadline: 0,
563         };
564
565         let (addr, len) = addr_to_sockaddr(addr);
566         let addrp = &addr as *_ as *libc::sockaddr;
567         let len = len as libc::socklen_t;
568
569         match unsafe { libc::bind(fd, addrp, len) } {
570             -1 => Err(last_error()),
571             _ => Ok(ret),
572         }
573     }
574
575     pub fn fd(&self) -> sock_t { self.inner.fd }
576
577     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
578         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
579                    on as libc::c_int)
580     }
581
582     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
583         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
584                    on as libc::c_int)
585     }
586
587     pub fn set_membership(&mut self, addr: rtio::IpAddr,
588                           opt: libc::c_int) -> IoResult<()> {
589         match ip_to_inaddr(addr) {
590             InAddr(addr) => {
591                 let mreq = libc::ip_mreq {
592                     imr_multiaddr: addr,
593                     // interface == INADDR_ANY
594                     imr_interface: libc::in_addr { s_addr: 0x0 },
595                 };
596                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
597             }
598             In6Addr(addr) => {
599                 let mreq = libc::ip6_mreq {
600                     ipv6mr_multiaddr: addr,
601                     ipv6mr_interface: 0,
602                 };
603                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
604             }
605         }
606     }
607
608     #[cfg(target_os = "linux")]
609     fn lock_nonblocking(&self) {}
610
611     #[cfg(not(target_os = "linux"))]
612     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
613         let ret = Guard {
614             fd: self.fd(),
615             guard: unsafe { self.inner.lock.lock() },
616         };
617         assert!(util::set_nonblocking(self.fd(), true).is_ok());
618         ret
619     }
620 }
621
622 impl rtio::RtioSocket for UdpSocket {
623     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
624         sockname(self.fd(), libc::getsockname)
625     }
626 }
627
628 #[cfg(windows)] type msglen_t = libc::c_int;
629 #[cfg(unix)]    type msglen_t = libc::size_t;
630
631 impl rtio::RtioUdpSocket for UdpSocket {
632     fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
633         let fd = self.fd();
634         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
635         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
636         let mut addrlen: libc::socklen_t =
637                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
638
639         let dolock = || self.lock_nonblocking();
640         let doread = |nb| unsafe {
641             let flags = if nb {c::MSG_DONTWAIT} else {0};
642             libc::recvfrom(fd,
643                            buf.as_mut_ptr() as *mut libc::c_void,
644                            buf.len() as msglen_t,
645                            flags,
646                            storagep,
647                            &mut addrlen) as libc::c_int
648         };
649         let n = try!(read(fd, self.read_deadline, dolock, doread));
650         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
651             Ok((n as uint, addr))
652         })
653     }
654
655     fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
656         let (dst, dstlen) = addr_to_sockaddr(dst);
657         let dstp = &dst as *_ as *libc::sockaddr;
658         let dstlen = dstlen as libc::socklen_t;
659
660         let fd = self.fd();
661         let dolock = || self.lock_nonblocking();
662         let dowrite = |nb, buf: *u8, len: uint| unsafe {
663             let flags = if nb {c::MSG_DONTWAIT} else {0};
664             libc::sendto(fd,
665                          buf as *libc::c_void,
666                          len as msglen_t,
667                          flags,
668                          dstp,
669                          dstlen) as i64
670         };
671
672         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
673         if n != buf.len() {
674             Err(util::short_write(n, "couldn't send entire packet at once"))
675         } else {
676             Ok(())
677         }
678     }
679
680     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
681         match multi {
682             rtio::Ipv4Addr(..) => {
683                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
684             }
685             rtio::Ipv6Addr(..) => {
686                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
687             }
688         }
689     }
690     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
691         match multi {
692             rtio::Ipv4Addr(..) => {
693                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
694             }
695             rtio::Ipv6Addr(..) => {
696                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
697             }
698         }
699     }
700
701     fn loop_multicast_locally(&mut self) -> IoResult<()> {
702         self.set_multicast_loop(true)
703     }
704     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
705         self.set_multicast_loop(false)
706     }
707
708     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
709         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
710                    ttl as libc::c_int)
711     }
712     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
713         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
714     }
715
716     fn hear_broadcasts(&mut self) -> IoResult<()> {
717         self.set_broadcast(true)
718     }
719     fn ignore_broadcasts(&mut self) -> IoResult<()> {
720         self.set_broadcast(false)
721     }
722
723     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
724         box UdpSocket {
725             inner: self.inner.clone(),
726             read_deadline: 0,
727             write_deadline: 0,
728         } as Box<rtio::RtioUdpSocket:Send>
729     }
730
731     fn set_timeout(&mut self, timeout: Option<u64>) {
732         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
733         self.read_deadline = deadline;
734         self.write_deadline = deadline;
735     }
736     fn set_read_timeout(&mut self, timeout: Option<u64>) {
737         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
738     }
739     fn set_write_timeout(&mut self, timeout: Option<u64>) {
740         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
741     }
742 }
743
744 ////////////////////////////////////////////////////////////////////////////////
745 // Timeout helpers
746 //
747 // The read/write functions below are the helpers for reading/writing a socket
748 // with a possible deadline specified. This is generally viewed as a timed out
749 // I/O operation.
750 //
751 // From the application's perspective, timeouts apply to the I/O object, not to
752 // the underlying file descriptor (it's one timeout per object). This means that
753 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
754 //
755 // The next idea to implement timeouts would be to use nonblocking I/O. An
756 // invocation of select() would wait (with a timeout) for a socket to be ready.
757 // Once its ready, we can perform the operation. Note that the operation *must*
758 // be nonblocking, even though select() says the socket is ready. This is
759 // because some other thread could have come and stolen our data (handles can be
760 // cloned).
761 //
762 // To implement nonblocking I/O, the first option we have is to use the
763 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
764 // I/O objects, so this was initially viewed as unwise.
765 //
766 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
767 // send/recv, but the niftiness wears off once you realize it only works well on
768 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
769 // operation on linux (no flag fidding, no affecting other objects), but not on
770 // other platforms.
771 //
772 // To work around this constraint on other platforms, we end up using the
773 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
774 // could cause other objects' blocking operations to suddenly become
775 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
776 // falls back to using the same code path as nonblocking operations, but with an
777 // infinite timeout (select + send/recv). This helps emulate blocking
778 // reads/writes despite the underlying descriptor being nonblocking, as well as
779 // optimizing the fast path of just hitting one syscall in the good case.
780 //
781 // As a final caveat, this implementation uses a mutex so only one thread is
782 // doing a nonblocking operation at at time. This is the operation that comes
783 // after the select() (at which point we think the socket is ready). This is
784 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
785 // expect (wouldn't want someone turning it on when it should be off!). All
786 // operations performed in the lock are *nonblocking* to avoid holding the mutex
787 // forever.
788 //
789 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
790 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
791 // reads/writes are still blocking.
792 //
793 // Fun, fun!
794 //
795 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
796 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
797
798 pub fn read<T>(fd: sock_t,
799                deadline: u64,
800                lock: || -> T,
801                read: |bool| -> libc::c_int) -> IoResult<uint> {
802     let mut ret = -1;
803     if deadline == 0 {
804         ret = retry(|| read(false));
805     }
806
807     if deadline != 0 || (ret == -1 && util::wouldblock()) {
808         let deadline = match deadline {
809             0 => None,
810             n => Some(n),
811         };
812         loop {
813             // With a timeout, first we wait for the socket to become
814             // readable using select(), specifying the relevant timeout for
815             // our previously set deadline.
816             try!(util::await(fd, deadline, util::Readable));
817
818             // At this point, we're still within the timeout, and we've
819             // determined that the socket is readable (as returned by
820             // select). We must still read the socket in *nonblocking* mode
821             // because some other thread could come steal our data. If we
822             // fail to read some data, we retry (hence the outer loop) and
823             // wait for the socket to become readable again.
824             let _guard = lock();
825             match retry(|| read(deadline.is_some())) {
826                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
827                 -1 => return Err(last_error()),
828                n => { ret = n; break }
829             }
830         }
831     }
832
833     match ret {
834         0 => Err(util::eof()),
835         n if n < 0 => Err(last_error()),
836         n => Ok(n as uint)
837     }
838 }
839
840 pub fn write<T>(fd: sock_t,
841                 deadline: u64,
842                 buf: &[u8],
843                 write_everything: bool,
844                 lock: || -> T,
845                 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
846     let mut ret = -1;
847     let mut written = 0;
848     if deadline == 0 {
849         if write_everything {
850             ret = keep_going(buf, |inner, len| {
851                 written = buf.len() - len;
852                 write(false, inner, len)
853             });
854         } else {
855             ret = retry(|| {
856                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
857             }) as i64;
858             if ret > 0 { written = ret as uint; }
859         }
860     }
861
862     if deadline != 0 || (ret == -1 && util::wouldblock()) {
863         let deadline = match deadline {
864             0 => None,
865             n => Some(n),
866         };
867         while written < buf.len() && (write_everything || written == 0) {
868             // As with read(), first wait for the socket to be ready for
869             // the I/O operation.
870             match util::await(fd, deadline, util::Writable) {
871                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
872                     assert!(deadline.is_some());
873                     return Err(util::short_write(written, "short write"))
874                 }
875                 Err(e) => return Err(e),
876                 Ok(()) => {}
877             }
878
879             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
880             // against unforseen circumstances.
881             let _guard = lock();
882             let ptr = buf.slice_from(written).as_ptr();
883             let len = buf.len() - written;
884             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
885                 -1 if util::wouldblock() => {}
886                 -1 => return Err(last_error()),
887                 n => { written += n as uint; }
888             }
889         }
890         ret = 0;
891     }
892     if ret < 0 {
893         Err(last_error())
894     } else {
895         Ok(written)
896     }
897 }