]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/net.rs
368b5914444ac9509f12fca2e28f23258b674cca
[rust.git] / src / libnative / io / net.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::mem;
14 use std::ptr;
15 use std::rt::mutex;
16 use std::rt::rtio;
17 use std::rt::rtio::{IoResult, IoError};
18 use std::sync::atomic;
19
20 use super::{retry, keep_going};
21 use super::c;
22 use super::util;
23
24 #[cfg(unix)] use super::process;
25 #[cfg(unix)] use super::file::FileDesc;
26
27 pub use self::os::{init, sock_t, last_error};
28
29 ////////////////////////////////////////////////////////////////////////////////
30 // sockaddr and misc bindings
31 ////////////////////////////////////////////////////////////////////////////////
32
33 pub fn htons(u: u16) -> u16 {
34     u.to_be()
35 }
36 pub fn ntohs(u: u16) -> u16 {
37     Int::from_be(u)
38 }
39
40 enum InAddr {
41     InAddr(libc::in_addr),
42     In6Addr(libc::in6_addr),
43 }
44
45 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
46     match ip {
47         rtio::Ipv4Addr(a, b, c, d) => {
48             let ip = (a as u32 << 24) |
49                      (b as u32 << 16) |
50                      (c as u32 <<  8) |
51                      (d as u32 <<  0);
52             InAddr(libc::in_addr {
53                 s_addr: Int::from_be(ip)
54             })
55         }
56         rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
57             In6Addr(libc::in6_addr {
58                 s6_addr: [
59                     htons(a),
60                     htons(b),
61                     htons(c),
62                     htons(d),
63                     htons(e),
64                     htons(f),
65                     htons(g),
66                     htons(h),
67                 ]
68             })
69         }
70     }
71 }
72
73 fn addr_to_sockaddr(addr: rtio::SocketAddr,
74                     storage: &mut libc::sockaddr_storage)
75                     -> libc::socklen_t {
76     unsafe {
77         let len = match ip_to_inaddr(addr.ip) {
78             InAddr(inaddr) => {
79                 let storage = storage as *mut _ as *mut libc::sockaddr_in;
80                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
81                 (*storage).sin_port = htons(addr.port);
82                 (*storage).sin_addr = inaddr;
83                 mem::size_of::<libc::sockaddr_in>()
84             }
85             In6Addr(inaddr) => {
86                 let storage = storage as *mut _ as *mut libc::sockaddr_in6;
87                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
88                 (*storage).sin6_port = htons(addr.port);
89                 (*storage).sin6_addr = inaddr;
90                 mem::size_of::<libc::sockaddr_in6>()
91             }
92         };
93         return len as libc::socklen_t;
94     }
95 }
96
97 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
98     unsafe {
99         let fam = match addr.ip {
100             rtio::Ipv4Addr(..) => libc::AF_INET,
101             rtio::Ipv6Addr(..) => libc::AF_INET6,
102         };
103         match libc::socket(fam, ty, 0) {
104             -1 => Err(os::last_error()),
105             fd => Ok(fd),
106         }
107     }
108 }
109
110 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
111                  payload: T) -> IoResult<()> {
112     unsafe {
113         let payload = &payload as *const T as *const libc::c_void;
114         let ret = libc::setsockopt(fd, opt, val,
115                                    payload,
116                                    mem::size_of::<T>() as libc::socklen_t);
117         if ret != 0 {
118             Err(os::last_error())
119         } else {
120             Ok(())
121         }
122     }
123 }
124
125 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
126                            val: libc::c_int) -> IoResult<T> {
127     unsafe {
128         let mut slot: T = mem::zeroed();
129         let mut len = mem::size_of::<T>() as libc::socklen_t;
130         let ret = c::getsockopt(fd, opt, val,
131                                 &mut slot as *mut _ as *mut _,
132                                 &mut len);
133         if ret != 0 {
134             Err(os::last_error())
135         } else {
136             assert!(len as uint == mem::size_of::<T>());
137             Ok(slot)
138         }
139     }
140 }
141
142 fn sockname(fd: sock_t,
143             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
144                                          *mut libc::socklen_t) -> libc::c_int)
145     -> IoResult<rtio::SocketAddr>
146 {
147     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
148     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
149     unsafe {
150         let storage = &mut storage as *mut libc::sockaddr_storage;
151         let ret = f(fd,
152                     storage as *mut libc::sockaddr,
153                     &mut len as *mut libc::socklen_t);
154         if ret != 0 {
155             return Err(os::last_error())
156         }
157     }
158     return sockaddr_to_addr(&storage, len as uint);
159 }
160
161 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
162                         len: uint) -> IoResult<rtio::SocketAddr> {
163     match storage.ss_family as libc::c_int {
164         libc::AF_INET => {
165             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
166             let storage: &libc::sockaddr_in = unsafe {
167                 mem::transmute(storage)
168             };
169             let ip = (storage.sin_addr.s_addr as u32).to_be();
170             let a = (ip >> 24) as u8;
171             let b = (ip >> 16) as u8;
172             let c = (ip >>  8) as u8;
173             let d = (ip >>  0) as u8;
174             Ok(rtio::SocketAddr {
175                 ip: rtio::Ipv4Addr(a, b, c, d),
176                 port: ntohs(storage.sin_port),
177             })
178         }
179         libc::AF_INET6 => {
180             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
181             let storage: &libc::sockaddr_in6 = unsafe {
182                 mem::transmute(storage)
183             };
184             let a = ntohs(storage.sin6_addr.s6_addr[0]);
185             let b = ntohs(storage.sin6_addr.s6_addr[1]);
186             let c = ntohs(storage.sin6_addr.s6_addr[2]);
187             let d = ntohs(storage.sin6_addr.s6_addr[3]);
188             let e = ntohs(storage.sin6_addr.s6_addr[4]);
189             let f = ntohs(storage.sin6_addr.s6_addr[5]);
190             let g = ntohs(storage.sin6_addr.s6_addr[6]);
191             let h = ntohs(storage.sin6_addr.s6_addr[7]);
192             Ok(rtio::SocketAddr {
193                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
194                 port: ntohs(storage.sin6_port),
195             })
196         }
197         _ => {
198             #[cfg(unix)] use libc::EINVAL as ERROR;
199             #[cfg(windows)] use libc::WSAEINVAL as ERROR;
200             Err(IoError {
201                 code: ERROR as uint,
202                 extra: 0,
203                 detail: None,
204             })
205         }
206     }
207 }
208
209 ////////////////////////////////////////////////////////////////////////////////
210 // TCP streams
211 ////////////////////////////////////////////////////////////////////////////////
212
213 pub struct TcpStream {
214     inner: Arc<Inner>,
215     read_deadline: u64,
216     write_deadline: u64,
217 }
218
219 struct Inner {
220     fd: sock_t,
221
222     // Unused on Linux, where this lock is not necessary.
223     #[allow(dead_code)]
224     lock: mutex::NativeMutex
225 }
226
227 pub struct Guard<'a> {
228     pub fd: sock_t,
229     pub guard: mutex::LockGuard<'a>,
230 }
231
232 impl Inner {
233     fn new(fd: sock_t) -> Inner {
234         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
235     }
236 }
237
238 impl TcpStream {
239     pub fn connect(addr: rtio::SocketAddr,
240                    timeout: Option<u64>) -> IoResult<TcpStream> {
241         let fd = try!(socket(addr, libc::SOCK_STREAM));
242         let ret = TcpStream::new(Inner::new(fd));
243
244         let mut storage = unsafe { mem::zeroed() };
245         let len = addr_to_sockaddr(addr, &mut storage);
246         let addrp = &storage as *const _ as *const libc::sockaddr;
247
248         match timeout {
249             Some(timeout) => {
250                 try!(util::connect_timeout(fd, addrp, len, timeout));
251                 Ok(ret)
252             },
253             None => {
254                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
255                     -1 => Err(os::last_error()),
256                     _ => Ok(ret),
257                 }
258             }
259         }
260     }
261
262     fn new(inner: Inner) -> TcpStream {
263         TcpStream {
264             inner: Arc::new(inner),
265             read_deadline: 0,
266             write_deadline: 0,
267         }
268     }
269
270     pub fn fd(&self) -> sock_t { self.inner.fd }
271
272     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
273         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
274                    nodelay as libc::c_int)
275     }
276
277     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
278         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
279                              seconds.is_some() as libc::c_int);
280         match seconds {
281             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
282             None => ret,
283         }
284     }
285
286     #[cfg(target_os = "macos")]
287     #[cfg(target_os = "ios")]
288     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
289         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
290                    seconds as libc::c_int)
291     }
292     #[cfg(target_os = "freebsd")]
293     #[cfg(target_os = "dragonfly")]
294     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
295         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
296                    seconds as libc::c_int)
297     }
298     #[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"),
299       not(target_os = "dragonfly"))]
300     fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
301         Ok(())
302     }
303
304     #[cfg(target_os = "linux")]
305     fn lock_nonblocking(&self) {}
306
307     #[cfg(not(target_os = "linux"))]
308     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
309         let ret = Guard {
310             fd: self.fd(),
311             guard: unsafe { self.inner.lock.lock() },
312         };
313         assert!(util::set_nonblocking(self.fd(), true).is_ok());
314         ret
315     }
316 }
317
318 #[cfg(windows)] type wrlen = libc::c_int;
319 #[cfg(not(windows))] type wrlen = libc::size_t;
320
321 impl rtio::RtioTcpStream for TcpStream {
322     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
323         let fd = self.fd();
324         let dolock = || self.lock_nonblocking();
325         let doread = |nb| unsafe {
326             let flags = if nb {c::MSG_DONTWAIT} else {0};
327             libc::recv(fd,
328                        buf.as_mut_ptr() as *mut libc::c_void,
329                        buf.len() as wrlen,
330                        flags) as libc::c_int
331         };
332         read(fd, self.read_deadline, dolock, doread)
333     }
334
335     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
336         let fd = self.fd();
337         let dolock = || self.lock_nonblocking();
338         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
339             let flags = if nb {c::MSG_DONTWAIT} else {0};
340             libc::send(fd,
341                        buf as *mut libc::c_void,
342                        len as wrlen,
343                        flags) as i64
344         };
345         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
346             Ok(_) => Ok(()),
347             Err(e) => Err(e)
348         }
349     }
350     fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
351         sockname(self.fd(), libc::getpeername)
352     }
353     fn control_congestion(&mut self) -> IoResult<()> {
354         self.set_nodelay(false)
355     }
356     fn nodelay(&mut self) -> IoResult<()> {
357         self.set_nodelay(true)
358     }
359     fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
360         self.set_keepalive(Some(delay_in_seconds))
361     }
362     fn letdie(&mut self) -> IoResult<()> {
363         self.set_keepalive(None)
364     }
365
366     fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
367         box TcpStream {
368             inner: self.inner.clone(),
369             read_deadline: 0,
370             write_deadline: 0,
371         } as Box<rtio::RtioTcpStream + Send>
372     }
373
374     fn close_write(&mut self) -> IoResult<()> {
375         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
376     }
377     fn close_read(&mut self) -> IoResult<()> {
378         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
379     }
380
381     fn set_timeout(&mut self, timeout: Option<u64>) {
382         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
383         self.read_deadline = deadline;
384         self.write_deadline = deadline;
385     }
386     fn set_read_timeout(&mut self, timeout: Option<u64>) {
387         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
388     }
389     fn set_write_timeout(&mut self, timeout: Option<u64>) {
390         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
391     }
392 }
393
394 impl rtio::RtioSocket for TcpStream {
395     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
396         sockname(self.fd(), libc::getsockname)
397     }
398 }
399
400 impl Drop for Inner {
401     fn drop(&mut self) { unsafe { os::close(self.fd); } }
402 }
403
404 #[unsafe_destructor]
405 impl<'a> Drop for Guard<'a> {
406     fn drop(&mut self) {
407         assert!(util::set_nonblocking(self.fd, false).is_ok());
408     }
409 }
410
411 ////////////////////////////////////////////////////////////////////////////////
412 // TCP listeners
413 ////////////////////////////////////////////////////////////////////////////////
414
415 pub struct TcpListener {
416     inner: Inner,
417 }
418
419 impl TcpListener {
420     pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
421         let fd = try!(socket(addr, libc::SOCK_STREAM));
422         let ret = TcpListener { inner: Inner::new(fd) };
423
424         let mut storage = unsafe { mem::zeroed() };
425         let len = addr_to_sockaddr(addr, &mut storage);
426         let addrp = &storage as *const _ as *const libc::sockaddr;
427
428         // On platforms with Berkeley-derived sockets, this allows
429         // to quickly rebind a socket, without needing to wait for
430         // the OS to clean up the previous one.
431         if cfg!(unix) {
432             try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
433                             1 as libc::c_int));
434         }
435
436         match unsafe { libc::bind(fd, addrp, len) } {
437             -1 => Err(os::last_error()),
438             _ => Ok(ret),
439         }
440     }
441
442     pub fn fd(&self) -> sock_t { self.inner.fd }
443
444     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
445         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
446             -1 => Err(os::last_error()),
447
448             #[cfg(unix)]
449             _ => {
450                 let (reader, writer) = try!(process::pipe());
451                 try!(util::set_nonblocking(reader.fd(), true));
452                 try!(util::set_nonblocking(writer.fd(), true));
453                 try!(util::set_nonblocking(self.fd(), true));
454                 Ok(TcpAcceptor {
455                     inner: Arc::new(AcceptorInner {
456                         listener: self,
457                         reader: reader,
458                         writer: writer,
459                         closed: atomic::AtomicBool::new(false),
460                     }),
461                     deadline: 0,
462                 })
463             }
464
465             #[cfg(windows)]
466             _ => {
467                 let accept = try!(os::Event::new());
468                 let ret = unsafe {
469                     c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
470                 };
471                 if ret != 0 {
472                     return Err(os::last_error())
473                 }
474                 Ok(TcpAcceptor {
475                     inner: Arc::new(AcceptorInner {
476                         listener: self,
477                         abort: try!(os::Event::new()),
478                         accept: accept,
479                         closed: atomic::AtomicBool::new(false),
480                     }),
481                     deadline: 0,
482                 })
483             }
484         }
485     }
486 }
487
488 impl rtio::RtioTcpListener for TcpListener {
489     fn listen(self: Box<TcpListener>)
490               -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
491         self.native_listen(128).map(|a| {
492             box a as Box<rtio::RtioTcpAcceptor + Send>
493         })
494     }
495 }
496
497 impl rtio::RtioSocket for TcpListener {
498     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
499         sockname(self.fd(), libc::getsockname)
500     }
501 }
502
503 pub struct TcpAcceptor {
504     inner: Arc<AcceptorInner>,
505     deadline: u64,
506 }
507
508 #[cfg(unix)]
509 struct AcceptorInner {
510     listener: TcpListener,
511     reader: FileDesc,
512     writer: FileDesc,
513     closed: atomic::AtomicBool,
514 }
515
516 #[cfg(windows)]
517 struct AcceptorInner {
518     listener: TcpListener,
519     abort: os::Event,
520     accept: os::Event,
521     closed: atomic::AtomicBool,
522 }
523
524 impl TcpAcceptor {
525     pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
526
527     #[cfg(unix)]
528     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
529         // In implementing accept, the two main concerns are dealing with
530         // close_accept() and timeouts. The unix implementation is based on a
531         // nonblocking accept plus a call to select(). Windows ends up having
532         // an entirely separate implementation than unix, which is explained
533         // below.
534         //
535         // To implement timeouts, all blocking is done via select() instead of
536         // accept() by putting the socket in non-blocking mode. Because
537         // select() takes a timeout argument, we just pass through the timeout
538         // to select().
539         //
540         // To implement close_accept(), we have a self-pipe to ourselves which
541         // is passed to select() along with the socket being accepted on. The
542         // self-pipe is never written to unless close_accept() is called.
543         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
544
545         while !self.inner.closed.load(atomic::SeqCst) {
546             match retry(|| unsafe {
547                 libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
548             }) {
549                 -1 if util::wouldblock() => {}
550                 -1 => return Err(os::last_error()),
551                 fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
552             }
553             try!(util::await([self.fd(), self.inner.reader.fd()],
554                              deadline, util::Readable));
555         }
556
557         Err(util::eof())
558     }
559
560     #[cfg(windows)]
561     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
562         // Unlink unix, windows cannot invoke `select` on arbitrary file
563         // descriptors like pipes, only sockets. Consequently, windows cannot
564         // use the same implementation as unix for accept() when close_accept()
565         // is considered.
566         //
567         // In order to implement close_accept() and timeouts, windows uses
568         // event handles. An acceptor-specific abort event is created which
569         // will only get set in close_accept(), and it will never be un-set.
570         // Additionally, another acceptor-specific event is associated with the
571         // FD_ACCEPT network event.
572         //
573         // These two events are then passed to WaitForMultipleEvents to see
574         // which one triggers first, and the timeout passed to this function is
575         // the local timeout for the acceptor.
576         //
577         // If the wait times out, then the accept timed out. If the wait
578         // succeeds with the abort event, then we were closed, and if the wait
579         // succeeds otherwise, then we do a nonblocking poll via `accept` to
580         // see if we can accept a connection. The connection is candidate to be
581         // stolen, so we do all of this in a loop as well.
582         let events = [self.inner.abort.handle(), self.inner.accept.handle()];
583
584         while !self.inner.closed.load(atomic::SeqCst) {
585             let ms = if self.deadline == 0 {
586                 c::WSA_INFINITE as u64
587             } else {
588                 let now = ::io::timer::now();
589                 if self.deadline < now {0} else {self.deadline - now}
590             };
591             let ret = unsafe {
592                 c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
593                                             ms as libc::DWORD, libc::FALSE)
594             };
595             match ret {
596                 c::WSA_WAIT_TIMEOUT => {
597                     return Err(util::timeout("accept timed out"))
598                 }
599                 c::WSA_WAIT_FAILED => return Err(os::last_error()),
600                 c::WSA_WAIT_EVENT_0 => break,
601                 n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
602             }
603
604             let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
605             let ret = unsafe {
606                 c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
607             };
608             if ret != 0 { return Err(os::last_error()) }
609
610             if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
611             match unsafe {
612                 libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
613             } {
614                 -1 if util::wouldblock() => {}
615                 -1 => return Err(os::last_error()),
616
617                 // Accepted sockets inherit the same properties as the caller,
618                 // so we need to deregister our event and switch the socket back
619                 // to blocking mode
620                 fd => {
621                     let stream = TcpStream::new(Inner::new(fd));
622                     let ret = unsafe {
623                         c::WSAEventSelect(fd, events[1], 0)
624                     };
625                     if ret != 0 { return Err(os::last_error()) }
626                     try!(util::set_nonblocking(fd, false));
627                     return Ok(stream)
628                 }
629             }
630         }
631
632         Err(util::eof())
633     }
634 }
635
636 impl rtio::RtioSocket for TcpAcceptor {
637     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
638         sockname(self.fd(), libc::getsockname)
639     }
640 }
641
642 impl rtio::RtioTcpAcceptor for TcpAcceptor {
643     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
644         self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
645     }
646
647     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
648     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
649     fn set_timeout(&mut self, timeout: Option<u64>) {
650         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
651     }
652
653     fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
654         box TcpAcceptor {
655             inner: self.inner.clone(),
656             deadline: 0,
657         } as Box<rtio::RtioTcpAcceptor + Send>
658     }
659
660     #[cfg(unix)]
661     fn close_accept(&mut self) -> IoResult<()> {
662         self.inner.closed.store(true, atomic::SeqCst);
663         let mut fd = FileDesc::new(self.inner.writer.fd(), false);
664         match fd.inner_write([0]) {
665             Ok(..) => Ok(()),
666             Err(..) if util::wouldblock() => Ok(()),
667             Err(e) => Err(e),
668         }
669     }
670
671     #[cfg(windows)]
672     fn close_accept(&mut self) -> IoResult<()> {
673         self.inner.closed.store(true, atomic::SeqCst);
674         let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
675         if ret == libc::TRUE {
676             Ok(())
677         } else {
678             Err(os::last_error())
679         }
680     }
681 }
682
683 ////////////////////////////////////////////////////////////////////////////////
684 // UDP
685 ////////////////////////////////////////////////////////////////////////////////
686
687 pub struct UdpSocket {
688     inner: Arc<Inner>,
689     read_deadline: u64,
690     write_deadline: u64,
691 }
692
693 impl UdpSocket {
694     pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
695         let fd = try!(socket(addr, libc::SOCK_DGRAM));
696         let ret = UdpSocket {
697             inner: Arc::new(Inner::new(fd)),
698             read_deadline: 0,
699             write_deadline: 0,
700         };
701
702         let mut storage = unsafe { mem::zeroed() };
703         let len = addr_to_sockaddr(addr, &mut storage);
704         let addrp = &storage as *const _ as *const libc::sockaddr;
705
706         match unsafe { libc::bind(fd, addrp, len) } {
707             -1 => Err(os::last_error()),
708             _ => Ok(ret),
709         }
710     }
711
712     pub fn fd(&self) -> sock_t { self.inner.fd }
713
714     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
715         setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
716                    on as libc::c_int)
717     }
718
719     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
720         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
721                    on as libc::c_int)
722     }
723
724     pub fn set_membership(&mut self, addr: rtio::IpAddr,
725                           opt: libc::c_int) -> IoResult<()> {
726         match ip_to_inaddr(addr) {
727             InAddr(addr) => {
728                 let mreq = libc::ip_mreq {
729                     imr_multiaddr: addr,
730                     // interface == INADDR_ANY
731                     imr_interface: libc::in_addr { s_addr: 0x0 },
732                 };
733                 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
734             }
735             In6Addr(addr) => {
736                 let mreq = libc::ip6_mreq {
737                     ipv6mr_multiaddr: addr,
738                     ipv6mr_interface: 0,
739                 };
740                 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
741             }
742         }
743     }
744
745     #[cfg(target_os = "linux")]
746     fn lock_nonblocking(&self) {}
747
748     #[cfg(not(target_os = "linux"))]
749     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
750         let ret = Guard {
751             fd: self.fd(),
752             guard: unsafe { self.inner.lock.lock() },
753         };
754         assert!(util::set_nonblocking(self.fd(), true).is_ok());
755         ret
756     }
757 }
758
759 impl rtio::RtioSocket for UdpSocket {
760     fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
761         sockname(self.fd(), libc::getsockname)
762     }
763 }
764
765 #[cfg(windows)] type msglen_t = libc::c_int;
766 #[cfg(unix)]    type msglen_t = libc::size_t;
767
768 impl rtio::RtioUdpSocket for UdpSocket {
769     fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
770         let fd = self.fd();
771         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
772         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
773         let mut addrlen: libc::socklen_t =
774                 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
775
776         let dolock = || self.lock_nonblocking();
777         let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
778             let flags = if nb {c::MSG_DONTWAIT} else {0};
779             libc::recvfrom(fd,
780                            buf.as_mut_ptr() as *mut libc::c_void,
781                            buf.len() as msglen_t,
782                            flags,
783                            storagep,
784                            &mut addrlen) as libc::c_int
785         }));
786         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
787             Ok((n as uint, addr))
788         })
789     }
790
791     fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
792         let mut storage = unsafe { mem::zeroed() };
793         let dstlen = addr_to_sockaddr(dst, &mut storage);
794         let dstp = &storage as *const _ as *const libc::sockaddr;
795
796         let fd = self.fd();
797         let dolock = || self.lock_nonblocking();
798         let dowrite = |nb, buf: *const u8, len: uint| unsafe {
799             let flags = if nb {c::MSG_DONTWAIT} else {0};
800             libc::sendto(fd,
801                          buf as *const libc::c_void,
802                          len as msglen_t,
803                          flags,
804                          dstp,
805                          dstlen) as i64
806         };
807
808         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
809         if n != buf.len() {
810             Err(util::short_write(n, "couldn't send entire packet at once"))
811         } else {
812             Ok(())
813         }
814     }
815
816     fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
817         match multi {
818             rtio::Ipv4Addr(..) => {
819                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
820             }
821             rtio::Ipv6Addr(..) => {
822                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
823             }
824         }
825     }
826     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
827         match multi {
828             rtio::Ipv4Addr(..) => {
829                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
830             }
831             rtio::Ipv6Addr(..) => {
832                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
833             }
834         }
835     }
836
837     fn loop_multicast_locally(&mut self) -> IoResult<()> {
838         self.set_multicast_loop(true)
839     }
840     fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
841         self.set_multicast_loop(false)
842     }
843
844     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
845         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
846                    ttl as libc::c_int)
847     }
848     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
849         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
850     }
851
852     fn hear_broadcasts(&mut self) -> IoResult<()> {
853         self.set_broadcast(true)
854     }
855     fn ignore_broadcasts(&mut self) -> IoResult<()> {
856         self.set_broadcast(false)
857     }
858
859     fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
860         box UdpSocket {
861             inner: self.inner.clone(),
862             read_deadline: 0,
863             write_deadline: 0,
864         } as Box<rtio::RtioUdpSocket + Send>
865     }
866
867     fn set_timeout(&mut self, timeout: Option<u64>) {
868         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
869         self.read_deadline = deadline;
870         self.write_deadline = deadline;
871     }
872     fn set_read_timeout(&mut self, timeout: Option<u64>) {
873         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
874     }
875     fn set_write_timeout(&mut self, timeout: Option<u64>) {
876         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
877     }
878 }
879
880 ////////////////////////////////////////////////////////////////////////////////
881 // Timeout helpers
882 //
883 // The read/write functions below are the helpers for reading/writing a socket
884 // with a possible deadline specified. This is generally viewed as a timed out
885 // I/O operation.
886 //
887 // From the application's perspective, timeouts apply to the I/O object, not to
888 // the underlying file descriptor (it's one timeout per object). This means that
889 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
890 //
891 // The next idea to implement timeouts would be to use nonblocking I/O. An
892 // invocation of select() would wait (with a timeout) for a socket to be ready.
893 // Once its ready, we can perform the operation. Note that the operation *must*
894 // be nonblocking, even though select() says the socket is ready. This is
895 // because some other thread could have come and stolen our data (handles can be
896 // cloned).
897 //
898 // To implement nonblocking I/O, the first option we have is to use the
899 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
900 // I/O objects, so this was initially viewed as unwise.
901 //
902 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
903 // send/recv, but the niftiness wears off once you realize it only works well on
904 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
905 // operation on linux (no flag fiddling, no affecting other objects), but not on
906 // other platforms.
907 //
908 // To work around this constraint on other platforms, we end up using the
909 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
910 // could cause other objects' blocking operations to suddenly become
911 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
912 // falls back to using the same code path as nonblocking operations, but with an
913 // infinite timeout (select + send/recv). This helps emulate blocking
914 // reads/writes despite the underlying descriptor being nonblocking, as well as
915 // optimizing the fast path of just hitting one syscall in the good case.
916 //
917 // As a final caveat, this implementation uses a mutex so only one thread is
918 // doing a nonblocking operation at at time. This is the operation that comes
919 // after the select() (at which point we think the socket is ready). This is
920 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
921 // expect (wouldn't want someone turning it on when it should be off!). All
922 // operations performed in the lock are *nonblocking* to avoid holding the mutex
923 // forever.
924 //
925 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
926 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
927 // reads/writes are still blocking.
928 //
929 // Fun, fun!
930 //
931 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
932 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
933
934 pub fn read<T>(fd: sock_t,
935                deadline: u64,
936                lock: || -> T,
937                read: |bool| -> libc::c_int) -> IoResult<uint> {
938     let mut ret = -1;
939     if deadline == 0 {
940         ret = retry(|| read(false));
941     }
942
943     if deadline != 0 || (ret == -1 && util::wouldblock()) {
944         let deadline = match deadline {
945             0 => None,
946             n => Some(n),
947         };
948         loop {
949             // With a timeout, first we wait for the socket to become
950             // readable using select(), specifying the relevant timeout for
951             // our previously set deadline.
952             try!(util::await([fd], deadline, util::Readable));
953
954             // At this point, we're still within the timeout, and we've
955             // determined that the socket is readable (as returned by
956             // select). We must still read the socket in *nonblocking* mode
957             // because some other thread could come steal our data. If we
958             // fail to read some data, we retry (hence the outer loop) and
959             // wait for the socket to become readable again.
960             let _guard = lock();
961             match retry(|| read(deadline.is_some())) {
962                 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
963                 -1 => return Err(os::last_error()),
964                n => { ret = n; break }
965             }
966         }
967     }
968
969     match ret {
970         0 => Err(util::eof()),
971         n if n < 0 => Err(os::last_error()),
972         n => Ok(n as uint)
973     }
974 }
975
976 pub fn write<T>(fd: sock_t,
977                 deadline: u64,
978                 buf: &[u8],
979                 write_everything: bool,
980                 lock: || -> T,
981                 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
982     let mut ret = -1;
983     let mut written = 0;
984     if deadline == 0 {
985         if write_everything {
986             ret = keep_going(buf, |inner, len| {
987                 written = buf.len() - len;
988                 write(false, inner, len)
989             });
990         } else {
991             ret = retry(|| {
992                 write(false, buf.as_ptr(), buf.len()) as libc::c_int
993             }) as i64;
994             if ret > 0 { written = ret as uint; }
995         }
996     }
997
998     if deadline != 0 || (ret == -1 && util::wouldblock()) {
999         let deadline = match deadline {
1000             0 => None,
1001             n => Some(n),
1002         };
1003         while written < buf.len() && (write_everything || written == 0) {
1004             // As with read(), first wait for the socket to be ready for
1005             // the I/O operation.
1006             match util::await([fd], deadline, util::Writable) {
1007                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
1008                     assert!(deadline.is_some());
1009                     return Err(util::short_write(written, "short write"))
1010                 }
1011                 Err(e) => return Err(e),
1012                 Ok(()) => {}
1013             }
1014
1015             // Also as with read(), we use MSG_DONTWAIT to guard ourselves
1016             // against unforeseen circumstances.
1017             let _guard = lock();
1018             let ptr = buf.slice_from(written).as_ptr();
1019             let len = buf.len() - written;
1020             match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
1021                 -1 if util::wouldblock() => {}
1022                 -1 => return Err(os::last_error()),
1023                 n => { written += n as uint; }
1024             }
1025         }
1026         ret = 0;
1027     }
1028     if ret < 0 {
1029         Err(os::last_error())
1030     } else {
1031         Ok(written)
1032     }
1033 }
1034
1035 #[cfg(windows)]
1036 mod os {
1037     use libc;
1038     use std::mem;
1039     use std::rt::rtio::{IoError, IoResult};
1040
1041     use io::c;
1042
1043     pub type sock_t = libc::SOCKET;
1044     pub struct Event(c::WSAEVENT);
1045
1046     impl Event {
1047         pub fn new() -> IoResult<Event> {
1048             let event = unsafe { c::WSACreateEvent() };
1049             if event == c::WSA_INVALID_EVENT {
1050                 Err(last_error())
1051             } else {
1052                 Ok(Event(event))
1053             }
1054         }
1055
1056         pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
1057     }
1058
1059     impl Drop for Event {
1060         fn drop(&mut self) {
1061             unsafe { let _ = c::WSACloseEvent(self.handle()); }
1062         }
1063     }
1064
1065     pub fn init() {
1066         unsafe {
1067             use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1068             static mut INITIALIZED: bool = false;
1069             static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1070
1071             let _guard = LOCK.lock();
1072             if !INITIALIZED {
1073                 let mut data: c::WSADATA = mem::zeroed();
1074                 let ret = c::WSAStartup(0x202,      // version 2.2
1075                                         &mut data);
1076                 assert_eq!(ret, 0);
1077                 INITIALIZED = true;
1078             }
1079         }
1080     }
1081
1082     pub fn last_error() -> IoError {
1083         use std::os;
1084         let code = unsafe { c::WSAGetLastError() as uint };
1085         IoError {
1086             code: code,
1087             extra: 0,
1088             detail: Some(os::error_string(code)),
1089         }
1090     }
1091
1092     pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
1093 }
1094
1095 #[cfg(unix)]
1096 mod os {
1097     use libc;
1098     use std::rt::rtio::IoError;
1099     use io;
1100
1101     pub type sock_t = io::file::fd_t;
1102
1103     pub fn init() {}
1104     pub fn last_error() -> IoError { io::last_error() }
1105     pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
1106 }