1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 use sync::{atomic, Mutex};
16 use io::{mod, IoResult, IoError};
19 use sys::{mod, timer, retry, c, set_nonblocking, wouldblock};
20 use sys::fs::{fd_t, FileDesc};
21 use sys_common::net::*;
22 use sys_common::net::SocketStatus::*;
23 use sys_common::{eof, mkerr_libc};
25 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
26 match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
27 -1 => Err(super::last_error()),
32 fn addr_to_sockaddr_un(addr: &CString,
33 storage: &mut libc::sockaddr_storage)
34 -> IoResult<libc::socklen_t> {
35 // the sun_path length is limited to SUN_LEN (with null)
36 assert!(mem::size_of::<libc::sockaddr_storage>() >=
37 mem::size_of::<libc::sockaddr_un>());
38 let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
41 if len > s.sun_path.len() - 1 {
43 kind: io::InvalidInput,
44 desc: "invalid argument: path must be smaller than SUN_LEN",
48 s.sun_family = libc::AF_UNIX as libc::sa_family_t;
49 for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
53 // count the null terminator
54 let len = mem::size_of::<libc::sa_family_t>() + len + 1;
55 return Ok(len as libc::socklen_t);
61 // Unused on Linux, where this lock is not necessary.
67 fn new(fd: fd_t) -> Inner {
68 Inner { fd: fd, lock: Mutex::new(()) }
73 fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
76 fn connect(addr: &CString, ty: libc::c_int,
77 timeout: Option<u64>) -> IoResult<Inner> {
78 let mut storage = unsafe { mem::zeroed() };
79 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
80 let inner = Inner::new(try!(unix_socket(ty)));
81 let addrp = &storage as *const _ as *const libc::sockaddr;
85 match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
86 -1 => Err(super::last_error()),
91 try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
97 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
98 let mut storage = unsafe { mem::zeroed() };
99 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
100 let inner = Inner::new(try!(unix_socket(ty)));
101 let addrp = &storage as *const _ as *const libc::sockaddr;
103 libc::bind(inner.fd, addrp, len)
105 -1 => Err(super::last_error()),
110 ////////////////////////////////////////////////////////////////////////////////
112 ////////////////////////////////////////////////////////////////////////////////
114 pub struct UnixStream {
120 unsafe impl Send for UnixStream {}
121 unsafe impl Sync for UnixStream {}
124 pub fn connect(addr: &CString,
125 timeout: Option<u64>) -> IoResult<UnixStream> {
126 connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
127 UnixStream::new(Arc::new(inner))
131 fn new(inner: Arc<Inner>) -> UnixStream {
139 pub fn fd(&self) -> fd_t { self.inner.fd }
141 #[cfg(target_os = "linux")]
142 fn lock_nonblocking(&self) {}
144 #[cfg(not(target_os = "linux"))]
145 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
148 guard: self.inner.lock.lock().unwrap(),
150 assert!(set_nonblocking(self.fd(), true).is_ok());
154 pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
156 let dolock = |&:| self.lock_nonblocking();
157 let doread = |&mut: nb| unsafe {
158 let flags = if nb {c::MSG_DONTWAIT} else {0};
160 buf.as_mut_ptr() as *mut libc::c_void,
161 buf.len() as libc::size_t,
162 flags) as libc::c_int
164 read(fd, self.read_deadline, dolock, doread)
167 pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
169 let dolock = |&: | self.lock_nonblocking();
170 let dowrite = |&: nb: bool, buf: *const u8, len: uint| unsafe {
171 let flags = if nb {c::MSG_DONTWAIT} else {0};
177 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
183 pub fn close_write(&mut self) -> IoResult<()> {
184 mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
187 pub fn close_read(&mut self) -> IoResult<()> {
188 mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
191 pub fn set_timeout(&mut self, timeout: Option<u64>) {
192 let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
193 self.read_deadline = deadline;
194 self.write_deadline = deadline;
197 pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
198 self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
201 pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
202 self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
206 impl Clone for UnixStream {
207 fn clone(&self) -> UnixStream {
208 UnixStream::new(self.inner.clone())
212 ////////////////////////////////////////////////////////////////////////////////
214 ////////////////////////////////////////////////////////////////////////////////
216 pub struct UnixListener {
221 unsafe impl Send for UnixListener {}
222 unsafe impl Sync for UnixListener {}
225 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
226 bind(addr, libc::SOCK_STREAM).map(|fd| {
227 UnixListener { inner: fd, path: addr.clone() }
231 pub fn fd(&self) -> fd_t { self.inner.fd }
233 pub fn listen(self) -> IoResult<UnixAcceptor> {
234 match unsafe { libc::listen(self.fd(), 128) } {
235 -1 => Err(super::last_error()),
238 let (reader, writer) = try!(unsafe { sys::os::pipe() });
239 try!(set_nonblocking(reader.fd(), true));
240 try!(set_nonblocking(writer.fd(), true));
241 try!(set_nonblocking(self.fd(), true));
243 inner: Arc::new(AcceptorInner {
247 closed: atomic::AtomicBool::new(false),
256 pub struct UnixAcceptor {
257 inner: Arc<AcceptorInner>,
261 struct AcceptorInner {
262 listener: UnixListener,
265 closed: atomic::AtomicBool,
268 unsafe impl Send for AcceptorInner {}
269 unsafe impl Sync for AcceptorInner {}
272 pub fn fd(&self) -> fd_t { self.inner.listener.fd() }
274 pub fn accept(&mut self) -> IoResult<UnixStream> {
275 let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
277 while !self.inner.closed.load(atomic::SeqCst) {
279 let mut storage: libc::sockaddr_storage = mem::zeroed();
280 let storagep = &mut storage as *mut libc::sockaddr_storage;
281 let size = mem::size_of::<libc::sockaddr_storage>();
282 let mut size = size as libc::socklen_t;
284 libc::accept(self.fd(),
285 storagep as *mut libc::sockaddr,
286 &mut size as *mut libc::socklen_t) as libc::c_int
288 -1 if wouldblock() => {}
289 -1 => return Err(super::last_error()),
290 fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
293 try!(await(&[self.fd(), self.inner.reader.fd()],
294 deadline, Readable));
300 pub fn set_timeout(&mut self, timeout: Option<u64>) {
301 self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
304 pub fn close_accept(&mut self) -> IoResult<()> {
305 self.inner.closed.store(true, atomic::SeqCst);
306 let fd = FileDesc::new(self.inner.writer.fd(), false);
307 match fd.write(&[0]) {
309 Err(..) if wouldblock() => Ok(()),
315 impl Clone for UnixAcceptor {
316 fn clone(&self) -> UnixAcceptor {
317 UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
321 impl Drop for UnixListener {
323 // Unlink the path to the socket to ensure that it doesn't linger. We're
324 // careful to unlink the path before we close the file descriptor to
325 // prevent races where we unlink someone else's path.
327 let _ = libc::unlink(self.path.as_ptr());