1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use std::rt::rtio::{IoResult, IoError};
18 use super::{retry, keep_going};
22 ////////////////////////////////////////////////////////////////////////////////
23 // sockaddr and misc bindings
24 ////////////////////////////////////////////////////////////////////////////////
26 #[cfg(windows)] pub type sock_t = libc::SOCKET;
27 #[cfg(unix)] pub type sock_t = super::file::fd_t;
29 pub fn htons(u: u16) -> u16 {
32 pub fn ntohs(u: u16) -> u16 {
37 InAddr(libc::in_addr),
38 In6Addr(libc::in6_addr),
41 fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
43 rtio::Ipv4Addr(a, b, c, d) => {
44 let ip = (a as u32 << 24) |
48 InAddr(libc::in_addr {
49 s_addr: Int::from_be(ip)
52 rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
53 In6Addr(libc::in6_addr {
69 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
71 let storage: libc::sockaddr_storage = mem::zeroed();
72 let len = match ip_to_inaddr(addr.ip) {
74 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
75 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
76 (*storage).sin_port = htons(addr.port);
77 (*storage).sin_addr = inaddr;
78 mem::size_of::<libc::sockaddr_in>()
81 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
82 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
83 (*storage).sin6_port = htons(addr.port);
84 (*storage).sin6_addr = inaddr;
85 mem::size_of::<libc::sockaddr_in6>()
88 return (storage, len);
92 fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
94 let fam = match addr.ip {
95 rtio::Ipv4Addr(..) => libc::AF_INET,
96 rtio::Ipv6Addr(..) => libc::AF_INET6,
98 match libc::socket(fam, ty, 0) {
99 -1 => Err(super::last_error()),
105 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
106 payload: T) -> IoResult<()> {
108 let payload = &payload as *const T as *const libc::c_void;
109 let ret = libc::setsockopt(fd, opt, val,
111 mem::size_of::<T>() as libc::socklen_t);
120 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121 val: libc::c_int) -> IoResult<T> {
123 let mut slot: T = mem::zeroed();
124 let mut len = mem::size_of::<T>() as libc::socklen_t;
125 let ret = c::getsockopt(fd, opt, val,
126 &mut slot as *mut _ as *mut _,
131 assert!(len as uint == mem::size_of::<T>());
138 pub fn last_error() -> IoError {
140 let code = unsafe { c::WSAGetLastError() as uint };
144 detail: Some(os::error_string(code)),
149 fn last_error() -> IoError {
153 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
154 #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
156 fn sockname(fd: sock_t,
157 f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
158 *mut libc::socklen_t) -> libc::c_int)
159 -> IoResult<rtio::SocketAddr>
161 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
162 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
164 let storage = &mut storage as *mut libc::sockaddr_storage;
166 storage as *mut libc::sockaddr,
167 &mut len as *mut libc::socklen_t);
169 return Err(last_error())
172 return sockaddr_to_addr(&storage, len as uint);
175 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
176 len: uint) -> IoResult<rtio::SocketAddr> {
177 match storage.ss_family as libc::c_int {
179 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
180 let storage: &libc::sockaddr_in = unsafe {
181 mem::transmute(storage)
183 let ip = (storage.sin_addr.s_addr as u32).to_be();
184 let a = (ip >> 24) as u8;
185 let b = (ip >> 16) as u8;
186 let c = (ip >> 8) as u8;
187 let d = (ip >> 0) as u8;
188 Ok(rtio::SocketAddr {
189 ip: rtio::Ipv4Addr(a, b, c, d),
190 port: ntohs(storage.sin_port),
194 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
195 let storage: &libc::sockaddr_in6 = unsafe {
196 mem::transmute(storage)
198 let a = ntohs(storage.sin6_addr.s6_addr[0]);
199 let b = ntohs(storage.sin6_addr.s6_addr[1]);
200 let c = ntohs(storage.sin6_addr.s6_addr[2]);
201 let d = ntohs(storage.sin6_addr.s6_addr[3]);
202 let e = ntohs(storage.sin6_addr.s6_addr[4]);
203 let f = ntohs(storage.sin6_addr.s6_addr[5]);
204 let g = ntohs(storage.sin6_addr.s6_addr[6]);
205 let h = ntohs(storage.sin6_addr.s6_addr[7]);
206 Ok(rtio::SocketAddr {
207 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
208 port: ntohs(storage.sin6_port),
212 #[cfg(unix)] use ERROR = libc::EINVAL;
213 #[cfg(windows)] use ERROR = libc::WSAEINVAL;
230 use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
231 static mut INITIALIZED: bool = false;
232 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
234 let _guard = LOCK.lock();
236 let mut data: c::WSADATA = mem::zeroed();
237 let ret = c::WSAStartup(0x202, // version 2.2
245 ////////////////////////////////////////////////////////////////////////////////
247 ////////////////////////////////////////////////////////////////////////////////
249 pub struct TcpStream {
258 // Unused on Linux, where this lock is not necessary.
260 lock: mutex::NativeMutex
263 pub struct Guard<'a> {
265 pub guard: mutex::LockGuard<'a>,
269 fn new(fd: sock_t) -> Inner {
270 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
275 pub fn connect(addr: rtio::SocketAddr,
276 timeout: Option<u64>) -> IoResult<TcpStream> {
277 let fd = try!(socket(addr, libc::SOCK_STREAM));
278 let ret = TcpStream::new(Inner::new(fd));
280 let (addr, len) = addr_to_sockaddr(addr);
281 let addrp = &addr as *const _ as *const libc::sockaddr;
282 let len = len as libc::socklen_t;
286 try!(util::connect_timeout(fd, addrp, len, timeout));
290 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
291 -1 => Err(last_error()),
298 fn new(inner: Inner) -> TcpStream {
300 inner: Arc::new(inner),
306 pub fn fd(&self) -> sock_t { self.inner.fd }
308 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
309 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
310 nodelay as libc::c_int)
313 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
314 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
315 seconds.is_some() as libc::c_int);
317 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
322 #[cfg(target_os = "macos")]
323 #[cfg(target_os = "ios")]
324 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
325 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
326 seconds as libc::c_int)
328 #[cfg(target_os = "freebsd")]
329 #[cfg(target_os = "dragonfly")]
330 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
331 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
332 seconds as libc::c_int)
334 #[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"),
335 not(target_os = "dragonfly"))]
336 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
340 #[cfg(target_os = "linux")]
341 fn lock_nonblocking(&self) {}
343 #[cfg(not(target_os = "linux"))]
344 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
347 guard: unsafe { self.inner.lock.lock() },
349 assert!(util::set_nonblocking(self.fd(), true).is_ok());
354 #[cfg(windows)] type wrlen = libc::c_int;
355 #[cfg(not(windows))] type wrlen = libc::size_t;
357 impl rtio::RtioTcpStream for TcpStream {
358 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
360 let dolock = || self.lock_nonblocking();
361 let doread = |nb| unsafe {
362 let flags = if nb {c::MSG_DONTWAIT} else {0};
364 buf.as_mut_ptr() as *mut libc::c_void,
366 flags) as libc::c_int
368 read(fd, self.read_deadline, dolock, doread)
371 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
373 let dolock = || self.lock_nonblocking();
374 let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
375 let flags = if nb {c::MSG_DONTWAIT} else {0};
377 buf as *mut libc::c_void,
381 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
386 fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
387 sockname(self.fd(), libc::getpeername)
389 fn control_congestion(&mut self) -> IoResult<()> {
390 self.set_nodelay(false)
392 fn nodelay(&mut self) -> IoResult<()> {
393 self.set_nodelay(true)
395 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
396 self.set_keepalive(Some(delay_in_seconds))
398 fn letdie(&mut self) -> IoResult<()> {
399 self.set_keepalive(None)
402 fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
404 inner: self.inner.clone(),
407 } as Box<rtio::RtioTcpStream + Send>
410 fn close_write(&mut self) -> IoResult<()> {
411 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
413 fn close_read(&mut self) -> IoResult<()> {
414 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
417 fn set_timeout(&mut self, timeout: Option<u64>) {
418 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
419 self.read_deadline = deadline;
420 self.write_deadline = deadline;
422 fn set_read_timeout(&mut self, timeout: Option<u64>) {
423 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
425 fn set_write_timeout(&mut self, timeout: Option<u64>) {
426 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
430 impl rtio::RtioSocket for TcpStream {
431 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
432 sockname(self.fd(), libc::getsockname)
436 impl Drop for Inner {
437 fn drop(&mut self) { unsafe { close(self.fd); } }
441 impl<'a> Drop for Guard<'a> {
443 assert!(util::set_nonblocking(self.fd, false).is_ok());
447 ////////////////////////////////////////////////////////////////////////////////
449 ////////////////////////////////////////////////////////////////////////////////
451 pub struct TcpListener {
456 pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
457 let fd = try!(socket(addr, libc::SOCK_STREAM));
458 let ret = TcpListener { inner: Inner::new(fd) };
460 let (addr, len) = addr_to_sockaddr(addr);
461 let addrp = &addr as *const _ as *const libc::sockaddr;
462 let len = len as libc::socklen_t;
464 // On platforms with Berkeley-derived sockets, this allows
465 // to quickly rebind a socket, without needing to wait for
466 // the OS to clean up the previous one.
468 try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
472 match unsafe { libc::bind(fd, addrp, len) } {
473 -1 => Err(last_error()),
478 pub fn fd(&self) -> sock_t { self.inner.fd }
480 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
481 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
482 -1 => Err(last_error()),
483 _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
488 impl rtio::RtioTcpListener for TcpListener {
489 fn listen(self: Box<TcpListener>)
490 -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
491 self.native_listen(128).map(|a| {
492 box a as Box<rtio::RtioTcpAcceptor + Send>
497 impl rtio::RtioSocket for TcpListener {
498 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
499 sockname(self.fd(), libc::getsockname)
503 pub struct TcpAcceptor {
504 listener: TcpListener,
509 pub fn fd(&self) -> sock_t { self.listener.fd() }
511 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
512 if self.deadline != 0 {
513 try!(util::await(self.fd(), Some(self.deadline), util::Readable));
516 let mut storage: libc::sockaddr_storage = mem::zeroed();
517 let storagep = &mut storage as *mut libc::sockaddr_storage;
518 let size = mem::size_of::<libc::sockaddr_storage>();
519 let mut size = size as libc::socklen_t;
521 libc::accept(self.fd(),
522 storagep as *mut libc::sockaddr,
523 &mut size as *mut libc::socklen_t) as libc::c_int
525 -1 => Err(last_error()),
526 fd => Ok(TcpStream::new(Inner::new(fd))),
532 impl rtio::RtioSocket for TcpAcceptor {
533 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
534 sockname(self.fd(), libc::getsockname)
538 impl rtio::RtioTcpAcceptor for TcpAcceptor {
539 fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
540 self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
543 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
544 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
545 fn set_timeout(&mut self, timeout: Option<u64>) {
546 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
550 ////////////////////////////////////////////////////////////////////////////////
552 ////////////////////////////////////////////////////////////////////////////////
554 pub struct UdpSocket {
561 pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
562 let fd = try!(socket(addr, libc::SOCK_DGRAM));
563 let ret = UdpSocket {
564 inner: Arc::new(Inner::new(fd)),
569 let (addr, len) = addr_to_sockaddr(addr);
570 let addrp = &addr as *const _ as *const libc::sockaddr;
571 let len = len as libc::socklen_t;
573 match unsafe { libc::bind(fd, addrp, len) } {
574 -1 => Err(last_error()),
579 pub fn fd(&self) -> sock_t { self.inner.fd }
581 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
582 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
586 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
587 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
591 pub fn set_membership(&mut self, addr: rtio::IpAddr,
592 opt: libc::c_int) -> IoResult<()> {
593 match ip_to_inaddr(addr) {
595 let mreq = libc::ip_mreq {
597 // interface == INADDR_ANY
598 imr_interface: libc::in_addr { s_addr: 0x0 },
600 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
603 let mreq = libc::ip6_mreq {
604 ipv6mr_multiaddr: addr,
607 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
612 #[cfg(target_os = "linux")]
613 fn lock_nonblocking(&self) {}
615 #[cfg(not(target_os = "linux"))]
616 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
619 guard: unsafe { self.inner.lock.lock() },
621 assert!(util::set_nonblocking(self.fd(), true).is_ok());
626 impl rtio::RtioSocket for UdpSocket {
627 fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
628 sockname(self.fd(), libc::getsockname)
632 #[cfg(windows)] type msglen_t = libc::c_int;
633 #[cfg(unix)] type msglen_t = libc::size_t;
635 impl rtio::RtioUdpSocket for UdpSocket {
636 fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
638 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
639 let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
640 let mut addrlen: libc::socklen_t =
641 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
643 let dolock = || self.lock_nonblocking();
644 let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
645 let flags = if nb {c::MSG_DONTWAIT} else {0};
647 buf.as_mut_ptr() as *mut libc::c_void,
648 buf.len() as msglen_t,
651 &mut addrlen) as libc::c_int
653 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
654 Ok((n as uint, addr))
658 fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
659 let (dst, dstlen) = addr_to_sockaddr(dst);
660 let dstp = &dst as *const _ as *const libc::sockaddr;
661 let dstlen = dstlen as libc::socklen_t;
664 let dolock = || self.lock_nonblocking();
665 let dowrite = |nb, buf: *const u8, len: uint| unsafe {
666 let flags = if nb {c::MSG_DONTWAIT} else {0};
668 buf as *const libc::c_void,
675 let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
677 Err(util::short_write(n, "couldn't send entire packet at once"))
683 fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
685 rtio::Ipv4Addr(..) => {
686 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
688 rtio::Ipv6Addr(..) => {
689 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
693 fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
695 rtio::Ipv4Addr(..) => {
696 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
698 rtio::Ipv6Addr(..) => {
699 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
704 fn loop_multicast_locally(&mut self) -> IoResult<()> {
705 self.set_multicast_loop(true)
707 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
708 self.set_multicast_loop(false)
711 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
712 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
715 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
716 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
719 fn hear_broadcasts(&mut self) -> IoResult<()> {
720 self.set_broadcast(true)
722 fn ignore_broadcasts(&mut self) -> IoResult<()> {
723 self.set_broadcast(false)
726 fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
728 inner: self.inner.clone(),
731 } as Box<rtio::RtioUdpSocket + Send>
734 fn set_timeout(&mut self, timeout: Option<u64>) {
735 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
736 self.read_deadline = deadline;
737 self.write_deadline = deadline;
739 fn set_read_timeout(&mut self, timeout: Option<u64>) {
740 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
742 fn set_write_timeout(&mut self, timeout: Option<u64>) {
743 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
747 ////////////////////////////////////////////////////////////////////////////////
750 // The read/write functions below are the helpers for reading/writing a socket
751 // with a possible deadline specified. This is generally viewed as a timed out
754 // From the application's perspective, timeouts apply to the I/O object, not to
755 // the underlying file descriptor (it's one timeout per object). This means that
756 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
758 // The next idea to implement timeouts would be to use nonblocking I/O. An
759 // invocation of select() would wait (with a timeout) for a socket to be ready.
760 // Once its ready, we can perform the operation. Note that the operation *must*
761 // be nonblocking, even though select() says the socket is ready. This is
762 // because some other thread could have come and stolen our data (handles can be
765 // To implement nonblocking I/O, the first option we have is to use the
766 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
767 // I/O objects, so this was initially viewed as unwise.
769 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
770 // send/recv, but the niftiness wears off once you realize it only works well on
771 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
772 // operation on linux (no flag fidding, no affecting other objects), but not on
775 // To work around this constraint on other platforms, we end up using the
776 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
777 // could cause other objects' blocking operations to suddenly become
778 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
779 // falls back to using the same code path as nonblocking operations, but with an
780 // infinite timeout (select + send/recv). This helps emulate blocking
781 // reads/writes despite the underlying descriptor being nonblocking, as well as
782 // optimizing the fast path of just hitting one syscall in the good case.
784 // As a final caveat, this implementation uses a mutex so only one thread is
785 // doing a nonblocking operation at at time. This is the operation that comes
786 // after the select() (at which point we think the socket is ready). This is
787 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
788 // expect (wouldn't want someone turning it on when it should be off!). All
789 // operations performed in the lock are *nonblocking* to avoid holding the mutex
792 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
793 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
794 // reads/writes are still blocking.
798 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
799 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
801 pub fn read<T>(fd: sock_t,
804 read: |bool| -> libc::c_int) -> IoResult<uint> {
807 ret = retry(|| read(false));
810 if deadline != 0 || (ret == -1 && util::wouldblock()) {
811 let deadline = match deadline {
816 // With a timeout, first we wait for the socket to become
817 // readable using select(), specifying the relevant timeout for
818 // our previously set deadline.
819 try!(util::await(fd, deadline, util::Readable));
821 // At this point, we're still within the timeout, and we've
822 // determined that the socket is readable (as returned by
823 // select). We must still read the socket in *nonblocking* mode
824 // because some other thread could come steal our data. If we
825 // fail to read some data, we retry (hence the outer loop) and
826 // wait for the socket to become readable again.
828 match retry(|| read(deadline.is_some())) {
829 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
830 -1 => return Err(last_error()),
831 n => { ret = n; break }
837 0 => Err(util::eof()),
838 n if n < 0 => Err(last_error()),
843 pub fn write<T>(fd: sock_t,
846 write_everything: bool,
848 write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
852 if write_everything {
853 ret = keep_going(buf, |inner, len| {
854 written = buf.len() - len;
855 write(false, inner, len)
859 write(false, buf.as_ptr(), buf.len()) as libc::c_int
861 if ret > 0 { written = ret as uint; }
865 if deadline != 0 || (ret == -1 && util::wouldblock()) {
866 let deadline = match deadline {
870 while written < buf.len() && (write_everything || written == 0) {
871 // As with read(), first wait for the socket to be ready for
872 // the I/O operation.
873 match util::await(fd, deadline, util::Writable) {
874 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
875 assert!(deadline.is_some());
876 return Err(util::short_write(written, "short write"))
878 Err(e) => return Err(e),
882 // Also as with read(), we use MSG_DONTWAIT to guard ourselves
883 // against unforeseen circumstances.
885 let ptr = buf.slice_from(written).as_ptr();
886 let len = buf.len() - written;
887 match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
888 -1 if util::wouldblock() => {}
889 -1 => return Err(last_error()),
890 n => { written += n as uint; }