]> git.lizzy.rs Git - rust.git/blob - src/libstd/sys/unix/pipe.rs
Auto merge of #22517 - brson:relnotes, r=Gankro
[rust.git] / src / libstd / sys / unix / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12
13 use ffi::CString;
14 use libc;
15 use mem;
16 use sync::{Arc, Mutex};
17 use sync::atomic::{AtomicBool, Ordering};
18 use old_io::{self, IoResult, IoError};
19
20 use sys::{self, timer, retry, c, set_nonblocking, wouldblock};
21 use sys::fs::{fd_t, FileDesc};
22 use sys_common::net::*;
23 use sys_common::net::SocketStatus::*;
24 use sys_common::{eof, mkerr_libc};
25
26 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
27     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
28         -1 => Err(super::last_error()),
29         fd => Ok(fd)
30     }
31 }
32
33 fn addr_to_sockaddr_un(addr: &CString,
34                        storage: &mut libc::sockaddr_storage)
35                        -> IoResult<libc::socklen_t> {
36     // the sun_path length is limited to SUN_LEN (with null)
37     assert!(mem::size_of::<libc::sockaddr_storage>() >=
38             mem::size_of::<libc::sockaddr_un>());
39     let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
40
41     let len = addr.len();
42     if len > s.sun_path.len() - 1 {
43         return Err(IoError {
44             kind: old_io::InvalidInput,
45             desc: "invalid argument: path must be smaller than SUN_LEN",
46             detail: None,
47         })
48     }
49     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
50     for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) {
51         *slot = *value;
52     }
53
54     // count the null terminator
55     let len = mem::size_of::<libc::sa_family_t>() + len + 1;
56     return Ok(len as libc::socklen_t);
57 }
58
59 struct Inner {
60     fd: fd_t,
61
62     // Unused on Linux, where this lock is not necessary.
63     #[allow(dead_code)]
64     lock: Mutex<()>,
65 }
66
67 impl Inner {
68     fn new(fd: fd_t) -> Inner {
69         Inner { fd: fd, lock: Mutex::new(()) }
70     }
71 }
72
73 impl Drop for Inner {
74     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
75 }
76
77 fn connect(addr: &CString, ty: libc::c_int,
78            timeout: Option<u64>) -> IoResult<Inner> {
79     let mut storage = unsafe { mem::zeroed() };
80     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
81     let inner = Inner::new(try!(unix_socket(ty)));
82     let addrp = &storage as *const _ as *const libc::sockaddr;
83
84     match timeout {
85         None => {
86             match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
87                 -1 => Err(super::last_error()),
88                 _  => Ok(inner)
89             }
90         }
91         Some(timeout_ms) => {
92             try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
93             Ok(inner)
94         }
95     }
96 }
97
98 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
99     let mut storage = unsafe { mem::zeroed() };
100     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
101     let inner = Inner::new(try!(unix_socket(ty)));
102     let addrp = &storage as *const _ as *const libc::sockaddr;
103     match unsafe {
104         libc::bind(inner.fd, addrp, len)
105     } {
106         -1 => Err(super::last_error()),
107         _  => Ok(inner)
108     }
109 }
110
111 ////////////////////////////////////////////////////////////////////////////////
112 // Unix Streams
113 ////////////////////////////////////////////////////////////////////////////////
114
115 pub struct UnixStream {
116     inner: Arc<Inner>,
117     read_deadline: u64,
118     write_deadline: u64,
119 }
120
121 impl UnixStream {
122     pub fn connect(addr: &CString,
123                    timeout: Option<u64>) -> IoResult<UnixStream> {
124         connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
125             UnixStream::new(Arc::new(inner))
126         })
127     }
128
129     fn new(inner: Arc<Inner>) -> UnixStream {
130         UnixStream {
131             inner: inner,
132             read_deadline: 0,
133             write_deadline: 0,
134         }
135     }
136
137     pub fn fd(&self) -> fd_t { self.inner.fd }
138
139     #[cfg(target_os = "linux")]
140     fn lock_nonblocking(&self) {}
141
142     #[cfg(not(target_os = "linux"))]
143     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
144         let ret = Guard {
145             fd: self.fd(),
146             guard: unsafe { self.inner.lock.lock().unwrap() },
147         };
148         assert!(set_nonblocking(self.fd(), true).is_ok());
149         ret
150     }
151
152     pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
153         let fd = self.fd();
154         let dolock = || self.lock_nonblocking();
155         let doread = |nb| unsafe {
156             let flags = if nb {c::MSG_DONTWAIT} else {0};
157             libc::recv(fd,
158                        buf.as_mut_ptr() as *mut libc::c_void,
159                        buf.len() as libc::size_t,
160                        flags) as libc::c_int
161         };
162         read(fd, self.read_deadline, dolock, doread)
163     }
164
165     pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
166         let fd = self.fd();
167         let dolock = || self.lock_nonblocking();
168         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
169             let flags = if nb {c::MSG_DONTWAIT} else {0};
170             libc::send(fd,
171                        buf as *const _,
172                        len as libc::size_t,
173                        flags) as i64
174         };
175         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
176             Ok(_) => Ok(()),
177             Err(e) => Err(e)
178         }
179     }
180
181     pub fn close_write(&mut self) -> IoResult<()> {
182         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
183     }
184
185     pub fn close_read(&mut self) -> IoResult<()> {
186         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
187     }
188
189     pub fn set_timeout(&mut self, timeout: Option<u64>) {
190         let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
191         self.read_deadline = deadline;
192         self.write_deadline = deadline;
193     }
194
195     pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
196         self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
197     }
198
199     pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
200         self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
201     }
202 }
203
204 impl Clone for UnixStream {
205     fn clone(&self) -> UnixStream {
206         UnixStream::new(self.inner.clone())
207     }
208 }
209
210 ////////////////////////////////////////////////////////////////////////////////
211 // Unix Listener
212 ////////////////////////////////////////////////////////////////////////////////
213
214 pub struct UnixListener {
215     inner: Inner,
216     path: CString,
217 }
218
219 // we currently own the CString, so these impls should be safe
220 unsafe impl Send for UnixListener {}
221 unsafe impl Sync for UnixListener {}
222
223 impl UnixListener {
224     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
225         bind(addr, libc::SOCK_STREAM).map(|fd| {
226             UnixListener { inner: fd, path: addr.clone() }
227         })
228     }
229
230     pub fn fd(&self) -> fd_t { self.inner.fd }
231
232     pub fn listen(self) -> IoResult<UnixAcceptor> {
233         match unsafe { libc::listen(self.fd(), 128) } {
234             -1 => Err(super::last_error()),
235
236             _ => {
237                 let (reader, writer) = try!(unsafe { sys::os::pipe() });
238                 try!(set_nonblocking(reader.fd(), true));
239                 try!(set_nonblocking(writer.fd(), true));
240                 try!(set_nonblocking(self.fd(), true));
241                 Ok(UnixAcceptor {
242                     inner: Arc::new(AcceptorInner {
243                         listener: self,
244                         reader: reader,
245                         writer: writer,
246                         closed: AtomicBool::new(false),
247                     }),
248                     deadline: 0,
249                 })
250             }
251         }
252     }
253 }
254
255 pub struct UnixAcceptor {
256     inner: Arc<AcceptorInner>,
257     deadline: u64,
258 }
259
260 struct AcceptorInner {
261     listener: UnixListener,
262     reader: FileDesc,
263     writer: FileDesc,
264     closed: AtomicBool,
265 }
266
267 impl UnixAcceptor {
268     pub fn fd(&self) -> fd_t { self.inner.listener.fd() }
269
270     pub fn accept(&mut self) -> IoResult<UnixStream> {
271         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
272
273         while !self.inner.closed.load(Ordering::SeqCst) {
274             unsafe {
275                 let mut storage: libc::sockaddr_storage = mem::zeroed();
276                 let storagep = &mut storage as *mut libc::sockaddr_storage;
277                 let size = mem::size_of::<libc::sockaddr_storage>();
278                 let mut size = size as libc::socklen_t;
279                 match retry(|| {
280                     libc::accept(self.fd(),
281                                  storagep as *mut libc::sockaddr,
282                                  &mut size as *mut libc::socklen_t) as libc::c_int
283                 }) {
284                     -1 if wouldblock() => {}
285                     -1 => return Err(super::last_error()),
286                     fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
287                 }
288             }
289             try!(await(&[self.fd(), self.inner.reader.fd()],
290                        deadline, Readable));
291         }
292
293         Err(eof())
294     }
295
296     pub fn set_timeout(&mut self, timeout: Option<u64>) {
297         self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
298     }
299
300     pub fn close_accept(&mut self) -> IoResult<()> {
301         self.inner.closed.store(true, Ordering::SeqCst);
302         let fd = FileDesc::new(self.inner.writer.fd(), false);
303         match fd.write(&[0]) {
304             Ok(..) => Ok(()),
305             Err(..) if wouldblock() => Ok(()),
306             Err(e) => Err(e),
307         }
308     }
309 }
310
311 impl Clone for UnixAcceptor {
312     fn clone(&self) -> UnixAcceptor {
313         UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
314     }
315 }
316
317 impl Drop for UnixListener {
318     fn drop(&mut self) {
319         // Unlink the path to the socket to ensure that it doesn't linger. We're
320         // careful to unlink the path before we close the file descriptor to
321         // prevent races where we unlink someone else's path.
322         unsafe {
323             let _ = libc::unlink(self.path.as_ptr());
324         }
325     }
326 }