use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
+use std::sync::atomic;
use super::retry;
use super::net;
use super::util;
use super::c;
-use super::file::fd_t;
+use super::process;
+use super::file::{fd_t, FileDesc};
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
}
}
-fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
+fn addr_to_sockaddr_un(addr: &CString,
+ storage: &mut libc::sockaddr_storage)
+ -> IoResult<libc::socklen_t> {
// the sun_path length is limited to SUN_LEN (with null)
assert!(mem::size_of::<libc::sockaddr_storage>() >=
mem::size_of::<libc::sockaddr_un>());
- let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
- let s: &mut libc::sockaddr_un = unsafe { mem::transmute(&mut storage) };
+ let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
let len = addr.len();
if len > s.sun_path.len() - 1 {
- #[cfg(unix)] use ERROR = libc::EINVAL;
- #[cfg(windows)] use ERROR = libc::WSAEINVAL;
+ #[cfg(unix)] use libc::EINVAL as ERROR;
+ #[cfg(windows)] use libc::WSAEINVAL as ERROR;
return Err(IoError {
code: ERROR as uint,
extra: 0,
})
}
s.sun_family = libc::AF_UNIX as libc::sa_family_t;
- for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
+ for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
*slot = value;
}
// count the null terminator
let len = mem::size_of::<libc::sa_family_t>() + len + 1;
- return Ok((storage, len));
+ return Ok(len as libc::socklen_t);
}
struct Inner {
fn connect(addr: &CString, ty: libc::c_int,
timeout: Option<u64>) -> IoResult<Inner> {
- let (addr, len) = try!(addr_to_sockaddr_un(addr));
+ let mut storage = unsafe { mem::zeroed() };
+ let len = try!(addr_to_sockaddr_un(addr, &mut storage));
let inner = Inner::new(try!(unix_socket(ty)));
- let addrp = &addr as *const _ as *const libc::sockaddr;
- let len = len as libc::socklen_t;
+ let addrp = &storage as *const _ as *const libc::sockaddr;
match timeout {
None => {
}
fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
- let (addr, len) = try!(addr_to_sockaddr_un(addr));
+ let mut storage = unsafe { mem::zeroed() };
+ let len = try!(addr_to_sockaddr_un(addr, &mut storage));
let inner = Inner::new(try!(unix_socket(ty)));
- let addrp = &addr as *const _;
+ let addrp = &storage as *const _ as *const libc::sockaddr;
match unsafe {
- libc::bind(inner.fd, addrp as *const _, len as libc::socklen_t)
+ libc::bind(inner.fd, addrp, len)
} {
-1 => Err(super::last_error()),
_ => Ok(inner)
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
- _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
+
+ #[cfg(unix)]
+ _ => {
+ let (reader, writer) = try!(process::pipe());
+ try!(util::set_nonblocking(reader.fd(), true));
+ try!(util::set_nonblocking(writer.fd(), true));
+ try!(util::set_nonblocking(self.fd(), true));
+ Ok(UnixAcceptor {
+ inner: Arc::new(AcceptorInner {
+ listener: self,
+ reader: reader,
+ writer: writer,
+ closed: atomic::AtomicBool::new(false),
+ }),
+ deadline: 0,
+ })
+ }
}
}
}
}
pub struct UnixAcceptor {
- listener: UnixListener,
+ inner: Arc<AcceptorInner>,
deadline: u64,
}
+#[cfg(unix)]
+struct AcceptorInner {
+ listener: UnixListener,
+ reader: FileDesc,
+ writer: FileDesc,
+ closed: atomic::AtomicBool,
+}
+
impl UnixAcceptor {
- fn fd(&self) -> fd_t { self.listener.fd() }
+ fn fd(&self) -> fd_t { self.inner.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
- if self.deadline != 0 {
- try!(util::await(self.fd(), Some(self.deadline), util::Readable));
- }
- let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
- let storagep = &mut storage as *mut libc::sockaddr_storage;
- let size = mem::size_of::<libc::sockaddr_storage>();
- let mut size = size as libc::socklen_t;
- match retry(|| unsafe {
- libc::accept(self.fd(),
- storagep as *mut libc::sockaddr,
- &mut size as *mut libc::socklen_t) as libc::c_int
- }) {
- -1 => Err(super::last_error()),
- fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
+ let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+ while !self.inner.closed.load(atomic::SeqCst) {
+ unsafe {
+ let mut storage: libc::sockaddr_storage = mem::zeroed();
+ let storagep = &mut storage as *mut libc::sockaddr_storage;
+ let size = mem::size_of::<libc::sockaddr_storage>();
+ let mut size = size as libc::socklen_t;
+ match retry(|| {
+ libc::accept(self.fd(),
+ storagep as *mut libc::sockaddr,
+ &mut size as *mut libc::socklen_t) as libc::c_int
+ }) {
+ -1 if util::wouldblock() => {}
+ -1 => return Err(super::last_error()),
+ fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
+ }
+ }
+ try!(util::await([self.fd(), self.inner.reader.fd()],
+ deadline, util::Readable));
}
+
+ Err(util::eof())
}
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
+
+ fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
+ box UnixAcceptor {
+ inner: self.inner.clone(),
+ deadline: 0,
+ } as Box<rtio::RtioUnixAcceptor + Send>
+ }
+
+ #[cfg(unix)]
+ fn close_accept(&mut self) -> IoResult<()> {
+ self.inner.closed.store(true, atomic::SeqCst);
+ let mut fd = FileDesc::new(self.inner.writer.fd(), false);
+ match fd.inner_write([0]) {
+ Ok(..) => Ok(()),
+ Err(..) if util::wouldblock() => Ok(()),
+ Err(e) => Err(e),
+ }
+ }
}
impl Drop for UnixListener {