]> git.lizzy.rs Git - rust.git/blob - src/libstd/sys/windows/pipe.rs
Auto merge of #35856 - phimuemue:master, r=brson
[rust.git] / src / libstd / sys / windows / pipe.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use os::windows::prelude::*;
12
13 use ffi::OsStr;
14 use io;
15 use mem;
16 use path::Path;
17 use ptr;
18 use rand::{self, Rng};
19 use slice;
20 use sys::c;
21 use sys::fs::{File, OpenOptions};
22 use sys::handle::Handle;
23
24 ////////////////////////////////////////////////////////////////////////////////
25 // Anonymous pipes
26 ////////////////////////////////////////////////////////////////////////////////
27
28 pub struct AnonPipe {
29     inner: Handle,
30 }
31
32 pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> {
33     // Note that we specifically do *not* use `CreatePipe` here because
34     // unfortunately the anonymous pipes returned do not support overlapped
35     // operations.
36     //
37     // Instead, we create a "hopefully unique" name and create a named pipe
38     // which has overlapped operations enabled.
39     //
40     // Once we do this, we connect do it as usual via `CreateFileW`, and then we
41     // return those reader/writer halves.
42     unsafe {
43         let reader;
44         let mut name;
45         let mut tries = 0;
46         loop {
47             tries += 1;
48             let key: u64 = rand::thread_rng().gen();
49             name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
50                            c::GetCurrentProcessId(),
51                            key);
52             let wide_name = OsStr::new(&name)
53                                   .encode_wide()
54                                   .chain(Some(0))
55                                   .collect::<Vec<_>>();
56
57             let handle = c::CreateNamedPipeW(wide_name.as_ptr(),
58                                              c::PIPE_ACCESS_INBOUND |
59                                               c::FILE_FLAG_FIRST_PIPE_INSTANCE |
60                                               c::FILE_FLAG_OVERLAPPED,
61                                              c::PIPE_TYPE_BYTE |
62                                               c::PIPE_READMODE_BYTE |
63                                               c::PIPE_WAIT |
64                                               c::PIPE_REJECT_REMOTE_CLIENTS,
65                                              1,
66                                              4096,
67                                              4096,
68                                              0,
69                                              ptr::null_mut());
70
71             // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're
72             // also just doing a best effort at selecting a unique name. If
73             // ERROR_ACCESS_DENIED is returned then it could mean that we
74             // accidentally conflicted with an already existing pipe, so we try
75             // again.
76             //
77             // Don't try again too much though as this could also perhaps be a
78             // legit error.
79             if handle == c::INVALID_HANDLE_VALUE {
80                 let err = io::Error::last_os_error();
81                 if tries < 10 &&
82                    err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) {
83                     continue
84                 }
85                 return Err(err)
86             }
87             reader = Handle::new(handle);
88             break
89         }
90
91         // Connect to the named pipe we just created in write-only mode (also
92         // overlapped for async I/O below).
93         let mut opts = OpenOptions::new();
94         opts.write(true);
95         opts.read(false);
96         opts.share_mode(0);
97         opts.attributes(c::FILE_FLAG_OVERLAPPED);
98         let writer = File::open(Path::new(&name), &opts)?;
99         let writer = AnonPipe { inner: writer.into_handle() };
100
101         Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() }))
102     }
103 }
104
105 impl AnonPipe {
106     pub fn handle(&self) -> &Handle { &self.inner }
107     pub fn into_handle(self) -> Handle { self.inner }
108
109     pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
110         self.inner.read(buf)
111     }
112
113     pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
114         self.inner.read_to_end(buf)
115     }
116
117     pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
118         self.inner.write(buf)
119     }
120 }
121
122 pub fn read2(p1: AnonPipe,
123              v1: &mut Vec<u8>,
124              p2: AnonPipe,
125              v2: &mut Vec<u8>) -> io::Result<()> {
126     let p1 = p1.into_handle();
127     let p2 = p2.into_handle();
128
129     let mut p1 = AsyncPipe::new(p1, v1)?;
130     let mut p2 = AsyncPipe::new(p2, v2)?;
131     let objs = [p1.event.raw(), p2.event.raw()];
132
133     // In a loop we wait for either pipe's scheduled read operation to complete.
134     // If the operation completes with 0 bytes, that means EOF was reached, in
135     // which case we just finish out the other pipe entirely.
136     //
137     // Note that overlapped I/O is in general super unsafe because we have to
138     // be careful to ensure that all pointers in play are valid for the entire
139     // duration of the I/O operation (where tons of operations can also fail).
140     // The destructor for `AsyncPipe` ends up taking care of most of this.
141     loop {
142         let res = unsafe {
143             c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE)
144         };
145         if res == c::WAIT_OBJECT_0 {
146             if !p1.result()? || !p1.schedule_read()? {
147                 return p2.finish()
148             }
149         } else if res == c::WAIT_OBJECT_0 + 1 {
150             if !p2.result()? || !p2.schedule_read()? {
151                 return p1.finish()
152             }
153         } else {
154             return Err(io::Error::last_os_error())
155         }
156     }
157 }
158
159 struct AsyncPipe<'a> {
160     pipe: Handle,
161     event: Handle,
162     overlapped: Box<c::OVERLAPPED>, // needs a stable address
163     dst: &'a mut Vec<u8>,
164     state: State,
165 }
166
167 #[derive(PartialEq, Debug)]
168 enum State {
169     NotReading,
170     Reading,
171     Read(usize),
172 }
173
174 impl<'a> AsyncPipe<'a> {
175     fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
176         // Create an event which we'll use to coordinate our overlapped
177         // opreations, this event will be used in WaitForMultipleObjects
178         // and passed as part of the OVERLAPPED handle.
179         //
180         // Note that we do a somewhat clever thing here by flagging the
181         // event as being manually reset and setting it initially to the
182         // signaled state. This means that we'll naturally fall through the
183         // WaitForMultipleObjects call above for pipes created initially,
184         // and the only time an even will go back to "unset" will be once an
185         // I/O operation is successfully scheduled (what we want).
186         let event = Handle::new_event(true, true)?;
187         let mut overlapped: Box<c::OVERLAPPED> = unsafe {
188             Box::new(mem::zeroed())
189         };
190         overlapped.hEvent = event.raw();
191         Ok(AsyncPipe {
192             pipe: pipe,
193             overlapped: overlapped,
194             event: event,
195             dst: dst,
196             state: State::NotReading,
197         })
198     }
199
200     /// Executes an overlapped read operation.
201     ///
202     /// Must not currently be reading, and returns whether the pipe is currently
203     /// at EOF or not. If the pipe is not at EOF then `result()` must be called
204     /// to complete the read later on (may block), but if the pipe is at EOF
205     /// then `result()` should not be called as it will just block forever.
206     fn schedule_read(&mut self) -> io::Result<bool> {
207         assert_eq!(self.state, State::NotReading);
208         let amt = unsafe {
209             let slice = slice_to_end(self.dst);
210             self.pipe.read_overlapped(slice, &mut *self.overlapped)?
211         };
212
213         // If this read finished immediately then our overlapped event will
214         // remain signaled (it was signaled coming in here) and we'll progress
215         // down to the method below.
216         //
217         // Otherwise the I/O operation is scheduled and the system set our event
218         // to not signaled, so we flag ourselves into the reading state and move
219         // on.
220         self.state = match amt {
221             Some(0) => return Ok(false),
222             Some(amt) => State::Read(amt),
223             None => State::Reading,
224         };
225         Ok(true)
226     }
227
228     /// Wait for the result of the overlapped operation previously executed.
229     ///
230     /// Takes a parameter `wait` which indicates if this pipe is currently being
231     /// read whether the function should block waiting for the read to complete.
232     ///
233     /// Return values:
234     ///
235     /// * `true` - finished any pending read and the pipe is not at EOF (keep
236     ///            going)
237     /// * `false` - finished any pending read and pipe is at EOF (stop issuing
238     ///             reads)
239     fn result(&mut self) -> io::Result<bool> {
240         let amt = match self.state {
241             State::NotReading => return Ok(true),
242             State::Reading => {
243                 self.pipe.overlapped_result(&mut *self.overlapped, true)?
244             }
245             State::Read(amt) => amt,
246         };
247         self.state = State::NotReading;
248         unsafe {
249             let len = self.dst.len();
250             self.dst.set_len(len + amt);
251         }
252         Ok(amt != 0)
253     }
254
255     /// Finishes out reading this pipe entirely.
256     ///
257     /// Waits for any pending and schedule read, and then calls `read_to_end`
258     /// if necessary to read all the remaining information.
259     fn finish(&mut self) -> io::Result<()> {
260         while self.result()? && self.schedule_read()? {
261             // ...
262         }
263         Ok(())
264     }
265 }
266
267 impl<'a> Drop for AsyncPipe<'a> {
268     fn drop(&mut self) {
269         match self.state {
270             State::Reading => {}
271             _ => return,
272         }
273
274         // If we have a pending read operation, then we have to make sure that
275         // it's *done* before we actually drop this type. The kernel requires
276         // that the `OVERLAPPED` and buffer pointers are valid for the entire
277         // I/O operation.
278         //
279         // To do that, we call `CancelIo` to cancel any pending operation, and
280         // if that succeeds we wait for the overlapped result.
281         //
282         // If anything here fails, there's not really much we can do, so we leak
283         // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
284         if self.pipe.cancel_io().is_err() || self.result().is_err() {
285             let buf = mem::replace(self.dst, Vec::new());
286             let overlapped = Box::new(unsafe { mem::zeroed() });
287             let overlapped = mem::replace(&mut self.overlapped, overlapped);
288             mem::forget((buf, overlapped));
289         }
290     }
291 }
292
293 unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
294     if v.capacity() == 0 {
295         v.reserve(16);
296     }
297     if v.capacity() == v.len() {
298         v.reserve(1);
299     }
300     slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
301                               v.capacity() - v.len())
302 }