]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
auto merge of #17654 : gereeter/rust/no-unnecessary-cell, r=alexcrichton
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::ptr;
15 use std::rt::mutex;
16 use std::rt::rtio::{mod, IoResult, IoError};
17 use std::sync::atomic;
18
19 use super::{retry, keep_going};
20 use super::c;
21 use super::util;
22
23 #[cfg(unix)] use super::process;
24 #[cfg(unix)] use super::file::FileDesc;
25
26 pub use self::os::{init, sock_t, last_error};
27
28 ////////////////////////////////////////////////////////////////////////////////
29 // sockaddr and misc bindings
30 ////////////////////////////////////////////////////////////////////////////////
31
32 pub fn htons(u: u16) -> u16 {
33     u.to_be()
34 }
35 pub fn ntohs(u: u16) -> u16 {
36     Int::from_be(u)
37 }
38
39 enum InAddr {
40     In4Addr(libc::in_addr),
41     In6Addr(libc::in6_addr),
42 }
43
44 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
45     match ip {
46         rtio::Ipv4Addr(a, b, c, d) => {
47             let ip = (a as u32 << 24) |
48                      (b as u32 << 16) |
49                      (c as u32 <<  8) |
50                      (d as u32 <<  0);
51             In4Addr(libc::in_addr {
52                 s_addr: Int::from_be(ip)
53             })
54         }
55         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
56             In6Addr(libc::in6_addr {
57                 s6_addr: [
58                     htons(a),
59                     htons(b),
60                     htons(c),
61                     htons(d),
62                     htons(e),
63                     htons(f),
64                     htons(g),
65                     htons(h),
66                 ]
67             })
68         }
69     }
70 }
71
72 fn addr_to_sockaddr(addr: rtio::SocketAddr,
73                     storage: &mut libc::sockaddr_storage)
74                     -> libc::socklen_t {
75     unsafe {
76         let len = match ip_to_inaddr(addr.ip) {
77             In4Addr(inaddr) => {
78                 let storage = storage as *mut _ as *mut libc::sockaddr_in;
79                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
80                 (*storage).sin_port = htons(addr.port);
81                 (*storage).sin_addr = inaddr;
82                 mem::size_of::<libc::sockaddr_in>()
83             }
84             In6Addr(inaddr) => {
85                 let storage = storage as *mut _ as *mut libc::sockaddr_in6;
86                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
87                 (*storage).sin6_port = htons(addr.port);
88                 (*storage).sin6_addr = inaddr;
89                 mem::size_of::<libc::sockaddr_in6>()
90             }
91         };
92         return len as libc::socklen_t;
93     }
94 }
95
96 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
97     unsafe {
98         let fam = match addr.ip {
99             rtio::Ipv4Addr(..) => libc::AF_INET,
100             rtio::Ipv6Addr(..) => libc::AF_INET6,
101         };
102         match libc::socket(fam, ty, 0) {
103             -1 => Err(os::last_error()),
104             fd => Ok(fd),
105         }
106     }
107 }
108
109 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
110                  payload: T) -> IoResult<()> {
111     unsafe {
112         let payload = &payload as *const T as *const libc::c_void;
113         let ret = libc::setsockopt(fd, opt, val,
114                                    payload,
115                                    mem::size_of::<T>() as libc::socklen_t);
116         if ret != 0 {
117             Err(os::last_error())
118         } else {
119             Ok(())
120         }
121     }
122 }
123
124 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
125                            val: libc::c_int) -> IoResult<T> {
126     unsafe {
127         let mut slot: T = mem::zeroed();
128         let mut len = mem::size_of::<T>() as libc::socklen_t;
129         let ret = c::getsockopt(fd, opt, val,
130                                 &mut slot as *mut _ as *mut _,
131                                 &mut len);
132         if ret != 0 {
133             Err(os::last_error())
134         } else {
135             assert!(len as uint == mem::size_of::<T>());
136             Ok(slot)
137         }
138     }
139 }
140
141 fn sockname(fd: sock_t,
142             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
143                                          *mut libc::socklen_t) -> libc::c_int)
144     -> IoResult<rtio::SocketAddr>
145 {
146     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
147     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
148     unsafe {
149         let storage = &mut storage as *mut libc::sockaddr_storage;
150         let ret = f(fd,
151                     storage as *mut libc::sockaddr,
152                     &mut len as *mut libc::socklen_t);
153         if ret != 0 {
154             return Err(os::last_error())
155         }
156     }
157     return sockaddr_to_addr(&storage, len as uint);
158 }
159
160 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
161                         len: uint) -> IoResult<rtio::SocketAddr> {
162     match storage.ss_family as libc::c_int {
163         libc::AF_INET => {
164             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
165             let storage: &libc::sockaddr_in = unsafe {
166                 mem::transmute(storage)
167             };
168             let ip = (storage.sin_addr.s_addr as u32).to_be();
169             let a = (ip >> 24) as u8;
170             let b = (ip >> 16) as u8;
171             let c = (ip >>  8) as u8;
172             let d = (ip >>  0) as u8;
173             Ok(rtio::SocketAddr {
174                 ip: rtio::Ipv4Addr(a, b, c, d),
175                 port: ntohs(storage.sin_port),
176             })
177         }
178         libc::AF_INET6 => {
179             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
180             let storage: &libc::sockaddr_in6 = unsafe {
181                 mem::transmute(storage)
182             };
183             let a = ntohs(storage.sin6_addr.s6_addr[0]);
184             let b = ntohs(storage.sin6_addr.s6_addr[1]);
185             let c = ntohs(storage.sin6_addr.s6_addr[2]);
186             let d = ntohs(storage.sin6_addr.s6_addr[3]);
187             let e = ntohs(storage.sin6_addr.s6_addr[4]);
188             let f = ntohs(storage.sin6_addr.s6_addr[5]);
189             let g = ntohs(storage.sin6_addr.s6_addr[6]);
190             let h = ntohs(storage.sin6_addr.s6_addr[7]);
191             Ok(rtio::SocketAddr {
192                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
193                 port: ntohs(storage.sin6_port),
194             })
195         }
196         _ => {
197             #[cfg(unix)] use libc::EINVAL as ERROR;
198             #[cfg(windows)] use libc::WSAEINVAL as ERROR;
199             Err(IoError {
200                 code: ERROR as uint,
201                 extra: 0,
202                 detail: None,
203             })
204         }
205     }
206 }
207
208 ////////////////////////////////////////////////////////////////////////////////
209 // TCP streams
210 ////////////////////////////////////////////////////////////////////////////////
211
212 pub struct TcpStream {
213     inner: Arc<Inner>,
214     read_deadline: u64,
215     write_deadline: u64,
216 }
217
218 struct Inner {
219     fd: sock_t,
220
221     // Unused on Linux, where this lock is not necessary.
222     #[allow(dead_code)]
223     lock: mutex::NativeMutex
224 }
225
226 pub struct Guard<'a> {
227     pub fd: sock_t,
228     pub guard: mutex::LockGuard<'a>,
229 }
230
231 impl Inner {
232     fn new(fd: sock_t) -> Inner {
233         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
234     }
235 }
236
237 impl TcpStream {
238     pub fn connect(addr: rtio::SocketAddr,
239                    timeout: Option<u64>) -> IoResult<TcpStream> {
240         let fd = try!(socket(addr, libc::SOCK_STREAM));
241         let ret = TcpStream::new(Inner::new(fd));
242
243         let mut storage = unsafe { mem::zeroed() };
244         let len = addr_to_sockaddr(addr, &mut storage);
245         let addrp = &storage as *const _ as *const libc::sockaddr;
246
247         match timeout {
248             Some(timeout) => {
249                 try!(util::connect_timeout(fd, addrp, len, timeout));
250                 Ok(ret)
251             },
252             None => {
253                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
254                     -1 => Err(os::last_error()),
255                     _ => Ok(ret),
256                 }
257             }
258         }
259     }
260
261     fn new(inner: Inner) -> TcpStream {
262         TcpStream {
263             inner: Arc::new(inner),
264             read_deadline: 0,
265             write_deadline: 0,
266         }
267     }
268
269     pub fn fd(&self) -> sock_t { self.inner.fd }
270
271     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
272         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
273                    nodelay as libc::c_int)
274     }
275
276     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
277         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
278                              seconds.is_some() as libc::c_int);
279         match seconds {
280             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
281             None => ret,
282         }
283     }
284
285     #[cfg(any(target_os = "macos", target_os = "ios"))]
286     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
287         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
288                    seconds as libc::c_int)
289     }
290     #[cfg(any(target_os = "freebsd", target_os = "dragonfly"))]
291     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
292         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
293                    seconds as libc::c_int)
294     }
295     #[cfg(not(any(target_os = "macos",
296                   target_os = "ios",
297                   target_os = "freebsd",
298                   target_os = "dragonfly")))]
299     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
300         Ok(())
301     }
302
303     #[cfg(target_os = "linux")]
304     fn lock_nonblocking(&self) {}
305
306     #[cfg(not(target_os = "linux"))]
307     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
308         let ret = Guard {
309             fd: self.fd(),
310             guard: unsafe { self.inner.lock.lock() },
311         };
312         assert!(util::set_nonblocking(self.fd(), true).is_ok());
313         ret
314     }
315 }
316
317 #[cfg(windows)] type wrlen = libc::c_int;
318 #[cfg(not(windows))] type wrlen = libc::size_t;
319
320 impl rtio::RtioTcpStream for TcpStream {
321     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
322         let fd = self.fd();
323         let dolock = || self.lock_nonblocking();
324         let doread = |nb| unsafe {
325             let flags = if nb {c::MSG_DONTWAIT} else {0};
326             libc::recv(fd,
327                        buf.as_mut_ptr() as *mut libc::c_void,
328                        buf.len() as wrlen,
329                        flags) as libc::c_int
330         };
331         read(fd, self.read_deadline, dolock, doread)
332     }
333
334     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
335         let fd = self.fd();
336         let dolock = || self.lock_nonblocking();
337         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
338             let flags = if nb {c::MSG_DONTWAIT} else {0};
339             libc::send(fd,
340                        buf as *const _,
341                        len as wrlen,
342                        flags) as i64
343         };
344         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
345             Ok(_) => Ok(()),
346             Err(e) => Err(e)
347         }
348     }
349     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
350         sockname(self.fd(), libc::getpeername)
351     }
352     fn control_congestion(&mut self) -> IoResult<()> {
353         self.set_nodelay(false)
354     }
355     fn nodelay(&mut self) -> IoResult<()> {
356         self.set_nodelay(true)
357     }
358     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
359         self.set_keepalive(Some(delay_in_seconds))
360     }
361     fn letdie(&mut self) -> IoResult<()> {
362         self.set_keepalive(None)
363     }
364
365     fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
366         box TcpStream {
367             inner: self.inner.clone(),
368             read_deadline: 0,
369             write_deadline: 0,
370         } as Box<rtio::RtioTcpStream + Send>
371     }
372
373     fn close_write(&mut self) -> IoResult<()> {
374         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
375     }
376     fn close_read(&mut self) -> IoResult<()> {
377         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
378     }
379
380     fn set_timeout(&mut self, timeout: Option<u64>) {
381         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
382         self.read_deadline = deadline;
383         self.write_deadline = deadline;
384     }
385     fn set_read_timeout(&mut self, timeout: Option<u64>) {
386         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
387     }
388     fn set_write_timeout(&mut self, timeout: Option<u64>) {
389         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
390     }
391 }
392
393 impl rtio::RtioSocket for TcpStream {
394     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
395         sockname(self.fd(), libc::getsockname)
396     }
397 }
398
399 impl Drop for Inner {
400     fn drop(&mut self) { unsafe { os::close(self.fd); } }
401 }
402
403 #[unsafe_destructor]
404 impl<'a> Drop for Guard<'a> {
405     fn drop(&mut self) {
406         assert!(util::set_nonblocking(self.fd, false).is_ok());
407     }
408 }
409
410 ////////////////////////////////////////////////////////////////////////////////
411 // TCP listeners
412 ////////////////////////////////////////////////////////////////////////////////
413
414 pub struct TcpListener {
415     inner: Inner,
416 }
417
418 impl TcpListener {
419     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
420         let fd = try!(socket(addr, libc::SOCK_STREAM));
421         let ret = TcpListener { inner: Inner::new(fd) };
422
423         let mut storage = unsafe { mem::zeroed() };
424         let len = addr_to_sockaddr(addr, &mut storage);
425         let addrp = &storage as *const _ as *const libc::sockaddr;
426
427         // On platforms with Berkeley-derived sockets, this allows
428         // to quickly rebind a socket, without needing to wait for
429         // the OS to clean up the previous one.
430         if cfg!(unix) {
431             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
432                             1 as libc::c_int));
433         }
434
435         match unsafe { libc::bind(fd, addrp, len) } {
436             -1 => Err(os::last_error()),
437             _ => Ok(ret),
438         }
439     }
440
441     pub fn fd(&self) -> sock_t { self.inner.fd }
442
443     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
444         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
445             -1 => Err(os::last_error()),
446
447             #[cfg(unix)]
448             _ => {
449                 let (reader, writer) = try!(process::pipe());
450                 try!(util::set_nonblocking(reader.fd(), true));
451                 try!(util::set_nonblocking(writer.fd(), true));
452                 try!(util::set_nonblocking(self.fd(), true));
453                 Ok(TcpAcceptor {
454                     inner: Arc::new(AcceptorInner {
455                         listener: self,
456                         reader: reader,
457                         writer: writer,
458                         closed: atomic::AtomicBool::new(false),
459                     }),
460                     deadline: 0,
461                 })
462             }
463
464             #[cfg(windows)]
465             _ => {
466                 let accept = try!(os::Event::new());
467                 let ret = unsafe {
468                     c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
469                 };
470                 if ret != 0 {
471                     return Err(os::last_error())
472                 }
473                 Ok(TcpAcceptor {
474                     inner: Arc::new(AcceptorInner {
475                         listener: self,
476                         abort: try!(os::Event::new()),
477                         accept: accept,
478                         closed: atomic::AtomicBool::new(false),
479                     }),
480                     deadline: 0,
481                 })
482             }
483         }
484     }
485 }
486
487 impl rtio::RtioTcpListener for TcpListener {
488     fn listen(self: Box<TcpListener>)
489               -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
490         self.native_listen(128).map(|a| {
491             box a as Box<rtio::RtioTcpAcceptor + Send>
492         })
493     }
494 }
495
496 impl rtio::RtioSocket for TcpListener {
497     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
498         sockname(self.fd(), libc::getsockname)
499     }
500 }
501
502 pub struct TcpAcceptor {
503     inner: Arc<AcceptorInner>,
504     deadline: u64,
505 }
506
507 #[cfg(unix)]
508 struct AcceptorInner {
509     listener: TcpListener,
510     reader: FileDesc,
511     writer: FileDesc,
512     closed: atomic::AtomicBool,
513 }
514
515 #[cfg(windows)]
516 struct AcceptorInner {
517     listener: TcpListener,
518     abort: os::Event,
519     accept: os::Event,
520     closed: atomic::AtomicBool,
521 }
522
523 impl TcpAcceptor {
524     pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
525
526     #[cfg(unix)]
527     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
528         // In implementing accept, the two main concerns are dealing with
529         // close_accept() and timeouts. The unix implementation is based on a
530         // nonblocking accept plus a call to select(). Windows ends up having
531         // an entirely separate implementation than unix, which is explained
532         // below.
533         //
534         // To implement timeouts, all blocking is done via select() instead of
535         // accept() by putting the socket in non-blocking mode. Because
536         // select() takes a timeout argument, we just pass through the timeout
537         // to select().
538         //
539         // To implement close_accept(), we have a self-pipe to ourselves which
540         // is passed to select() along with the socket being accepted on. The
541         // self-pipe is never written to unless close_accept() is called.
542         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
543
544         while !self.inner.closed.load(atomic::SeqCst) {
545             match retry(|| unsafe {
546                 libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
547             }) {
548                 -1 if util::wouldblock() => {}
549                 -1 => return Err(os::last_error()),
550                 fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
551             }
552             try!(util::await([self.fd(), self.inner.reader.fd()],
553                              deadline, util::Readable));
554         }
555
556         Err(util::eof())
557     }
558
559     #[cfg(windows)]
560     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
561         // Unlink unix, windows cannot invoke `select` on arbitrary file
562         // descriptors like pipes, only sockets. Consequently, windows cannot
563         // use the same implementation as unix for accept() when close_accept()
564         // is considered.
565         //
566         // In order to implement close_accept() and timeouts, windows uses
567         // event handles. An acceptor-specific abort event is created which
568         // will only get set in close_accept(), and it will never be un-set.
569         // Additionally, another acceptor-specific event is associated with the
570         // FD_ACCEPT network event.
571         //
572         // These two events are then passed to WaitForMultipleEvents to see
573         // which one triggers first, and the timeout passed to this function is
574         // the local timeout for the acceptor.
575         //
576         // If the wait times out, then the accept timed out. If the wait
577         // succeeds with the abort event, then we were closed, and if the wait
578         // succeeds otherwise, then we do a nonblocking poll via `accept` to
579         // see if we can accept a connection. The connection is candidate to be
580         // stolen, so we do all of this in a loop as well.
581         let events = [self.inner.abort.handle(), self.inner.accept.handle()];
582
583         while !self.inner.closed.load(atomic::SeqCst) {
584             let ms = if self.deadline == 0 {
585                 c::WSA_INFINITE as u64
586             } else {
587                 let now = ::io::timer::now();
588                 if self.deadline < now {0} else {self.deadline - now}
589             };
590             let ret = unsafe {
591                 c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
592                                             ms as libc::DWORD, libc::FALSE)
593             };
594             match ret {
595                 c::WSA_WAIT_TIMEOUT => {
596                     return Err(util::timeout("accept timed out"))
597                 }
598                 c::WSA_WAIT_FAILED => return Err(os::last_error()),
599                 c::WSA_WAIT_EVENT_0 => break,
600                 n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
601             }
602
603             let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
604             let ret = unsafe {
605                 c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
606             };
607             if ret != 0 { return Err(os::last_error()) }
608
609             if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
610             match unsafe {
611                 libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
612             } {
613                 -1 if util::wouldblock() => {}
614                 -1 => return Err(os::last_error()),
615
616                 // Accepted sockets inherit the same properties as the caller,
617                 // so we need to deregister our event and switch the socket back
618                 // to blocking mode
619                 fd => {
620                     let stream = TcpStream::new(Inner::new(fd));
621                     let ret = unsafe {
622                         c::WSAEventSelect(fd, events[1], 0)
623                     };
624                     if ret != 0 { return Err(os::last_error()) }
625                     try!(util::set_nonblocking(fd, false));
626                     return Ok(stream)
627                 }
628             }
629         }
630
631         Err(util::eof())
632     }
633 }
634
635 impl rtio::RtioSocket for TcpAcceptor {
636     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
637         sockname(self.fd(), libc::getsockname)
638     }
639 }
640
641 impl rtio::RtioTcpAcceptor for TcpAcceptor {
642     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
643         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
644     }
645
646     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
647     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
648     fn set_timeout(&mut self, timeout: Option<u64>) {
649         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
650     }
651
652     fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
653         box TcpAcceptor {
654             inner: self.inner.clone(),
655             deadline: 0,
656         } as Box<rtio::RtioTcpAcceptor + Send>
657     }
658
659     #[cfg(unix)]
660     fn close_accept(&mut self) -> IoResult<()> {
661         self.inner.closed.store(true, atomic::SeqCst);
662         let mut fd = FileDesc::new(self.inner.writer.fd(), false);
663         match fd.inner_write([0]) {
664             Ok(..) => Ok(()),
665             Err(..) if util::wouldblock() => Ok(()),
666             Err(e) => Err(e),
667         }
668     }
669
670     #[cfg(windows)]
671     fn close_accept(&mut self) -> IoResult<()> {
672         self.inner.closed.store(true, atomic::SeqCst);
673         let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
674         if ret == libc::TRUE {
675             Ok(())
676         } else {
677             Err(os::last_error())
678         }
679     }
680 }
681
682 ////////////////////////////////////////////////////////////////////////////////
683 // UDP
684 ////////////////////////////////////////////////////////////////////////////////
685
686 pub struct UdpSocket {
687     inner: Arc<Inner>,
688     read_deadline: u64,
689     write_deadline: u64,
690 }
691
692 impl UdpSocket {
693     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
694         let fd = try!(socket(addr, libc::SOCK_DGRAM));
695         let ret = UdpSocket {
696             inner: Arc::new(Inner::new(fd)),
697             read_deadline: 0,
698             write_deadline: 0,
699         };
700
701         let mut storage = unsafe { mem::zeroed() };
702         let len = addr_to_sockaddr(addr, &mut storage);
703         let addrp = &storage as *const _ as *const libc::sockaddr;
704
705         match unsafe { libc::bind(fd, addrp, len) } {
706             -1 => Err(os::last_error()),
707             _ => Ok(ret),
708         }
709     }
710
711     pub fn fd(&self) -> sock_t { self.inner.fd }
712
713     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
714         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
715                    on as libc::c_int)
716     }
717
718     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
719         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
720                    on as libc::c_int)
721     }
722
723     pub fn set_membership(&mut self, addr: rtio::IpAddr,
724                           opt: libc::c_int) -> IoResult<()> {
725         match ip_to_inaddr(addr) {
726             In4Addr(addr) => {
727                 let mreq = libc::ip_mreq {
728                     imr_multiaddr: addr,
729                     // interface == INADDR_ANY
730                     imr_interface: libc::in_addr { s_addr: 0x0 },
731                 };
732                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
733             }
734             In6Addr(addr) => {
735                 let mreq = libc::ip6_mreq {
736                     ipv6mr_multiaddr: addr,
737                     ipv6mr_interface: 0,
738                 };
739                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
740             }
741         }
742     }
743
744     #[cfg(target_os = "linux")]
745     fn lock_nonblocking(&self) {}
746
747     #[cfg(not(target_os = "linux"))]
748     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
749         let ret = Guard {
750             fd: self.fd(),
751             guard: unsafe { self.inner.lock.lock() },
752         };
753         assert!(util::set_nonblocking(self.fd(), true).is_ok());
754         ret
755     }
756 }
757
758 impl rtio::RtioSocket for UdpSocket {
759     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
760         sockname(self.fd(), libc::getsockname)
761     }
762 }
763
764 #[cfg(windows)] type msglen_t = libc::c_int;
765 #[cfg(unix)]    type msglen_t = libc::size_t;
766
767 impl rtio::RtioUdpSocket for UdpSocket {
768     fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
769         let fd = self.fd();
770         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
771         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
772         let mut addrlen: libc::socklen_t =
773                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
774
775         let dolock = || self.lock_nonblocking();
776         let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
777             let flags = if nb {c::MSG_DONTWAIT} else {0};
778             libc::recvfrom(fd,
779                            buf.as_mut_ptr() as *mut libc::c_void,
780                            buf.len() as msglen_t,
781                            flags,
782                            storagep,
783                            &mut addrlen) as libc::c_int
784         }));
785         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
786             Ok((n as uint, addr))
787         })
788     }
789
790     fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
791         let mut storage = unsafe { mem::zeroed() };
792         let dstlen = addr_to_sockaddr(dst, &mut storage);
793         let dstp = &storage as *const _ as *const libc::sockaddr;
794
795         let fd = self.fd();
796         let dolock = || self.lock_nonblocking();
797         let dowrite = |nb, buf: *const u8, len: uint| unsafe {
798             let flags = if nb {c::MSG_DONTWAIT} else {0};
799             libc::sendto(fd,
800                          buf as *const libc::c_void,
801                          len as msglen_t,
802                          flags,
803                          dstp,
804                          dstlen) as i64
805         };
806
807         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
808         if n != buf.len() {
809             Err(util::short_write(n, "couldn't send entire packet at once"))
810         } else {
811             Ok(())
812         }
813     }
814
815     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
816         match multi {
817             rtio::Ipv4Addr(..) => {
818                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
819             }
820             rtio::Ipv6Addr(..) => {
821                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
822             }
823         }
824     }
825     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
826         match multi {
827             rtio::Ipv4Addr(..) => {
828                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
829             }
830             rtio::Ipv6Addr(..) => {
831                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
832             }
833         }
834     }
835
836     fn loop_multicast_locally(&mut self) -> IoResult<()> {
837         self.set_multicast_loop(true)
838     }
839     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
840         self.set_multicast_loop(false)
841     }
842
843     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
844         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
845                    ttl as libc::c_int)
846     }
847     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
848         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
849     }
850
851     fn hear_broadcasts(&mut self) -> IoResult<()> {
852         self.set_broadcast(true)
853     }
854     fn ignore_broadcasts(&mut self) -> IoResult<()> {
855         self.set_broadcast(false)
856     }
857
858     fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
859         box UdpSocket {
860             inner: self.inner.clone(),
861             read_deadline: 0,
862             write_deadline: 0,
863         } as Box<rtio::RtioUdpSocket + Send>
864     }
865
866     fn set_timeout(&mut self, timeout: Option<u64>) {
867         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
868         self.read_deadline = deadline;
869         self.write_deadline = deadline;
870     }
871     fn set_read_timeout(&mut self, timeout: Option<u64>) {
872         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
873     }
874     fn set_write_timeout(&mut self, timeout: Option<u64>) {
875         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
876     }
877 }
878
879 ////////////////////////////////////////////////////////////////////////////////
880 // Timeout helpers
881 //
882 // The read/write functions below are the helpers for reading/writing a socket
883 // with a possible deadline specified. This is generally viewed as a timed out
884 // I/O operation.
885 //
886 // From the application's perspective, timeouts apply to the I/O object, not to
887 // the underlying file descriptor (it's one timeout per object). This means that
888 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
889 //
890 // The next idea to implement timeouts would be to use nonblocking I/O. An
891 // invocation of select() would wait (with a timeout) for a socket to be ready.
892 // Once its ready, we can perform the operation. Note that the operation *must*
893 // be nonblocking, even though select() says the socket is ready. This is
894 // because some other thread could have come and stolen our data (handles can be
895 // cloned).
896 //
897 // To implement nonblocking I/O, the first option we have is to use the
898 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
899 // I/O objects, so this was initially viewed as unwise.
900 //
901 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
902 // send/recv, but the niftiness wears off once you realize it only works well on
903 // Linux [1] [2]. This means that it's pretty easy to get a nonblocking
904 // operation on Linux (no flag fiddling, no affecting other objects), but not on
905 // other platforms.
906 //
907 // To work around this constraint on other platforms, we end up using the
908 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
909 // could cause other objects' blocking operations to suddenly become
910 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
911 // falls back to using the same code path as nonblocking operations, but with an
912 // infinite timeout (select + send/recv). This helps emulate blocking
913 // reads/writes despite the underlying descriptor being nonblocking, as well as
914 // optimizing the fast path of just hitting one syscall in the good case.
915 //
916 // As a final caveat, this implementation uses a mutex so only one thread is
917 // doing a nonblocking operation at at time. This is the operation that comes
918 // after the select() (at which point we think the socket is ready). This is
919 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
920 // expect (wouldn't want someone turning it on when it should be off!). All
921 // operations performed in the lock are *nonblocking* to avoid holding the mutex
922 // forever.
923 //
924 // So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
925 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
926 // reads/writes are still blocking.
927 //
928 // Fun, fun!
929 //
930 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
931 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
932
933 pub fn read<T>(fd: sock_t,
934                deadline: u64,
935                lock: || -> T,
936                read: |bool| -> libc::c_int) -> IoResult<uint> {
937     let mut ret = -1;
938     if deadline == 0 {
939         ret = retry(|| read(false));
940     }
941
942     if deadline != 0 || (ret == -1 && util::wouldblock()) {
943         let deadline = match deadline {
944             0 => None,
945             n => Some(n),
946         };
947         loop {
948             // With a timeout, first we wait for the socket to become
949             // readable using select(), specifying the relevant timeout for
950             // our previously set deadline.
951             try!(util::await([fd], deadline, util::Readable));
952
953             // At this point, we're still within the timeout, and we've
954             // determined that the socket is readable (as returned by
955             // select). We must still read the socket in *nonblocking* mode
956             // because some other thread could come steal our data. If we
957             // fail to read some data, we retry (hence the outer loop) and
958             // wait for the socket to become readable again.
959             let _guard = lock();
960             match retry(|| read(deadline.is_some())) {
961                 -1 if util::wouldblock() => {}
962                 -1 => return Err(os::last_error()),
963                n => { ret = n; break }
964             }
965         }
966     }
967
968     match ret {
969         0 => Err(util::eof()),
970         n if n < 0 => Err(os::last_error()),
971         n => Ok(n as uint)
972     }
973 }
974
975 pub fn write<T>(fd: sock_t,
976                 deadline: u64,
977                 buf: &[u8],
978                 write_everything: bool,
979                 lock: || -> T,
980                 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
981     let mut ret = -1;
982     let mut written = 0;
983     if deadline == 0 {
984         if write_everything {
985             ret = keep_going(buf, |inner, len| {
986                 written = buf.len() - len;
987                 write(false, inner, len)
988             });
989         } else {
990             ret = retry(|| { write(false, buf.as_ptr(), buf.len()) });
991             if ret > 0 { written = ret as uint; }
992         }
993     }
994
995     if deadline != 0 || (ret == -1 && util::wouldblock()) {
996         let deadline = match deadline {
997             0 => None,
998             n => Some(n),
999         };
1000         while written < buf.len() && (write_everything || written == 0) {
1001             // As with read(), first wait for the socket to be ready for
1002             // the I/O operation.
1003             match util::await([fd], deadline, util::Writable) {
1004                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
1005                     assert!(deadline.is_some());
1006                     return Err(util::short_write(written, "short write"))
1007                 }
1008                 Err(e) => return Err(e),
1009                 Ok(()) => {}
1010             }
1011
1012             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
1013             // against unforeseen circumstances.
1014             let _guard = lock();
1015             let ptr = buf.slice_from(written).as_ptr();
1016             let len = buf.len() - written;
1017             match retry(|| write(deadline.is_some(), ptr, len)) {
1018                 -1 if util::wouldblock() => {}
1019                 -1 => return Err(os::last_error()),
1020                 n => { written += n as uint; }
1021             }
1022         }
1023         ret = 0;
1024     }
1025     if ret < 0 {
1026         Err(os::last_error())
1027     } else {
1028         Ok(written)
1029     }
1030 }
1031
1032 #[cfg(windows)]
1033 mod os {
1034     use libc;
1035     use std::mem;
1036     use std::rt::rtio::{IoError, IoResult};
1037
1038     use io::c;
1039
1040     pub type sock_t = libc::SOCKET;
1041     pub struct Event(c::WSAEVENT);
1042
1043     impl Event {
1044         pub fn new() -> IoResult<Event> {
1045             let event = unsafe { c::WSACreateEvent() };
1046             if event == c::WSA_INVALID_EVENT {
1047                 Err(last_error())
1048             } else {
1049                 Ok(Event(event))
1050             }
1051         }
1052
1053         pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
1054     }
1055
1056     impl Drop for Event {
1057         fn drop(&mut self) {
1058             unsafe { let _ = c::WSACloseEvent(self.handle()); }
1059         }
1060     }
1061
1062     pub fn init() {
1063         unsafe {
1064             use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1065             static mut INITIALIZED: bool = false;
1066             static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1067
1068             let _guard = LOCK.lock();
1069             if !INITIALIZED {
1070                 let mut data: c::WSADATA = mem::zeroed();
1071                 let ret = c::WSAStartup(0x202,      // version 2.2
1072                                         &mut data);
1073                 assert_eq!(ret, 0);
1074                 INITIALIZED = true;
1075             }
1076         }
1077     }
1078
1079     pub fn last_error() -> IoError {
1080         use std::os;
1081         let code = unsafe { c::WSAGetLastError() as uint };
1082         IoError {
1083             code: code,
1084             extra: 0,
1085             detail: Some(os::error_string(code)),
1086         }
1087     }
1088
1089     pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
1090 }
1091
1092 #[cfg(unix)]
1093 mod os {
1094     use libc;
1095     use std::rt::rtio::IoError;
1096     use io;
1097
1098     pub type sock_t = io::file::fd_t;
1099
1100     pub fn init() {}
1101     pub fn last_error() -> IoError { io::last_error() }
1102     pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
1103 }