1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use os::windows::prelude::*;
19 use rand::{self, Rng};
22 use sys::fs::{File, OpenOptions};
23 use sys::handle::Handle;
25 ////////////////////////////////////////////////////////////////////////////////
27 ////////////////////////////////////////////////////////////////////////////////
33 pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> {
34 // Note that we specifically do *not* use `CreatePipe` here because
35 // unfortunately the anonymous pipes returned do not support overlapped
38 // Instead, we create a "hopefully unique" name and create a named pipe
39 // which has overlapped operations enabled.
41 // Once we do this, we connect do it as usual via `CreateFileW`, and then we
42 // return those reader/writer halves.
49 let key: u64 = rand::thread_rng().gen();
50 name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
51 c::GetCurrentProcessId(),
53 let wide_name = OsStr::new(&name)
58 let handle = c::CreateNamedPipeW(wide_name.as_ptr(),
59 c::PIPE_ACCESS_INBOUND |
60 c::FILE_FLAG_FIRST_PIPE_INSTANCE |
61 c::FILE_FLAG_OVERLAPPED,
63 c::PIPE_READMODE_BYTE |
65 c::PIPE_REJECT_REMOTE_CLIENTS,
72 // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're
73 // also just doing a best effort at selecting a unique name. If
74 // ERROR_ACCESS_DENIED is returned then it could mean that we
75 // accidentally conflicted with an already existing pipe, so we try
78 // Don't try again too much though as this could also perhaps be a
80 if handle == c::INVALID_HANDLE_VALUE {
81 let err = io::Error::last_os_error();
83 err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) {
88 reader = Handle::new(handle);
92 // Connect to the named pipe we just created in write-only mode (also
93 // overlapped for async I/O below).
94 let mut opts = OpenOptions::new();
98 opts.attributes(c::FILE_FLAG_OVERLAPPED);
99 let writer = File::open(Path::new(&name), &opts)?;
100 let writer = AnonPipe { inner: writer.into_handle() };
102 Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() }))
107 pub fn handle(&self) -> &Handle { &self.inner }
108 pub fn into_handle(self) -> Handle { self.inner }
110 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
114 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
115 self.inner.read_to_end(buf)
118 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
119 self.inner.write(buf)
123 pub fn read2(p1: AnonPipe,
126 v2: &mut Vec<u8>) -> io::Result<()> {
127 let p1 = p1.into_handle();
128 let p2 = p2.into_handle();
130 let mut p1 = AsyncPipe::new(p1, v1)?;
131 let mut p2 = AsyncPipe::new(p2, v2)?;
132 let objs = [p1.event.raw(), p2.event.raw()];
134 // In a loop we wait for either pipe's scheduled read operation to complete.
135 // If the operation completes with 0 bytes, that means EOF was reached, in
136 // which case we just finish out the other pipe entirely.
138 // Note that overlapped I/O is in general super unsafe because we have to
139 // be careful to ensure that all pointers in play are valid for the entire
140 // duration of the I/O operation (where tons of operations can also fail).
141 // The destructor for `AsyncPipe` ends up taking care of most of this.
144 c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE)
146 if res == c::WAIT_OBJECT_0 {
147 if !p1.result()? || !p1.schedule_read()? {
150 } else if res == c::WAIT_OBJECT_0 + 1 {
151 if !p2.result()? || !p2.schedule_read()? {
155 return Err(io::Error::last_os_error())
160 struct AsyncPipe<'a> {
163 overlapped: Box<c::OVERLAPPED>, // needs a stable address
164 dst: &'a mut Vec<u8>,
168 #[derive(PartialEq, Debug)]
175 impl<'a> AsyncPipe<'a> {
176 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
177 // Create an event which we'll use to coordinate our overlapped
178 // opreations, this event will be used in WaitForMultipleObjects
179 // and passed as part of the OVERLAPPED handle.
181 // Note that we do a somewhat clever thing here by flagging the
182 // event as being manually reset and setting it initially to the
183 // signaled state. This means that we'll naturally fall through the
184 // WaitForMultipleObjects call above for pipes created initially,
185 // and the only time an even will go back to "unset" will be once an
186 // I/O operation is successfully scheduled (what we want).
187 let event = Handle::new_event(true, true)?;
188 let mut overlapped: Box<c::OVERLAPPED> = unsafe {
189 Box::new(mem::zeroed())
191 overlapped.hEvent = event.raw();
194 overlapped: overlapped,
197 state: State::NotReading,
201 /// Executes an overlapped read operation.
203 /// Must not currently be reading, and returns whether the pipe is currently
204 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
205 /// to complete the read later on (may block), but if the pipe is at EOF
206 /// then `result()` should not be called as it will just block forever.
207 fn schedule_read(&mut self) -> io::Result<bool> {
208 assert_eq!(self.state, State::NotReading);
210 let slice = slice_to_end(self.dst);
211 self.pipe.read_overlapped(slice, &mut *self.overlapped)?
214 // If this read finished immediately then our overlapped event will
215 // remain signaled (it was signaled coming in here) and we'll progress
216 // down to the method below.
218 // Otherwise the I/O operation is scheduled and the system set our event
219 // to not signaled, so we flag ourselves into the reading state and move
221 self.state = match amt {
222 Some(0) => return Ok(false),
223 Some(amt) => State::Read(amt),
224 None => State::Reading,
229 /// Wait for the result of the overlapped operation previously executed.
231 /// Takes a parameter `wait` which indicates if this pipe is currently being
232 /// read whether the function should block waiting for the read to complete.
236 /// * `true` - finished any pending read and the pipe is not at EOF (keep
238 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
240 fn result(&mut self) -> io::Result<bool> {
241 let amt = match self.state {
242 State::NotReading => return Ok(true),
244 self.pipe.overlapped_result(&mut *self.overlapped, true)?
246 State::Read(amt) => amt,
248 self.state = State::NotReading;
250 let len = self.dst.len();
251 self.dst.set_len(len + amt);
256 /// Finishes out reading this pipe entirely.
258 /// Waits for any pending and schedule read, and then calls `read_to_end`
259 /// if necessary to read all the remaining information.
260 fn finish(&mut self) -> io::Result<()> {
261 while self.result()? && self.schedule_read()? {
268 impl<'a> Drop for AsyncPipe<'a> {
275 // If we have a pending read operation, then we have to make sure that
276 // it's *done* before we actually drop this type. The kernel requires
277 // that the `OVERLAPPED` and buffer pointers are valid for the entire
280 // To do that, we call `CancelIo` to cancel any pending operation, and
281 // if that succeeds we wait for the overlapped result.
283 // If anything here fails, there's not really much we can do, so we leak
284 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
285 if self.pipe.cancel_io().is_err() || self.result().is_err() {
286 let buf = mem::replace(self.dst, Vec::new());
287 let overlapped = Box::new(unsafe { mem::zeroed() });
288 let overlapped = mem::replace(&mut self.overlapped, overlapped);
289 mem::forget((buf, overlapped));
294 unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
295 if v.capacity() == 0 {
298 if v.capacity() == v.len() {
301 slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
302 v.capacity() - v.len())