]> git.lizzy.rs Git - rust.git/blobdiff - src/libnative/io/pipe_unix.rs
rollup merge of #17355 : gamazeps/issue17210
[rust.git] / src / libnative / io / pipe_unix.rs
index 895b8b5929c96e69597b64ad130b82e0ae209969..c222907fa5b88afc62eb1d443a698d35c074cd9c 100644 (file)
 use std::rt::mutex;
 use std::rt::rtio;
 use std::rt::rtio::{IoResult, IoError};
+use std::sync::atomic;
 
 use super::retry;
 use super::net;
 use super::util;
 use super::c;
-use super::file::fd_t;
+use super::process;
+use super::file::{fd_t, FileDesc};
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -48,7 +50,7 @@ fn addr_to_sockaddr_un(addr: &CString,
         })
     }
     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
-    for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
+    for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
         *slot = value;
     }
 
@@ -225,7 +227,23 @@ fn fd(&self) -> fd_t { self.inner.fd }
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
+
+            #[cfg(unix)]
+            _ => {
+                let (reader, writer) = try!(process::pipe());
+                try!(util::set_nonblocking(reader.fd(), true));
+                try!(util::set_nonblocking(writer.fd(), true));
+                try!(util::set_nonblocking(self.fd(), true));
+                Ok(UnixAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        reader: reader,
+                        writer: writer,
+                        closed: atomic::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
         }
     }
 }
@@ -240,29 +258,45 @@ fn listen(self: Box<UnixListener>)
 }
 
 pub struct UnixAcceptor {
-    listener: UnixListener,
+    inner: Arc<AcceptorInner>,
     deadline: u64,
 }
 
+#[cfg(unix)]
+struct AcceptorInner {
+    listener: UnixListener,
+    reader: FileDesc,
+    writer: FileDesc,
+    closed: atomic::AtomicBool,
+}
+
 impl UnixAcceptor {
-    fn fd(&self) -> fd_t { self.listener.fd() }
+    fn fd(&self) -> fd_t { self.inner.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
-        if self.deadline != 0 {
-            try!(util::await(self.fd(), Some(self.deadline), util::Readable));
-        }
-        let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
-        let storagep = &mut storage as *mut libc::sockaddr_storage;
-        let size = mem::size_of::<libc::sockaddr_storage>();
-        let mut size = size as libc::socklen_t;
-        match retry(|| unsafe {
-            libc::accept(self.fd(),
-                         storagep as *mut libc::sockaddr,
-                         &mut size as *mut libc::socklen_t) as libc::c_int
-        }) {
-            -1 => Err(super::last_error()),
-            fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
+        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+        while !self.inner.closed.load(atomic::SeqCst) {
+            unsafe {
+                let mut storage: libc::sockaddr_storage = mem::zeroed();
+                let storagep = &mut storage as *mut libc::sockaddr_storage;
+                let size = mem::size_of::<libc::sockaddr_storage>();
+                let mut size = size as libc::socklen_t;
+                match retry(|| {
+                    libc::accept(self.fd(),
+                                 storagep as *mut libc::sockaddr,
+                                 &mut size as *mut libc::socklen_t) as libc::c_int
+                }) {
+                    -1 if util::wouldblock() => {}
+                    -1 => return Err(super::last_error()),
+                    fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
+                }
+            }
+            try!(util::await([self.fd(), self.inner.reader.fd()],
+                             deadline, util::Readable));
         }
+
+        Err(util::eof())
     }
 }
 
@@ -273,6 +307,24 @@ fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
     fn set_timeout(&mut self, timeout: Option<u64>) {
         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
+
+    fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
+        box UnixAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        } as Box<rtio::RtioUnixAcceptor + Send>
+    }
+
+    #[cfg(unix)]
+    fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.inner_write([0]) {
+            Ok(..) => Ok(()),
+            Err(..) if util::wouldblock() => Ok(()),
+            Err(e) => Err(e),
+        }
+    }
 }
 
 impl Drop for UnixListener {