1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
17 use std::rt::rtio::{IoResult, IoError};
18 use std::sync::atomic;
20 use super::{retry, keep_going};
24 #[cfg(unix)] use super::process;
25 #[cfg(unix)] use super::file::FileDesc;
27 pub use self::os::{init, sock_t, last_error};
29 ////////////////////////////////////////////////////////////////////////////////
30 // sockaddr and misc bindings
31 ////////////////////////////////////////////////////////////////////////////////
33 pub fn htons(u: u16) -> u16 {
36 pub fn ntohs(u: u16) -> u16 {
41 InAddr(libc::in_addr),
42 In6Addr(libc::in6_addr),
45 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
47 rtio::Ipv4Addr(a, b, c, d) => {
48 let ip = (a as u32 << 24) |
52 InAddr(libc::in_addr {
53 s_addr: Int::from_be(ip)
56 rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
57 In6Addr(libc::in6_addr {
73 fn addr_to_sockaddr(addr: rtio::SocketAddr,
74 storage: &mut libc::sockaddr_storage)
77 let len = match ip_to_inaddr(addr.ip) {
79 let storage = storage as *mut _ as *mut libc::sockaddr_in;
80 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
81 (*storage).sin_port = htons(addr.port);
82 (*storage).sin_addr = inaddr;
83 mem::size_of::<libc::sockaddr_in>()
86 let storage = storage as *mut _ as *mut libc::sockaddr_in6;
87 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
88 (*storage).sin6_port = htons(addr.port);
89 (*storage).sin6_addr = inaddr;
90 mem::size_of::<libc::sockaddr_in6>()
93 return len as libc::socklen_t;
97 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
99 let fam = match addr.ip {
100 rtio::Ipv4Addr(..) => libc::AF_INET,
101 rtio::Ipv6Addr(..) => libc::AF_INET6,
103 match libc::socket(fam, ty, 0) {
104 -1 => Err(os::last_error()),
110 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
111 payload: T) -> IoResult<()> {
113 let payload = &payload as *const T as *const libc::c_void;
114 let ret = libc::setsockopt(fd, opt, val,
116 mem::size_of::<T>() as libc::socklen_t);
118 Err(os::last_error())
125 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
126 val: libc::c_int) -> IoResult<T> {
128 let mut slot: T = mem::zeroed();
129 let mut len = mem::size_of::<T>() as libc::socklen_t;
130 let ret = c::getsockopt(fd, opt, val,
131 &mut slot as *mut _ as *mut _,
134 Err(os::last_error())
136 assert!(len as uint == mem::size_of::<T>());
142 fn sockname(fd: sock_t,
143 f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
144 *mut libc::socklen_t) -> libc::c_int)
145 -> IoResult<rtio::SocketAddr>
147 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
148 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
150 let storage = &mut storage as *mut libc::sockaddr_storage;
152 storage as *mut libc::sockaddr,
153 &mut len as *mut libc::socklen_t);
155 return Err(os::last_error())
158 return sockaddr_to_addr(&storage, len as uint);
161 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
162 len: uint) -> IoResult<rtio::SocketAddr> {
163 match storage.ss_family as libc::c_int {
165 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
166 let storage: &libc::sockaddr_in = unsafe {
167 mem::transmute(storage)
169 let ip = (storage.sin_addr.s_addr as u32).to_be();
170 let a = (ip >> 24) as u8;
171 let b = (ip >> 16) as u8;
172 let c = (ip >> 8) as u8;
173 let d = (ip >> 0) as u8;
174 Ok(rtio::SocketAddr {
175 ip: rtio::Ipv4Addr(a, b, c, d),
176 port: ntohs(storage.sin_port),
180 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
181 let storage: &libc::sockaddr_in6 = unsafe {
182 mem::transmute(storage)
184 let a = ntohs(storage.sin6_addr.s6_addr[0]);
185 let b = ntohs(storage.sin6_addr.s6_addr[1]);
186 let c = ntohs(storage.sin6_addr.s6_addr[2]);
187 let d = ntohs(storage.sin6_addr.s6_addr[3]);
188 let e = ntohs(storage.sin6_addr.s6_addr[4]);
189 let f = ntohs(storage.sin6_addr.s6_addr[5]);
190 let g = ntohs(storage.sin6_addr.s6_addr[6]);
191 let h = ntohs(storage.sin6_addr.s6_addr[7]);
192 Ok(rtio::SocketAddr {
193 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
194 port: ntohs(storage.sin6_port),
198 #[cfg(unix)] use libc::EINVAL as ERROR;
199 #[cfg(windows)] use libc::WSAEINVAL as ERROR;
209 ////////////////////////////////////////////////////////////////////////////////
211 ////////////////////////////////////////////////////////////////////////////////
213 pub struct TcpStream {
222 // Unused on Linux, where this lock is not necessary.
224 lock: mutex::NativeMutex
227 pub struct Guard<'a> {
229 pub guard: mutex::LockGuard<'a>,
233 fn new(fd: sock_t) -> Inner {
234 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
239 pub fn connect(addr: rtio::SocketAddr,
240 timeout: Option<u64>) -> IoResult<TcpStream> {
241 let fd = try!(socket(addr, libc::SOCK_STREAM));
242 let ret = TcpStream::new(Inner::new(fd));
244 let mut storage = unsafe { mem::zeroed() };
245 let len = addr_to_sockaddr(addr, &mut storage);
246 let addrp = &storage as *const _ as *const libc::sockaddr;
250 try!(util::connect_timeout(fd, addrp, len, timeout));
254 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
255 -1 => Err(os::last_error()),
262 fn new(inner: Inner) -> TcpStream {
264 inner: Arc::new(inner),
270 pub fn fd(&self) -> sock_t { self.inner.fd }
272 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
273 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
274 nodelay as libc::c_int)
277 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
278 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
279 seconds.is_some() as libc::c_int);
281 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
286 #[cfg(target_os = "macos")]
287 #[cfg(target_os = "ios")]
288 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
289 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
290 seconds as libc::c_int)
292 #[cfg(target_os = "freebsd")]
293 #[cfg(target_os = "dragonfly")]
294 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
295 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
296 seconds as libc::c_int)
298 #[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"),
299 not(target_os = "dragonfly"))]
300 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
304 #[cfg(target_os = "linux")]
305 fn lock_nonblocking(&self) {}
307 #[cfg(not(target_os = "linux"))]
308 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
311 guard: unsafe { self.inner.lock.lock() },
313 assert!(util::set_nonblocking(self.fd(), true).is_ok());
318 #[cfg(windows)] type wrlen = libc::c_int;
319 #[cfg(not(windows))] type wrlen = libc::size_t;
321 impl rtio::RtioTcpStream for TcpStream {
322 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
324 let dolock = || self.lock_nonblocking();
325 let doread = |nb| unsafe {
326 let flags = if nb {c::MSG_DONTWAIT} else {0};
328 buf.as_mut_ptr() as *mut libc::c_void,
330 flags) as libc::c_int
332 read(fd, self.read_deadline, dolock, doread)
335 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
337 let dolock = || self.lock_nonblocking();
338 let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
339 let flags = if nb {c::MSG_DONTWAIT} else {0};
341 buf as *mut libc::c_void,
345 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
350 fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
351 sockname(self.fd(), libc::getpeername)
353 fn control_congestion(&mut self) -> IoResult<()> {
354 self.set_nodelay(false)
356 fn nodelay(&mut self) -> IoResult<()> {
357 self.set_nodelay(true)
359 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
360 self.set_keepalive(Some(delay_in_seconds))
362 fn letdie(&mut self) -> IoResult<()> {
363 self.set_keepalive(None)
366 fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
368 inner: self.inner.clone(),
371 } as Box<rtio::RtioTcpStream + Send>
374 fn close_write(&mut self) -> IoResult<()> {
375 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
377 fn close_read(&mut self) -> IoResult<()> {
378 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
381 fn set_timeout(&mut self, timeout: Option<u64>) {
382 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
383 self.read_deadline = deadline;
384 self.write_deadline = deadline;
386 fn set_read_timeout(&mut self, timeout: Option<u64>) {
387 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
389 fn set_write_timeout(&mut self, timeout: Option<u64>) {
390 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
394 impl rtio::RtioSocket for TcpStream {
395 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
396 sockname(self.fd(), libc::getsockname)
400 impl Drop for Inner {
401 fn drop(&mut self) { unsafe { os::close(self.fd); } }
405 impl<'a> Drop for Guard<'a> {
407 assert!(util::set_nonblocking(self.fd, false).is_ok());
411 ////////////////////////////////////////////////////////////////////////////////
413 ////////////////////////////////////////////////////////////////////////////////
415 pub struct TcpListener {
420 pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
421 let fd = try!(socket(addr, libc::SOCK_STREAM));
422 let ret = TcpListener { inner: Inner::new(fd) };
424 let mut storage = unsafe { mem::zeroed() };
425 let len = addr_to_sockaddr(addr, &mut storage);
426 let addrp = &storage as *const _ as *const libc::sockaddr;
428 // On platforms with Berkeley-derived sockets, this allows
429 // to quickly rebind a socket, without needing to wait for
430 // the OS to clean up the previous one.
432 try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
436 match unsafe { libc::bind(fd, addrp, len) } {
437 -1 => Err(os::last_error()),
442 pub fn fd(&self) -> sock_t { self.inner.fd }
444 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
445 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
446 -1 => Err(os::last_error()),
450 let (reader, writer) = try!(process::pipe());
451 try!(util::set_nonblocking(reader.fd(), true));
452 try!(util::set_nonblocking(writer.fd(), true));
453 try!(util::set_nonblocking(self.fd(), true));
455 inner: Arc::new(AcceptorInner {
459 closed: atomic::AtomicBool::new(false),
467 let accept = try!(os::Event::new());
469 c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
472 return Err(os::last_error())
475 inner: Arc::new(AcceptorInner {
477 abort: try!(os::Event::new()),
479 closed: atomic::AtomicBool::new(false),
488 impl rtio::RtioTcpListener for TcpListener {
489 fn listen(self: Box<TcpListener>)
490 -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
491 self.native_listen(128).map(|a| {
492 box a as Box<rtio::RtioTcpAcceptor + Send>
497 impl rtio::RtioSocket for TcpListener {
498 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
499 sockname(self.fd(), libc::getsockname)
503 pub struct TcpAcceptor {
504 inner: Arc<AcceptorInner>,
509 struct AcceptorInner {
510 listener: TcpListener,
513 closed: atomic::AtomicBool,
517 struct AcceptorInner {
518 listener: TcpListener,
521 closed: atomic::AtomicBool,
525 pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
528 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
529 // In implementing accept, the two main concerns are dealing with
530 // close_accept() and timeouts. The unix implementation is based on a
531 // nonblocking accept plus a call to select(). Windows ends up having
532 // an entirely separate implementation than unix, which is explained
535 // To implement timeouts, all blocking is done via select() instead of
536 // accept() by putting the socket in non-blocking mode. Because
537 // select() takes a timeout argument, we just pass through the timeout
540 // To implement close_accept(), we have a self-pipe to ourselves which
541 // is passed to select() along with the socket being accepted on. The
542 // self-pipe is never written to unless close_accept() is called.
543 let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
545 while !self.inner.closed.load(atomic::SeqCst) {
546 match retry(|| unsafe {
547 libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
549 -1 if util::wouldblock() => {}
550 -1 => return Err(os::last_error()),
551 fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
553 try!(util::await([self.fd(), self.inner.reader.fd()],
554 deadline, util::Readable));
561 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
562 // Unlink unix, windows cannot invoke `select` on arbitrary file
563 // descriptors like pipes, only sockets. Consequently, windows cannot
564 // use the same implementation as unix for accept() when close_accept()
567 // In order to implement close_accept() and timeouts, windows uses
568 // event handles. An acceptor-specific abort event is created which
569 // will only get set in close_accept(), and it will never be un-set.
570 // Additionally, another acceptor-specific event is associated with the
571 // FD_ACCEPT network event.
573 // These two events are then passed to WaitForMultipleEvents to see
574 // which one triggers first, and the timeout passed to this function is
575 // the local timeout for the acceptor.
577 // If the wait times out, then the accept timed out. If the wait
578 // succeeds with the abort event, then we were closed, and if the wait
579 // succeeds otherwise, then we do a nonblocking poll via `accept` to
580 // see if we can accept a connection. The connection is candidate to be
581 // stolen, so we do all of this in a loop as well.
582 let events = [self.inner.abort.handle(), self.inner.accept.handle()];
584 while !self.inner.closed.load(atomic::SeqCst) {
585 let ms = if self.deadline == 0 {
586 c::WSA_INFINITE as u64
588 let now = ::io::timer::now();
589 if self.deadline < now {0} else {self.deadline - now}
592 c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
593 ms as libc::DWORD, libc::FALSE)
596 c::WSA_WAIT_TIMEOUT => {
597 return Err(util::timeout("accept timed out"))
599 c::WSA_WAIT_FAILED => return Err(os::last_error()),
600 c::WSA_WAIT_EVENT_0 => break,
601 n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
604 let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
606 c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
608 if ret != 0 { return Err(os::last_error()) }
610 if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
612 libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
614 -1 if util::wouldblock() => {}
615 -1 => return Err(os::last_error()),
617 // Accepted sockets inherit the same properties as the caller,
618 // so we need to deregister our event and switch the socket back
621 let stream = TcpStream::new(Inner::new(fd));
623 c::WSAEventSelect(fd, events[1], 0)
625 if ret != 0 { return Err(os::last_error()) }
626 try!(util::set_nonblocking(fd, false));
636 impl rtio::RtioSocket for TcpAcceptor {
637 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
638 sockname(self.fd(), libc::getsockname)
642 impl rtio::RtioTcpAcceptor for TcpAcceptor {
643 fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
644 self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
647 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
648 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
649 fn set_timeout(&mut self, timeout: Option<u64>) {
650 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
653 fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
655 inner: self.inner.clone(),
657 } as Box<rtio::RtioTcpAcceptor + Send>
661 fn close_accept(&mut self) -> IoResult<()> {
662 self.inner.closed.store(true, atomic::SeqCst);
663 let mut fd = FileDesc::new(self.inner.writer.fd(), false);
664 match fd.inner_write([0]) {
666 Err(..) if util::wouldblock() => Ok(()),
672 fn close_accept(&mut self) -> IoResult<()> {
673 self.inner.closed.store(true, atomic::SeqCst);
674 let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
675 if ret == libc::TRUE {
678 Err(os::last_error())
683 ////////////////////////////////////////////////////////////////////////////////
685 ////////////////////////////////////////////////////////////////////////////////
687 pub struct UdpSocket {
694 pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
695 let fd = try!(socket(addr, libc::SOCK_DGRAM));
696 let ret = UdpSocket {
697 inner: Arc::new(Inner::new(fd)),
702 let mut storage = unsafe { mem::zeroed() };
703 let len = addr_to_sockaddr(addr, &mut storage);
704 let addrp = &storage as *const _ as *const libc::sockaddr;
706 match unsafe { libc::bind(fd, addrp, len) } {
707 -1 => Err(os::last_error()),
712 pub fn fd(&self) -> sock_t { self.inner.fd }
714 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
715 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
719 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
720 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
724 pub fn set_membership(&mut self, addr: rtio::IpAddr,
725 opt: libc::c_int) -> IoResult<()> {
726 match ip_to_inaddr(addr) {
728 let mreq = libc::ip_mreq {
730 // interface == INADDR_ANY
731 imr_interface: libc::in_addr { s_addr: 0x0 },
733 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
736 let mreq = libc::ip6_mreq {
737 ipv6mr_multiaddr: addr,
740 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
745 #[cfg(target_os = "linux")]
746 fn lock_nonblocking(&self) {}
748 #[cfg(not(target_os = "linux"))]
749 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
752 guard: unsafe { self.inner.lock.lock() },
754 assert!(util::set_nonblocking(self.fd(), true).is_ok());
759 impl rtio::RtioSocket for UdpSocket {
760 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
761 sockname(self.fd(), libc::getsockname)
765 #[cfg(windows)] type msglen_t = libc::c_int;
766 #[cfg(unix)] type msglen_t = libc::size_t;
768 impl rtio::RtioUdpSocket for UdpSocket {
769 fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
771 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
772 let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
773 let mut addrlen: libc::socklen_t =
774 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
776 let dolock = || self.lock_nonblocking();
777 let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
778 let flags = if nb {c::MSG_DONTWAIT} else {0};
780 buf.as_mut_ptr() as *mut libc::c_void,
781 buf.len() as msglen_t,
784 &mut addrlen) as libc::c_int
786 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
787 Ok((n as uint, addr))
791 fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
792 let mut storage = unsafe { mem::zeroed() };
793 let dstlen = addr_to_sockaddr(dst, &mut storage);
794 let dstp = &storage as *const _ as *const libc::sockaddr;
797 let dolock = || self.lock_nonblocking();
798 let dowrite = |nb, buf: *const u8, len: uint| unsafe {
799 let flags = if nb {c::MSG_DONTWAIT} else {0};
801 buf as *const libc::c_void,
808 let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
810 Err(util::short_write(n, "couldn't send entire packet at once"))
816 fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
818 rtio::Ipv4Addr(..) => {
819 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
821 rtio::Ipv6Addr(..) => {
822 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
826 fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
828 rtio::Ipv4Addr(..) => {
829 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
831 rtio::Ipv6Addr(..) => {
832 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
837 fn loop_multicast_locally(&mut self) -> IoResult<()> {
838 self.set_multicast_loop(true)
840 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
841 self.set_multicast_loop(false)
844 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
845 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
848 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
849 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
852 fn hear_broadcasts(&mut self) -> IoResult<()> {
853 self.set_broadcast(true)
855 fn ignore_broadcasts(&mut self) -> IoResult<()> {
856 self.set_broadcast(false)
859 fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
861 inner: self.inner.clone(),
864 } as Box<rtio::RtioUdpSocket + Send>
867 fn set_timeout(&mut self, timeout: Option<u64>) {
868 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
869 self.read_deadline = deadline;
870 self.write_deadline = deadline;
872 fn set_read_timeout(&mut self, timeout: Option<u64>) {
873 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
875 fn set_write_timeout(&mut self, timeout: Option<u64>) {
876 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
880 ////////////////////////////////////////////////////////////////////////////////
883 // The read/write functions below are the helpers for reading/writing a socket
884 // with a possible deadline specified. This is generally viewed as a timed out
887 // From the application's perspective, timeouts apply to the I/O object, not to
888 // the underlying file descriptor (it's one timeout per object). This means that
889 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
891 // The next idea to implement timeouts would be to use nonblocking I/O. An
892 // invocation of select() would wait (with a timeout) for a socket to be ready.
893 // Once its ready, we can perform the operation. Note that the operation *must*
894 // be nonblocking, even though select() says the socket is ready. This is
895 // because some other thread could have come and stolen our data (handles can be
898 // To implement nonblocking I/O, the first option we have is to use the
899 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
900 // I/O objects, so this was initially viewed as unwise.
902 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
903 // send/recv, but the niftiness wears off once you realize it only works well on
904 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
905 // operation on linux (no flag fiddling, no affecting other objects), but not on
908 // To work around this constraint on other platforms, we end up using the
909 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
910 // could cause other objects' blocking operations to suddenly become
911 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
912 // falls back to using the same code path as nonblocking operations, but with an
913 // infinite timeout (select + send/recv). This helps emulate blocking
914 // reads/writes despite the underlying descriptor being nonblocking, as well as
915 // optimizing the fast path of just hitting one syscall in the good case.
917 // As a final caveat, this implementation uses a mutex so only one thread is
918 // doing a nonblocking operation at at time. This is the operation that comes
919 // after the select() (at which point we think the socket is ready). This is
920 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
921 // expect (wouldn't want someone turning it on when it should be off!). All
922 // operations performed in the lock are *nonblocking* to avoid holding the mutex
925 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
926 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
927 // reads/writes are still blocking.
931 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
932 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
934 pub fn read<T>(fd: sock_t,
937 read: |bool| -> libc::c_int) -> IoResult<uint> {
940 ret = retry(|| read(false));
943 if deadline != 0 || (ret == -1 && util::wouldblock()) {
944 let deadline = match deadline {
949 // With a timeout, first we wait for the socket to become
950 // readable using select(), specifying the relevant timeout for
951 // our previously set deadline.
952 try!(util::await([fd], deadline, util::Readable));
954 // At this point, we're still within the timeout, and we've
955 // determined that the socket is readable (as returned by
956 // select). We must still read the socket in *nonblocking* mode
957 // because some other thread could come steal our data. If we
958 // fail to read some data, we retry (hence the outer loop) and
959 // wait for the socket to become readable again.
961 match retry(|| read(deadline.is_some())) {
962 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
963 -1 => return Err(os::last_error()),
964 n => { ret = n; break }
970 0 => Err(util::eof()),
971 n if n < 0 => Err(os::last_error()),
976 pub fn write<T>(fd: sock_t,
979 write_everything: bool,
981 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
985 if write_everything {
986 ret = keep_going(buf, |inner, len| {
987 written = buf.len() - len;
988 write(false, inner, len)
992 write(false, buf.as_ptr(), buf.len()) as libc::c_int
994 if ret > 0 { written = ret as uint; }
998 if deadline != 0 || (ret == -1 && util::wouldblock()) {
999 let deadline = match deadline {
1003 while written < buf.len() && (write_everything || written == 0) {
1004 // As with read(), first wait for the socket to be ready for
1005 // the I/O operation.
1006 match util::await([fd], deadline, util::Writable) {
1007 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
1008 assert!(deadline.is_some());
1009 return Err(util::short_write(written, "short write"))
1011 Err(e) => return Err(e),
1015 // Also as with read(), we use MSG_DONTWAIT to guard ourselves
1016 // against unforeseen circumstances.
1017 let _guard = lock();
1018 let ptr = buf.slice_from(written).as_ptr();
1019 let len = buf.len() - written;
1020 match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
1021 -1 if util::wouldblock() => {}
1022 -1 => return Err(os::last_error()),
1023 n => { written += n as uint; }
1029 Err(os::last_error())
1039 use std::rt::rtio::{IoError, IoResult};
1043 pub type sock_t = libc::SOCKET;
1044 pub struct Event(c::WSAEVENT);
1047 pub fn new() -> IoResult<Event> {
1048 let event = unsafe { c::WSACreateEvent() };
1049 if event == c::WSA_INVALID_EVENT {
1056 pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
1059 impl Drop for Event {
1060 fn drop(&mut self) {
1061 unsafe { let _ = c::WSACloseEvent(self.handle()); }
1067 use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1068 static mut INITIALIZED: bool = false;
1069 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1071 let _guard = LOCK.lock();
1073 let mut data: c::WSADATA = mem::zeroed();
1074 let ret = c::WSAStartup(0x202, // version 2.2
1082 pub fn last_error() -> IoError {
1084 let code = unsafe { c::WSAGetLastError() as uint };
1088 detail: Some(os::error_string(code)),
1092 pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
1098 use std::rt::rtio::IoError;
1101 pub type sock_t = io::file::fd_t;
1104 pub fn last_error() -> IoError { io::last_error() }
1105 pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }