1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use std::c_str::CString;
17 use std::rt::rtio::{IoResult, IoError};
18 use std::sync::atomic;
25 use super::file::{fd_t, FileDesc};
27 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
28 match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
29 -1 => Err(super::last_error()),
34 fn addr_to_sockaddr_un(addr: &CString,
35 storage: &mut libc::sockaddr_storage)
36 -> IoResult<libc::socklen_t> {
37 // the sun_path length is limited to SUN_LEN (with null)
38 assert!(mem::size_of::<libc::sockaddr_storage>() >=
39 mem::size_of::<libc::sockaddr_un>());
40 let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
43 if len > s.sun_path.len() - 1 {
44 #[cfg(unix)] use libc::EINVAL as ERROR;
45 #[cfg(windows)] use libc::WSAEINVAL as ERROR;
49 detail: Some("path must be smaller than SUN_LEN".to_string()),
52 s.sun_family = libc::AF_UNIX as libc::sa_family_t;
53 for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
57 // count the null terminator
58 let len = mem::size_of::<libc::sa_family_t>() + len + 1;
59 return Ok(len as libc::socklen_t);
65 // Unused on Linux, where this lock is not necessary.
67 lock: mutex::NativeMutex
71 fn new(fd: fd_t) -> Inner {
72 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
77 fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
80 fn connect(addr: &CString, ty: libc::c_int,
81 timeout: Option<u64>) -> IoResult<Inner> {
82 let mut storage = unsafe { mem::zeroed() };
83 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
84 let inner = Inner::new(try!(unix_socket(ty)));
85 let addrp = &storage as *const _ as *const libc::sockaddr;
89 match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
90 -1 => Err(super::last_error()),
95 try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
101 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
102 let mut storage = unsafe { mem::zeroed() };
103 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
104 let inner = Inner::new(try!(unix_socket(ty)));
105 let addrp = &storage as *const _ as *const libc::sockaddr;
107 libc::bind(inner.fd, addrp, len)
109 -1 => Err(super::last_error()),
114 ////////////////////////////////////////////////////////////////////////////////
116 ////////////////////////////////////////////////////////////////////////////////
118 pub struct UnixStream {
125 pub fn connect(addr: &CString,
126 timeout: Option<u64>) -> IoResult<UnixStream> {
127 connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
128 UnixStream::new(Arc::new(inner))
132 fn new(inner: Arc<Inner>) -> UnixStream {
140 fn fd(&self) -> fd_t { self.inner.fd }
142 #[cfg(target_os = "linux")]
143 fn lock_nonblocking(&self) {}
145 #[cfg(not(target_os = "linux"))]
146 fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
147 let ret = net::Guard {
149 guard: unsafe { self.inner.lock.lock() },
151 assert!(util::set_nonblocking(self.fd(), true).is_ok());
156 impl rtio::RtioPipe for UnixStream {
157 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
159 let dolock = || self.lock_nonblocking();
160 let doread = |nb| unsafe {
161 let flags = if nb {c::MSG_DONTWAIT} else {0};
163 buf.as_mut_ptr() as *mut libc::c_void,
164 buf.len() as libc::size_t,
165 flags) as libc::c_int
167 net::read(fd, self.read_deadline, dolock, doread)
170 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
172 let dolock = || self.lock_nonblocking();
173 let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
174 let flags = if nb {c::MSG_DONTWAIT} else {0};
176 buf as *mut libc::c_void,
180 match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
186 fn clone(&self) -> Box<rtio::RtioPipe + Send> {
187 box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send>
190 fn close_write(&mut self) -> IoResult<()> {
191 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
193 fn close_read(&mut self) -> IoResult<()> {
194 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
196 fn set_timeout(&mut self, timeout: Option<u64>) {
197 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
198 self.read_deadline = deadline;
199 self.write_deadline = deadline;
201 fn set_read_timeout(&mut self, timeout: Option<u64>) {
202 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
204 fn set_write_timeout(&mut self, timeout: Option<u64>) {
205 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
209 ////////////////////////////////////////////////////////////////////////////////
211 ////////////////////////////////////////////////////////////////////////////////
213 pub struct UnixListener {
219 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
220 bind(addr, libc::SOCK_STREAM).map(|fd| {
221 UnixListener { inner: fd, path: addr.clone() }
225 fn fd(&self) -> fd_t { self.inner.fd }
227 pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
228 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
229 -1 => Err(super::last_error()),
233 let (reader, writer) = try!(process::pipe());
234 try!(util::set_nonblocking(reader.fd(), true));
235 try!(util::set_nonblocking(writer.fd(), true));
236 try!(util::set_nonblocking(self.fd(), true));
238 inner: Arc::new(AcceptorInner {
242 closed: atomic::AtomicBool::new(false),
251 impl rtio::RtioUnixListener for UnixListener {
252 fn listen(self: Box<UnixListener>)
253 -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
254 self.native_listen(128).map(|a| {
255 box a as Box<rtio::RtioUnixAcceptor + Send>
260 pub struct UnixAcceptor {
261 inner: Arc<AcceptorInner>,
266 struct AcceptorInner {
267 listener: UnixListener,
270 closed: atomic::AtomicBool,
274 fn fd(&self) -> fd_t { self.inner.listener.fd() }
276 pub fn native_accept(&mut self) -> IoResult<UnixStream> {
277 let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
279 while !self.inner.closed.load(atomic::SeqCst) {
281 let mut storage: libc::sockaddr_storage = mem::zeroed();
282 let storagep = &mut storage as *mut libc::sockaddr_storage;
283 let size = mem::size_of::<libc::sockaddr_storage>();
284 let mut size = size as libc::socklen_t;
286 libc::accept(self.fd(),
287 storagep as *mut libc::sockaddr,
288 &mut size as *mut libc::socklen_t) as libc::c_int
290 -1 if util::wouldblock() => {}
291 -1 => return Err(super::last_error()),
292 fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
295 try!(util::await([self.fd(), self.inner.reader.fd()],
296 deadline, util::Readable));
303 impl rtio::RtioUnixAcceptor for UnixAcceptor {
304 fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
305 self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
307 fn set_timeout(&mut self, timeout: Option<u64>) {
308 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
311 fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
313 inner: self.inner.clone(),
315 } as Box<rtio::RtioUnixAcceptor + Send>
319 fn close_accept(&mut self) -> IoResult<()> {
320 self.inner.closed.store(true, atomic::SeqCst);
321 let mut fd = FileDesc::new(self.inner.writer.fd(), false);
322 match fd.inner_write([0]) {
324 Err(..) if util::wouldblock() => Ok(()),
330 impl Drop for UnixListener {
332 // Unlink the path to the socket to ensure that it doesn't linger. We're
333 // careful to unlink the path before we close the file descriptor to
334 // prevent races where we unlink someone else's path.
336 let _ = libc::unlink(self.path.as_ptr());