]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/pipe_unix.rs
rollup merge of #17355 : gamazeps/issue17210
[rust.git] / src / libnative / io / pipe_unix.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use std::c_str::CString;
14 use std::mem;
15 use std::rt::mutex;
16 use std::rt::rtio;
17 use std::rt::rtio::{IoResult, IoError};
18 use std::sync::atomic;
19
20 use super::retry;
21 use super::net;
22 use super::util;
23 use super::c;
24 use super::process;
25 use super::file::{fd_t, FileDesc};
26
27 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
28     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
29         -1 => Err(super::last_error()),
30         fd => Ok(fd)
31     }
32 }
33
34 fn addr_to_sockaddr_un(addr: &CString,
35                        storage: &mut libc::sockaddr_storage)
36                        -> IoResult<libc::socklen_t> {
37     // the sun_path length is limited to SUN_LEN (with null)
38     assert!(mem::size_of::<libc::sockaddr_storage>() >=
39             mem::size_of::<libc::sockaddr_un>());
40     let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
41
42     let len = addr.len();
43     if len > s.sun_path.len() - 1 {
44         #[cfg(unix)] use libc::EINVAL as ERROR;
45         #[cfg(windows)] use libc::WSAEINVAL as ERROR;
46         return Err(IoError {
47             code: ERROR as uint,
48             extra: 0,
49             detail: Some("path must be smaller than SUN_LEN".to_string()),
50         })
51     }
52     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
53     for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
54         *slot = value;
55     }
56
57     // count the null terminator
58     let len = mem::size_of::<libc::sa_family_t>() + len + 1;
59     return Ok(len as libc::socklen_t);
60 }
61
62 struct Inner {
63     fd: fd_t,
64
65     // Unused on Linux, where this lock is not necessary.
66     #[allow(dead_code)]
67     lock: mutex::NativeMutex
68 }
69
70 impl Inner {
71     fn new(fd: fd_t) -> Inner {
72         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
73     }
74 }
75
76 impl Drop for Inner {
77     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
78 }
79
80 fn connect(addr: &CString, ty: libc::c_int,
81            timeout: Option<u64>) -> IoResult<Inner> {
82     let mut storage = unsafe { mem::zeroed() };
83     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
84     let inner = Inner::new(try!(unix_socket(ty)));
85     let addrp = &storage as *const _ as *const libc::sockaddr;
86
87     match timeout {
88         None => {
89             match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
90                 -1 => Err(super::last_error()),
91                 _  => Ok(inner)
92             }
93         }
94         Some(timeout_ms) => {
95             try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
96             Ok(inner)
97         }
98     }
99 }
100
101 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
102     let mut storage = unsafe { mem::zeroed() };
103     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
104     let inner = Inner::new(try!(unix_socket(ty)));
105     let addrp = &storage as *const _ as *const libc::sockaddr;
106     match unsafe {
107         libc::bind(inner.fd, addrp, len)
108     } {
109         -1 => Err(super::last_error()),
110         _  => Ok(inner)
111     }
112 }
113
114 ////////////////////////////////////////////////////////////////////////////////
115 // Unix Streams
116 ////////////////////////////////////////////////////////////////////////////////
117
118 pub struct UnixStream {
119     inner: Arc<Inner>,
120     read_deadline: u64,
121     write_deadline: u64,
122 }
123
124 impl UnixStream {
125     pub fn connect(addr: &CString,
126                    timeout: Option<u64>) -> IoResult<UnixStream> {
127         connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
128             UnixStream::new(Arc::new(inner))
129         })
130     }
131
132     fn new(inner: Arc<Inner>) -> UnixStream {
133         UnixStream {
134             inner: inner,
135             read_deadline: 0,
136             write_deadline: 0,
137         }
138     }
139
140     fn fd(&self) -> fd_t { self.inner.fd }
141
142     #[cfg(target_os = "linux")]
143     fn lock_nonblocking(&self) {}
144
145     #[cfg(not(target_os = "linux"))]
146     fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
147         let ret = net::Guard {
148             fd: self.fd(),
149             guard: unsafe { self.inner.lock.lock() },
150         };
151         assert!(util::set_nonblocking(self.fd(), true).is_ok());
152         ret
153     }
154 }
155
156 impl rtio::RtioPipe for UnixStream {
157     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
158         let fd = self.fd();
159         let dolock = || self.lock_nonblocking();
160         let doread = |nb| unsafe {
161             let flags = if nb {c::MSG_DONTWAIT} else {0};
162             libc::recv(fd,
163                        buf.as_mut_ptr() as *mut libc::c_void,
164                        buf.len() as libc::size_t,
165                        flags) as libc::c_int
166         };
167         net::read(fd, self.read_deadline, dolock, doread)
168     }
169
170     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
171         let fd = self.fd();
172         let dolock = || self.lock_nonblocking();
173         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
174             let flags = if nb {c::MSG_DONTWAIT} else {0};
175             libc::send(fd,
176                        buf as *mut libc::c_void,
177                        len as libc::size_t,
178                        flags) as i64
179         };
180         match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
181             Ok(_) => Ok(()),
182             Err(e) => Err(e)
183         }
184     }
185
186     fn clone(&self) -> Box<rtio::RtioPipe + Send> {
187         box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send>
188     }
189
190     fn close_write(&mut self) -> IoResult<()> {
191         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
192     }
193     fn close_read(&mut self) -> IoResult<()> {
194         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
195     }
196     fn set_timeout(&mut self, timeout: Option<u64>) {
197         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
198         self.read_deadline = deadline;
199         self.write_deadline = deadline;
200     }
201     fn set_read_timeout(&mut self, timeout: Option<u64>) {
202         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
203     }
204     fn set_write_timeout(&mut self, timeout: Option<u64>) {
205         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
206     }
207 }
208
209 ////////////////////////////////////////////////////////////////////////////////
210 // Unix Listener
211 ////////////////////////////////////////////////////////////////////////////////
212
213 pub struct UnixListener {
214     inner: Inner,
215     path: CString,
216 }
217
218 impl UnixListener {
219     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
220         bind(addr, libc::SOCK_STREAM).map(|fd| {
221             UnixListener { inner: fd, path: addr.clone() }
222         })
223     }
224
225     fn fd(&self) -> fd_t { self.inner.fd }
226
227     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
228         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
229             -1 => Err(super::last_error()),
230
231             #[cfg(unix)]
232             _ => {
233                 let (reader, writer) = try!(process::pipe());
234                 try!(util::set_nonblocking(reader.fd(), true));
235                 try!(util::set_nonblocking(writer.fd(), true));
236                 try!(util::set_nonblocking(self.fd(), true));
237                 Ok(UnixAcceptor {
238                     inner: Arc::new(AcceptorInner {
239                         listener: self,
240                         reader: reader,
241                         writer: writer,
242                         closed: atomic::AtomicBool::new(false),
243                     }),
244                     deadline: 0,
245                 })
246             }
247         }
248     }
249 }
250
251 impl rtio::RtioUnixListener for UnixListener {
252     fn listen(self: Box<UnixListener>)
253               -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
254         self.native_listen(128).map(|a| {
255             box a as Box<rtio::RtioUnixAcceptor + Send>
256         })
257     }
258 }
259
260 pub struct UnixAcceptor {
261     inner: Arc<AcceptorInner>,
262     deadline: u64,
263 }
264
265 #[cfg(unix)]
266 struct AcceptorInner {
267     listener: UnixListener,
268     reader: FileDesc,
269     writer: FileDesc,
270     closed: atomic::AtomicBool,
271 }
272
273 impl UnixAcceptor {
274     fn fd(&self) -> fd_t { self.inner.listener.fd() }
275
276     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
277         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
278
279         while !self.inner.closed.load(atomic::SeqCst) {
280             unsafe {
281                 let mut storage: libc::sockaddr_storage = mem::zeroed();
282                 let storagep = &mut storage as *mut libc::sockaddr_storage;
283                 let size = mem::size_of::<libc::sockaddr_storage>();
284                 let mut size = size as libc::socklen_t;
285                 match retry(|| {
286                     libc::accept(self.fd(),
287                                  storagep as *mut libc::sockaddr,
288                                  &mut size as *mut libc::socklen_t) as libc::c_int
289                 }) {
290                     -1 if util::wouldblock() => {}
291                     -1 => return Err(super::last_error()),
292                     fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
293                 }
294             }
295             try!(util::await([self.fd(), self.inner.reader.fd()],
296                              deadline, util::Readable));
297         }
298
299         Err(util::eof())
300     }
301 }
302
303 impl rtio::RtioUnixAcceptor for UnixAcceptor {
304     fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
305         self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
306     }
307     fn set_timeout(&mut self, timeout: Option<u64>) {
308         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
309     }
310
311     fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
312         box UnixAcceptor {
313             inner: self.inner.clone(),
314             deadline: 0,
315         } as Box<rtio::RtioUnixAcceptor + Send>
316     }
317
318     #[cfg(unix)]
319     fn close_accept(&mut self) -> IoResult<()> {
320         self.inner.closed.store(true, atomic::SeqCst);
321         let mut fd = FileDesc::new(self.inner.writer.fd(), false);
322         match fd.inner_write([0]) {
323             Ok(..) => Ok(()),
324             Err(..) if util::wouldblock() => Ok(()),
325             Err(e) => Err(e),
326         }
327     }
328 }
329
330 impl Drop for UnixListener {
331     fn drop(&mut self) {
332         // Unlink the path to the socket to ensure that it doesn't linger. We're
333         // careful to unlink the path before we close the file descriptor to
334         // prevent races where we unlink someone else's path.
335         unsafe {
336             let _ = libc::unlink(self.path.as_ptr());
337         }
338     }
339 }