]> git.lizzy.rs Git - rust.git/blob - src/libstd/sys/unix/pipe.rs
Add verbose option to rustdoc in order to fix problem with --version
[rust.git] / src / libstd / sys / unix / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use libc;
13 use c_str::CString;
14 use mem;
15 use sync::{atomic, Mutex};
16 use io::{mod, IoResult, IoError};
17 use prelude::*;
18
19 use sys::{mod, timer, retry, c, set_nonblocking, wouldblock};
20 use sys::fs::{fd_t, FileDesc};
21 use sys_common::net::*;
22 use sys_common::net::SocketStatus::*;
23 use sys_common::{eof, mkerr_libc};
24
25 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
26     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
27         -1 => Err(super::last_error()),
28         fd => Ok(fd)
29     }
30 }
31
32 fn addr_to_sockaddr_un(addr: &CString,
33                        storage: &mut libc::sockaddr_storage)
34                        -> IoResult<libc::socklen_t> {
35     // the sun_path length is limited to SUN_LEN (with null)
36     assert!(mem::size_of::<libc::sockaddr_storage>() >=
37             mem::size_of::<libc::sockaddr_un>());
38     let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
39
40     let len = addr.len();
41     if len > s.sun_path.len() - 1 {
42         return Err(IoError {
43             kind: io::InvalidInput,
44             desc: "invalid argument: path must be smaller than SUN_LEN",
45             detail: None,
46         })
47     }
48     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
49     for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
50         *slot = value;
51     }
52
53     // count the null terminator
54     let len = mem::size_of::<libc::sa_family_t>() + len + 1;
55     return Ok(len as libc::socklen_t);
56 }
57
58 struct Inner {
59     fd: fd_t,
60
61     // Unused on Linux, where this lock is not necessary.
62     #[allow(dead_code)]
63     lock: Mutex<()>,
64 }
65
66 impl Inner {
67     fn new(fd: fd_t) -> Inner {
68         Inner { fd: fd, lock: Mutex::new(()) }
69     }
70 }
71
72 impl Drop for Inner {
73     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
74 }
75
76 fn connect(addr: &CString, ty: libc::c_int,
77            timeout: Option<u64>) -> IoResult<Inner> {
78     let mut storage = unsafe { mem::zeroed() };
79     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
80     let inner = Inner::new(try!(unix_socket(ty)));
81     let addrp = &storage as *const _ as *const libc::sockaddr;
82
83     match timeout {
84         None => {
85             match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
86                 -1 => Err(super::last_error()),
87                 _  => Ok(inner)
88             }
89         }
90         Some(timeout_ms) => {
91             try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
92             Ok(inner)
93         }
94     }
95 }
96
97 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
98     let mut storage = unsafe { mem::zeroed() };
99     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
100     let inner = Inner::new(try!(unix_socket(ty)));
101     let addrp = &storage as *const _ as *const libc::sockaddr;
102     match unsafe {
103         libc::bind(inner.fd, addrp, len)
104     } {
105         -1 => Err(super::last_error()),
106         _  => Ok(inner)
107     }
108 }
109
110 ////////////////////////////////////////////////////////////////////////////////
111 // Unix Streams
112 ////////////////////////////////////////////////////////////////////////////////
113
114 pub struct UnixStream {
115     inner: Arc<Inner>,
116     read_deadline: u64,
117     write_deadline: u64,
118 }
119
120 unsafe impl Send for UnixStream {}
121 unsafe impl Sync for UnixStream {}
122
123 impl UnixStream {
124     pub fn connect(addr: &CString,
125                    timeout: Option<u64>) -> IoResult<UnixStream> {
126         connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
127             UnixStream::new(Arc::new(inner))
128         })
129     }
130
131     fn new(inner: Arc<Inner>) -> UnixStream {
132         UnixStream {
133             inner: inner,
134             read_deadline: 0,
135             write_deadline: 0,
136         }
137     }
138
139     pub fn fd(&self) -> fd_t { self.inner.fd }
140
141     #[cfg(target_os = "linux")]
142     fn lock_nonblocking(&self) {}
143
144     #[cfg(not(target_os = "linux"))]
145     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
146         let ret = Guard {
147             fd: self.fd(),
148             guard: unsafe { self.inner.lock.lock().unwrap() },
149         };
150         assert!(set_nonblocking(self.fd(), true).is_ok());
151         ret
152     }
153
154     pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
155         let fd = self.fd();
156         let dolock = |&:| self.lock_nonblocking();
157         let doread = |&mut: nb| unsafe {
158             let flags = if nb {c::MSG_DONTWAIT} else {0};
159             libc::recv(fd,
160                        buf.as_mut_ptr() as *mut libc::c_void,
161                        buf.len() as libc::size_t,
162                        flags) as libc::c_int
163         };
164         read(fd, self.read_deadline, dolock, doread)
165     }
166
167     pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
168         let fd = self.fd();
169         let dolock = |&: | self.lock_nonblocking();
170         let dowrite = |&: nb: bool, buf: *const u8, len: uint| unsafe {
171             let flags = if nb {c::MSG_DONTWAIT} else {0};
172             libc::send(fd,
173                        buf as *const _,
174                        len as libc::size_t,
175                        flags) as i64
176         };
177         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
178             Ok(_) => Ok(()),
179             Err(e) => Err(e)
180         }
181     }
182
183     pub fn close_write(&mut self) -> IoResult<()> {
184         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
185     }
186
187     pub fn close_read(&mut self) -> IoResult<()> {
188         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
189     }
190
191     pub fn set_timeout(&mut self, timeout: Option<u64>) {
192         let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
193         self.read_deadline = deadline;
194         self.write_deadline = deadline;
195     }
196
197     pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
198         self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
199     }
200
201     pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
202         self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
203     }
204 }
205
206 impl Clone for UnixStream {
207     fn clone(&self) -> UnixStream {
208         UnixStream::new(self.inner.clone())
209     }
210 }
211
212 ////////////////////////////////////////////////////////////////////////////////
213 // Unix Listener
214 ////////////////////////////////////////////////////////////////////////////////
215
216 pub struct UnixListener {
217     inner: Inner,
218     path: CString,
219 }
220
221 unsafe impl Send for UnixListener {}
222 unsafe impl Sync for UnixListener {}
223
224 impl UnixListener {
225     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
226         bind(addr, libc::SOCK_STREAM).map(|fd| {
227             UnixListener { inner: fd, path: addr.clone() }
228         })
229     }
230
231     pub fn fd(&self) -> fd_t { self.inner.fd }
232
233     pub fn listen(self) -> IoResult<UnixAcceptor> {
234         match unsafe { libc::listen(self.fd(), 128) } {
235             -1 => Err(super::last_error()),
236
237             _ => {
238                 let (reader, writer) = try!(unsafe { sys::os::pipe() });
239                 try!(set_nonblocking(reader.fd(), true));
240                 try!(set_nonblocking(writer.fd(), true));
241                 try!(set_nonblocking(self.fd(), true));
242                 Ok(UnixAcceptor {
243                     inner: Arc::new(AcceptorInner {
244                         listener: self,
245                         reader: reader,
246                         writer: writer,
247                         closed: atomic::AtomicBool::new(false),
248                     }),
249                     deadline: 0,
250                 })
251             }
252         }
253     }
254 }
255
256 pub struct UnixAcceptor {
257     inner: Arc<AcceptorInner>,
258     deadline: u64,
259 }
260
261 struct AcceptorInner {
262     listener: UnixListener,
263     reader: FileDesc,
264     writer: FileDesc,
265     closed: atomic::AtomicBool,
266 }
267
268 unsafe impl Send for AcceptorInner {}
269 unsafe impl Sync for AcceptorInner {}
270
271 impl UnixAcceptor {
272     pub fn fd(&self) -> fd_t { self.inner.listener.fd() }
273
274     pub fn accept(&mut self) -> IoResult<UnixStream> {
275         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
276
277         while !self.inner.closed.load(atomic::SeqCst) {
278             unsafe {
279                 let mut storage: libc::sockaddr_storage = mem::zeroed();
280                 let storagep = &mut storage as *mut libc::sockaddr_storage;
281                 let size = mem::size_of::<libc::sockaddr_storage>();
282                 let mut size = size as libc::socklen_t;
283                 match retry(|| {
284                     libc::accept(self.fd(),
285                                  storagep as *mut libc::sockaddr,
286                                  &mut size as *mut libc::socklen_t) as libc::c_int
287                 }) {
288                     -1 if wouldblock() => {}
289                     -1 => return Err(super::last_error()),
290                     fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
291                 }
292             }
293             try!(await(&[self.fd(), self.inner.reader.fd()],
294                        deadline, Readable));
295         }
296
297         Err(eof())
298     }
299
300     pub fn set_timeout(&mut self, timeout: Option<u64>) {
301         self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
302     }
303
304     pub fn close_accept(&mut self) -> IoResult<()> {
305         self.inner.closed.store(true, atomic::SeqCst);
306         let fd = FileDesc::new(self.inner.writer.fd(), false);
307         match fd.write(&[0]) {
308             Ok(..) => Ok(()),
309             Err(..) if wouldblock() => Ok(()),
310             Err(e) => Err(e),
311         }
312     }
313 }
314
315 impl Clone for UnixAcceptor {
316     fn clone(&self) -> UnixAcceptor {
317         UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
318     }
319 }
320
321 impl Drop for UnixListener {
322     fn drop(&mut self) {
323         // Unlink the path to the socket to ensure that it doesn't linger. We're
324         // careful to unlink the path before we close the file descriptor to
325         // prevent races where we unlink someone else's path.
326         unsafe {
327             let _ = libc::unlink(self.path.as_ptr());
328         }
329     }
330 }