1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Named pipes implementation for windows
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
18 //! # Unix pipes vs Named pipes
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
28 //! # Accepting clients
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
45 //! # Simultaneous reads and writes
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
53 //! I don't really know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
66 //! This scheme ended up working well enough. There were two snags that I ran
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //! can't be shared among clones of the same stream because the documentation
71 //! states that it unsets the event when the I/O is started (would possibly
72 //! corrupt other events simultaneously waiting). For convenience's sake,
73 //! these events are lazily initialized.
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //! to all pipes created through `connect`. Notably this means that the
77 //! ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //! to have yet another event to do the actual blocking.
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
89 use std::c_str::CString;
94 use std::rt::rtio::{IoResult, IoError};
95 use std::sync::atomic;
100 use super::file::to_utf16;
102 struct Event(libc::HANDLE);
105 fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
107 libc::CreateEventW(ptr::null_mut(),
108 manual_reset as libc::BOOL,
109 initial_state as libc::BOOL,
112 if event as uint == 0 {
113 Err(super::last_error())
119 fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
122 impl Drop for Event {
124 unsafe { let _ = libc::CloseHandle(self.handle()); }
129 handle: libc::HANDLE,
130 lock: mutex::NativeMutex,
131 read_closed: atomic::AtomicBool,
132 write_closed: atomic::AtomicBool,
136 fn new(handle: libc::HANDLE) -> Inner {
139 lock: unsafe { mutex::NativeMutex::new() },
140 read_closed: atomic::AtomicBool::new(false),
141 write_closed: atomic::AtomicBool::new(false),
146 impl Drop for Inner {
149 let _ = libc::FlushFileBuffers(self.handle);
150 let _ = libc::CloseHandle(self.handle);
155 unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
156 libc::CreateNamedPipeW(
158 libc::PIPE_ACCESS_DUPLEX |
159 if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
160 libc::FILE_FLAG_OVERLAPPED,
161 libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
163 libc::PIPE_UNLIMITED_INSTANCES,
171 pub fn await(handle: libc::HANDLE, deadline: u64,
172 events: &[libc::HANDLE]) -> IoResult<uint> {
173 use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
175 // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
176 // to figure out if we should indeed get the result.
177 let ms = if deadline == 0 {
178 libc::INFINITE as u64
180 let now = ::io::timer::now();
181 if deadline < now {0} else {deadline - now}
184 c::WaitForMultipleObjects(events.len() as libc::DWORD,
190 WAIT_FAILED => Err(super::last_error()),
191 WAIT_TIMEOUT => unsafe {
192 let _ = c::CancelIo(handle);
193 Err(util::timeout("operation timed out"))
195 n => Ok((n - WAIT_OBJECT_0) as uint)
199 fn epipe() -> IoError {
201 code: libc::ERROR_BROKEN_PIPE as uint,
207 ////////////////////////////////////////////////////////////////////////////////
209 ////////////////////////////////////////////////////////////////////////////////
211 pub struct UnixStream {
213 write: Option<Event>,
220 fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
221 // Note that most of this is lifted from the libuv implementation.
222 // The idea is that if we fail to open a pipe in read/write mode
223 // that we try afterwards in just read or just write
224 let mut result = unsafe {
226 libc::GENERIC_READ | libc::GENERIC_WRITE,
230 libc::FILE_FLAG_OVERLAPPED,
233 if result != libc::INVALID_HANDLE_VALUE {
237 let err = unsafe { libc::GetLastError() };
238 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
241 libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
245 libc::FILE_FLAG_OVERLAPPED,
248 if result != libc::INVALID_HANDLE_VALUE {
252 let err = unsafe { libc::GetLastError() };
253 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
256 libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
260 libc::FILE_FLAG_OVERLAPPED,
263 if result != libc::INVALID_HANDLE_VALUE {
270 pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
271 let addr = try!(to_utf16(addr));
272 let start = ::io::timer::now();
274 match UnixStream::try_connect(addr.as_ptr()) {
276 let inner = Inner::new(handle);
277 let mut mode = libc::PIPE_TYPE_BYTE |
278 libc::PIPE_READMODE_BYTE |
281 libc::SetNamedPipeHandleState(inner.handle,
287 Err(super::last_error())
290 inner: Arc::new(inner),
301 // On windows, if you fail to connect, you may need to call the
302 // `WaitNamedPipe` function, and this is indicated with an error
303 // code of ERROR_PIPE_BUSY.
304 let code = unsafe { libc::GetLastError() };
305 if code as int != libc::ERROR_PIPE_BUSY as int {
306 return Err(super::last_error())
311 let now = ::io::timer::now();
312 let timed_out = (now - start) >= timeout || unsafe {
313 let ms = (timeout - (now - start)) as libc::DWORD;
314 libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
317 return Err(util::timeout("connect timed out"))
321 // An example I found on Microsoft's website used 20
322 // seconds, libuv uses 30 seconds, hence we make the
323 // obvious choice of waiting for 25 seconds.
325 if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 {
326 return Err(super::last_error())
333 fn handle(&self) -> libc::HANDLE { self.inner.handle }
335 fn read_closed(&self) -> bool {
336 self.inner.read_closed.load(atomic::SeqCst)
339 fn write_closed(&self) -> bool {
340 self.inner.write_closed.load(atomic::SeqCst)
343 fn cancel_io(&self) -> IoResult<()> {
344 match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } {
345 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
348 0 => Err(super::last_error()),
354 impl rtio::RtioPipe for UnixStream {
355 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
356 if self.read.is_none() {
357 self.read = Some(try!(Event::new(true, false)));
360 let mut bytes_read = 0;
361 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
362 overlapped.hEvent = self.read.get_ref().handle();
364 // Pre-flight check to see if the reading half has been closed. This
365 // must be done before issuing the ReadFile request, but after we
368 // See comments in close_read() about why this lock is necessary.
369 let guard = unsafe { self.inner.lock.lock() };
370 if self.read_closed() {
371 return Err(util::eof())
374 // Issue a nonblocking requests, succeeding quickly if it happened to
377 libc::ReadFile(self.handle(),
378 buf.as_ptr() as libc::LPVOID,
379 buf.len() as libc::DWORD,
383 if ret != 0 { return Ok(bytes_read as uint) }
385 // If our errno doesn't say that the I/O is pending, then we hit some
386 // legitimate error and return immediately.
387 if os::errno() != libc::ERROR_IO_PENDING as uint {
388 return Err(super::last_error())
391 // Now that we've issued a successful nonblocking request, we need to
392 // wait for it to finish. This can all be done outside the lock because
393 // we'll see any invocation of CancelIoEx. We also call this in a loop
394 // because we're woken up if the writing half is closed, we just need to
395 // realize that the reading half wasn't closed and we go right back to
399 // Process a timeout if one is pending
400 let wait_succeeded = await(self.handle(), self.read_deadline,
401 [overlapped.hEvent]);
404 libc::GetOverlappedResult(self.handle(),
409 // If we succeeded, or we failed for some reason other than
410 // CancelIoEx, return immediately
411 if ret != 0 { return Ok(bytes_read as uint) }
412 if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
413 return Err(super::last_error())
416 // If the reading half is now closed, then we're done. If we woke up
417 // because the writing half was closed, keep trying.
418 if wait_succeeded.is_err() {
419 return Err(util::timeout("read timed out"))
421 if self.read_closed() {
422 return Err(util::eof())
427 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
428 if self.write.is_none() {
429 self.write = Some(try!(Event::new(true, false)));
433 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
434 overlapped.hEvent = self.write.get_ref().handle();
436 while offset < buf.len() {
437 let mut bytes_written = 0;
439 // This sequence below is quite similar to the one found in read().
440 // Some careful looping is done to ensure that if close_write() is
441 // invoked we bail out early, and if close_read() is invoked we keep
442 // going after we woke up.
444 // See comments in close_read() about why this lock is necessary.
445 let guard = unsafe { self.inner.lock.lock() };
446 if self.write_closed() {
450 libc::WriteFile(self.handle(),
451 buf.slice_from(offset).as_ptr() as libc::LPVOID,
452 (buf.len() - offset) as libc::DWORD,
456 let err = os::errno();
460 if err != libc::ERROR_IO_PENDING as uint {
464 detail: Some(os::error_string(err as uint)),
467 // Process a timeout if one is pending
468 let wait_succeeded = await(self.handle(), self.write_deadline,
469 [overlapped.hEvent]);
471 libc::GetOverlappedResult(self.handle(),
476 // If we weren't aborted, this was a legit error, if we were
477 // aborted, then check to see if the write half was actually
478 // closed or whether we woke up from the read half closing.
480 if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
481 return Err(super::last_error())
483 if !wait_succeeded.is_ok() {
484 let amt = offset + bytes_written as uint;
487 code: libc::ERROR_OPERATION_ABORTED as uint,
489 detail: Some("short write during write".to_string()),
492 Err(util::timeout("write timed out"))
495 if self.write_closed() {
501 offset += bytes_written as uint;
506 fn clone(&self) -> Box<rtio::RtioPipe + Send> {
508 inner: self.inner.clone(),
513 } as Box<rtio::RtioPipe + Send>
516 fn close_read(&mut self) -> IoResult<()> {
517 // On windows, there's no actual shutdown() method for pipes, so we're
518 // forced to emulate the behavior manually at the application level. To
519 // do this, we need to both cancel any pending requests, as well as
520 // prevent all future requests from succeeding. These two operations are
521 // not atomic with respect to one another, so we must use a lock to do
524 // The read() code looks like:
526 // 1. Make sure the pipe is still open
527 // 2. Submit a read request
528 // 3. Wait for the read request to finish
530 // The race this lock is preventing is if another thread invokes
531 // close_read() between steps 1 and 2. By atomically executing steps 1
532 // and 2 with a lock with respect to close_read(), we're guaranteed that
533 // no thread will erroneously sit in a read forever.
534 let _guard = unsafe { self.inner.lock.lock() };
535 self.inner.read_closed.store(true, atomic::SeqCst);
539 fn close_write(&mut self) -> IoResult<()> {
540 // see comments in close_read() for why this lock is necessary
541 let _guard = unsafe { self.inner.lock.lock() };
542 self.inner.write_closed.store(true, atomic::SeqCst);
546 fn set_timeout(&mut self, timeout: Option<u64>) {
547 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
548 self.read_deadline = deadline;
549 self.write_deadline = deadline;
551 fn set_read_timeout(&mut self, timeout: Option<u64>) {
552 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
554 fn set_write_timeout(&mut self, timeout: Option<u64>) {
555 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
559 ////////////////////////////////////////////////////////////////////////////////
561 ////////////////////////////////////////////////////////////////////////////////
563 pub struct UnixListener {
564 handle: libc::HANDLE,
569 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
570 // Although we technically don't need the pipe until much later, we
571 // create the initial handle up front to test the validity of the name
573 let addr_v = try!(to_utf16(addr));
574 let ret = unsafe { pipe(addr_v.as_ptr(), true) };
575 if ret == libc::INVALID_HANDLE_VALUE {
576 Err(super::last_error())
578 Ok(UnixListener { handle: ret, name: addr.clone() })
582 pub fn native_listen(self) -> IoResult<UnixAcceptor> {
585 event: try!(Event::new(true, false)),
587 inner: Arc::new(AcceptorState {
588 abort: try!(Event::new(true, false)),
589 closed: atomic::AtomicBool::new(false),
595 impl Drop for UnixListener {
597 unsafe { let _ = libc::CloseHandle(self.handle); }
601 impl rtio::RtioUnixListener for UnixListener {
602 fn listen(self: Box<UnixListener>)
603 -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
604 self.native_listen().map(|a| {
605 box a as Box<rtio::RtioUnixAcceptor + Send>
610 pub struct UnixAcceptor {
611 inner: Arc<AcceptorState>,
612 listener: UnixListener,
617 struct AcceptorState {
619 closed: atomic::AtomicBool,
623 pub fn native_accept(&mut self) -> IoResult<UnixStream> {
624 // This function has some funky implementation details when working with
625 // unix pipes. On windows, each server named pipe handle can be
626 // connected to a one or zero clients. To the best of my knowledge, a
627 // named server is considered active and present if there exists at
628 // least one server named pipe for it.
630 // The model of this function is to take the current known server
631 // handle, connect a client to it, and then transfer ownership to the
632 // UnixStream instance. The next time accept() is invoked, it'll need a
633 // different server handle to connect a client to.
635 // Note that there is a possible race here. Once our server pipe is
636 // handed off to a `UnixStream` object, the stream could be closed,
637 // meaning that there would be no active server pipes, hence even though
638 // we have a valid `UnixAcceptor`, no one can connect to it. For this
639 // reason, we generate the next accept call's server pipe at the end of
640 // this function call.
642 // This provides us an invariant that we always have at least one server
643 // connection open at a time, meaning that all connects to this acceptor
644 // should succeed while this is active.
646 // The actual implementation of doing this is a little tricky. Once a
647 // server pipe is created, a client can connect to it at any time. I
648 // assume that which server a client connects to is nondeterministic, so
649 // we also need to guarantee that the only server able to be connected
650 // to is the one that we're calling ConnectNamedPipe on. This means that
651 // we have to create the second server pipe *after* we've already
652 // accepted a connection. In order to at least somewhat gracefully
653 // handle errors, this means that if the second server pipe creation
654 // fails that we disconnect the connected client and then just keep
655 // using the original server pipe.
656 let handle = self.listener.handle;
658 // If we've had an artificial call to close_accept, be sure to never
659 // proceed in accepting new clients in the future
660 if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
662 let name = try!(to_utf16(&self.listener.name));
664 // Once we've got a "server handle", we need to wait for a client to
665 // connect. The ConnectNamedPipe function will block this thread until
666 // someone on the other end connects. This function can "fail" if a
667 // client connects after we created the pipe but before we got down
668 // here. Thanks windows.
669 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
670 overlapped.hEvent = self.event.handle();
671 if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
672 let mut err = unsafe { libc::GetLastError() };
674 if err == libc::ERROR_IO_PENDING as libc::DWORD {
675 // Process a timeout if one is pending
676 let wait_succeeded = await(handle, self.deadline,
677 [self.inner.abort.handle(),
680 // This will block until the overlapped I/O is completed. The
681 // timeout was previously handled, so this will either block in
682 // the normal case or succeed very quickly in the timeout case.
684 let mut transfer = 0;
685 libc::GetOverlappedResult(handle,
691 if wait_succeeded.is_ok() {
692 err = unsafe { libc::GetLastError() };
694 return Err(util::timeout("accept timed out"))
697 // we succeeded, bypass the check below
698 err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
701 if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
702 return Err(super::last_error())
706 // Now that we've got a connected client to our handle, we need to
707 // create a second server pipe. If this fails, we disconnect the
708 // connected client and return an error (see comments above).
709 let new_handle = unsafe { pipe(name.as_ptr(), false) };
710 if new_handle == libc::INVALID_HANDLE_VALUE {
711 let ret = Err(super::last_error());
712 // If our disconnection fails, then there's not really a whole lot
713 // that we can do, so fail the task.
714 let err = unsafe { libc::DisconnectNamedPipe(handle) };
718 self.listener.handle = new_handle;
721 // Transfer ownership of our handle into this stream
723 inner: Arc::new(Inner::new(handle)),
732 impl rtio::RtioUnixAcceptor for UnixAcceptor {
733 fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
734 self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
736 fn set_timeout(&mut self, timeout: Option<u64>) {
737 self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
740 fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
741 let name = to_utf16(&self.listener.name).ok().unwrap();
743 inner: self.inner.clone(),
744 event: Event::new(true, false).ok().unwrap(),
746 listener: UnixListener {
747 name: self.listener.name.clone(),
749 let p = pipe(name.as_ptr(), false) ;
750 assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
754 } as Box<rtio::RtioUnixAcceptor + Send>
757 fn close_accept(&mut self) -> IoResult<()> {
758 self.inner.closed.store(true, atomic::SeqCst);
760 c::SetEvent(self.inner.abort.handle())
763 Err(super::last_error())