]> git.lizzy.rs Git - rust.git/blobdiff - src/libnative/io/pipe_unix.rs
rollup merge of #17355 : gamazeps/issue17210
[rust.git] / src / libnative / io / pipe_unix.rs
index 075ca769d073e919a46f31415195cce75205e13d..c222907fa5b88afc62eb1d443a698d35c074cd9c 100644 (file)
 use std::rt::mutex;
 use std::rt::rtio;
 use std::rt::rtio::{IoResult, IoError};
+use std::sync::atomic;
 
 use super::retry;
 use super::net;
 use super::util;
 use super::c;
-use super::file::fd_t;
+use super::process;
+use super::file::{fd_t, FileDesc};
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -29,17 +31,18 @@ fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
     }
 }
 
-fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
+fn addr_to_sockaddr_un(addr: &CString,
+                       storage: &mut libc::sockaddr_storage)
+                       -> IoResult<libc::socklen_t> {
     // the sun_path length is limited to SUN_LEN (with null)
     assert!(mem::size_of::<libc::sockaddr_storage>() >=
             mem::size_of::<libc::sockaddr_un>());
-    let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
-    let s: &mut libc::sockaddr_un = unsafe { mem::transmute(&mut storage) };
+    let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
 
     let len = addr.len();
     if len > s.sun_path.len() - 1 {
-        #[cfg(unix)] use ERROR = libc::EINVAL;
-        #[cfg(windows)] use ERROR = libc::WSAEINVAL;
+        #[cfg(unix)] use libc::EINVAL as ERROR;
+        #[cfg(windows)] use libc::WSAEINVAL as ERROR;
         return Err(IoError {
             code: ERROR as uint,
             extra: 0,
@@ -47,13 +50,13 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
         })
     }
     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
-    for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
+    for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
         *slot = value;
     }
 
     // count the null terminator
     let len = mem::size_of::<libc::sa_family_t>() + len + 1;
-    return Ok((storage, len));
+    return Ok(len as libc::socklen_t);
 }
 
 struct Inner {
@@ -76,10 +79,10 @@ impl Drop for Inner {
 
 fn connect(addr: &CString, ty: libc::c_int,
            timeout: Option<u64>) -> IoResult<Inner> {
-    let (addr, len) = try!(addr_to_sockaddr_un(addr));
+    let mut storage = unsafe { mem::zeroed() };
+    let len = try!(addr_to_sockaddr_un(addr, &mut storage));
     let inner = Inner::new(try!(unix_socket(ty)));
-    let addrp = &addr as *const _ as *const libc::sockaddr;
-    let len = len as libc::socklen_t;
+    let addrp = &storage as *const _ as *const libc::sockaddr;
 
     match timeout {
         None => {
@@ -96,11 +99,12 @@ fn connect(addr: &CString, ty: libc::c_int,
 }
 
 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
-    let (addr, len) = try!(addr_to_sockaddr_un(addr));
+    let mut storage = unsafe { mem::zeroed() };
+    let len = try!(addr_to_sockaddr_un(addr, &mut storage));
     let inner = Inner::new(try!(unix_socket(ty)));
-    let addrp = &addr as *const _;
+    let addrp = &storage as *const _ as *const libc::sockaddr;
     match unsafe {
-        libc::bind(inner.fd, addrp as *const _, len as libc::socklen_t)
+        libc::bind(inner.fd, addrp, len)
     } {
         -1 => Err(super::last_error()),
         _  => Ok(inner)
@@ -223,7 +227,23 @@ fn fd(&self) -> fd_t { self.inner.fd }
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
+
+            #[cfg(unix)]
+            _ => {
+                let (reader, writer) = try!(process::pipe());
+                try!(util::set_nonblocking(reader.fd(), true));
+                try!(util::set_nonblocking(writer.fd(), true));
+                try!(util::set_nonblocking(self.fd(), true));
+                Ok(UnixAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        reader: reader,
+                        writer: writer,
+                        closed: atomic::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
         }
     }
 }
@@ -238,29 +258,45 @@ fn listen(self: Box<UnixListener>)
 }
 
 pub struct UnixAcceptor {
-    listener: UnixListener,
+    inner: Arc<AcceptorInner>,
     deadline: u64,
 }
 
+#[cfg(unix)]
+struct AcceptorInner {
+    listener: UnixListener,
+    reader: FileDesc,
+    writer: FileDesc,
+    closed: atomic::AtomicBool,
+}
+
 impl UnixAcceptor {
-    fn fd(&self) -> fd_t { self.listener.fd() }
+    fn fd(&self) -> fd_t { self.inner.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
-        if self.deadline != 0 {
-            try!(util::await(self.fd(), Some(self.deadline), util::Readable));
-        }
-        let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
-        let storagep = &mut storage as *mut libc::sockaddr_storage;
-        let size = mem::size_of::<libc::sockaddr_storage>();
-        let mut size = size as libc::socklen_t;
-        match retry(|| unsafe {
-            libc::accept(self.fd(),
-                         storagep as *mut libc::sockaddr,
-                         &mut size as *mut libc::socklen_t) as libc::c_int
-        }) {
-            -1 => Err(super::last_error()),
-            fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
+        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+        while !self.inner.closed.load(atomic::SeqCst) {
+            unsafe {
+                let mut storage: libc::sockaddr_storage = mem::zeroed();
+                let storagep = &mut storage as *mut libc::sockaddr_storage;
+                let size = mem::size_of::<libc::sockaddr_storage>();
+                let mut size = size as libc::socklen_t;
+                match retry(|| {
+                    libc::accept(self.fd(),
+                                 storagep as *mut libc::sockaddr,
+                                 &mut size as *mut libc::socklen_t) as libc::c_int
+                }) {
+                    -1 if util::wouldblock() => {}
+                    -1 => return Err(super::last_error()),
+                    fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
+                }
+            }
+            try!(util::await([self.fd(), self.inner.reader.fd()],
+                             deadline, util::Readable));
         }
+
+        Err(util::eof())
     }
 }
 
@@ -271,6 +307,24 @@ fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
     fn set_timeout(&mut self, timeout: Option<u64>) {
         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
+
+    fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
+        box UnixAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        } as Box<rtio::RtioUnixAcceptor + Send>
+    }
+
+    #[cfg(unix)]
+    fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.inner_write([0]) {
+            Ok(..) => Ok(()),
+            Err(..) if util::wouldblock() => Ok(()),
+            Err(e) => Err(e),
+        }
+    }
 }
 
 impl Drop for UnixListener {