1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
17 use std::unstable::mutex;
19 use super::{IoResult, retry, keep_going};
23 ////////////////////////////////////////////////////////////////////////////////
24 // sockaddr and misc bindings
25 ////////////////////////////////////////////////////////////////////////////////
27 #[cfg(windows)] pub type sock_t = libc::SOCKET;
28 #[cfg(unix)] pub type sock_t = super::file::fd_t;
30 pub fn htons(u: u16) -> u16 {
33 pub fn ntohs(u: u16) -> u16 {
38 InAddr(libc::in_addr),
39 In6Addr(libc::in6_addr),
42 fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
44 ip::Ipv4Addr(a, b, c, d) => {
45 let ip = (a as u32 << 24) |
49 InAddr(libc::in_addr {
50 s_addr: mem::from_be32(ip)
53 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
54 In6Addr(libc::in6_addr {
70 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
72 let storage: libc::sockaddr_storage = mem::zeroed();
73 let len = match ip_to_inaddr(addr.ip) {
75 let storage: *mut libc::sockaddr_in = mem::transmute(&storage);
76 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77 (*storage).sin_port = htons(addr.port);
78 (*storage).sin_addr = inaddr;
79 mem::size_of::<libc::sockaddr_in>()
82 let storage: *mut libc::sockaddr_in6 = mem::transmute(&storage);
83 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84 (*storage).sin6_port = htons(addr.port);
85 (*storage).sin6_addr = inaddr;
86 mem::size_of::<libc::sockaddr_in6>()
89 return (storage, len);
93 fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
95 let fam = match addr.ip {
96 ip::Ipv4Addr(..) => libc::AF_INET,
97 ip::Ipv6Addr(..) => libc::AF_INET6,
99 match libc::socket(fam, ty, 0) {
100 -1 => Err(super::last_error()),
106 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
107 payload: T) -> IoResult<()> {
109 let payload = &payload as *T as *libc::c_void;
110 let ret = libc::setsockopt(fd, opt, val,
112 mem::size_of::<T>() as libc::socklen_t);
121 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122 val: libc::c_int) -> IoResult<T> {
124 let mut slot: T = mem::zeroed();
125 let mut len = mem::size_of::<T>() as libc::socklen_t;
126 let ret = c::getsockopt(fd, opt, val,
127 &mut slot as *mut _ as *mut _,
132 assert!(len as uint == mem::size_of::<T>());
139 fn last_error() -> io::IoError {
140 io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
144 fn last_error() -> io::IoError {
148 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
149 #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
151 fn sockname(fd: sock_t,
152 f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
153 *mut libc::socklen_t) -> libc::c_int)
154 -> IoResult<ip::SocketAddr>
156 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
157 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
159 let storage = &mut storage as *mut libc::sockaddr_storage;
161 storage as *mut libc::sockaddr,
162 &mut len as *mut libc::socklen_t);
164 return Err(last_error())
167 return sockaddr_to_addr(&storage, len as uint);
170 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
171 len: uint) -> IoResult<ip::SocketAddr> {
172 match storage.ss_family as libc::c_int {
174 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
175 let storage: &libc::sockaddr_in = unsafe {
176 mem::transmute(storage)
178 let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
179 let a = (ip >> 24) as u8;
180 let b = (ip >> 16) as u8;
181 let c = (ip >> 8) as u8;
182 let d = (ip >> 0) as u8;
184 ip: ip::Ipv4Addr(a, b, c, d),
185 port: ntohs(storage.sin_port),
189 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
190 let storage: &libc::sockaddr_in6 = unsafe {
191 mem::transmute(storage)
193 let a = ntohs(storage.sin6_addr.s6_addr[0]);
194 let b = ntohs(storage.sin6_addr.s6_addr[1]);
195 let c = ntohs(storage.sin6_addr.s6_addr[2]);
196 let d = ntohs(storage.sin6_addr.s6_addr[3]);
197 let e = ntohs(storage.sin6_addr.s6_addr[4]);
198 let f = ntohs(storage.sin6_addr.s6_addr[5]);
199 let g = ntohs(storage.sin6_addr.s6_addr[6]);
200 let h = ntohs(storage.sin6_addr.s6_addr[7]);
202 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
203 port: ntohs(storage.sin6_port),
207 Err(io::standard_error(io::OtherIoError))
219 use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
220 static mut INITIALIZED: bool = false;
221 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
223 let _guard = LOCK.lock();
225 let mut data: c::WSADATA = mem::zeroed();
226 let ret = c::WSAStartup(0x202, // version 2.2
234 ////////////////////////////////////////////////////////////////////////////////
236 ////////////////////////////////////////////////////////////////////////////////
238 pub struct TcpStream {
246 lock: mutex::NativeMutex,
249 pub struct Guard<'a> {
251 pub guard: mutex::LockGuard<'a>,
255 fn new(fd: sock_t) -> Inner {
256 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
261 pub fn connect(addr: ip::SocketAddr,
262 timeout: Option<u64>) -> IoResult<TcpStream> {
263 let fd = try!(socket(addr, libc::SOCK_STREAM));
264 let ret = TcpStream::new(Inner::new(fd));
266 let (addr, len) = addr_to_sockaddr(addr);
267 let addrp = &addr as *_ as *libc::sockaddr;
268 let len = len as libc::socklen_t;
272 try!(util::connect_timeout(fd, addrp, len, timeout));
276 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
277 -1 => Err(last_error()),
284 fn new(inner: Inner) -> TcpStream {
286 inner: Arc::new(inner),
292 pub fn fd(&self) -> sock_t { self.inner.fd }
294 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
295 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
296 nodelay as libc::c_int)
299 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
300 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
301 seconds.is_some() as libc::c_int);
303 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
308 #[cfg(target_os = "macos")]
309 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
310 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
311 seconds as libc::c_int)
313 #[cfg(target_os = "freebsd")]
314 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
315 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
316 seconds as libc::c_int)
318 #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
319 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
323 #[cfg(target_os = "linux")]
324 fn lock_nonblocking(&self) {}
326 #[cfg(not(target_os = "linux"))]
327 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
330 guard: unsafe { self.inner.lock.lock() },
332 assert!(util::set_nonblocking(self.fd(), true).is_ok());
337 #[cfg(windows)] type wrlen = libc::c_int;
338 #[cfg(not(windows))] type wrlen = libc::size_t;
340 impl rtio::RtioTcpStream for TcpStream {
341 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
343 let dolock = || self.lock_nonblocking();
344 let doread = |nb| unsafe {
345 let flags = if nb {c::MSG_DONTWAIT} else {0};
347 buf.as_mut_ptr() as *mut libc::c_void,
349 flags) as libc::c_int
351 read(fd, self.read_deadline, dolock, doread)
354 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
356 let dolock = || self.lock_nonblocking();
357 let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
358 let flags = if nb {c::MSG_DONTWAIT} else {0};
360 buf as *mut libc::c_void,
364 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
369 fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
370 sockname(self.fd(), libc::getpeername)
372 fn control_congestion(&mut self) -> IoResult<()> {
373 self.set_nodelay(false)
375 fn nodelay(&mut self) -> IoResult<()> {
376 self.set_nodelay(true)
378 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
379 self.set_keepalive(Some(delay_in_seconds))
381 fn letdie(&mut self) -> IoResult<()> {
382 self.set_keepalive(None)
385 fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
387 inner: self.inner.clone(),
390 } as Box<rtio::RtioTcpStream:Send>
393 fn close_write(&mut self) -> IoResult<()> {
394 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
396 fn close_read(&mut self) -> IoResult<()> {
397 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
400 fn set_timeout(&mut self, timeout: Option<u64>) {
401 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
402 self.read_deadline = deadline;
403 self.write_deadline = deadline;
405 fn set_read_timeout(&mut self, timeout: Option<u64>) {
406 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
408 fn set_write_timeout(&mut self, timeout: Option<u64>) {
409 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
413 impl rtio::RtioSocket for TcpStream {
414 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
415 sockname(self.fd(), libc::getsockname)
419 impl Drop for Inner {
420 fn drop(&mut self) { unsafe { close(self.fd); } }
424 impl<'a> Drop for Guard<'a> {
426 assert!(util::set_nonblocking(self.fd, false).is_ok());
430 ////////////////////////////////////////////////////////////////////////////////
432 ////////////////////////////////////////////////////////////////////////////////
434 pub struct TcpListener {
439 pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
440 let fd = try!(socket(addr, libc::SOCK_STREAM));
441 let ret = TcpListener { inner: Inner::new(fd) };
443 let (addr, len) = addr_to_sockaddr(addr);
444 let addrp = &addr as *_ as *libc::sockaddr;
445 let len = len as libc::socklen_t;
447 // On platforms with Berkeley-derived sockets, this allows
448 // to quickly rebind a socket, without needing to wait for
449 // the OS to clean up the previous one.
451 try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
455 match unsafe { libc::bind(fd, addrp, len) } {
456 -1 => Err(last_error()),
461 pub fn fd(&self) -> sock_t { self.inner.fd }
463 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
464 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
465 -1 => Err(last_error()),
466 _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
471 impl rtio::RtioTcpListener for TcpListener {
472 fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
473 self.native_listen(128).map(|a| {
474 box a as Box<rtio::RtioTcpAcceptor:Send>
479 impl rtio::RtioSocket for TcpListener {
480 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
481 sockname(self.fd(), libc::getsockname)
485 pub struct TcpAcceptor {
486 listener: TcpListener,
491 pub fn fd(&self) -> sock_t { self.listener.fd() }
493 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
494 if self.deadline != 0 {
495 try!(util::await(self.fd(), Some(self.deadline), util::Readable));
498 let mut storage: libc::sockaddr_storage = mem::zeroed();
499 let storagep = &mut storage as *mut libc::sockaddr_storage;
500 let size = mem::size_of::<libc::sockaddr_storage>();
501 let mut size = size as libc::socklen_t;
503 libc::accept(self.fd(),
504 storagep as *mut libc::sockaddr,
505 &mut size as *mut libc::socklen_t) as libc::c_int
507 -1 => Err(last_error()),
508 fd => Ok(TcpStream::new(Inner::new(fd))),
514 impl rtio::RtioSocket for TcpAcceptor {
515 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
516 sockname(self.fd(), libc::getsockname)
520 impl rtio::RtioTcpAcceptor for TcpAcceptor {
521 fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
522 self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
525 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
526 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
527 fn set_timeout(&mut self, timeout: Option<u64>) {
528 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
532 ////////////////////////////////////////////////////////////////////////////////
534 ////////////////////////////////////////////////////////////////////////////////
536 pub struct UdpSocket {
543 pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
544 let fd = try!(socket(addr, libc::SOCK_DGRAM));
545 let ret = UdpSocket {
546 inner: Arc::new(Inner::new(fd)),
551 let (addr, len) = addr_to_sockaddr(addr);
552 let addrp = &addr as *_ as *libc::sockaddr;
553 let len = len as libc::socklen_t;
555 match unsafe { libc::bind(fd, addrp, len) } {
556 -1 => Err(last_error()),
561 pub fn fd(&self) -> sock_t { self.inner.fd }
563 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
564 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
568 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
569 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
573 pub fn set_membership(&mut self, addr: ip::IpAddr,
574 opt: libc::c_int) -> IoResult<()> {
575 match ip_to_inaddr(addr) {
577 let mreq = libc::ip_mreq {
579 // interface == INADDR_ANY
580 imr_interface: libc::in_addr { s_addr: 0x0 },
582 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
585 let mreq = libc::ip6_mreq {
586 ipv6mr_multiaddr: addr,
589 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
594 #[cfg(target_os = "linux")]
595 fn lock_nonblocking(&self) {}
597 #[cfg(not(target_os = "linux"))]
598 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
601 guard: unsafe { self.inner.lock.lock() },
603 assert!(util::set_nonblocking(self.fd(), true).is_ok());
608 impl rtio::RtioSocket for UdpSocket {
609 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
610 sockname(self.fd(), libc::getsockname)
614 #[cfg(windows)] type msglen_t = libc::c_int;
615 #[cfg(unix)] type msglen_t = libc::size_t;
617 impl rtio::RtioUdpSocket for UdpSocket {
618 fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
620 let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
621 let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
622 let mut addrlen: libc::socklen_t =
623 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
625 let dolock = || self.lock_nonblocking();
626 let doread = |nb| unsafe {
627 let flags = if nb {c::MSG_DONTWAIT} else {0};
629 buf.as_mut_ptr() as *mut libc::c_void,
630 buf.len() as msglen_t,
633 &mut addrlen) as libc::c_int
635 let n = try!(read(fd, self.read_deadline, dolock, doread));
636 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
637 Ok((n as uint, addr))
641 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
642 let (dst, dstlen) = addr_to_sockaddr(dst);
643 let dstp = &dst as *_ as *libc::sockaddr;
644 let dstlen = dstlen as libc::socklen_t;
647 let dolock = || self.lock_nonblocking();
648 let dowrite = |nb, buf: *u8, len: uint| unsafe {
649 let flags = if nb {c::MSG_DONTWAIT} else {0};
651 buf as *libc::c_void,
658 let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
661 kind: io::ShortWrite(n),
662 desc: "couldn't send entire packet at once",
670 fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
672 ip::Ipv4Addr(..) => {
673 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
675 ip::Ipv6Addr(..) => {
676 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
680 fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
682 ip::Ipv4Addr(..) => {
683 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
685 ip::Ipv6Addr(..) => {
686 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
691 fn loop_multicast_locally(&mut self) -> IoResult<()> {
692 self.set_multicast_loop(true)
694 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
695 self.set_multicast_loop(false)
698 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
699 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
702 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
703 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
706 fn hear_broadcasts(&mut self) -> IoResult<()> {
707 self.set_broadcast(true)
709 fn ignore_broadcasts(&mut self) -> IoResult<()> {
710 self.set_broadcast(false)
713 fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
715 inner: self.inner.clone(),
718 } as Box<rtio::RtioUdpSocket:Send>
721 fn set_timeout(&mut self, timeout: Option<u64>) {
722 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
723 self.read_deadline = deadline;
724 self.write_deadline = deadline;
726 fn set_read_timeout(&mut self, timeout: Option<u64>) {
727 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
729 fn set_write_timeout(&mut self, timeout: Option<u64>) {
730 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
734 ////////////////////////////////////////////////////////////////////////////////
737 // The read/write functions below are the helpers for reading/writing a socket
738 // with a possible deadline specified. This is generally viewed as a timed out
741 // From the application's perspective, timeouts apply to the I/O object, not to
742 // the underlying file descriptor (it's one timeout per object). This means that
743 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
745 // The next idea to implement timeouts would be to use nonblocking I/O. An
746 // invocation of select() would wait (with a timeout) for a socket to be ready.
747 // Once its ready, we can perform the operation. Note that the operation *must*
748 // be nonblocking, even though select() says the socket is ready. This is
749 // because some other thread could have come and stolen our data (handles can be
752 // To implement nonblocking I/O, the first option we have is to use the
753 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
754 // I/O objects, so this was initially viewed as unwise.
756 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
757 // send/recv, but the niftiness wears off once you realize it only works well on
758 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
759 // operation on linux (no flag fidding, no affecting other objects), but not on
762 // To work around this constraint on other platforms, we end up using the
763 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
764 // could cause other objects' blocking operations to suddenly become
765 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
766 // falls back to using the same code path as nonblocking operations, but with an
767 // infinite timeout (select + send/recv). This helps emulate blocking
768 // reads/writes despite the underlying descriptor being nonblocking, as well as
769 // optimizing the fast path of just hitting one syscall in the good case.
771 // As a final caveat, this implementation uses a mutex so only one thread is
772 // doing a nonblocking operation at at time. This is the operation that comes
773 // after the select() (at which point we think the socket is ready). This is
774 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
775 // expect (wouldn't want someone turning it on when it should be off!). All
776 // operations performed in the lock are *nonblocking* to avoid holding the mutex
779 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
780 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
781 // reads/writes are still blocking.
785 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
786 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
788 pub fn read<T>(fd: sock_t,
791 read: |bool| -> libc::c_int) -> IoResult<uint> {
794 ret = retry(|| read(false));
797 if deadline != 0 || (ret == -1 && util::wouldblock()) {
798 let deadline = match deadline {
803 // With a timeout, first we wait for the socket to become
804 // readable using select(), specifying the relevant timeout for
805 // our previously set deadline.
806 try!(util::await(fd, deadline, util::Readable));
808 // At this point, we're still within the timeout, and we've
809 // determined that the socket is readable (as returned by
810 // select). We must still read the socket in *nonblocking* mode
811 // because some other thread could come steal our data. If we
812 // fail to read some data, we retry (hence the outer loop) and
813 // wait for the socket to become readable again.
815 match retry(|| read(deadline.is_some())) {
816 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
817 -1 => return Err(last_error()),
818 n => { ret = n; break }
824 0 => Err(io::standard_error(io::EndOfFile)),
825 n if n < 0 => Err(last_error()),
830 pub fn write<T>(fd: sock_t,
833 write_everything: bool,
835 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
839 if write_everything {
840 ret = keep_going(buf, |inner, len| {
841 written = buf.len() - len;
842 write(false, inner, len)
846 write(false, buf.as_ptr(), buf.len()) as libc::c_int
848 if ret > 0 { written = ret as uint; }
852 if deadline != 0 || (ret == -1 && util::wouldblock()) {
853 let deadline = match deadline {
857 while written < buf.len() && (write_everything || written == 0) {
858 // As with read(), first wait for the socket to be ready for
859 // the I/O operation.
860 match util::await(fd, deadline, util::Writable) {
861 Err(ref e) if e.kind == io::TimedOut && written > 0 => {
862 assert!(deadline.is_some());
863 return Err(io::IoError {
864 kind: io::ShortWrite(written),
869 Err(e) => return Err(e),
873 // Also as with read(), we use MSG_DONTWAIT to guard ourselves
874 // against unforseen circumstances.
876 let ptr = buf.slice_from(written).as_ptr();
877 let len = buf.len() - written;
878 match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
879 -1 if util::wouldblock() => {}
880 -1 => return Err(last_error()),
881 n => { written += n as uint; }