]> git.lizzy.rs Git - rust.git/blob - src/libstd/sys/windows/pipe.rs
doc: remove incomplete sentence
[rust.git] / src / libstd / sys / windows / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes implementation for windows
12 //!
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
17 //!
18 //! # Unix pipes vs Named pipes
19 //!
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
26 //! later date.
27 //!
28 //! # Accepting clients
29 //!
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
35 //!
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
41 //!
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
44 //!
45 //! # Simultaneous reads and writes
46 //!
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
52 //!
53 //! I don't really know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
57 //!
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
64 //! I/O to finish.
65 //!
66 //! This scheme ended up working well enough. There were two snags that I ran
67 //! into, however:
68 //!
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //!   can't be shared among clones of the same stream because the documentation
71 //!   states that it unsets the event when the I/O is started (would possibly
72 //!   corrupt other events simultaneously waiting). For convenience's sake,
73 //!   these events are lazily initialized.
74 //!
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //!   to all pipes created through `connect`. Notably this means that the
77 //!   ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //!   to have yet another event to do the actual blocking.
79 //!
80 //! # Conclusion
81 //!
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
85 //! me!
86
87 use prelude::v1::*;
88
89 use libc;
90 use c_str::CString;
91 use mem;
92 use ptr;
93 use sync::{atomic, Arc, Mutex};
94 use io::{mod, IoError, IoResult};
95
96 use sys_common::{mod, eof};
97
98 use super::{c, os, timer, to_utf16, decode_error_detailed};
99
100 struct Event(libc::HANDLE);
101
102 impl Event {
103     fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
104         let event = unsafe {
105             libc::CreateEventW(ptr::null_mut(),
106                                manual_reset as libc::BOOL,
107                                initial_state as libc::BOOL,
108                                ptr::null())
109         };
110         if event as uint == 0 {
111             Err(super::last_error())
112         } else {
113             Ok(Event(event))
114         }
115     }
116
117     fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
118 }
119
120 impl Drop for Event {
121     fn drop(&mut self) {
122         unsafe { let _ = libc::CloseHandle(self.handle()); }
123     }
124 }
125
126 struct Inner {
127     handle: libc::HANDLE,
128     lock: Mutex<()>,
129     read_closed: atomic::AtomicBool,
130     write_closed: atomic::AtomicBool,
131 }
132
133 impl Inner {
134     fn new(handle: libc::HANDLE) -> Inner {
135         Inner {
136             handle: handle,
137             lock: Mutex::new(()),
138             read_closed: atomic::AtomicBool::new(false),
139             write_closed: atomic::AtomicBool::new(false),
140         }
141     }
142 }
143
144 impl Drop for Inner {
145     fn drop(&mut self) {
146         unsafe {
147             let _ = libc::FlushFileBuffers(self.handle);
148             let _ = libc::CloseHandle(self.handle);
149         }
150     }
151 }
152
153 unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
154     libc::CreateNamedPipeW(
155         name,
156         libc::PIPE_ACCESS_DUPLEX |
157             if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
158             libc::FILE_FLAG_OVERLAPPED,
159         libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
160             libc::PIPE_WAIT,
161         libc::PIPE_UNLIMITED_INSTANCES,
162         65536,
163         65536,
164         0,
165         ptr::null_mut()
166     )
167 }
168
169 pub fn await(handle: libc::HANDLE, deadline: u64,
170              events: &[libc::HANDLE]) -> IoResult<uint> {
171     use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
172
173     // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
174     // to figure out if we should indeed get the result.
175     let ms = if deadline == 0 {
176         libc::INFINITE as u64
177     } else {
178         let now = timer::now();
179         if deadline < now {0} else {deadline - now}
180     };
181     let ret = unsafe {
182         c::WaitForMultipleObjects(events.len() as libc::DWORD,
183                                   events.as_ptr(),
184                                   libc::FALSE,
185                                   ms as libc::DWORD)
186     };
187     match ret {
188         WAIT_FAILED => Err(super::last_error()),
189         WAIT_TIMEOUT => unsafe {
190             let _ = c::CancelIo(handle);
191             Err(sys_common::timeout("operation timed out"))
192         },
193         n => Ok((n - WAIT_OBJECT_0) as uint)
194     }
195 }
196
197 fn epipe() -> IoError {
198     IoError {
199         kind: io::EndOfFile,
200         desc: "the pipe has ended",
201         detail: None,
202     }
203 }
204
205 ////////////////////////////////////////////////////////////////////////////////
206 // Unix Streams
207 ////////////////////////////////////////////////////////////////////////////////
208
209 pub struct UnixStream {
210     inner: Arc<Inner>,
211     write: Option<Event>,
212     read: Option<Event>,
213     read_deadline: u64,
214     write_deadline: u64,
215 }
216
217 unsafe impl Send for UnixStream {}
218 unsafe impl Sync for UnixStream {}
219
220 impl UnixStream {
221     fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
222         // Note that most of this is lifted from the libuv implementation.
223         // The idea is that if we fail to open a pipe in read/write mode
224         // that we try afterwards in just read or just write
225         let mut result = unsafe {
226             libc::CreateFileW(p,
227                 libc::GENERIC_READ | libc::GENERIC_WRITE,
228                 0,
229                 ptr::null_mut(),
230                 libc::OPEN_EXISTING,
231                 libc::FILE_FLAG_OVERLAPPED,
232                 ptr::null_mut())
233         };
234         if result != libc::INVALID_HANDLE_VALUE {
235             return Some(result)
236         }
237
238         let err = unsafe { libc::GetLastError() };
239         if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
240             result = unsafe {
241                 libc::CreateFileW(p,
242                     libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
243                     0,
244                     ptr::null_mut(),
245                     libc::OPEN_EXISTING,
246                     libc::FILE_FLAG_OVERLAPPED,
247                     ptr::null_mut())
248             };
249             if result != libc::INVALID_HANDLE_VALUE {
250                 return Some(result)
251             }
252         }
253         let err = unsafe { libc::GetLastError() };
254         if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
255             result = unsafe {
256                 libc::CreateFileW(p,
257                     libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
258                     0,
259                     ptr::null_mut(),
260                     libc::OPEN_EXISTING,
261                     libc::FILE_FLAG_OVERLAPPED,
262                     ptr::null_mut())
263             };
264             if result != libc::INVALID_HANDLE_VALUE {
265                 return Some(result)
266             }
267         }
268         None
269     }
270
271     pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
272         let addr = try!(to_utf16(addr.as_str()));
273         let start = timer::now();
274         loop {
275             match UnixStream::try_connect(addr.as_ptr()) {
276                 Some(handle) => {
277                     let inner = Inner::new(handle);
278                     let mut mode = libc::PIPE_TYPE_BYTE |
279                                    libc::PIPE_READMODE_BYTE |
280                                    libc::PIPE_WAIT;
281                     let ret = unsafe {
282                         libc::SetNamedPipeHandleState(inner.handle,
283                                                       &mut mode,
284                                                       ptr::null_mut(),
285                                                       ptr::null_mut())
286                     };
287                     return if ret == 0 {
288                         Err(super::last_error())
289                     } else {
290                         Ok(UnixStream {
291                             inner: Arc::new(inner),
292                             read: None,
293                             write: None,
294                             read_deadline: 0,
295                             write_deadline: 0,
296                         })
297                     }
298                 }
299                 None => {}
300             }
301
302             // On windows, if you fail to connect, you may need to call the
303             // `WaitNamedPipe` function, and this is indicated with an error
304             // code of ERROR_PIPE_BUSY.
305             let code = unsafe { libc::GetLastError() };
306             if code as int != libc::ERROR_PIPE_BUSY as int {
307                 return Err(super::last_error())
308             }
309
310             match timeout {
311                 Some(timeout) => {
312                     let now = timer::now();
313                     let timed_out = (now - start) >= timeout || unsafe {
314                         let ms = (timeout - (now - start)) as libc::DWORD;
315                         libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
316                     };
317                     if timed_out {
318                         return Err(sys_common::timeout("connect timed out"))
319                     }
320                 }
321
322                 // An example I found on Microsoft's website used 20
323                 // seconds, libuv uses 30 seconds, hence we make the
324                 // obvious choice of waiting for 25 seconds.
325                 None => {
326                     if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 {
327                         return Err(super::last_error())
328                     }
329                 }
330             }
331         }
332     }
333
334     pub fn handle(&self) -> libc::HANDLE { self.inner.handle }
335
336     fn read_closed(&self) -> bool {
337         self.inner.read_closed.load(atomic::SeqCst)
338     }
339
340     fn write_closed(&self) -> bool {
341         self.inner.write_closed.load(atomic::SeqCst)
342     }
343
344     fn cancel_io(&self) -> IoResult<()> {
345         match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } {
346             0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
347                 Ok(())
348             }
349             0 => Err(super::last_error()),
350             _ => Ok(())
351         }
352     }
353
354     pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
355         if self.read.is_none() {
356             self.read = Some(try!(Event::new(true, false)));
357         }
358
359         let mut bytes_read = 0;
360         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
361         overlapped.hEvent = self.read.as_ref().unwrap().handle();
362
363         // Pre-flight check to see if the reading half has been closed. This
364         // must be done before issuing the ReadFile request, but after we
365         // acquire the lock.
366         //
367         // See comments in close_read() about why this lock is necessary.
368         let guard = unsafe { self.inner.lock.lock() };
369         if self.read_closed() {
370             return Err(eof())
371         }
372
373         // Issue a nonblocking requests, succeeding quickly if it happened to
374         // succeed.
375         let ret = unsafe {
376             libc::ReadFile(self.handle(),
377                            buf.as_ptr() as libc::LPVOID,
378                            buf.len() as libc::DWORD,
379                            &mut bytes_read,
380                            &mut overlapped)
381         };
382         if ret != 0 { return Ok(bytes_read as uint) }
383
384         // If our errno doesn't say that the I/O is pending, then we hit some
385         // legitimate error and return immediately.
386         if os::errno() != libc::ERROR_IO_PENDING as uint {
387             return Err(super::last_error())
388         }
389
390         // Now that we've issued a successful nonblocking request, we need to
391         // wait for it to finish. This can all be done outside the lock because
392         // we'll see any invocation of CancelIoEx. We also call this in a loop
393         // because we're woken up if the writing half is closed, we just need to
394         // realize that the reading half wasn't closed and we go right back to
395         // sleep.
396         drop(guard);
397         loop {
398             // Process a timeout if one is pending
399             let wait_succeeded = await(self.handle(), self.read_deadline,
400                                        &[overlapped.hEvent]);
401
402             let ret = unsafe {
403                 libc::GetOverlappedResult(self.handle(),
404                                           &mut overlapped,
405                                           &mut bytes_read,
406                                           libc::TRUE)
407             };
408             // If we succeeded, or we failed for some reason other than
409             // CancelIoEx, return immediately
410             if ret != 0 { return Ok(bytes_read as uint) }
411             if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
412                 return Err(super::last_error())
413             }
414
415             // If the reading half is now closed, then we're done. If we woke up
416             // because the writing half was closed, keep trying.
417             if wait_succeeded.is_err() {
418                 return Err(sys_common::timeout("read timed out"))
419             }
420             if self.read_closed() {
421                 return Err(eof())
422             }
423         }
424     }
425
426     pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
427         if self.write.is_none() {
428             self.write = Some(try!(Event::new(true, false)));
429         }
430
431         let mut offset = 0;
432         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
433         overlapped.hEvent = self.write.as_ref().unwrap().handle();
434
435         while offset < buf.len() {
436             let mut bytes_written = 0;
437
438             // This sequence below is quite similar to the one found in read().
439             // Some careful looping is done to ensure that if close_write() is
440             // invoked we bail out early, and if close_read() is invoked we keep
441             // going after we woke up.
442             //
443             // See comments in close_read() about why this lock is necessary.
444             let guard = unsafe { self.inner.lock.lock() };
445             if self.write_closed() {
446                 return Err(epipe())
447             }
448             let ret = unsafe {
449                 libc::WriteFile(self.handle(),
450                                 buf[offset..].as_ptr() as libc::LPVOID,
451                                 (buf.len() - offset) as libc::DWORD,
452                                 &mut bytes_written,
453                                 &mut overlapped)
454             };
455             let err = os::errno();
456             drop(guard);
457
458             if ret == 0 {
459                 if err != libc::ERROR_IO_PENDING as uint {
460                     return Err(decode_error_detailed(err as i32))
461                 }
462                 // Process a timeout if one is pending
463                 let wait_succeeded = await(self.handle(), self.write_deadline,
464                                            &[overlapped.hEvent]);
465                 let ret = unsafe {
466                     libc::GetOverlappedResult(self.handle(),
467                                               &mut overlapped,
468                                               &mut bytes_written,
469                                               libc::TRUE)
470                 };
471                 // If we weren't aborted, this was a legit error, if we were
472                 // aborted, then check to see if the write half was actually
473                 // closed or whether we woke up from the read half closing.
474                 if ret == 0 {
475                     if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
476                         return Err(super::last_error())
477                     }
478                     if !wait_succeeded.is_ok() {
479                         let amt = offset + bytes_written as uint;
480                         return if amt > 0 {
481                             Err(IoError {
482                                 kind: io::ShortWrite(amt),
483                                 desc: "short write during write",
484                                 detail: None,
485                             })
486                         } else {
487                             Err(sys_common::timeout("write timed out"))
488                         }
489                     }
490                     if self.write_closed() {
491                         return Err(epipe())
492                     }
493                     continue // retry
494                 }
495             }
496             offset += bytes_written as uint;
497         }
498         Ok(())
499     }
500
501     pub fn close_read(&mut self) -> IoResult<()> {
502         // On windows, there's no actual shutdown() method for pipes, so we're
503         // forced to emulate the behavior manually at the application level. To
504         // do this, we need to both cancel any pending requests, as well as
505         // prevent all future requests from succeeding. These two operations are
506         // not atomic with respect to one another, so we must use a lock to do
507         // so.
508         //
509         // The read() code looks like:
510         //
511         //      1. Make sure the pipe is still open
512         //      2. Submit a read request
513         //      3. Wait for the read request to finish
514         //
515         // The race this lock is preventing is if another thread invokes
516         // close_read() between steps 1 and 2. By atomically executing steps 1
517         // and 2 with a lock with respect to close_read(), we're guaranteed that
518         // no thread will erroneously sit in a read forever.
519         let _guard = unsafe { self.inner.lock.lock() };
520         self.inner.read_closed.store(true, atomic::SeqCst);
521         self.cancel_io()
522     }
523
524     pub fn close_write(&mut self) -> IoResult<()> {
525         // see comments in close_read() for why this lock is necessary
526         let _guard = unsafe { self.inner.lock.lock() };
527         self.inner.write_closed.store(true, atomic::SeqCst);
528         self.cancel_io()
529     }
530
531     pub fn set_timeout(&mut self, timeout: Option<u64>) {
532         let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
533         self.read_deadline = deadline;
534         self.write_deadline = deadline;
535     }
536     pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
537         self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
538     }
539     pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
540         self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
541     }
542 }
543
544 impl Clone for UnixStream {
545     fn clone(&self) -> UnixStream {
546         UnixStream {
547             inner: self.inner.clone(),
548             read: None,
549             write: None,
550             read_deadline: 0,
551             write_deadline: 0,
552         }
553     }
554 }
555
556 ////////////////////////////////////////////////////////////////////////////////
557 // Unix Listener
558 ////////////////////////////////////////////////////////////////////////////////
559
560 pub struct UnixListener {
561     handle: libc::HANDLE,
562     name: CString,
563 }
564
565 unsafe impl Send for UnixListener {}
566 unsafe impl Sync for UnixListener {}
567
568 impl UnixListener {
569     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
570         // Although we technically don't need the pipe until much later, we
571         // create the initial handle up front to test the validity of the name
572         // and such.
573         let addr_v = try!(to_utf16(addr.as_str()));
574         let ret = unsafe { pipe(addr_v.as_ptr(), true) };
575         if ret == libc::INVALID_HANDLE_VALUE {
576             Err(super::last_error())
577         } else {
578             Ok(UnixListener { handle: ret, name: addr.clone() })
579         }
580     }
581
582     pub fn listen(self) -> IoResult<UnixAcceptor> {
583         Ok(UnixAcceptor {
584             listener: self,
585             event: try!(Event::new(true, false)),
586             deadline: 0,
587             inner: Arc::new(AcceptorState {
588                 abort: try!(Event::new(true, false)),
589                 closed: atomic::AtomicBool::new(false),
590             }),
591         })
592     }
593
594     pub fn handle(&self) -> libc::HANDLE {
595         self.handle
596     }
597 }
598
599 impl Drop for UnixListener {
600     fn drop(&mut self) {
601         unsafe { let _ = libc::CloseHandle(self.handle); }
602     }
603 }
604
605 pub struct UnixAcceptor {
606     inner: Arc<AcceptorState>,
607     listener: UnixListener,
608     event: Event,
609     deadline: u64,
610 }
611
612 unsafe impl Send for UnixAcceptor {}
613 unsafe impl Sync for UnixAcceptor {}
614
615 struct AcceptorState {
616     abort: Event,
617     closed: atomic::AtomicBool,
618 }
619
620 unsafe impl Send for AcceptorState {}
621 unsafe impl Sync for AcceptorState {}
622
623 impl UnixAcceptor {
624     pub fn accept(&mut self) -> IoResult<UnixStream> {
625         // This function has some funky implementation details when working with
626         // unix pipes. On windows, each server named pipe handle can be
627         // connected to a one or zero clients. To the best of my knowledge, a
628         // named server is considered active and present if there exists at
629         // least one server named pipe for it.
630         //
631         // The model of this function is to take the current known server
632         // handle, connect a client to it, and then transfer ownership to the
633         // UnixStream instance. The next time accept() is invoked, it'll need a
634         // different server handle to connect a client to.
635         //
636         // Note that there is a possible race here. Once our server pipe is
637         // handed off to a `UnixStream` object, the stream could be closed,
638         // meaning that there would be no active server pipes, hence even though
639         // we have a valid `UnixAcceptor`, no one can connect to it. For this
640         // reason, we generate the next accept call's server pipe at the end of
641         // this function call.
642         //
643         // This provides us an invariant that we always have at least one server
644         // connection open at a time, meaning that all connects to this acceptor
645         // should succeed while this is active.
646         //
647         // The actual implementation of doing this is a little tricky. Once a
648         // server pipe is created, a client can connect to it at any time. I
649         // assume that which server a client connects to is nondeterministic, so
650         // we also need to guarantee that the only server able to be connected
651         // to is the one that we're calling ConnectNamedPipe on. This means that
652         // we have to create the second server pipe *after* we've already
653         // accepted a connection. In order to at least somewhat gracefully
654         // handle errors, this means that if the second server pipe creation
655         // fails that we disconnect the connected client and then just keep
656         // using the original server pipe.
657         let handle = self.listener.handle;
658
659         // If we've had an artificial call to close_accept, be sure to never
660         // proceed in accepting new clients in the future
661         if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) }
662
663         let name = try!(to_utf16(self.listener.name.as_str()));
664
665         // Once we've got a "server handle", we need to wait for a client to
666         // connect. The ConnectNamedPipe function will block this thread until
667         // someone on the other end connects. This function can "fail" if a
668         // client connects after we created the pipe but before we got down
669         // here. Thanks windows.
670         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
671         overlapped.hEvent = self.event.handle();
672         if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
673             let mut err = unsafe { libc::GetLastError() };
674
675             if err == libc::ERROR_IO_PENDING as libc::DWORD {
676                 // Process a timeout if one is pending
677                 let wait_succeeded = await(handle, self.deadline,
678                                            &[self.inner.abort.handle(),
679                                              overlapped.hEvent]);
680
681                 // This will block until the overlapped I/O is completed. The
682                 // timeout was previously handled, so this will either block in
683                 // the normal case or succeed very quickly in the timeout case.
684                 let ret = unsafe {
685                     let mut transfer = 0;
686                     libc::GetOverlappedResult(handle,
687                                               &mut overlapped,
688                                               &mut transfer,
689                                               libc::TRUE)
690                 };
691                 if ret == 0 {
692                     if wait_succeeded.is_ok() {
693                         err = unsafe { libc::GetLastError() };
694                     } else {
695                         return Err(sys_common::timeout("accept timed out"))
696                     }
697                 } else {
698                     // we succeeded, bypass the check below
699                     err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
700                 }
701             }
702             if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
703                 return Err(super::last_error())
704             }
705         }
706
707         // Now that we've got a connected client to our handle, we need to
708         // create a second server pipe. If this fails, we disconnect the
709         // connected client and return an error (see comments above).
710         let new_handle = unsafe { pipe(name.as_ptr(), false) };
711         if new_handle == libc::INVALID_HANDLE_VALUE {
712             let ret = Err(super::last_error());
713             // If our disconnection fails, then there's not really a whole lot
714             // that we can do, so panic
715             let err = unsafe { libc::DisconnectNamedPipe(handle) };
716             assert!(err != 0);
717             return ret;
718         } else {
719             self.listener.handle = new_handle;
720         }
721
722         // Transfer ownership of our handle into this stream
723         Ok(UnixStream {
724             inner: Arc::new(Inner::new(handle)),
725             read: None,
726             write: None,
727             read_deadline: 0,
728             write_deadline: 0,
729         })
730     }
731
732     pub fn set_timeout(&mut self, timeout: Option<u64>) {
733         self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0);
734     }
735
736     pub fn close_accept(&mut self) -> IoResult<()> {
737         self.inner.closed.store(true, atomic::SeqCst);
738         let ret = unsafe {
739             c::SetEvent(self.inner.abort.handle())
740         };
741         if ret == 0 {
742             Err(super::last_error())
743         } else {
744             Ok(())
745         }
746     }
747
748     pub fn handle(&self) -> libc::HANDLE {
749         self.listener.handle()
750     }
751 }
752
753 impl Clone for UnixAcceptor {
754     fn clone(&self) -> UnixAcceptor {
755         let name = to_utf16(self.listener.name.as_str()).ok().unwrap();
756         UnixAcceptor {
757             inner: self.inner.clone(),
758             event: Event::new(true, false).ok().unwrap(),
759             deadline: 0,
760             listener: UnixListener {
761                 name: self.listener.name.clone(),
762                 handle: unsafe {
763                     let p = pipe(name.as_ptr(), false) ;
764                     assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
765                     p
766                 },
767             },
768         }
769     }
770 }