]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
auto merge of #14451 : alexcrichton/rust/issue-14442, r=brson
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::io::net::ip;
14 use std::io;
15 use std::mem;
16 use std::rt::rtio;
17 use std::unstable::mutex;
18
19 use super::{IoResult, retry, keep_going};
20 use super::c;
21 use super::util;
22
23 ////////////////////////////////////////////////////////////////////////////////
24 // sockaddr and misc bindings
25 ////////////////////////////////////////////////////////////////////////////////
26
27 #[cfg(windows)] pub type sock_t = libc::SOCKET;
28 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
29
30 pub fn htons(u: u16) -> u16 {
31     mem::to_be16(u)
32 }
33 pub fn ntohs(u: u16) -> u16 {
34     mem::from_be16(u)
35 }
36
37 enum InAddr {
38     InAddr(libc::in_addr),
39     In6Addr(libc::in6_addr),
40 }
41
42 fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
43     match ip {
44         ip::Ipv4Addr(a, b, c, d) => {
45             let ip = (a as u32 << 24) |
46                      (b as u32 << 16) |
47                      (c as u32 <<  8) |
48                      (d as u32 <<  0);
49             InAddr(libc::in_addr {
50                 s_addr: mem::from_be32(ip)
51             })
52         }
53         ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
54             In6Addr(libc::in6_addr {
55                 s6_addr: [
56                     htons(a),
57                     htons(b),
58                     htons(c),
59                     htons(d),
60                     htons(e),
61                     htons(f),
62                     htons(g),
63                     htons(h),
64                 ]
65             })
66         }
67     }
68 }
69
70 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
71     unsafe {
72         let storage: libc::sockaddr_storage = mem::zeroed();
73         let len = match ip_to_inaddr(addr.ip) {
74             InAddr(inaddr) => {
75                 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
76                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77                 (*storage).sin_port = htons(addr.port);
78                 (*storage).sin_addr = inaddr;
79                 mem::size_of::<libc::sockaddr_in>()
80             }
81             In6Addr(inaddr) => {
82                 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
83                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84                 (*storage).sin6_port = htons(addr.port);
85                 (*storage).sin6_addr = inaddr;
86                 mem::size_of::<libc::sockaddr_in6>()
87             }
88         };
89         return (storage, len);
90     }
91 }
92
93 fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
94     unsafe {
95         let fam = match addr.ip {
96             ip::Ipv4Addr(..) => libc::AF_INET,
97             ip::Ipv6Addr(..) => libc::AF_INET6,
98         };
99         match libc::socket(fam, ty, 0) {
100             -1 => Err(super::last_error()),
101             fd => Ok(fd),
102         }
103     }
104 }
105
106 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
107                  payload: T) -> IoResult<()> {
108     unsafe {
109         let payload = &payload as *T as *libc::c_void;
110         let ret = libc::setsockopt(fd, opt, val,
111                                    payload,
112                                    mem::size_of::<T>() as libc::socklen_t);
113         if ret != 0 {
114             Err(last_error())
115         } else {
116             Ok(())
117         }
118     }
119 }
120
121 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122                            val: libc::c_int) -> IoResult<T> {
123     unsafe {
124         let mut slot: T = mem::zeroed();
125         let mut len = mem::size_of::<T>() as libc::socklen_t;
126         let ret = c::getsockopt(fd, opt, val,
127                                 &mut slot as *mut _ as *mut _,
128                                 &mut len);
129         if ret != 0 {
130             Err(last_error())
131         } else {
132             assert!(len as uint == mem::size_of::<T>());
133             Ok(slot)
134         }
135     }
136 }
137
138 #[cfg(windows)]
139 fn last_error() -> io::IoError {
140     io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
141 }
142
143 #[cfg(not(windows))]
144 fn last_error() -> io::IoError {
145     super::last_error()
146 }
147
148 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
149 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
150
151 fn sockname(fd: sock_t,
152             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
153                                          *mut libc::socklen_t) -> libc::c_int)
154     -> IoResult<ip::SocketAddr>
155 {
156     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
157     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
158     unsafe {
159         let storage = &mut storage as *mut libc::sockaddr_storage;
160         let ret = f(fd,
161                     storage as *mut libc::sockaddr,
162                     &mut len as *mut libc::socklen_t);
163         if ret != 0 {
164             return Err(last_error())
165         }
166     }
167     return sockaddr_to_addr(&storage, len as uint);
168 }
169
170 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
171                         len: uint) -> IoResult<ip::SocketAddr> {
172     match storage.ss_family as libc::c_int {
173         libc::AF_INET => {
174             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
175             let storage: &libc::sockaddr_in = unsafe {
176                 mem::transmute(storage)
177             };
178             let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
179             let a = (ip >> 24) as u8;
180             let b = (ip >> 16) as u8;
181             let c = (ip >>  8) as u8;
182             let d = (ip >>  0) as u8;
183             Ok(ip::SocketAddr {
184                 ip: ip::Ipv4Addr(a, b, c, d),
185                 port: ntohs(storage.sin_port),
186             })
187         }
188         libc::AF_INET6 => {
189             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
190             let storage: &libc::sockaddr_in6 = unsafe {
191                 mem::transmute(storage)
192             };
193             let a = ntohs(storage.sin6_addr.s6_addr[0]);
194             let b = ntohs(storage.sin6_addr.s6_addr[1]);
195             let c = ntohs(storage.sin6_addr.s6_addr[2]);
196             let d = ntohs(storage.sin6_addr.s6_addr[3]);
197             let e = ntohs(storage.sin6_addr.s6_addr[4]);
198             let f = ntohs(storage.sin6_addr.s6_addr[5]);
199             let g = ntohs(storage.sin6_addr.s6_addr[6]);
200             let h = ntohs(storage.sin6_addr.s6_addr[7]);
201             Ok(ip::SocketAddr {
202                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
203                 port: ntohs(storage.sin6_port),
204             })
205         }
206         _ => {
207             Err(io::standard_error(io::OtherIoError))
208         }
209     }
210 }
211
212 #[cfg(unix)]
213 pub fn init() {}
214
215 #[cfg(windows)]
216 pub fn init() {
217
218     unsafe {
219         use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
220         static mut INITIALIZED: bool = false;
221         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
222
223         let _guard = LOCK.lock();
224         if !INITIALIZED {
225             let mut data: c::WSADATA = mem::zeroed();
226             let ret = c::WSAStartup(0x202,      // version 2.2
227                                     &mut data);
228             assert_eq!(ret, 0);
229             INITIALIZED = true;
230         }
231     }
232 }
233
234 ////////////////////////////////////////////////////////////////////////////////
235 // TCP streams
236 ////////////////////////////////////////////////////////////////////////////////
237
238 pub struct TcpStream {
239     inner: Arc<Inner>,
240     read_deadline: u64,
241     write_deadline: u64,
242 }
243
244 struct Inner {
245     fd: sock_t,
246     lock: mutex::NativeMutex,
247 }
248
249 pub struct Guard<'a> {
250     pub fd: sock_t,
251     pub guard: mutex::LockGuard<'a>,
252 }
253
254 impl Inner {
255     fn new(fd: sock_t) -> Inner {
256         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
257     }
258 }
259
260 impl TcpStream {
261     pub fn connect(addr: ip::SocketAddr,
262                    timeout: Option<u64>) -> IoResult<TcpStream> {
263         let fd = try!(socket(addr, libc::SOCK_STREAM));
264         let ret = TcpStream::new(Inner::new(fd));
265
266         let (addr, len) = addr_to_sockaddr(addr);
267         let addrp = &addr as *_ as *libc::sockaddr;
268         let len = len as libc::socklen_t;
269
270         match timeout {
271             Some(timeout) => {
272                 try!(util::connect_timeout(fd, addrp, len, timeout));
273                 Ok(ret)
274             },
275             None => {
276                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
277                     -1 => Err(last_error()),
278                     _ => Ok(ret),
279                 }
280             }
281         }
282     }
283
284     fn new(inner: Inner) -> TcpStream {
285         TcpStream {
286             inner: Arc::new(inner),
287             read_deadline: 0,
288             write_deadline: 0,
289         }
290     }
291
292     pub fn fd(&self) -> sock_t { self.inner.fd }
293
294     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
295         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
296                    nodelay as libc::c_int)
297     }
298
299     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
300         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
301                              seconds.is_some() as libc::c_int);
302         match seconds {
303             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
304             None => ret,
305         }
306     }
307
308     #[cfg(target_os = "macos")]
309     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
310         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
311                    seconds as libc::c_int)
312     }
313     #[cfg(target_os = "freebsd")]
314     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
315         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
316                    seconds as libc::c_int)
317     }
318     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
319     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
320         Ok(())
321     }
322
323     #[cfg(target_os = "linux")]
324     fn lock_nonblocking(&self) {}
325
326     #[cfg(not(target_os = "linux"))]
327     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
328         let ret = Guard {
329             fd: self.fd(),
330             guard: unsafe { self.inner.lock.lock() },
331         };
332         assert!(util::set_nonblocking(self.fd(), true).is_ok());
333         ret
334     }
335 }
336
337 #[cfg(windows)] type wrlen = libc::c_int;
338 #[cfg(not(windows))] type wrlen = libc::size_t;
339
340 impl rtio::RtioTcpStream for TcpStream {
341     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
342         let fd = self.fd();
343         let dolock = || self.lock_nonblocking();
344         let doread = |nb| unsafe {
345             let flags = if nb {c::MSG_DONTWAIT} else {0};
346             libc::recv(fd,
347                        buf.as_mut_ptr() as *mut libc::c_void,
348                        buf.len() as wrlen,
349                        flags) as libc::c_int
350         };
351         read(fd, self.read_deadline, dolock, doread)
352     }
353
354     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
355         let fd = self.fd();
356         let dolock = || self.lock_nonblocking();
357         let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
358             let flags = if nb {c::MSG_DONTWAIT} else {0};
359             libc::send(fd,
360                        buf as *mut libc::c_void,
361                        len as wrlen,
362                        flags) as i64
363         };
364         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
365             Ok(_) => Ok(()),
366             Err(e) => Err(e)
367         }
368     }
369     fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
370         sockname(self.fd(), libc::getpeername)
371     }
372     fn control_congestion(&mut self) -> IoResult<()> {
373         self.set_nodelay(false)
374     }
375     fn nodelay(&mut self) -> IoResult<()> {
376         self.set_nodelay(true)
377     }
378     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
379         self.set_keepalive(Some(delay_in_seconds))
380     }
381     fn letdie(&mut self) -> IoResult<()> {
382         self.set_keepalive(None)
383     }
384
385     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
386         box TcpStream {
387             inner: self.inner.clone(),
388             read_deadline: 0,
389             write_deadline: 0,
390         } as Box<rtio::RtioTcpStream:Send>
391     }
392
393     fn close_write(&mut self) -> IoResult<()> {
394         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
395     }
396     fn close_read(&mut self) -> IoResult<()> {
397         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
398     }
399
400     fn set_timeout(&mut self, timeout: Option<u64>) {
401         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
402         self.read_deadline = deadline;
403         self.write_deadline = deadline;
404     }
405     fn set_read_timeout(&mut self, timeout: Option<u64>) {
406         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
407     }
408     fn set_write_timeout(&mut self, timeout: Option<u64>) {
409         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
410     }
411 }
412
413 impl rtio::RtioSocket for TcpStream {
414     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
415         sockname(self.fd(), libc::getsockname)
416     }
417 }
418
419 impl Drop for Inner {
420     fn drop(&mut self) { unsafe { close(self.fd); } }
421 }
422
423 #[unsafe_destructor]
424 impl<'a> Drop for Guard<'a> {
425     fn drop(&mut self) {
426         assert!(util::set_nonblocking(self.fd, false).is_ok());
427     }
428 }
429
430 ////////////////////////////////////////////////////////////////////////////////
431 // TCP listeners
432 ////////////////////////////////////////////////////////////////////////////////
433
434 pub struct TcpListener {
435     inner: Inner,
436 }
437
438 impl TcpListener {
439     pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
440         let fd = try!(socket(addr, libc::SOCK_STREAM));
441         let ret = TcpListener { inner: Inner::new(fd) };
442
443         let (addr, len) = addr_to_sockaddr(addr);
444         let addrp = &addr as *_ as *libc::sockaddr;
445         let len = len as libc::socklen_t;
446
447         // On platforms with Berkeley-derived sockets, this allows
448         // to quickly rebind a socket, without needing to wait for
449         // the OS to clean up the previous one.
450         if cfg!(unix) {
451             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
452                             1 as libc::c_int));
453         }
454
455         match unsafe { libc::bind(fd, addrp, len) } {
456             -1 => Err(last_error()),
457             _ => Ok(ret),
458         }
459     }
460
461     pub fn fd(&self) -> sock_t { self.inner.fd }
462
463     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
464         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
465             -1 => Err(last_error()),
466             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
467         }
468     }
469 }
470
471 impl rtio::RtioTcpListener for TcpListener {
472     fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
473         self.native_listen(128).map(|a| {
474             box a as Box<rtio::RtioTcpAcceptor:Send>
475         })
476     }
477 }
478
479 impl rtio::RtioSocket for TcpListener {
480     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
481         sockname(self.fd(), libc::getsockname)
482     }
483 }
484
485 pub struct TcpAcceptor {
486     listener: TcpListener,
487     deadline: u64,
488 }
489
490 impl TcpAcceptor {
491     pub fn fd(&self) -> sock_t { self.listener.fd() }
492
493     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
494         if self.deadline != 0 {
495             try!(util::await(self.fd(), Some(self.deadline), util::Readable));
496         }
497         unsafe {
498             let mut storage: libc::sockaddr_storage = mem::zeroed();
499             let storagep = &mut storage as *mut libc::sockaddr_storage;
500             let size = mem::size_of::<libc::sockaddr_storage>();
501             let mut size = size as libc::socklen_t;
502             match retry(|| {
503                 libc::accept(self.fd(),
504                              storagep as *mut libc::sockaddr,
505                              &mut size as *mut libc::socklen_t) as libc::c_int
506             }) as sock_t {
507                 -1 => Err(last_error()),
508                 fd => Ok(TcpStream::new(Inner::new(fd))),
509             }
510         }
511     }
512 }
513
514 impl rtio::RtioSocket for TcpAcceptor {
515     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
516         sockname(self.fd(), libc::getsockname)
517     }
518 }
519
520 impl rtio::RtioTcpAcceptor for TcpAcceptor {
521     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
522         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
523     }
524
525     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
526     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
527     fn set_timeout(&mut self, timeout: Option<u64>) {
528         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
529     }
530 }
531
532 ////////////////////////////////////////////////////////////////////////////////
533 // UDP
534 ////////////////////////////////////////////////////////////////////////////////
535
536 pub struct UdpSocket {
537     inner: Arc<Inner>,
538     read_deadline: u64,
539     write_deadline: u64,
540 }
541
542 impl UdpSocket {
543     pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
544         let fd = try!(socket(addr, libc::SOCK_DGRAM));
545         let ret = UdpSocket {
546             inner: Arc::new(Inner::new(fd)),
547             read_deadline: 0,
548             write_deadline: 0,
549         };
550
551         let (addr, len) = addr_to_sockaddr(addr);
552         let addrp = &addr as *_ as *libc::sockaddr;
553         let len = len as libc::socklen_t;
554
555         match unsafe { libc::bind(fd, addrp, len) } {
556             -1 => Err(last_error()),
557             _ => Ok(ret),
558         }
559     }
560
561     pub fn fd(&self) -> sock_t { self.inner.fd }
562
563     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
564         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
565                    on as libc::c_int)
566     }
567
568     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
569         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
570                    on as libc::c_int)
571     }
572
573     pub fn set_membership(&mut self, addr: ip::IpAddr,
574                           opt: libc::c_int) -> IoResult<()> {
575         match ip_to_inaddr(addr) {
576             InAddr(addr) => {
577                 let mreq = libc::ip_mreq {
578                     imr_multiaddr: addr,
579                     // interface == INADDR_ANY
580                     imr_interface: libc::in_addr { s_addr: 0x0 },
581                 };
582                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
583             }
584             In6Addr(addr) => {
585                 let mreq = libc::ip6_mreq {
586                     ipv6mr_multiaddr: addr,
587                     ipv6mr_interface: 0,
588                 };
589                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
590             }
591         }
592     }
593
594     #[cfg(target_os = "linux")]
595     fn lock_nonblocking(&self) {}
596
597     #[cfg(not(target_os = "linux"))]
598     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
599         let ret = Guard {
600             fd: self.fd(),
601             guard: unsafe { self.inner.lock.lock() },
602         };
603         assert!(util::set_nonblocking(self.fd(), true).is_ok());
604         ret
605     }
606 }
607
608 impl rtio::RtioSocket for UdpSocket {
609     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
610         sockname(self.fd(), libc::getsockname)
611     }
612 }
613
614 #[cfg(windows)] type msglen_t = libc::c_int;
615 #[cfg(unix)]    type msglen_t = libc::size_t;
616
617 impl rtio::RtioUdpSocket for UdpSocket {
618     fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
619         let fd = self.fd();
620         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
621         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
622         let mut addrlen: libc::socklen_t =
623                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
624
625         let dolock = || self.lock_nonblocking();
626         let doread = |nb| unsafe {
627             let flags = if nb {c::MSG_DONTWAIT} else {0};
628             libc::recvfrom(fd,
629                            buf.as_mut_ptr() as *mut libc::c_void,
630                            buf.len() as msglen_t,
631                            flags,
632                            storagep,
633                            &mut addrlen) as libc::c_int
634         };
635         let n = try!(read(fd, self.read_deadline, dolock, doread));
636         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
637             Ok((n as uint, addr))
638         })
639     }
640
641     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
642         let (dst, dstlen) = addr_to_sockaddr(dst);
643         let dstp = &dst as *_ as *libc::sockaddr;
644         let dstlen = dstlen as libc::socklen_t;
645
646         let fd = self.fd();
647         let dolock = || self.lock_nonblocking();
648         let dowrite = |nb, buf: *u8, len: uint| unsafe {
649             let flags = if nb {c::MSG_DONTWAIT} else {0};
650             libc::sendto(fd,
651                          buf as *libc::c_void,
652                          len as msglen_t,
653                          flags,
654                          dstp,
655                          dstlen) as i64
656         };
657
658         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
659         if n != buf.len() {
660             Err(io::IoError {
661                 kind: io::ShortWrite(n),
662                 desc: "couldn't send entire packet at once",
663                 detail: None,
664             })
665         } else {
666             Ok(())
667         }
668     }
669
670     fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
671         match multi {
672             ip::Ipv4Addr(..) => {
673                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
674             }
675             ip::Ipv6Addr(..) => {
676                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
677             }
678         }
679     }
680     fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
681         match multi {
682             ip::Ipv4Addr(..) => {
683                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
684             }
685             ip::Ipv6Addr(..) => {
686                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
687             }
688         }
689     }
690
691     fn loop_multicast_locally(&mut self) -> IoResult<()> {
692         self.set_multicast_loop(true)
693     }
694     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
695         self.set_multicast_loop(false)
696     }
697
698     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
699         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
700                    ttl as libc::c_int)
701     }
702     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
703         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
704     }
705
706     fn hear_broadcasts(&mut self) -> IoResult<()> {
707         self.set_broadcast(true)
708     }
709     fn ignore_broadcasts(&mut self) -> IoResult<()> {
710         self.set_broadcast(false)
711     }
712
713     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
714         box UdpSocket {
715             inner: self.inner.clone(),
716             read_deadline: 0,
717             write_deadline: 0,
718         } as Box<rtio::RtioUdpSocket:Send>
719     }
720
721     fn set_timeout(&mut self, timeout: Option<u64>) {
722         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
723         self.read_deadline = deadline;
724         self.write_deadline = deadline;
725     }
726     fn set_read_timeout(&mut self, timeout: Option<u64>) {
727         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
728     }
729     fn set_write_timeout(&mut self, timeout: Option<u64>) {
730         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
731     }
732 }
733
734 ////////////////////////////////////////////////////////////////////////////////
735 // Timeout helpers
736 //
737 // The read/write functions below are the helpers for reading/writing a socket
738 // with a possible deadline specified. This is generally viewed as a timed out
739 // I/O operation.
740 //
741 // From the application's perspective, timeouts apply to the I/O object, not to
742 // the underlying file descriptor (it's one timeout per object). This means that
743 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
744 //
745 // The next idea to implement timeouts would be to use nonblocking I/O. An
746 // invocation of select() would wait (with a timeout) for a socket to be ready.
747 // Once its ready, we can perform the operation. Note that the operation *must*
748 // be nonblocking, even though select() says the socket is ready. This is
749 // because some other thread could have come and stolen our data (handles can be
750 // cloned).
751 //
752 // To implement nonblocking I/O, the first option we have is to use the
753 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
754 // I/O objects, so this was initially viewed as unwise.
755 //
756 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
757 // send/recv, but the niftiness wears off once you realize it only works well on
758 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
759 // operation on linux (no flag fidding, no affecting other objects), but not on
760 // other platforms.
761 //
762 // To work around this constraint on other platforms, we end up using the
763 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
764 // could cause other objects' blocking operations to suddenly become
765 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
766 // falls back to using the same code path as nonblocking operations, but with an
767 // infinite timeout (select + send/recv). This helps emulate blocking
768 // reads/writes despite the underlying descriptor being nonblocking, as well as
769 // optimizing the fast path of just hitting one syscall in the good case.
770 //
771 // As a final caveat, this implementation uses a mutex so only one thread is
772 // doing a nonblocking operation at at time. This is the operation that comes
773 // after the select() (at which point we think the socket is ready). This is
774 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
775 // expect (wouldn't want someone turning it on when it should be off!). All
776 // operations performed in the lock are *nonblocking* to avoid holding the mutex
777 // forever.
778 //
779 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
780 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
781 // reads/writes are still blocking.
782 //
783 // Fun, fun!
784 //
785 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
786 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
787
788 pub fn read<T>(fd: sock_t,
789                deadline: u64,
790                lock: || -> T,
791                read: |bool| -> libc::c_int) -> IoResult<uint> {
792     let mut ret = -1;
793     if deadline == 0 {
794         ret = retry(|| read(false));
795     }
796
797     if deadline != 0 || (ret == -1 && util::wouldblock()) {
798         let deadline = match deadline {
799             0 => None,
800             n => Some(n),
801         };
802         loop {
803             // With a timeout, first we wait for the socket to become
804             // readable using select(), specifying the relevant timeout for
805             // our previously set deadline.
806             try!(util::await(fd, deadline, util::Readable));
807
808             // At this point, we're still within the timeout, and we've
809             // determined that the socket is readable (as returned by
810             // select). We must still read the socket in *nonblocking* mode
811             // because some other thread could come steal our data. If we
812             // fail to read some data, we retry (hence the outer loop) and
813             // wait for the socket to become readable again.
814             let _guard = lock();
815             match retry(|| read(deadline.is_some())) {
816                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
817                 -1 => return Err(last_error()),
818                n => { ret = n; break }
819             }
820         }
821     }
822
823     match ret {
824         0 => Err(io::standard_error(io::EndOfFile)),
825         n if n < 0 => Err(last_error()),
826         n => Ok(n as uint)
827     }
828 }
829
830 pub fn write<T>(fd: sock_t,
831                 deadline: u64,
832                 buf: &[u8],
833                 write_everything: bool,
834                 lock: || -> T,
835                 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
836     let mut ret = -1;
837     let mut written = 0;
838     if deadline == 0 {
839         if write_everything {
840             ret = keep_going(buf, |inner, len| {
841                 written = buf.len() - len;
842                 write(false, inner, len)
843             });
844         } else {
845             ret = retry(|| {
846                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
847             }) as i64;
848             if ret > 0 { written = ret as uint; }
849         }
850     }
851
852     if deadline != 0 || (ret == -1 && util::wouldblock()) {
853         let deadline = match deadline {
854             0 => None,
855             n => Some(n),
856         };
857         while written < buf.len() && (write_everything || written == 0) {
858             // As with read(), first wait for the socket to be ready for
859             // the I/O operation.
860             match util::await(fd, deadline, util::Writable) {
861                 Err(ref e) if e.kind == io::TimedOut && written > 0 => {
862                     assert!(deadline.is_some());
863                     return Err(io::IoError {
864                         kind: io::ShortWrite(written),
865                         desc: "short write",
866                         detail: None,
867                     })
868                 }
869                 Err(e) => return Err(e),
870                 Ok(()) => {}
871             }
872
873             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
874             // against unforseen circumstances.
875             let _guard = lock();
876             let ptr = buf.slice_from(written).as_ptr();
877             let len = buf.len() - written;
878             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
879                 -1 if util::wouldblock() => {}
880                 -1 => return Err(last_error()),
881                 n => { written += n as uint; }
882             }
883         }
884         ret = 0;
885     }
886     if ret < 0 {
887         Err(last_error())
888     } else {
889         Ok(written)
890     }
891 }