]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
auto merge of #15999 : Kimundi/rust/fix_folder, r=nikomatsakis
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::rt::mutex;
15 use std::rt::rtio;
16 use std::rt::rtio::{IoResult, IoError};
17
18 use super::{retry, keep_going};
19 use super::c;
20 use super::util;
21
22 ////////////////////////////////////////////////////////////////////////////////
23 // sockaddr and misc bindings
24 ////////////////////////////////////////////////////////////////////////////////
25
26 #[cfg(windows)] pub type sock_t = libc::SOCKET;
27 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
28
29 pub fn htons(u: u16) -> u16 {
30     u.to_be()
31 }
32 pub fn ntohs(u: u16) -> u16 {
33     Int::from_be(u)
34 }
35
36 enum InAddr {
37     InAddr(libc::in_addr),
38     In6Addr(libc::in6_addr),
39 }
40
41 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
42     match ip {
43         rtio::Ipv4Addr(a, b, c, d) => {
44             let ip = (a as u32 << 24) |
45                      (b as u32 << 16) |
46                      (c as u32 <<  8) |
47                      (d as u32 <<  0);
48             InAddr(libc::in_addr {
49                 s_addr: Int::from_be(ip)
50             })
51         }
52         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
53             In6Addr(libc::in6_addr {
54                 s6_addr: [
55                     htons(a),
56                     htons(b),
57                     htons(c),
58                     htons(d),
59                     htons(e),
60                     htons(f),
61                     htons(g),
62                     htons(h),
63                 ]
64             })
65         }
66     }
67 }
68
69 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
70     unsafe {
71         let storage: libc::sockaddr_storage = mem::zeroed();
72         let len = match ip_to_inaddr(addr.ip) {
73             InAddr(inaddr) => {
74                 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
75                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
76                 (*storage).sin_port = htons(addr.port);
77                 (*storage).sin_addr = inaddr;
78                 mem::size_of::<libc::sockaddr_in>()
79             }
80             In6Addr(inaddr) => {
81                 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
82                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
83                 (*storage).sin6_port = htons(addr.port);
84                 (*storage).sin6_addr = inaddr;
85                 mem::size_of::<libc::sockaddr_in6>()
86             }
87         };
88         return (storage, len);
89     }
90 }
91
92 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
93     unsafe {
94         let fam = match addr.ip {
95             rtio::Ipv4Addr(..) => libc::AF_INET,
96             rtio::Ipv6Addr(..) => libc::AF_INET6,
97         };
98         match libc::socket(fam, ty, 0) {
99             -1 => Err(super::last_error()),
100             fd => Ok(fd),
101         }
102     }
103 }
104
105 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
106                  payload: T) -> IoResult<()> {
107     unsafe {
108         let payload = &payload as *const T as *const libc::c_void;
109         let ret = libc::setsockopt(fd, opt, val,
110                                    payload,
111                                    mem::size_of::<T>() as libc::socklen_t);
112         if ret != 0 {
113             Err(last_error())
114         } else {
115             Ok(())
116         }
117     }
118 }
119
120 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121                            val: libc::c_int) -> IoResult<T> {
122     unsafe {
123         let mut slot: T = mem::zeroed();
124         let mut len = mem::size_of::<T>() as libc::socklen_t;
125         let ret = c::getsockopt(fd, opt, val,
126                                 &mut slot as *mut _ as *mut _,
127                                 &mut len);
128         if ret != 0 {
129             Err(last_error())
130         } else {
131             assert!(len as uint == mem::size_of::<T>());
132             Ok(slot)
133         }
134     }
135 }
136
137 #[cfg(windows)]
138 pub fn last_error() -> IoError {
139     use std::os;
140     let code = unsafe { c::WSAGetLastError() as uint };
141     IoError {
142         code: code,
143         extra: 0,
144         detail: Some(os::error_string(code)),
145     }
146 }
147
148 #[cfg(not(windows))]
149 fn last_error() -> IoError {
150     super::last_error()
151 }
152
153 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
154 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
155
156 fn sockname(fd: sock_t,
157             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
158                                          *mut libc::socklen_t) -> libc::c_int)
159     -> IoResult<rtio::SocketAddr>
160 {
161     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
162     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
163     unsafe {
164         let storage = &mut storage as *mut libc::sockaddr_storage;
165         let ret = f(fd,
166                     storage as *mut libc::sockaddr,
167                     &mut len as *mut libc::socklen_t);
168         if ret != 0 {
169             return Err(last_error())
170         }
171     }
172     return sockaddr_to_addr(&storage, len as uint);
173 }
174
175 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
176                         len: uint) -> IoResult<rtio::SocketAddr> {
177     match storage.ss_family as libc::c_int {
178         libc::AF_INET => {
179             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
180             let storage: &libc::sockaddr_in = unsafe {
181                 mem::transmute(storage)
182             };
183             let ip = (storage.sin_addr.s_addr as u32).to_be();
184             let a = (ip >> 24) as u8;
185             let b = (ip >> 16) as u8;
186             let c = (ip >>  8) as u8;
187             let d = (ip >>  0) as u8;
188             Ok(rtio::SocketAddr {
189                 ip: rtio::Ipv4Addr(a, b, c, d),
190                 port: ntohs(storage.sin_port),
191             })
192         }
193         libc::AF_INET6 => {
194             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
195             let storage: &libc::sockaddr_in6 = unsafe {
196                 mem::transmute(storage)
197             };
198             let a = ntohs(storage.sin6_addr.s6_addr[0]);
199             let b = ntohs(storage.sin6_addr.s6_addr[1]);
200             let c = ntohs(storage.sin6_addr.s6_addr[2]);
201             let d = ntohs(storage.sin6_addr.s6_addr[3]);
202             let e = ntohs(storage.sin6_addr.s6_addr[4]);
203             let f = ntohs(storage.sin6_addr.s6_addr[5]);
204             let g = ntohs(storage.sin6_addr.s6_addr[6]);
205             let h = ntohs(storage.sin6_addr.s6_addr[7]);
206             Ok(rtio::SocketAddr {
207                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
208                 port: ntohs(storage.sin6_port),
209             })
210         }
211         _ => {
212             #[cfg(unix)] use ERROR = libc::EINVAL;
213             #[cfg(windows)] use ERROR = libc::WSAEINVAL;
214             Err(IoError {
215                 code: ERROR as uint,
216                 extra: 0,
217                 detail: None,
218             })
219         }
220     }
221 }
222
223 #[cfg(unix)]
224 pub fn init() {}
225
226 #[cfg(windows)]
227 pub fn init() {
228
229     unsafe {
230         use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
231         static mut INITIALIZED: bool = false;
232         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
233
234         let _guard = LOCK.lock();
235         if !INITIALIZED {
236             let mut data: c::WSADATA = mem::zeroed();
237             let ret = c::WSAStartup(0x202,      // version 2.2
238                                     &mut data);
239             assert_eq!(ret, 0);
240             INITIALIZED = true;
241         }
242     }
243 }
244
245 ////////////////////////////////////////////////////////////////////////////////
246 // TCP streams
247 ////////////////////////////////////////////////////////////////////////////////
248
249 pub struct TcpStream {
250     inner: Arc<Inner>,
251     read_deadline: u64,
252     write_deadline: u64,
253 }
254
255 struct Inner {
256     fd: sock_t,
257
258     // Unused on Linux, where this lock is not necessary.
259     #[allow(dead_code)]
260     lock: mutex::NativeMutex
261 }
262
263 pub struct Guard<'a> {
264     pub fd: sock_t,
265     pub guard: mutex::LockGuard<'a>,
266 }
267
268 impl Inner {
269     fn new(fd: sock_t) -> Inner {
270         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
271     }
272 }
273
274 impl TcpStream {
275     pub fn connect(addr: rtio::SocketAddr,
276                    timeout: Option<u64>) -> IoResult<TcpStream> {
277         let fd = try!(socket(addr, libc::SOCK_STREAM));
278         let ret = TcpStream::new(Inner::new(fd));
279
280         let (addr, len) = addr_to_sockaddr(addr);
281         let addrp = &addr as *const _ as *const libc::sockaddr;
282         let len = len as libc::socklen_t;
283
284         match timeout {
285             Some(timeout) => {
286                 try!(util::connect_timeout(fd, addrp, len, timeout));
287                 Ok(ret)
288             },
289             None => {
290                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
291                     -1 => Err(last_error()),
292                     _ => Ok(ret),
293                 }
294             }
295         }
296     }
297
298     fn new(inner: Inner) -> TcpStream {
299         TcpStream {
300             inner: Arc::new(inner),
301             read_deadline: 0,
302             write_deadline: 0,
303         }
304     }
305
306     pub fn fd(&self) -> sock_t { self.inner.fd }
307
308     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
309         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
310                    nodelay as libc::c_int)
311     }
312
313     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
314         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
315                              seconds.is_some() as libc::c_int);
316         match seconds {
317             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
318             None => ret,
319         }
320     }
321
322     #[cfg(target_os = "macos")]
323     #[cfg(target_os = "ios")]
324     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
325         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
326                    seconds as libc::c_int)
327     }
328     #[cfg(target_os = "freebsd")]
329     #[cfg(target_os = "dragonfly")]
330     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
331         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
332                    seconds as libc::c_int)
333     }
334     #[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"),
335       not(target_os = "dragonfly"))]
336     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
337         Ok(())
338     }
339
340     #[cfg(target_os = "linux")]
341     fn lock_nonblocking(&self) {}
342
343     #[cfg(not(target_os = "linux"))]
344     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
345         let ret = Guard {
346             fd: self.fd(),
347             guard: unsafe { self.inner.lock.lock() },
348         };
349         assert!(util::set_nonblocking(self.fd(), true).is_ok());
350         ret
351     }
352 }
353
354 #[cfg(windows)] type wrlen = libc::c_int;
355 #[cfg(not(windows))] type wrlen = libc::size_t;
356
357 impl rtio::RtioTcpStream for TcpStream {
358     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
359         let fd = self.fd();
360         let dolock = || self.lock_nonblocking();
361         let doread = |nb| unsafe {
362             let flags = if nb {c::MSG_DONTWAIT} else {0};
363             libc::recv(fd,
364                        buf.as_mut_ptr() as *mut libc::c_void,
365                        buf.len() as wrlen,
366                        flags) as libc::c_int
367         };
368         read(fd, self.read_deadline, dolock, doread)
369     }
370
371     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
372         let fd = self.fd();
373         let dolock = || self.lock_nonblocking();
374         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
375             let flags = if nb {c::MSG_DONTWAIT} else {0};
376             libc::send(fd,
377                        buf as *mut libc::c_void,
378                        len as wrlen,
379                        flags) as i64
380         };
381         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
382             Ok(_) => Ok(()),
383             Err(e) => Err(e)
384         }
385     }
386     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
387         sockname(self.fd(), libc::getpeername)
388     }
389     fn control_congestion(&mut self) -> IoResult<()> {
390         self.set_nodelay(false)
391     }
392     fn nodelay(&mut self) -> IoResult<()> {
393         self.set_nodelay(true)
394     }
395     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
396         self.set_keepalive(Some(delay_in_seconds))
397     }
398     fn letdie(&mut self) -> IoResult<()> {
399         self.set_keepalive(None)
400     }
401
402     fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
403         box TcpStream {
404             inner: self.inner.clone(),
405             read_deadline: 0,
406             write_deadline: 0,
407         } as Box<rtio::RtioTcpStream + Send>
408     }
409
410     fn close_write(&mut self) -> IoResult<()> {
411         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
412     }
413     fn close_read(&mut self) -> IoResult<()> {
414         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
415     }
416
417     fn set_timeout(&mut self, timeout: Option<u64>) {
418         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
419         self.read_deadline = deadline;
420         self.write_deadline = deadline;
421     }
422     fn set_read_timeout(&mut self, timeout: Option<u64>) {
423         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
424     }
425     fn set_write_timeout(&mut self, timeout: Option<u64>) {
426         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
427     }
428 }
429
430 impl rtio::RtioSocket for TcpStream {
431     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
432         sockname(self.fd(), libc::getsockname)
433     }
434 }
435
436 impl Drop for Inner {
437     fn drop(&mut self) { unsafe { close(self.fd); } }
438 }
439
440 #[unsafe_destructor]
441 impl<'a> Drop for Guard<'a> {
442     fn drop(&mut self) {
443         assert!(util::set_nonblocking(self.fd, false).is_ok());
444     }
445 }
446
447 ////////////////////////////////////////////////////////////////////////////////
448 // TCP listeners
449 ////////////////////////////////////////////////////////////////////////////////
450
451 pub struct TcpListener {
452     inner: Inner,
453 }
454
455 impl TcpListener {
456     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
457         let fd = try!(socket(addr, libc::SOCK_STREAM));
458         let ret = TcpListener { inner: Inner::new(fd) };
459
460         let (addr, len) = addr_to_sockaddr(addr);
461         let addrp = &addr as *const _ as *const libc::sockaddr;
462         let len = len as libc::socklen_t;
463
464         // On platforms with Berkeley-derived sockets, this allows
465         // to quickly rebind a socket, without needing to wait for
466         // the OS to clean up the previous one.
467         if cfg!(unix) {
468             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
469                             1 as libc::c_int));
470         }
471
472         match unsafe { libc::bind(fd, addrp, len) } {
473             -1 => Err(last_error()),
474             _ => Ok(ret),
475         }
476     }
477
478     pub fn fd(&self) -> sock_t { self.inner.fd }
479
480     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
481         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
482             -1 => Err(last_error()),
483             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
484         }
485     }
486 }
487
488 impl rtio::RtioTcpListener for TcpListener {
489     fn listen(self: Box<TcpListener>)
490               -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
491         self.native_listen(128).map(|a| {
492             box a as Box<rtio::RtioTcpAcceptor + Send>
493         })
494     }
495 }
496
497 impl rtio::RtioSocket for TcpListener {
498     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
499         sockname(self.fd(), libc::getsockname)
500     }
501 }
502
503 pub struct TcpAcceptor {
504     listener: TcpListener,
505     deadline: u64,
506 }
507
508 impl TcpAcceptor {
509     pub fn fd(&self) -> sock_t { self.listener.fd() }
510
511     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
512         if self.deadline != 0 {
513             try!(util::await(self.fd(), Some(self.deadline), util::Readable));
514         }
515         unsafe {
516             let mut storage: libc::sockaddr_storage = mem::zeroed();
517             let storagep = &mut storage as *mut libc::sockaddr_storage;
518             let size = mem::size_of::<libc::sockaddr_storage>();
519             let mut size = size as libc::socklen_t;
520             match retry(|| {
521                 libc::accept(self.fd(),
522                              storagep as *mut libc::sockaddr,
523                              &mut size as *mut libc::socklen_t) as libc::c_int
524             }) as sock_t {
525                 -1 => Err(last_error()),
526                 fd => Ok(TcpStream::new(Inner::new(fd))),
527             }
528         }
529     }
530 }
531
532 impl rtio::RtioSocket for TcpAcceptor {
533     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
534         sockname(self.fd(), libc::getsockname)
535     }
536 }
537
538 impl rtio::RtioTcpAcceptor for TcpAcceptor {
539     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
540         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
541     }
542
543     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
544     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
545     fn set_timeout(&mut self, timeout: Option<u64>) {
546         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
547     }
548 }
549
550 ////////////////////////////////////////////////////////////////////////////////
551 // UDP
552 ////////////////////////////////////////////////////////////////////////////////
553
554 pub struct UdpSocket {
555     inner: Arc<Inner>,
556     read_deadline: u64,
557     write_deadline: u64,
558 }
559
560 impl UdpSocket {
561     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
562         let fd = try!(socket(addr, libc::SOCK_DGRAM));
563         let ret = UdpSocket {
564             inner: Arc::new(Inner::new(fd)),
565             read_deadline: 0,
566             write_deadline: 0,
567         };
568
569         let (addr, len) = addr_to_sockaddr(addr);
570         let addrp = &addr as *const _ as *const libc::sockaddr;
571         let len = len as libc::socklen_t;
572
573         match unsafe { libc::bind(fd, addrp, len) } {
574             -1 => Err(last_error()),
575             _ => Ok(ret),
576         }
577     }
578
579     pub fn fd(&self) -> sock_t { self.inner.fd }
580
581     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
582         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
583                    on as libc::c_int)
584     }
585
586     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
587         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
588                    on as libc::c_int)
589     }
590
591     pub fn set_membership(&mut self, addr: rtio::IpAddr,
592                           opt: libc::c_int) -> IoResult<()> {
593         match ip_to_inaddr(addr) {
594             InAddr(addr) => {
595                 let mreq = libc::ip_mreq {
596                     imr_multiaddr: addr,
597                     // interface == INADDR_ANY
598                     imr_interface: libc::in_addr { s_addr: 0x0 },
599                 };
600                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
601             }
602             In6Addr(addr) => {
603                 let mreq = libc::ip6_mreq {
604                     ipv6mr_multiaddr: addr,
605                     ipv6mr_interface: 0,
606                 };
607                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
608             }
609         }
610     }
611
612     #[cfg(target_os = "linux")]
613     fn lock_nonblocking(&self) {}
614
615     #[cfg(not(target_os = "linux"))]
616     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
617         let ret = Guard {
618             fd: self.fd(),
619             guard: unsafe { self.inner.lock.lock() },
620         };
621         assert!(util::set_nonblocking(self.fd(), true).is_ok());
622         ret
623     }
624 }
625
626 impl rtio::RtioSocket for UdpSocket {
627     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
628         sockname(self.fd(), libc::getsockname)
629     }
630 }
631
632 #[cfg(windows)] type msglen_t = libc::c_int;
633 #[cfg(unix)]    type msglen_t = libc::size_t;
634
635 impl rtio::RtioUdpSocket for UdpSocket {
636     fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
637         let fd = self.fd();
638         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
639         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
640         let mut addrlen: libc::socklen_t =
641                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
642
643         let dolock = || self.lock_nonblocking();
644         let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
645             let flags = if nb {c::MSG_DONTWAIT} else {0};
646             libc::recvfrom(fd,
647                            buf.as_mut_ptr() as *mut libc::c_void,
648                            buf.len() as msglen_t,
649                            flags,
650                            storagep,
651                            &mut addrlen) as libc::c_int
652         }));
653         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
654             Ok((n as uint, addr))
655         })
656     }
657
658     fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
659         let (dst, dstlen) = addr_to_sockaddr(dst);
660         let dstp = &dst as *const _ as *const libc::sockaddr;
661         let dstlen = dstlen as libc::socklen_t;
662
663         let fd = self.fd();
664         let dolock = || self.lock_nonblocking();
665         let dowrite = |nb, buf: *const u8, len: uint| unsafe {
666             let flags = if nb {c::MSG_DONTWAIT} else {0};
667             libc::sendto(fd,
668                          buf as *const libc::c_void,
669                          len as msglen_t,
670                          flags,
671                          dstp,
672                          dstlen) as i64
673         };
674
675         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
676         if n != buf.len() {
677             Err(util::short_write(n, "couldn't send entire packet at once"))
678         } else {
679             Ok(())
680         }
681     }
682
683     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
684         match multi {
685             rtio::Ipv4Addr(..) => {
686                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
687             }
688             rtio::Ipv6Addr(..) => {
689                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
690             }
691         }
692     }
693     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
694         match multi {
695             rtio::Ipv4Addr(..) => {
696                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
697             }
698             rtio::Ipv6Addr(..) => {
699                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
700             }
701         }
702     }
703
704     fn loop_multicast_locally(&mut self) -> IoResult<()> {
705         self.set_multicast_loop(true)
706     }
707     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
708         self.set_multicast_loop(false)
709     }
710
711     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
712         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
713                    ttl as libc::c_int)
714     }
715     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
716         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
717     }
718
719     fn hear_broadcasts(&mut self) -> IoResult<()> {
720         self.set_broadcast(true)
721     }
722     fn ignore_broadcasts(&mut self) -> IoResult<()> {
723         self.set_broadcast(false)
724     }
725
726     fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
727         box UdpSocket {
728             inner: self.inner.clone(),
729             read_deadline: 0,
730             write_deadline: 0,
731         } as Box<rtio::RtioUdpSocket + Send>
732     }
733
734     fn set_timeout(&mut self, timeout: Option<u64>) {
735         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
736         self.read_deadline = deadline;
737         self.write_deadline = deadline;
738     }
739     fn set_read_timeout(&mut self, timeout: Option<u64>) {
740         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
741     }
742     fn set_write_timeout(&mut self, timeout: Option<u64>) {
743         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
744     }
745 }
746
747 ////////////////////////////////////////////////////////////////////////////////
748 // Timeout helpers
749 //
750 // The read/write functions below are the helpers for reading/writing a socket
751 // with a possible deadline specified. This is generally viewed as a timed out
752 // I/O operation.
753 //
754 // From the application's perspective, timeouts apply to the I/O object, not to
755 // the underlying file descriptor (it's one timeout per object). This means that
756 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
757 //
758 // The next idea to implement timeouts would be to use nonblocking I/O. An
759 // invocation of select() would wait (with a timeout) for a socket to be ready.
760 // Once its ready, we can perform the operation. Note that the operation *must*
761 // be nonblocking, even though select() says the socket is ready. This is
762 // because some other thread could have come and stolen our data (handles can be
763 // cloned).
764 //
765 // To implement nonblocking I/O, the first option we have is to use the
766 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
767 // I/O objects, so this was initially viewed as unwise.
768 //
769 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
770 // send/recv, but the niftiness wears off once you realize it only works well on
771 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
772 // operation on linux (no flag fidding, no affecting other objects), but not on
773 // other platforms.
774 //
775 // To work around this constraint on other platforms, we end up using the
776 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
777 // could cause other objects' blocking operations to suddenly become
778 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
779 // falls back to using the same code path as nonblocking operations, but with an
780 // infinite timeout (select + send/recv). This helps emulate blocking
781 // reads/writes despite the underlying descriptor being nonblocking, as well as
782 // optimizing the fast path of just hitting one syscall in the good case.
783 //
784 // As a final caveat, this implementation uses a mutex so only one thread is
785 // doing a nonblocking operation at at time. This is the operation that comes
786 // after the select() (at which point we think the socket is ready). This is
787 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
788 // expect (wouldn't want someone turning it on when it should be off!). All
789 // operations performed in the lock are *nonblocking* to avoid holding the mutex
790 // forever.
791 //
792 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
793 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
794 // reads/writes are still blocking.
795 //
796 // Fun, fun!
797 //
798 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
799 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
800
801 pub fn read<T>(fd: sock_t,
802                deadline: u64,
803                lock: || -> T,
804                read: |bool| -> libc::c_int) -> IoResult<uint> {
805     let mut ret = -1;
806     if deadline == 0 {
807         ret = retry(|| read(false));
808     }
809
810     if deadline != 0 || (ret == -1 && util::wouldblock()) {
811         let deadline = match deadline {
812             0 => None,
813             n => Some(n),
814         };
815         loop {
816             // With a timeout, first we wait for the socket to become
817             // readable using select(), specifying the relevant timeout for
818             // our previously set deadline.
819             try!(util::await(fd, deadline, util::Readable));
820
821             // At this point, we're still within the timeout, and we've
822             // determined that the socket is readable (as returned by
823             // select). We must still read the socket in *nonblocking* mode
824             // because some other thread could come steal our data. If we
825             // fail to read some data, we retry (hence the outer loop) and
826             // wait for the socket to become readable again.
827             let _guard = lock();
828             match retry(|| read(deadline.is_some())) {
829                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
830                 -1 => return Err(last_error()),
831                n => { ret = n; break }
832             }
833         }
834     }
835
836     match ret {
837         0 => Err(util::eof()),
838         n if n < 0 => Err(last_error()),
839         n => Ok(n as uint)
840     }
841 }
842
843 pub fn write<T>(fd: sock_t,
844                 deadline: u64,
845                 buf: &[u8],
846                 write_everything: bool,
847                 lock: || -> T,
848                 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
849     let mut ret = -1;
850     let mut written = 0;
851     if deadline == 0 {
852         if write_everything {
853             ret = keep_going(buf, |inner, len| {
854                 written = buf.len() - len;
855                 write(false, inner, len)
856             });
857         } else {
858             ret = retry(|| {
859                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
860             }) as i64;
861             if ret > 0 { written = ret as uint; }
862         }
863     }
864
865     if deadline != 0 || (ret == -1 && util::wouldblock()) {
866         let deadline = match deadline {
867             0 => None,
868             n => Some(n),
869         };
870         while written < buf.len() && (write_everything || written == 0) {
871             // As with read(), first wait for the socket to be ready for
872             // the I/O operation.
873             match util::await(fd, deadline, util::Writable) {
874                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
875                     assert!(deadline.is_some());
876                     return Err(util::short_write(written, "short write"))
877                 }
878                 Err(e) => return Err(e),
879                 Ok(()) => {}
880             }
881
882             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
883             // against unforeseen circumstances.
884             let _guard = lock();
885             let ptr = buf.slice_from(written).as_ptr();
886             let len = buf.len() - written;
887             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
888                 -1 if util::wouldblock() => {}
889                 -1 => return Err(last_error()),
890                 n => { written += n as uint; }
891             }
892         }
893         ret = 0;
894     }
895     if ret < 0 {
896         Err(last_error())
897     } else {
898         Ok(written)
899     }
900 }