]> git.lizzy.rs Git - rust.git/blob - src/libstd/sys/unix/pipe.rs
Fallout of std::old_io deprecation
[rust.git] / src / libstd / sys / unix / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #![allow(deprecated)]
12
13 use prelude::v1::*;
14
15 use ffi::CString;
16 use libc;
17 use mem;
18 use sync::{Arc, Mutex};
19 use sync::atomic::{AtomicBool, Ordering};
20 use old_io::{self, IoResult, IoError};
21
22 use sys::{self, timer, retry, c, set_nonblocking, wouldblock};
23 use sys::fs::{fd_t, FileDesc};
24 use sys_common::net::*;
25 use sys_common::net::SocketStatus::*;
26 use sys_common::{eof, mkerr_libc};
27
28 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
29     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
30         -1 => Err(super::last_error()),
31         fd => Ok(fd)
32     }
33 }
34
35 fn addr_to_sockaddr_un(addr: &CString,
36                        storage: &mut libc::sockaddr_storage)
37                        -> IoResult<libc::socklen_t> {
38     // the sun_path length is limited to SUN_LEN (with null)
39     assert!(mem::size_of::<libc::sockaddr_storage>() >=
40             mem::size_of::<libc::sockaddr_un>());
41     let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
42
43     let len = addr.as_bytes().len();
44     if len > s.sun_path.len() - 1 {
45         return Err(IoError {
46             kind: old_io::InvalidInput,
47             desc: "invalid argument: path must be smaller than SUN_LEN",
48             detail: None,
49         })
50     }
51     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
52     for (slot, value) in s.sun_path.iter_mut().zip(addr.as_bytes().iter()) {
53         *slot = *value as libc::c_char;
54     }
55
56     // count the null terminator
57     let len = mem::size_of::<libc::sa_family_t>() + len + 1;
58     return Ok(len as libc::socklen_t);
59 }
60
61 struct Inner {
62     fd: fd_t,
63
64     // Unused on Linux, where this lock is not necessary.
65     #[allow(dead_code)]
66     lock: Mutex<()>,
67 }
68
69 impl Inner {
70     fn new(fd: fd_t) -> Inner {
71         Inner { fd: fd, lock: Mutex::new(()) }
72     }
73 }
74
75 impl Drop for Inner {
76     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
77 }
78
79 fn connect(addr: &CString, ty: libc::c_int,
80            timeout: Option<u64>) -> IoResult<Inner> {
81     let mut storage = unsafe { mem::zeroed() };
82     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
83     let inner = Inner::new(try!(unix_socket(ty)));
84     let addrp = &storage as *const _ as *const libc::sockaddr;
85
86     match timeout {
87         None => {
88             match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
89                 -1 => Err(super::last_error()),
90                 _  => Ok(inner)
91             }
92         }
93         Some(timeout_ms) => {
94             try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
95             Ok(inner)
96         }
97     }
98 }
99
100 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
101     let mut storage = unsafe { mem::zeroed() };
102     let len = try!(addr_to_sockaddr_un(addr, &mut storage));
103     let inner = Inner::new(try!(unix_socket(ty)));
104     let addrp = &storage as *const _ as *const libc::sockaddr;
105     match unsafe {
106         libc::bind(inner.fd, addrp, len)
107     } {
108         -1 => Err(super::last_error()),
109         _  => Ok(inner)
110     }
111 }
112
113 ////////////////////////////////////////////////////////////////////////////////
114 // Unix Streams
115 ////////////////////////////////////////////////////////////////////////////////
116
117 pub struct UnixStream {
118     inner: Arc<Inner>,
119     read_deadline: u64,
120     write_deadline: u64,
121 }
122
123 impl UnixStream {
124     pub fn connect(addr: &CString,
125                    timeout: Option<u64>) -> IoResult<UnixStream> {
126         connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
127             UnixStream::new(Arc::new(inner))
128         })
129     }
130
131     fn new(inner: Arc<Inner>) -> UnixStream {
132         UnixStream {
133             inner: inner,
134             read_deadline: 0,
135             write_deadline: 0,
136         }
137     }
138
139     pub fn fd(&self) -> fd_t { self.inner.fd }
140
141     #[cfg(target_os = "linux")]
142     fn lock_nonblocking(&self) {}
143
144     #[cfg(not(target_os = "linux"))]
145     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
146         let ret = Guard {
147             fd: self.fd(),
148             guard: self.inner.lock.lock().unwrap(),
149         };
150         set_nonblocking(self.fd(), true);
151         ret
152     }
153
154     pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
155         let fd = self.fd();
156         let dolock = || self.lock_nonblocking();
157         let doread = |nb| unsafe {
158             let flags = if nb {c::MSG_DONTWAIT} else {0};
159             libc::recv(fd,
160                        buf.as_mut_ptr() as *mut libc::c_void,
161                        buf.len() as libc::size_t,
162                        flags) as libc::c_int
163         };
164         read(fd, self.read_deadline, dolock, doread)
165     }
166
167     pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
168         let fd = self.fd();
169         let dolock = || self.lock_nonblocking();
170         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
171             let flags = if nb {c::MSG_DONTWAIT} else {0};
172             libc::send(fd,
173                        buf as *const _,
174                        len as libc::size_t,
175                        flags) as i64
176         };
177         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
178             Ok(_) => Ok(()),
179             Err(e) => Err(e)
180         }
181     }
182
183     pub fn close_write(&mut self) -> IoResult<()> {
184         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
185     }
186
187     pub fn close_read(&mut self) -> IoResult<()> {
188         mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
189     }
190
191     pub fn set_timeout(&mut self, timeout: Option<u64>) {
192         let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
193         self.read_deadline = deadline;
194         self.write_deadline = deadline;
195     }
196
197     pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
198         self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
199     }
200
201     pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
202         self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
203     }
204 }
205
206 impl Clone for UnixStream {
207     fn clone(&self) -> UnixStream {
208         UnixStream::new(self.inner.clone())
209     }
210 }
211
212 ////////////////////////////////////////////////////////////////////////////////
213 // Unix Listener
214 ////////////////////////////////////////////////////////////////////////////////
215
216 pub struct UnixListener {
217     inner: Inner,
218     path: CString,
219 }
220
221 // we currently own the CString, so these impls should be safe
222 unsafe impl Send for UnixListener {}
223 unsafe impl Sync for UnixListener {}
224
225 impl UnixListener {
226     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
227         bind(addr, libc::SOCK_STREAM).map(|fd| {
228             UnixListener { inner: fd, path: addr.clone() }
229         })
230     }
231
232     pub fn fd(&self) -> fd_t { self.inner.fd }
233
234     pub fn listen(self) -> IoResult<UnixAcceptor> {
235         match unsafe { libc::listen(self.fd(), 128) } {
236             -1 => Err(super::last_error()),
237
238             _ => {
239                 let (reader, writer) = try!(unsafe { sys::os::pipe() });
240                 set_nonblocking(reader.fd(), true);
241                 set_nonblocking(writer.fd(), true);
242                 set_nonblocking(self.fd(), true);
243                 Ok(UnixAcceptor {
244                     inner: Arc::new(AcceptorInner {
245                         listener: self,
246                         reader: reader,
247                         writer: writer,
248                         closed: AtomicBool::new(false),
249                     }),
250                     deadline: 0,
251                 })
252             }
253         }
254     }
255 }
256
257 pub struct UnixAcceptor {
258     inner: Arc<AcceptorInner>,
259     deadline: u64,
260 }
261
262 struct AcceptorInner {
263     listener: UnixListener,
264     reader: FileDesc,
265     writer: FileDesc,
266     closed: AtomicBool,
267 }
268
269 impl UnixAcceptor {
270     pub fn fd(&self) -> fd_t { self.inner.listener.fd() }
271
272     pub fn accept(&mut self) -> IoResult<UnixStream> {
273         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
274
275         while !self.inner.closed.load(Ordering::SeqCst) {
276             unsafe {
277                 let mut storage: libc::sockaddr_storage = mem::zeroed();
278                 let storagep = &mut storage as *mut libc::sockaddr_storage;
279                 let size = mem::size_of::<libc::sockaddr_storage>();
280                 let mut size = size as libc::socklen_t;
281                 match retry(|| {
282                     libc::accept(self.fd(),
283                                  storagep as *mut libc::sockaddr,
284                                  &mut size as *mut libc::socklen_t) as libc::c_int
285                 }) {
286                     -1 if wouldblock() => {}
287                     -1 => return Err(super::last_error()),
288                     fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
289                 }
290             }
291             try!(await(&[self.fd(), self.inner.reader.fd()],
292                        deadline, Readable));
293         }
294
295         Err(eof())
296     }
297
298     pub fn set_timeout(&mut self, timeout: Option<u64>) {
299         self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
300     }
301
302     pub fn close_accept(&mut self) -> IoResult<()> {
303         self.inner.closed.store(true, Ordering::SeqCst);
304         let fd = FileDesc::new(self.inner.writer.fd(), false);
305         match fd.write(&[0]) {
306             Ok(..) => Ok(()),
307             Err(..) if wouldblock() => Ok(()),
308             Err(e) => Err(e),
309         }
310     }
311 }
312
313 impl Clone for UnixAcceptor {
314     fn clone(&self) -> UnixAcceptor {
315         UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
316     }
317 }
318
319 impl Drop for UnixListener {
320     fn drop(&mut self) {
321         // Unlink the path to the socket to ensure that it doesn't linger. We're
322         // careful to unlink the path before we close the file descriptor to
323         // prevent races where we unlink someone else's path.
324         unsafe {
325             let _ = libc::unlink(self.path.as_ptr());
326         }
327     }
328 }