]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
libs: Fix miscellaneous fallout of librustrt
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::rt::mutex;
15 use std::rt::rtio;
16 use std::rt::rtio::{IoResult, IoError};
17
18 use super::{retry, keep_going};
19 use super::c;
20 use super::util;
21
22 ////////////////////////////////////////////////////////////////////////////////
23 // sockaddr and misc bindings
24 ////////////////////////////////////////////////////////////////////////////////
25
26 #[cfg(windows)] pub type sock_t = libc::SOCKET;
27 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
28
29 pub fn htons(u: u16) -> u16 {
30     mem::to_be16(u)
31 }
32 pub fn ntohs(u: u16) -> u16 {
33     mem::from_be16(u)
34 }
35
36 enum InAddr {
37     InAddr(libc::in_addr),
38     In6Addr(libc::in6_addr),
39 }
40
41 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
42     match ip {
43         rtio::Ipv4Addr(a, b, c, d) => {
44             let ip = (a as u32 << 24) |
45                      (b as u32 << 16) |
46                      (c as u32 <<  8) |
47                      (d as u32 <<  0);
48             InAddr(libc::in_addr {
49                 s_addr: mem::from_be32(ip)
50             })
51         }
52         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
53             In6Addr(libc::in6_addr {
54                 s6_addr: [
55                     htons(a),
56                     htons(b),
57                     htons(c),
58                     htons(d),
59                     htons(e),
60                     htons(f),
61                     htons(g),
62                     htons(h),
63                 ]
64             })
65         }
66     }
67 }
68
69 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
70     unsafe {
71         let storage: libc::sockaddr_storage = mem::zeroed();
72         let len = match ip_to_inaddr(addr.ip) {
73             InAddr(inaddr) => {
74                 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
75                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
76                 (*storage).sin_port = htons(addr.port);
77                 (*storage).sin_addr = inaddr;
78                 mem::size_of::<libc::sockaddr_in>()
79             }
80             In6Addr(inaddr) => {
81                 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
82                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
83                 (*storage).sin6_port = htons(addr.port);
84                 (*storage).sin6_addr = inaddr;
85                 mem::size_of::<libc::sockaddr_in6>()
86             }
87         };
88         return (storage, len);
89     }
90 }
91
92 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
93     unsafe {
94         let fam = match addr.ip {
95             rtio::Ipv4Addr(..) => libc::AF_INET,
96             rtio::Ipv6Addr(..) => libc::AF_INET6,
97         };
98         match libc::socket(fam, ty, 0) {
99             -1 => Err(super::last_error()),
100             fd => Ok(fd),
101         }
102     }
103 }
104
105 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
106                  payload: T) -> IoResult<()> {
107     unsafe {
108         let payload = &payload as *T as *libc::c_void;
109         let ret = libc::setsockopt(fd, opt, val,
110                                    payload,
111                                    mem::size_of::<T>() as libc::socklen_t);
112         if ret != 0 {
113             Err(last_error())
114         } else {
115             Ok(())
116         }
117     }
118 }
119
120 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121                            val: libc::c_int) -> IoResult<T> {
122     unsafe {
123         let mut slot: T = mem::zeroed();
124         let mut len = mem::size_of::<T>() as libc::socklen_t;
125         let ret = c::getsockopt(fd, opt, val,
126                                 &mut slot as *mut _ as *mut _,
127                                 &mut len);
128         if ret != 0 {
129             Err(last_error())
130         } else {
131             assert!(len as uint == mem::size_of::<T>());
132             Ok(slot)
133         }
134     }
135 }
136
137 #[cfg(windows)]
138 pub fn last_error() -> IoError {
139     use std::os;
140     let code = unsafe { c::WSAGetLastError() as uint };
141     IoError {
142         code: code,
143         extra: 0,
144         detail: Some(os::error_string(code)),
145     }
146 }
147
148 #[cfg(not(windows))]
149 fn last_error() -> IoError {
150     super::last_error()
151 }
152
153 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
154 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
155
156 fn sockname(fd: sock_t,
157             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
158                                          *mut libc::socklen_t) -> libc::c_int)
159     -> IoResult<rtio::SocketAddr>
160 {
161     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
162     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
163     unsafe {
164         let storage = &mut storage as *mut libc::sockaddr_storage;
165         let ret = f(fd,
166                     storage as *mut libc::sockaddr,
167                     &mut len as *mut libc::socklen_t);
168         if ret != 0 {
169             return Err(last_error())
170         }
171     }
172     return sockaddr_to_addr(&storage, len as uint);
173 }
174
175 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
176                         len: uint) -> IoResult<rtio::SocketAddr> {
177     match storage.ss_family as libc::c_int {
178         libc::AF_INET => {
179             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
180             let storage: &libc::sockaddr_in = unsafe {
181                 mem::transmute(storage)
182             };
183             let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
184             let a = (ip >> 24) as u8;
185             let b = (ip >> 16) as u8;
186             let c = (ip >>  8) as u8;
187             let d = (ip >>  0) as u8;
188             Ok(rtio::SocketAddr {
189                 ip: rtio::Ipv4Addr(a, b, c, d),
190                 port: ntohs(storage.sin_port),
191             })
192         }
193         libc::AF_INET6 => {
194             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
195             let storage: &libc::sockaddr_in6 = unsafe {
196                 mem::transmute(storage)
197             };
198             let a = ntohs(storage.sin6_addr.s6_addr[0]);
199             let b = ntohs(storage.sin6_addr.s6_addr[1]);
200             let c = ntohs(storage.sin6_addr.s6_addr[2]);
201             let d = ntohs(storage.sin6_addr.s6_addr[3]);
202             let e = ntohs(storage.sin6_addr.s6_addr[4]);
203             let f = ntohs(storage.sin6_addr.s6_addr[5]);
204             let g = ntohs(storage.sin6_addr.s6_addr[6]);
205             let h = ntohs(storage.sin6_addr.s6_addr[7]);
206             Ok(rtio::SocketAddr {
207                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
208                 port: ntohs(storage.sin6_port),
209             })
210         }
211         _ => {
212             #[cfg(unix)] use ERROR = libc::EINVAL;
213             #[cfg(windows)] use ERROR = libc::WSAEINVAL;
214             Err(IoError {
215                 code: ERROR as uint,
216                 extra: 0,
217                 detail: None,
218             })
219         }
220     }
221 }
222
223 #[cfg(unix)]
224 pub fn init() {}
225
226 #[cfg(windows)]
227 pub fn init() {
228
229     unsafe {
230         use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
231         static mut INITIALIZED: bool = false;
232         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
233
234         let _guard = LOCK.lock();
235         if !INITIALIZED {
236             let mut data: c::WSADATA = mem::zeroed();
237             let ret = c::WSAStartup(0x202,      // version 2.2
238                                     &mut data);
239             assert_eq!(ret, 0);
240             INITIALIZED = true;
241         }
242     }
243 }
244
245 ////////////////////////////////////////////////////////////////////////////////
246 // TCP streams
247 ////////////////////////////////////////////////////////////////////////////////
248
249 pub struct TcpStream {
250     inner: Arc<Inner>,
251     read_deadline: u64,
252     write_deadline: u64,
253 }
254
255 struct Inner {
256     fd: sock_t,
257     lock: mutex::NativeMutex,
258 }
259
260 pub struct Guard<'a> {
261     pub fd: sock_t,
262     pub guard: mutex::LockGuard<'a>,
263 }
264
265 impl Inner {
266     fn new(fd: sock_t) -> Inner {
267         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
268     }
269 }
270
271 impl TcpStream {
272     pub fn connect(addr: rtio::SocketAddr,
273                    timeout: Option<u64>) -> IoResult<TcpStream> {
274         let fd = try!(socket(addr, libc::SOCK_STREAM));
275         let ret = TcpStream::new(Inner::new(fd));
276
277         let (addr, len) = addr_to_sockaddr(addr);
278         let addrp = &addr as *_ as *libc::sockaddr;
279         let len = len as libc::socklen_t;
280
281         match timeout {
282             Some(timeout) => {
283                 try!(util::connect_timeout(fd, addrp, len, timeout));
284                 Ok(ret)
285             },
286             None => {
287                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
288                     -1 => Err(last_error()),
289                     _ => Ok(ret),
290                 }
291             }
292         }
293     }
294
295     fn new(inner: Inner) -> TcpStream {
296         TcpStream {
297             inner: Arc::new(inner),
298             read_deadline: 0,
299             write_deadline: 0,
300         }
301     }
302
303     pub fn fd(&self) -> sock_t { self.inner.fd }
304
305     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
306         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
307                    nodelay as libc::c_int)
308     }
309
310     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
311         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
312                              seconds.is_some() as libc::c_int);
313         match seconds {
314             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
315             None => ret,
316         }
317     }
318
319     #[cfg(target_os = "macos")]
320     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
321         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
322                    seconds as libc::c_int)
323     }
324     #[cfg(target_os = "freebsd")]
325     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
326         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
327                    seconds as libc::c_int)
328     }
329     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
330     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
331         Ok(())
332     }
333
334     #[cfg(target_os = "linux")]
335     fn lock_nonblocking(&self) {}
336
337     #[cfg(not(target_os = "linux"))]
338     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
339         let ret = Guard {
340             fd: self.fd(),
341             guard: unsafe { self.inner.lock.lock() },
342         };
343         assert!(util::set_nonblocking(self.fd(), true).is_ok());
344         ret
345     }
346 }
347
348 #[cfg(windows)] type wrlen = libc::c_int;
349 #[cfg(not(windows))] type wrlen = libc::size_t;
350
351 impl rtio::RtioTcpStream for TcpStream {
352     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
353         let fd = self.fd();
354         let dolock = || self.lock_nonblocking();
355         let doread = |nb| unsafe {
356             let flags = if nb {c::MSG_DONTWAIT} else {0};
357             libc::recv(fd,
358                        buf.as_mut_ptr() as *mut libc::c_void,
359                        buf.len() as wrlen,
360                        flags) as libc::c_int
361         };
362         read(fd, self.read_deadline, dolock, doread)
363     }
364
365     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
366         let fd = self.fd();
367         let dolock = || self.lock_nonblocking();
368         let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
369             let flags = if nb {c::MSG_DONTWAIT} else {0};
370             libc::send(fd,
371                        buf as *mut libc::c_void,
372                        len as wrlen,
373                        flags) as i64
374         };
375         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
376             Ok(_) => Ok(()),
377             Err(e) => Err(e)
378         }
379     }
380     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
381         sockname(self.fd(), libc::getpeername)
382     }
383     fn control_congestion(&mut self) -> IoResult<()> {
384         self.set_nodelay(false)
385     }
386     fn nodelay(&mut self) -> IoResult<()> {
387         self.set_nodelay(true)
388     }
389     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
390         self.set_keepalive(Some(delay_in_seconds))
391     }
392     fn letdie(&mut self) -> IoResult<()> {
393         self.set_keepalive(None)
394     }
395
396     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
397         box TcpStream {
398             inner: self.inner.clone(),
399             read_deadline: 0,
400             write_deadline: 0,
401         } as Box<rtio::RtioTcpStream:Send>
402     }
403
404     fn close_write(&mut self) -> IoResult<()> {
405         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
406     }
407     fn close_read(&mut self) -> IoResult<()> {
408         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
409     }
410
411     fn set_timeout(&mut self, timeout: Option<u64>) {
412         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
413         self.read_deadline = deadline;
414         self.write_deadline = deadline;
415     }
416     fn set_read_timeout(&mut self, timeout: Option<u64>) {
417         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
418     }
419     fn set_write_timeout(&mut self, timeout: Option<u64>) {
420         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
421     }
422 }
423
424 impl rtio::RtioSocket for TcpStream {
425     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
426         sockname(self.fd(), libc::getsockname)
427     }
428 }
429
430 impl Drop for Inner {
431     fn drop(&mut self) { unsafe { close(self.fd); } }
432 }
433
434 #[unsafe_destructor]
435 impl<'a> Drop for Guard<'a> {
436     fn drop(&mut self) {
437         assert!(util::set_nonblocking(self.fd, false).is_ok());
438     }
439 }
440
441 ////////////////////////////////////////////////////////////////////////////////
442 // TCP listeners
443 ////////////////////////////////////////////////////////////////////////////////
444
445 pub struct TcpListener {
446     inner: Inner,
447 }
448
449 impl TcpListener {
450     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
451         let fd = try!(socket(addr, libc::SOCK_STREAM));
452         let ret = TcpListener { inner: Inner::new(fd) };
453
454         let (addr, len) = addr_to_sockaddr(addr);
455         let addrp = &addr as *_ as *libc::sockaddr;
456         let len = len as libc::socklen_t;
457
458         // On platforms with Berkeley-derived sockets, this allows
459         // to quickly rebind a socket, without needing to wait for
460         // the OS to clean up the previous one.
461         if cfg!(unix) {
462             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
463                             1 as libc::c_int));
464         }
465
466         match unsafe { libc::bind(fd, addrp, len) } {
467             -1 => Err(last_error()),
468             _ => Ok(ret),
469         }
470     }
471
472     pub fn fd(&self) -> sock_t { self.inner.fd }
473
474     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
475         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
476             -1 => Err(last_error()),
477             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
478         }
479     }
480 }
481
482 impl rtio::RtioTcpListener for TcpListener {
483     fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
484         self.native_listen(128).map(|a| {
485             box a as Box<rtio::RtioTcpAcceptor:Send>
486         })
487     }
488 }
489
490 impl rtio::RtioSocket for TcpListener {
491     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
492         sockname(self.fd(), libc::getsockname)
493     }
494 }
495
496 pub struct TcpAcceptor {
497     listener: TcpListener,
498     deadline: u64,
499 }
500
501 impl TcpAcceptor {
502     pub fn fd(&self) -> sock_t { self.listener.fd() }
503
504     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
505         if self.deadline != 0 {
506             try!(util::await(self.fd(), Some(self.deadline), util::Readable));
507         }
508         unsafe {
509             let mut storage: libc::sockaddr_storage = mem::zeroed();
510             let storagep = &mut storage as *mut libc::sockaddr_storage;
511             let size = mem::size_of::<libc::sockaddr_storage>();
512             let mut size = size as libc::socklen_t;
513             match retry(|| {
514                 libc::accept(self.fd(),
515                              storagep as *mut libc::sockaddr,
516                              &mut size as *mut libc::socklen_t) as libc::c_int
517             }) as sock_t {
518                 -1 => Err(last_error()),
519                 fd => Ok(TcpStream::new(Inner::new(fd))),
520             }
521         }
522     }
523 }
524
525 impl rtio::RtioSocket for TcpAcceptor {
526     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
527         sockname(self.fd(), libc::getsockname)
528     }
529 }
530
531 impl rtio::RtioTcpAcceptor for TcpAcceptor {
532     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
533         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
534     }
535
536     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
537     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
538     fn set_timeout(&mut self, timeout: Option<u64>) {
539         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
540     }
541 }
542
543 ////////////////////////////////////////////////////////////////////////////////
544 // UDP
545 ////////////////////////////////////////////////////////////////////////////////
546
547 pub struct UdpSocket {
548     inner: Arc<Inner>,
549     read_deadline: u64,
550     write_deadline: u64,
551 }
552
553 impl UdpSocket {
554     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
555         let fd = try!(socket(addr, libc::SOCK_DGRAM));
556         let ret = UdpSocket {
557             inner: Arc::new(Inner::new(fd)),
558             read_deadline: 0,
559             write_deadline: 0,
560         };
561
562         let (addr, len) = addr_to_sockaddr(addr);
563         let addrp = &addr as *_ as *libc::sockaddr;
564         let len = len as libc::socklen_t;
565
566         match unsafe { libc::bind(fd, addrp, len) } {
567             -1 => Err(last_error()),
568             _ => Ok(ret),
569         }
570     }
571
572     pub fn fd(&self) -> sock_t { self.inner.fd }
573
574     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
575         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
576                    on as libc::c_int)
577     }
578
579     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
580         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
581                    on as libc::c_int)
582     }
583
584     pub fn set_membership(&mut self, addr: rtio::IpAddr,
585                           opt: libc::c_int) -> IoResult<()> {
586         match ip_to_inaddr(addr) {
587             InAddr(addr) => {
588                 let mreq = libc::ip_mreq {
589                     imr_multiaddr: addr,
590                     // interface == INADDR_ANY
591                     imr_interface: libc::in_addr { s_addr: 0x0 },
592                 };
593                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
594             }
595             In6Addr(addr) => {
596                 let mreq = libc::ip6_mreq {
597                     ipv6mr_multiaddr: addr,
598                     ipv6mr_interface: 0,
599                 };
600                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
601             }
602         }
603     }
604
605     #[cfg(target_os = "linux")]
606     fn lock_nonblocking(&self) {}
607
608     #[cfg(not(target_os = "linux"))]
609     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
610         let ret = Guard {
611             fd: self.fd(),
612             guard: unsafe { self.inner.lock.lock() },
613         };
614         assert!(util::set_nonblocking(self.fd(), true).is_ok());
615         ret
616     }
617 }
618
619 impl rtio::RtioSocket for UdpSocket {
620     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
621         sockname(self.fd(), libc::getsockname)
622     }
623 }
624
625 #[cfg(windows)] type msglen_t = libc::c_int;
626 #[cfg(unix)]    type msglen_t = libc::size_t;
627
628 impl rtio::RtioUdpSocket for UdpSocket {
629     fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
630         let fd = self.fd();
631         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
632         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
633         let mut addrlen: libc::socklen_t =
634                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
635
636         let dolock = || self.lock_nonblocking();
637         let doread = |nb| unsafe {
638             let flags = if nb {c::MSG_DONTWAIT} else {0};
639             libc::recvfrom(fd,
640                            buf.as_mut_ptr() as *mut libc::c_void,
641                            buf.len() as msglen_t,
642                            flags,
643                            storagep,
644                            &mut addrlen) as libc::c_int
645         };
646         let n = try!(read(fd, self.read_deadline, dolock, doread));
647         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
648             Ok((n as uint, addr))
649         })
650     }
651
652     fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
653         let (dst, dstlen) = addr_to_sockaddr(dst);
654         let dstp = &dst as *_ as *libc::sockaddr;
655         let dstlen = dstlen as libc::socklen_t;
656
657         let fd = self.fd();
658         let dolock = || self.lock_nonblocking();
659         let dowrite = |nb, buf: *u8, len: uint| unsafe {
660             let flags = if nb {c::MSG_DONTWAIT} else {0};
661             libc::sendto(fd,
662                          buf as *libc::c_void,
663                          len as msglen_t,
664                          flags,
665                          dstp,
666                          dstlen) as i64
667         };
668
669         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
670         if n != buf.len() {
671             Err(util::short_write(n, "couldn't send entire packet at once"))
672         } else {
673             Ok(())
674         }
675     }
676
677     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
678         match multi {
679             rtio::Ipv4Addr(..) => {
680                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
681             }
682             rtio::Ipv6Addr(..) => {
683                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
684             }
685         }
686     }
687     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
688         match multi {
689             rtio::Ipv4Addr(..) => {
690                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
691             }
692             rtio::Ipv6Addr(..) => {
693                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
694             }
695         }
696     }
697
698     fn loop_multicast_locally(&mut self) -> IoResult<()> {
699         self.set_multicast_loop(true)
700     }
701     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
702         self.set_multicast_loop(false)
703     }
704
705     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
706         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
707                    ttl as libc::c_int)
708     }
709     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
710         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
711     }
712
713     fn hear_broadcasts(&mut self) -> IoResult<()> {
714         self.set_broadcast(true)
715     }
716     fn ignore_broadcasts(&mut self) -> IoResult<()> {
717         self.set_broadcast(false)
718     }
719
720     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
721         box UdpSocket {
722             inner: self.inner.clone(),
723             read_deadline: 0,
724             write_deadline: 0,
725         } as Box<rtio::RtioUdpSocket:Send>
726     }
727
728     fn set_timeout(&mut self, timeout: Option<u64>) {
729         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
730         self.read_deadline = deadline;
731         self.write_deadline = deadline;
732     }
733     fn set_read_timeout(&mut self, timeout: Option<u64>) {
734         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
735     }
736     fn set_write_timeout(&mut self, timeout: Option<u64>) {
737         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
738     }
739 }
740
741 ////////////////////////////////////////////////////////////////////////////////
742 // Timeout helpers
743 //
744 // The read/write functions below are the helpers for reading/writing a socket
745 // with a possible deadline specified. This is generally viewed as a timed out
746 // I/O operation.
747 //
748 // From the application's perspective, timeouts apply to the I/O object, not to
749 // the underlying file descriptor (it's one timeout per object). This means that
750 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
751 //
752 // The next idea to implement timeouts would be to use nonblocking I/O. An
753 // invocation of select() would wait (with a timeout) for a socket to be ready.
754 // Once its ready, we can perform the operation. Note that the operation *must*
755 // be nonblocking, even though select() says the socket is ready. This is
756 // because some other thread could have come and stolen our data (handles can be
757 // cloned).
758 //
759 // To implement nonblocking I/O, the first option we have is to use the
760 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
761 // I/O objects, so this was initially viewed as unwise.
762 //
763 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
764 // send/recv, but the niftiness wears off once you realize it only works well on
765 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
766 // operation on linux (no flag fidding, no affecting other objects), but not on
767 // other platforms.
768 //
769 // To work around this constraint on other platforms, we end up using the
770 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
771 // could cause other objects' blocking operations to suddenly become
772 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
773 // falls back to using the same code path as nonblocking operations, but with an
774 // infinite timeout (select + send/recv). This helps emulate blocking
775 // reads/writes despite the underlying descriptor being nonblocking, as well as
776 // optimizing the fast path of just hitting one syscall in the good case.
777 //
778 // As a final caveat, this implementation uses a mutex so only one thread is
779 // doing a nonblocking operation at at time. This is the operation that comes
780 // after the select() (at which point we think the socket is ready). This is
781 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
782 // expect (wouldn't want someone turning it on when it should be off!). All
783 // operations performed in the lock are *nonblocking* to avoid holding the mutex
784 // forever.
785 //
786 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
787 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
788 // reads/writes are still blocking.
789 //
790 // Fun, fun!
791 //
792 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
793 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
794
795 pub fn read<T>(fd: sock_t,
796                deadline: u64,
797                lock: || -> T,
798                read: |bool| -> libc::c_int) -> IoResult<uint> {
799     let mut ret = -1;
800     if deadline == 0 {
801         ret = retry(|| read(false));
802     }
803
804     if deadline != 0 || (ret == -1 && util::wouldblock()) {
805         let deadline = match deadline {
806             0 => None,
807             n => Some(n),
808         };
809         loop {
810             // With a timeout, first we wait for the socket to become
811             // readable using select(), specifying the relevant timeout for
812             // our previously set deadline.
813             try!(util::await(fd, deadline, util::Readable));
814
815             // At this point, we're still within the timeout, and we've
816             // determined that the socket is readable (as returned by
817             // select). We must still read the socket in *nonblocking* mode
818             // because some other thread could come steal our data. If we
819             // fail to read some data, we retry (hence the outer loop) and
820             // wait for the socket to become readable again.
821             let _guard = lock();
822             match retry(|| read(deadline.is_some())) {
823                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
824                 -1 => return Err(last_error()),
825                n => { ret = n; break }
826             }
827         }
828     }
829
830     match ret {
831         0 => Err(util::eof()),
832         n if n < 0 => Err(last_error()),
833         n => Ok(n as uint)
834     }
835 }
836
837 pub fn write<T>(fd: sock_t,
838                 deadline: u64,
839                 buf: &[u8],
840                 write_everything: bool,
841                 lock: || -> T,
842                 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
843     let mut ret = -1;
844     let mut written = 0;
845     if deadline == 0 {
846         if write_everything {
847             ret = keep_going(buf, |inner, len| {
848                 written = buf.len() - len;
849                 write(false, inner, len)
850             });
851         } else {
852             ret = retry(|| {
853                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
854             }) as i64;
855             if ret > 0 { written = ret as uint; }
856         }
857     }
858
859     if deadline != 0 || (ret == -1 && util::wouldblock()) {
860         let deadline = match deadline {
861             0 => None,
862             n => Some(n),
863         };
864         while written < buf.len() && (write_everything || written == 0) {
865             // As with read(), first wait for the socket to be ready for
866             // the I/O operation.
867             match util::await(fd, deadline, util::Writable) {
868                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
869                     assert!(deadline.is_some());
870                     return Err(util::short_write(written, "short write"))
871                 }
872                 Err(e) => return Err(e),
873                 Ok(()) => {}
874             }
875
876             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
877             // against unforseen circumstances.
878             let _guard = lock();
879             let ptr = buf.slice_from(written).as_ptr();
880             let len = buf.len() - written;
881             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
882                 -1 if util::wouldblock() => {}
883                 -1 => return Err(last_error()),
884                 n => { written += n as uint; }
885             }
886         }
887         ret = 0;
888     }
889     if ret < 0 {
890         Err(last_error())
891     } else {
892         Ok(written)
893     }
894 }