1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use std::rt::rtio::{mod, IoResult, IoError};
17 use std::sync::atomic;
19 use super::{retry, keep_going};
23 #[cfg(unix)] use super::process;
24 #[cfg(unix)] use super::file::FileDesc;
26 pub use self::os::{init, sock_t, last_error};
28 ////////////////////////////////////////////////////////////////////////////////
29 // sockaddr and misc bindings
30 ////////////////////////////////////////////////////////////////////////////////
32 pub fn htons(u: u16) -> u16 {
35 pub fn ntohs(u: u16) -> u16 {
40 In4Addr(libc::in_addr),
41 In6Addr(libc::in6_addr),
44 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
46 rtio::Ipv4Addr(a, b, c, d) => {
47 let ip = (a as u32 << 24) |
51 In4Addr(libc::in_addr {
52 s_addr: Int::from_be(ip)
55 rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
56 In6Addr(libc::in6_addr {
72 fn addr_to_sockaddr(addr: rtio::SocketAddr,
73 storage: &mut libc::sockaddr_storage)
76 let len = match ip_to_inaddr(addr.ip) {
78 let storage = storage as *mut _ as *mut libc::sockaddr_in;
79 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
80 (*storage).sin_port = htons(addr.port);
81 (*storage).sin_addr = inaddr;
82 mem::size_of::<libc::sockaddr_in>()
85 let storage = storage as *mut _ as *mut libc::sockaddr_in6;
86 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
87 (*storage).sin6_port = htons(addr.port);
88 (*storage).sin6_addr = inaddr;
89 mem::size_of::<libc::sockaddr_in6>()
92 return len as libc::socklen_t;
96 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
98 let fam = match addr.ip {
99 rtio::Ipv4Addr(..) => libc::AF_INET,
100 rtio::Ipv6Addr(..) => libc::AF_INET6,
102 match libc::socket(fam, ty, 0) {
103 -1 => Err(os::last_error()),
109 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
110 payload: T) -> IoResult<()> {
112 let payload = &payload as *const T as *const libc::c_void;
113 let ret = libc::setsockopt(fd, opt, val,
115 mem::size_of::<T>() as libc::socklen_t);
117 Err(os::last_error())
124 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
125 val: libc::c_int) -> IoResult<T> {
127 let mut slot: T = mem::zeroed();
128 let mut len = mem::size_of::<T>() as libc::socklen_t;
129 let ret = c::getsockopt(fd, opt, val,
130 &mut slot as *mut _ as *mut _,
133 Err(os::last_error())
135 assert!(len as uint == mem::size_of::<T>());
141 fn sockname(fd: sock_t,
142 f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
143 *mut libc::socklen_t) -> libc::c_int)
144 -> IoResult<rtio::SocketAddr>
146 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
147 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
149 let storage = &mut storage as *mut libc::sockaddr_storage;
151 storage as *mut libc::sockaddr,
152 &mut len as *mut libc::socklen_t);
154 return Err(os::last_error())
157 return sockaddr_to_addr(&storage, len as uint);
160 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
161 len: uint) -> IoResult<rtio::SocketAddr> {
162 match storage.ss_family as libc::c_int {
164 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
165 let storage: &libc::sockaddr_in = unsafe {
166 mem::transmute(storage)
168 let ip = (storage.sin_addr.s_addr as u32).to_be();
169 let a = (ip >> 24) as u8;
170 let b = (ip >> 16) as u8;
171 let c = (ip >> 8) as u8;
172 let d = (ip >> 0) as u8;
173 Ok(rtio::SocketAddr {
174 ip: rtio::Ipv4Addr(a, b, c, d),
175 port: ntohs(storage.sin_port),
179 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
180 let storage: &libc::sockaddr_in6 = unsafe {
181 mem::transmute(storage)
183 let a = ntohs(storage.sin6_addr.s6_addr[0]);
184 let b = ntohs(storage.sin6_addr.s6_addr[1]);
185 let c = ntohs(storage.sin6_addr.s6_addr[2]);
186 let d = ntohs(storage.sin6_addr.s6_addr[3]);
187 let e = ntohs(storage.sin6_addr.s6_addr[4]);
188 let f = ntohs(storage.sin6_addr.s6_addr[5]);
189 let g = ntohs(storage.sin6_addr.s6_addr[6]);
190 let h = ntohs(storage.sin6_addr.s6_addr[7]);
191 Ok(rtio::SocketAddr {
192 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
193 port: ntohs(storage.sin6_port),
197 #[cfg(unix)] use libc::EINVAL as ERROR;
198 #[cfg(windows)] use libc::WSAEINVAL as ERROR;
208 ////////////////////////////////////////////////////////////////////////////////
210 ////////////////////////////////////////////////////////////////////////////////
212 pub struct TcpStream {
221 // Unused on Linux, where this lock is not necessary.
223 lock: mutex::NativeMutex
226 pub struct Guard<'a> {
228 pub guard: mutex::LockGuard<'a>,
232 fn new(fd: sock_t) -> Inner {
233 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
238 pub fn connect(addr: rtio::SocketAddr,
239 timeout: Option<u64>) -> IoResult<TcpStream> {
240 let fd = try!(socket(addr, libc::SOCK_STREAM));
241 let ret = TcpStream::new(Inner::new(fd));
243 let mut storage = unsafe { mem::zeroed() };
244 let len = addr_to_sockaddr(addr, &mut storage);
245 let addrp = &storage as *const _ as *const libc::sockaddr;
249 try!(util::connect_timeout(fd, addrp, len, timeout));
253 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
254 -1 => Err(os::last_error()),
261 fn new(inner: Inner) -> TcpStream {
263 inner: Arc::new(inner),
269 pub fn fd(&self) -> sock_t { self.inner.fd }
271 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
272 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
273 nodelay as libc::c_int)
276 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
277 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
278 seconds.is_some() as libc::c_int);
280 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
285 #[cfg(any(target_os = "macos", target_os = "ios"))]
286 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
287 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
288 seconds as libc::c_int)
290 #[cfg(any(target_os = "freebsd", target_os = "dragonfly"))]
291 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
292 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
293 seconds as libc::c_int)
295 #[cfg(not(any(target_os = "macos",
297 target_os = "freebsd",
298 target_os = "dragonfly")))]
299 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
303 #[cfg(target_os = "linux")]
304 fn lock_nonblocking(&self) {}
306 #[cfg(not(target_os = "linux"))]
307 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
310 guard: unsafe { self.inner.lock.lock() },
312 assert!(util::set_nonblocking(self.fd(), true).is_ok());
317 #[cfg(windows)] type wrlen = libc::c_int;
318 #[cfg(not(windows))] type wrlen = libc::size_t;
320 impl rtio::RtioTcpStream for TcpStream {
321 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
323 let dolock = || self.lock_nonblocking();
324 let doread = |nb| unsafe {
325 let flags = if nb {c::MSG_DONTWAIT} else {0};
327 buf.as_mut_ptr() as *mut libc::c_void,
329 flags) as libc::c_int
331 read(fd, self.read_deadline, dolock, doread)
334 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
336 let dolock = || self.lock_nonblocking();
337 let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
338 let flags = if nb {c::MSG_DONTWAIT} else {0};
344 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
349 fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
350 sockname(self.fd(), libc::getpeername)
352 fn control_congestion(&mut self) -> IoResult<()> {
353 self.set_nodelay(false)
355 fn nodelay(&mut self) -> IoResult<()> {
356 self.set_nodelay(true)
358 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
359 self.set_keepalive(Some(delay_in_seconds))
361 fn letdie(&mut self) -> IoResult<()> {
362 self.set_keepalive(None)
365 fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
367 inner: self.inner.clone(),
370 } as Box<rtio::RtioTcpStream + Send>
373 fn close_write(&mut self) -> IoResult<()> {
374 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
376 fn close_read(&mut self) -> IoResult<()> {
377 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
380 fn set_timeout(&mut self, timeout: Option<u64>) {
381 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
382 self.read_deadline = deadline;
383 self.write_deadline = deadline;
385 fn set_read_timeout(&mut self, timeout: Option<u64>) {
386 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
388 fn set_write_timeout(&mut self, timeout: Option<u64>) {
389 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
393 impl rtio::RtioSocket for TcpStream {
394 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
395 sockname(self.fd(), libc::getsockname)
399 impl Drop for Inner {
400 fn drop(&mut self) { unsafe { os::close(self.fd); } }
404 impl<'a> Drop for Guard<'a> {
406 assert!(util::set_nonblocking(self.fd, false).is_ok());
410 ////////////////////////////////////////////////////////////////////////////////
412 ////////////////////////////////////////////////////////////////////////////////
414 pub struct TcpListener {
419 pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
420 let fd = try!(socket(addr, libc::SOCK_STREAM));
421 let ret = TcpListener { inner: Inner::new(fd) };
423 let mut storage = unsafe { mem::zeroed() };
424 let len = addr_to_sockaddr(addr, &mut storage);
425 let addrp = &storage as *const _ as *const libc::sockaddr;
427 // On platforms with Berkeley-derived sockets, this allows
428 // to quickly rebind a socket, without needing to wait for
429 // the OS to clean up the previous one.
431 try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
435 match unsafe { libc::bind(fd, addrp, len) } {
436 -1 => Err(os::last_error()),
441 pub fn fd(&self) -> sock_t { self.inner.fd }
443 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
444 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
445 -1 => Err(os::last_error()),
449 let (reader, writer) = try!(process::pipe());
450 try!(util::set_nonblocking(reader.fd(), true));
451 try!(util::set_nonblocking(writer.fd(), true));
452 try!(util::set_nonblocking(self.fd(), true));
454 inner: Arc::new(AcceptorInner {
458 closed: atomic::AtomicBool::new(false),
466 let accept = try!(os::Event::new());
468 c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
471 return Err(os::last_error())
474 inner: Arc::new(AcceptorInner {
476 abort: try!(os::Event::new()),
478 closed: atomic::AtomicBool::new(false),
487 impl rtio::RtioTcpListener for TcpListener {
488 fn listen(self: Box<TcpListener>)
489 -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
490 self.native_listen(128).map(|a| {
491 box a as Box<rtio::RtioTcpAcceptor + Send>
496 impl rtio::RtioSocket for TcpListener {
497 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
498 sockname(self.fd(), libc::getsockname)
502 pub struct TcpAcceptor {
503 inner: Arc<AcceptorInner>,
508 struct AcceptorInner {
509 listener: TcpListener,
512 closed: atomic::AtomicBool,
516 struct AcceptorInner {
517 listener: TcpListener,
520 closed: atomic::AtomicBool,
524 pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
527 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
528 // In implementing accept, the two main concerns are dealing with
529 // close_accept() and timeouts. The unix implementation is based on a
530 // nonblocking accept plus a call to select(). Windows ends up having
531 // an entirely separate implementation than unix, which is explained
534 // To implement timeouts, all blocking is done via select() instead of
535 // accept() by putting the socket in non-blocking mode. Because
536 // select() takes a timeout argument, we just pass through the timeout
539 // To implement close_accept(), we have a self-pipe to ourselves which
540 // is passed to select() along with the socket being accepted on. The
541 // self-pipe is never written to unless close_accept() is called.
542 let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
544 while !self.inner.closed.load(atomic::SeqCst) {
545 match retry(|| unsafe {
546 libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
548 -1 if util::wouldblock() => {}
549 -1 => return Err(os::last_error()),
550 fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
552 try!(util::await([self.fd(), self.inner.reader.fd()],
553 deadline, util::Readable));
560 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
561 // Unlink unix, windows cannot invoke `select` on arbitrary file
562 // descriptors like pipes, only sockets. Consequently, windows cannot
563 // use the same implementation as unix for accept() when close_accept()
566 // In order to implement close_accept() and timeouts, windows uses
567 // event handles. An acceptor-specific abort event is created which
568 // will only get set in close_accept(), and it will never be un-set.
569 // Additionally, another acceptor-specific event is associated with the
570 // FD_ACCEPT network event.
572 // These two events are then passed to WaitForMultipleEvents to see
573 // which one triggers first, and the timeout passed to this function is
574 // the local timeout for the acceptor.
576 // If the wait times out, then the accept timed out. If the wait
577 // succeeds with the abort event, then we were closed, and if the wait
578 // succeeds otherwise, then we do a nonblocking poll via `accept` to
579 // see if we can accept a connection. The connection is candidate to be
580 // stolen, so we do all of this in a loop as well.
581 let events = [self.inner.abort.handle(), self.inner.accept.handle()];
583 while !self.inner.closed.load(atomic::SeqCst) {
584 let ms = if self.deadline == 0 {
585 c::WSA_INFINITE as u64
587 let now = ::io::timer::now();
588 if self.deadline < now {0} else {self.deadline - now}
591 c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
592 ms as libc::DWORD, libc::FALSE)
595 c::WSA_WAIT_TIMEOUT => {
596 return Err(util::timeout("accept timed out"))
598 c::WSA_WAIT_FAILED => return Err(os::last_error()),
599 c::WSA_WAIT_EVENT_0 => break,
600 n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
603 let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
605 c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
607 if ret != 0 { return Err(os::last_error()) }
609 if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
611 libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
613 -1 if util::wouldblock() => {}
614 -1 => return Err(os::last_error()),
616 // Accepted sockets inherit the same properties as the caller,
617 // so we need to deregister our event and switch the socket back
620 let stream = TcpStream::new(Inner::new(fd));
622 c::WSAEventSelect(fd, events[1], 0)
624 if ret != 0 { return Err(os::last_error()) }
625 try!(util::set_nonblocking(fd, false));
635 impl rtio::RtioSocket for TcpAcceptor {
636 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
637 sockname(self.fd(), libc::getsockname)
641 impl rtio::RtioTcpAcceptor for TcpAcceptor {
642 fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
643 self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
646 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
647 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
648 fn set_timeout(&mut self, timeout: Option<u64>) {
649 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
652 fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
654 inner: self.inner.clone(),
656 } as Box<rtio::RtioTcpAcceptor + Send>
660 fn close_accept(&mut self) -> IoResult<()> {
661 self.inner.closed.store(true, atomic::SeqCst);
662 let mut fd = FileDesc::new(self.inner.writer.fd(), false);
663 match fd.inner_write([0]) {
665 Err(..) if util::wouldblock() => Ok(()),
671 fn close_accept(&mut self) -> IoResult<()> {
672 self.inner.closed.store(true, atomic::SeqCst);
673 let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
674 if ret == libc::TRUE {
677 Err(os::last_error())
682 ////////////////////////////////////////////////////////////////////////////////
684 ////////////////////////////////////////////////////////////////////////////////
686 pub struct UdpSocket {
693 pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
694 let fd = try!(socket(addr, libc::SOCK_DGRAM));
695 let ret = UdpSocket {
696 inner: Arc::new(Inner::new(fd)),
701 let mut storage = unsafe { mem::zeroed() };
702 let len = addr_to_sockaddr(addr, &mut storage);
703 let addrp = &storage as *const _ as *const libc::sockaddr;
705 match unsafe { libc::bind(fd, addrp, len) } {
706 -1 => Err(os::last_error()),
711 pub fn fd(&self) -> sock_t { self.inner.fd }
713 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
714 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
718 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
719 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
723 pub fn set_membership(&mut self, addr: rtio::IpAddr,
724 opt: libc::c_int) -> IoResult<()> {
725 match ip_to_inaddr(addr) {
727 let mreq = libc::ip_mreq {
729 // interface == INADDR_ANY
730 imr_interface: libc::in_addr { s_addr: 0x0 },
732 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
735 let mreq = libc::ip6_mreq {
736 ipv6mr_multiaddr: addr,
739 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
744 #[cfg(target_os = "linux")]
745 fn lock_nonblocking(&self) {}
747 #[cfg(not(target_os = "linux"))]
748 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
751 guard: unsafe { self.inner.lock.lock() },
753 assert!(util::set_nonblocking(self.fd(), true).is_ok());
758 impl rtio::RtioSocket for UdpSocket {
759 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
760 sockname(self.fd(), libc::getsockname)
764 #[cfg(windows)] type msglen_t = libc::c_int;
765 #[cfg(unix)] type msglen_t = libc::size_t;
767 impl rtio::RtioUdpSocket for UdpSocket {
768 fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
770 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
771 let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
772 let mut addrlen: libc::socklen_t =
773 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
775 let dolock = || self.lock_nonblocking();
776 let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
777 let flags = if nb {c::MSG_DONTWAIT} else {0};
779 buf.as_mut_ptr() as *mut libc::c_void,
780 buf.len() as msglen_t,
783 &mut addrlen) as libc::c_int
785 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
786 Ok((n as uint, addr))
790 fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
791 let mut storage = unsafe { mem::zeroed() };
792 let dstlen = addr_to_sockaddr(dst, &mut storage);
793 let dstp = &storage as *const _ as *const libc::sockaddr;
796 let dolock = || self.lock_nonblocking();
797 let dowrite = |nb, buf: *const u8, len: uint| unsafe {
798 let flags = if nb {c::MSG_DONTWAIT} else {0};
800 buf as *const libc::c_void,
807 let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
809 Err(util::short_write(n, "couldn't send entire packet at once"))
815 fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
817 rtio::Ipv4Addr(..) => {
818 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
820 rtio::Ipv6Addr(..) => {
821 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
825 fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
827 rtio::Ipv4Addr(..) => {
828 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
830 rtio::Ipv6Addr(..) => {
831 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
836 fn loop_multicast_locally(&mut self) -> IoResult<()> {
837 self.set_multicast_loop(true)
839 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
840 self.set_multicast_loop(false)
843 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
844 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
847 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
848 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
851 fn hear_broadcasts(&mut self) -> IoResult<()> {
852 self.set_broadcast(true)
854 fn ignore_broadcasts(&mut self) -> IoResult<()> {
855 self.set_broadcast(false)
858 fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
860 inner: self.inner.clone(),
863 } as Box<rtio::RtioUdpSocket + Send>
866 fn set_timeout(&mut self, timeout: Option<u64>) {
867 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
868 self.read_deadline = deadline;
869 self.write_deadline = deadline;
871 fn set_read_timeout(&mut self, timeout: Option<u64>) {
872 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
874 fn set_write_timeout(&mut self, timeout: Option<u64>) {
875 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
879 ////////////////////////////////////////////////////////////////////////////////
882 // The read/write functions below are the helpers for reading/writing a socket
883 // with a possible deadline specified. This is generally viewed as a timed out
886 // From the application's perspective, timeouts apply to the I/O object, not to
887 // the underlying file descriptor (it's one timeout per object). This means that
888 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
890 // The next idea to implement timeouts would be to use nonblocking I/O. An
891 // invocation of select() would wait (with a timeout) for a socket to be ready.
892 // Once its ready, we can perform the operation. Note that the operation *must*
893 // be nonblocking, even though select() says the socket is ready. This is
894 // because some other thread could have come and stolen our data (handles can be
897 // To implement nonblocking I/O, the first option we have is to use the
898 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
899 // I/O objects, so this was initially viewed as unwise.
901 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
902 // send/recv, but the niftiness wears off once you realize it only works well on
903 // Linux [1] [2]. This means that it's pretty easy to get a nonblocking
904 // operation on Linux (no flag fiddling, no affecting other objects), but not on
907 // To work around this constraint on other platforms, we end up using the
908 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
909 // could cause other objects' blocking operations to suddenly become
910 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
911 // falls back to using the same code path as nonblocking operations, but with an
912 // infinite timeout (select + send/recv). This helps emulate blocking
913 // reads/writes despite the underlying descriptor being nonblocking, as well as
914 // optimizing the fast path of just hitting one syscall in the good case.
916 // As a final caveat, this implementation uses a mutex so only one thread is
917 // doing a nonblocking operation at at time. This is the operation that comes
918 // after the select() (at which point we think the socket is ready). This is
919 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
920 // expect (wouldn't want someone turning it on when it should be off!). All
921 // operations performed in the lock are *nonblocking* to avoid holding the mutex
924 // So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
925 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
926 // reads/writes are still blocking.
930 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
931 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
933 pub fn read<T>(fd: sock_t,
936 read: |bool| -> libc::c_int) -> IoResult<uint> {
939 ret = retry(|| read(false));
942 if deadline != 0 || (ret == -1 && util::wouldblock()) {
943 let deadline = match deadline {
948 // With a timeout, first we wait for the socket to become
949 // readable using select(), specifying the relevant timeout for
950 // our previously set deadline.
951 try!(util::await([fd], deadline, util::Readable));
953 // At this point, we're still within the timeout, and we've
954 // determined that the socket is readable (as returned by
955 // select). We must still read the socket in *nonblocking* mode
956 // because some other thread could come steal our data. If we
957 // fail to read some data, we retry (hence the outer loop) and
958 // wait for the socket to become readable again.
960 match retry(|| read(deadline.is_some())) {
961 -1 if util::wouldblock() => {}
962 -1 => return Err(os::last_error()),
963 n => { ret = n; break }
969 0 => Err(util::eof()),
970 n if n < 0 => Err(os::last_error()),
975 pub fn write<T>(fd: sock_t,
978 write_everything: bool,
980 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
984 if write_everything {
985 ret = keep_going(buf, |inner, len| {
986 written = buf.len() - len;
987 write(false, inner, len)
990 ret = retry(|| { write(false, buf.as_ptr(), buf.len()) });
991 if ret > 0 { written = ret as uint; }
995 if deadline != 0 || (ret == -1 && util::wouldblock()) {
996 let deadline = match deadline {
1000 while written < buf.len() && (write_everything || written == 0) {
1001 // As with read(), first wait for the socket to be ready for
1002 // the I/O operation.
1003 match util::await([fd], deadline, util::Writable) {
1004 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
1005 assert!(deadline.is_some());
1006 return Err(util::short_write(written, "short write"))
1008 Err(e) => return Err(e),
1012 // Also as with read(), we use MSG_DONTWAIT to guard ourselves
1013 // against unforeseen circumstances.
1014 let _guard = lock();
1015 let ptr = buf.slice_from(written).as_ptr();
1016 let len = buf.len() - written;
1017 match retry(|| write(deadline.is_some(), ptr, len)) {
1018 -1 if util::wouldblock() => {}
1019 -1 => return Err(os::last_error()),
1020 n => { written += n as uint; }
1026 Err(os::last_error())
1036 use std::rt::rtio::{IoError, IoResult};
1040 pub type sock_t = libc::SOCKET;
1041 pub struct Event(c::WSAEVENT);
1044 pub fn new() -> IoResult<Event> {
1045 let event = unsafe { c::WSACreateEvent() };
1046 if event == c::WSA_INVALID_EVENT {
1053 pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
1056 impl Drop for Event {
1057 fn drop(&mut self) {
1058 unsafe { let _ = c::WSACloseEvent(self.handle()); }
1064 use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1065 static mut INITIALIZED: bool = false;
1066 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1068 let _guard = LOCK.lock();
1070 let mut data: c::WSADATA = mem::zeroed();
1071 let ret = c::WSAStartup(0x202, // version 2.2
1079 pub fn last_error() -> IoError {
1081 let code = unsafe { c::WSAGetLastError() as uint };
1085 detail: Some(os::error_string(code)),
1089 pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
1095 use std::rt::rtio::IoError;
1098 pub type sock_t = io::file::fd_t;
1101 pub fn last_error() -> IoError { io::last_error() }
1102 pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }