1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
19 use std::sync::arc::UnsafeArc;
21 use super::{IoResult, retry, keep_going};
24 ////////////////////////////////////////////////////////////////////////////////
25 // sockaddr and misc bindings
26 ////////////////////////////////////////////////////////////////////////////////
28 #[cfg(windows)] pub type sock_t = libc::SOCKET;
29 #[cfg(unix)] pub type sock_t = super::file::fd_t;
31 pub fn htons(u: u16) -> u16 {
34 pub fn ntohs(u: u16) -> u16 {
39 InAddr(libc::in_addr),
40 In6Addr(libc::in6_addr),
43 fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
45 ip::Ipv4Addr(a, b, c, d) => {
46 InAddr(libc::in_addr {
47 s_addr: (d as u32 << 24) |
53 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
54 In6Addr(libc::in6_addr {
70 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
72 let storage: libc::sockaddr_storage = mem::init();
73 let len = match ip_to_inaddr(addr.ip) {
75 let storage: *mut libc::sockaddr_in = cast::transmute(&storage);
76 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77 (*storage).sin_port = htons(addr.port);
78 (*storage).sin_addr = inaddr;
79 mem::size_of::<libc::sockaddr_in>()
82 let storage: *mut libc::sockaddr_in6 = cast::transmute(&storage);
83 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84 (*storage).sin6_port = htons(addr.port);
85 (*storage).sin6_addr = inaddr;
86 mem::size_of::<libc::sockaddr_in6>()
89 return (storage, len);
93 fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
95 let fam = match addr.ip {
96 ip::Ipv4Addr(..) => libc::AF_INET,
97 ip::Ipv6Addr(..) => libc::AF_INET6,
99 match libc::socket(fam, ty, 0) {
100 -1 => Err(super::last_error()),
106 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
107 payload: T) -> IoResult<()> {
109 let payload = &payload as *T as *libc::c_void;
110 let ret = libc::setsockopt(fd, opt, val,
112 mem::size_of::<T>() as libc::socklen_t);
121 fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122 val: libc::c_int) -> IoResult<T> {
124 let mut slot: T = mem::init();
125 let mut len = mem::size_of::<T>() as libc::socklen_t;
126 let ret = c::getsockopt(fd, opt, val,
127 &mut slot as *mut _ as *mut _,
132 assert!(len as uint == mem::size_of::<T>());
139 fn last_error() -> io::IoError {
140 io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
144 fn last_error() -> io::IoError {
148 fn ms_to_timeval(ms: u64) -> libc::timeval {
150 tv_sec: (ms / 1000) as libc::time_t,
151 tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
155 fn timeout(desc: &'static str) -> io::IoError {
163 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
164 #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
166 fn sockname(fd: sock_t,
167 f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
168 *mut libc::socklen_t) -> libc::c_int)
169 -> IoResult<ip::SocketAddr>
171 let mut storage: libc::sockaddr_storage = unsafe { mem::init() };
172 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
174 let storage = &mut storage as *mut libc::sockaddr_storage;
176 storage as *mut libc::sockaddr,
177 &mut len as *mut libc::socklen_t);
179 return Err(last_error())
182 return sockaddr_to_addr(&storage, len as uint);
185 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
186 len: uint) -> IoResult<ip::SocketAddr> {
187 match storage.ss_family as libc::c_int {
189 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
190 let storage: &libc::sockaddr_in = unsafe {
191 cast::transmute(storage)
193 let addr = storage.sin_addr.s_addr as u32;
194 let a = (addr >> 0) as u8;
195 let b = (addr >> 8) as u8;
196 let c = (addr >> 16) as u8;
197 let d = (addr >> 24) as u8;
199 ip: ip::Ipv4Addr(a, b, c, d),
200 port: ntohs(storage.sin_port),
204 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
205 let storage: &libc::sockaddr_in6 = unsafe {
206 cast::transmute(storage)
208 let a = ntohs(storage.sin6_addr.s6_addr[0]);
209 let b = ntohs(storage.sin6_addr.s6_addr[1]);
210 let c = ntohs(storage.sin6_addr.s6_addr[2]);
211 let d = ntohs(storage.sin6_addr.s6_addr[3]);
212 let e = ntohs(storage.sin6_addr.s6_addr[4]);
213 let f = ntohs(storage.sin6_addr.s6_addr[5]);
214 let g = ntohs(storage.sin6_addr.s6_addr[6]);
215 let h = ntohs(storage.sin6_addr.s6_addr[7]);
217 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
218 port: ntohs(storage.sin6_port),
222 Err(io::standard_error(io::OtherIoError))
234 use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
235 static mut INITIALIZED: bool = false;
236 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
238 let _guard = LOCK.lock();
240 let mut data: c::WSADATA = mem::init();
241 let ret = c::WSAStartup(0x202, // version 2.2
249 ////////////////////////////////////////////////////////////////////////////////
251 ////////////////////////////////////////////////////////////////////////////////
253 pub struct TcpStream {
254 inner: UnsafeArc<Inner>,
262 pub fn connect(addr: ip::SocketAddr,
263 timeout: Option<u64>) -> IoResult<TcpStream> {
264 let fd = try!(socket(addr, libc::SOCK_STREAM));
265 let (addr, len) = addr_to_sockaddr(addr);
266 let inner = Inner { fd: fd };
267 let ret = TcpStream { inner: UnsafeArc::new(inner) };
269 let len = len as libc::socklen_t;
270 let addrp = &addr as *_ as *libc::sockaddr;
273 try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
277 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
278 -1 => Err(last_error()),
285 // See http://developerweb.net/viewtopic.php?id=3196 for where this is
287 fn connect_timeout(fd: sock_t,
288 addrp: *libc::sockaddr,
289 len: libc::socklen_t,
290 timeout_ms: u64) -> IoResult<()> {
291 #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
292 #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
293 #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
294 #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
296 // Make sure the call to connect() doesn't block
297 try!(set_nonblocking(fd, true));
299 let ret = match unsafe { libc::connect(fd, addrp, len) } {
300 // If the connection is in progress, then we need to wait for it to
301 // finish (with a timeout). The current strategy for doing this is
302 // to use select() with a timeout.
303 -1 if os::errno() as int == INPROGRESS as int ||
304 os::errno() as int == WOULDBLOCK as int => {
305 let mut set: c::fd_set = unsafe { mem::init() };
306 c::fd_set(&mut set, fd);
307 match await(fd, &mut set, timeout_ms) {
308 0 => Err(timeout("connection timed out")),
309 -1 => Err(last_error()),
311 let err: libc::c_int = try!(
312 getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
316 Err(io::IoError::from_errno(err as uint, true))
322 -1 => Err(last_error()),
326 // be sure to turn blocking I/O back on
327 try!(set_nonblocking(fd, false));
331 fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
332 let set = nb as libc::c_int;
333 super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
336 fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
337 let mut set = nb as libc::c_ulong;
338 if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
346 fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
347 let start = ::io::timer::now();
349 // Recalculate the timeout each iteration (it is generally
350 // undefined what the value of the 'tv' is after select
352 let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
353 c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
357 fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
358 let tv = ms_to_timeval(timeout);
359 unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
363 pub fn fd(&self) -> sock_t {
364 // This unsafety is fine because it's just a read-only arc
365 unsafe { (*self.inner.get()).fd }
368 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
369 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
370 nodelay as libc::c_int)
373 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
374 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
375 seconds.is_some() as libc::c_int);
377 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
382 #[cfg(target_os = "macos")]
383 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
384 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
385 seconds as libc::c_int)
387 #[cfg(target_os = "freebsd")]
388 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
389 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
390 seconds as libc::c_int)
392 #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
393 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
398 #[cfg(windows)] type wrlen = libc::c_int;
399 #[cfg(not(windows))] type wrlen = libc::size_t;
401 impl rtio::RtioTcpStream for TcpStream {
402 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
405 libc::recv(self.fd(),
406 buf.as_mut_ptr() as *mut libc::c_void,
412 Err(io::standard_error(io::EndOfFile))
419 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
420 let ret = keep_going(buf, |buf, len| unsafe {
421 libc::send(self.fd(),
422 buf as *mut libc::c_void,
427 Err(super::last_error())
432 fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
433 sockname(self.fd(), libc::getpeername)
435 fn control_congestion(&mut self) -> IoResult<()> {
436 self.set_nodelay(false)
438 fn nodelay(&mut self) -> IoResult<()> {
439 self.set_nodelay(true)
441 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
442 self.set_keepalive(Some(delay_in_seconds))
444 fn letdie(&mut self) -> IoResult<()> {
445 self.set_keepalive(None)
448 fn clone(&self) -> ~rtio::RtioTcpStream:Send {
449 ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream:Send
451 fn close_write(&mut self) -> IoResult<()> {
452 super::mkerr_libc(unsafe {
453 libc::shutdown(self.fd(), libc::SHUT_WR)
458 impl rtio::RtioSocket for TcpStream {
459 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
460 sockname(self.fd(), libc::getsockname)
464 impl Drop for Inner {
465 fn drop(&mut self) { unsafe { close(self.fd); } }
468 ////////////////////////////////////////////////////////////////////////////////
470 ////////////////////////////////////////////////////////////////////////////////
472 pub struct TcpListener {
477 pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
479 socket(addr, libc::SOCK_STREAM).and_then(|fd| {
480 let (addr, len) = addr_to_sockaddr(addr);
481 let addrp = &addr as *libc::sockaddr_storage;
482 let inner = Inner { fd: fd };
483 let ret = TcpListener { inner: inner };
484 // On platforms with Berkeley-derived sockets, this allows
485 // to quickly rebind a socket, without needing to wait for
486 // the OS to clean up the previous one.
488 match setsockopt(fd, libc::SOL_SOCKET,
491 Err(n) => { return Err(n); },
495 match libc::bind(fd, addrp as *libc::sockaddr,
496 len as libc::socklen_t) {
497 -1 => Err(last_error()),
504 pub fn fd(&self) -> sock_t { self.inner.fd }
506 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
507 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
508 -1 => Err(last_error()),
509 _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
514 impl rtio::RtioTcpListener for TcpListener {
515 fn listen(~self) -> IoResult<~rtio::RtioTcpAcceptor:Send> {
516 self.native_listen(128).map(|a| ~a as ~rtio::RtioTcpAcceptor:Send)
520 impl rtio::RtioSocket for TcpListener {
521 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
522 sockname(self.fd(), libc::getsockname)
526 pub struct TcpAcceptor {
527 listener: TcpListener,
532 pub fn fd(&self) -> sock_t { self.listener.fd() }
534 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
535 if self.deadline != 0 {
536 try!(self.accept_deadline());
539 let mut storage: libc::sockaddr_storage = mem::init();
540 let storagep = &mut storage as *mut libc::sockaddr_storage;
541 let size = mem::size_of::<libc::sockaddr_storage>();
542 let mut size = size as libc::socklen_t;
544 libc::accept(self.fd(),
545 storagep as *mut libc::sockaddr,
546 &mut size as *mut libc::socklen_t) as libc::c_int
548 -1 => Err(last_error()),
549 fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })})
554 fn accept_deadline(&mut self) -> IoResult<()> {
555 let mut set: c::fd_set = unsafe { mem::init() };
556 c::fd_set(&mut set, self.fd());
559 // If we're past the deadline, then pass a 0 timeout to select() so
560 // we can poll the status of the socket.
561 let now = ::io::timer::now();
562 let ms = if self.deadline > now {0} else {self.deadline - now};
563 let tv = ms_to_timeval(ms);
564 let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
565 unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
567 -1 => Err(last_error()),
568 0 => Err(timeout("accept timed out")),
574 impl rtio::RtioSocket for TcpAcceptor {
575 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
576 sockname(self.fd(), libc::getsockname)
580 impl rtio::RtioTcpAcceptor for TcpAcceptor {
581 fn accept(&mut self) -> IoResult<~rtio::RtioTcpStream:Send> {
582 self.native_accept().map(|s| ~s as ~rtio::RtioTcpStream:Send)
585 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
586 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
587 fn set_timeout(&mut self, timeout: Option<u64>) {
588 self.deadline = match timeout {
590 Some(t) => ::io::timer::now() + t,
595 ////////////////////////////////////////////////////////////////////////////////
597 ////////////////////////////////////////////////////////////////////////////////
599 pub struct UdpSocket {
600 inner: UnsafeArc<Inner>,
604 pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
606 socket(addr, libc::SOCK_DGRAM).and_then(|fd| {
607 let (addr, len) = addr_to_sockaddr(addr);
608 let addrp = &addr as *libc::sockaddr_storage;
609 let inner = Inner { fd: fd };
610 let ret = UdpSocket { inner: UnsafeArc::new(inner) };
611 match libc::bind(fd, addrp as *libc::sockaddr,
612 len as libc::socklen_t) {
613 -1 => Err(last_error()),
620 pub fn fd(&self) -> sock_t {
621 // unsafety is fine because it's just a read-only arc
622 unsafe { (*self.inner.get()).fd }
625 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
626 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
630 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
631 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
635 pub fn set_membership(&mut self, addr: ip::IpAddr,
636 opt: libc::c_int) -> IoResult<()> {
637 match ip_to_inaddr(addr) {
639 let mreq = libc::ip_mreq {
641 // interface == INADDR_ANY
642 imr_interface: libc::in_addr { s_addr: 0x0 },
644 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
647 let mreq = libc::ip6_mreq {
648 ipv6mr_multiaddr: addr,
651 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
657 impl rtio::RtioSocket for UdpSocket {
658 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
659 sockname(self.fd(), libc::getsockname)
663 #[cfg(windows)] type msglen_t = libc::c_int;
664 #[cfg(unix)] type msglen_t = libc::size_t;
666 impl rtio::RtioUdpSocket for UdpSocket {
667 fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
669 let mut storage: libc::sockaddr_storage = mem::init();
670 let storagep = &mut storage as *mut libc::sockaddr_storage;
671 let mut addrlen: libc::socklen_t =
672 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
674 libc::recvfrom(self.fd(),
675 buf.as_ptr() as *mut libc::c_void,
676 buf.len() as msglen_t,
678 storagep as *mut libc::sockaddr,
679 &mut addrlen) as libc::c_int
681 if ret < 0 { return Err(last_error()) }
682 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
683 Ok((ret as uint, addr))
687 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
688 let (dst, len) = addr_to_sockaddr(dst);
689 let dstp = &dst as *libc::sockaddr_storage;
692 libc::sendto(self.fd(),
693 buf.as_ptr() as *libc::c_void,
694 buf.len() as msglen_t,
696 dstp as *libc::sockaddr,
697 len as libc::socklen_t) as libc::c_int
700 -1 => Err(last_error()),
701 n if n as uint != buf.len() => {
703 kind: io::OtherIoError,
704 desc: "couldn't send entire packet at once",
713 fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
715 ip::Ipv4Addr(..) => {
716 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
718 ip::Ipv6Addr(..) => {
719 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
723 fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
725 ip::Ipv4Addr(..) => {
726 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
728 ip::Ipv6Addr(..) => {
729 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
734 fn loop_multicast_locally(&mut self) -> IoResult<()> {
735 self.set_multicast_loop(true)
737 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
738 self.set_multicast_loop(false)
741 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
742 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
745 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
746 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
749 fn hear_broadcasts(&mut self) -> IoResult<()> {
750 self.set_broadcast(true)
752 fn ignore_broadcasts(&mut self) -> IoResult<()> {
753 self.set_broadcast(false)
756 fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
757 ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket:Send