]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/pipe_windows.rs
4d01230cbd9771295c5042d37a554a1cece1c395
[rust.git] / src / libnative / io / pipe_windows.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes implementation for windows
12 //!
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
17 //!
18 //! # Unix pipes vs Named pipes
19 //!
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
26 //! later date.
27 //!
28 //! # Accepting clients
29 //!
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
35 //!
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
41 //!
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
44 //!
45 //! # Simultaneous reads and writes
46 //!
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
52 //!
53 //! I don't really know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
57 //!
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
64 //! I/O to finish.
65 //!
66 //! This scheme ended up working well enough. There were two snags that I ran
67 //! into, however:
68 //!
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //!   can't be shared among clones of the same stream because the documentation
71 //!   states that it unsets the event when the I/O is started (would possibly
72 //!   corrupt other events simultaneously waiting). For convenience's sake,
73 //!   these events are lazily initialized.
74 //!
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //!   to all pipes created through `connect`. Notably this means that the
77 //!   ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //!   to have yet another event to do the actual blocking.
79 //!
80 //! # Conclusion
81 //!
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
85 //! me!
86
87 use alloc::arc::Arc;
88 use libc;
89 use std::c_str::CString;
90 use std::mem;
91 use std::os;
92 use std::ptr;
93 use std::rt::rtio;
94 use std::rt::rtio::{IoResult, IoError};
95 use std::sync::atomic;
96 use std::rt::mutex;
97
98 use super::c;
99 use super::util;
100 use super::file::to_utf16;
101
102 struct Event(libc::HANDLE);
103
104 impl Event {
105     fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
106         let event = unsafe {
107             libc::CreateEventW(ptr::mut_null(),
108                                manual_reset as libc::BOOL,
109                                initial_state as libc::BOOL,
110                                ptr::null())
111         };
112         if event as uint == 0 {
113             Err(super::last_error())
114         } else {
115             Ok(Event(event))
116         }
117     }
118
119     fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
120 }
121
122 impl Drop for Event {
123     fn drop(&mut self) {
124         unsafe { let _ = libc::CloseHandle(self.handle()); }
125     }
126 }
127
128 struct Inner {
129     handle: libc::HANDLE,
130     lock: mutex::NativeMutex,
131     read_closed: atomic::AtomicBool,
132     write_closed: atomic::AtomicBool,
133 }
134
135 impl Inner {
136     fn new(handle: libc::HANDLE) -> Inner {
137         Inner {
138             handle: handle,
139             lock: unsafe { mutex::NativeMutex::new() },
140             read_closed: atomic::AtomicBool::new(false),
141             write_closed: atomic::AtomicBool::new(false),
142         }
143     }
144 }
145
146 impl Drop for Inner {
147     fn drop(&mut self) {
148         unsafe {
149             let _ = libc::FlushFileBuffers(self.handle);
150             let _ = libc::CloseHandle(self.handle);
151         }
152     }
153 }
154
155 unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
156     libc::CreateNamedPipeW(
157         name,
158         libc::PIPE_ACCESS_DUPLEX |
159             if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
160             libc::FILE_FLAG_OVERLAPPED,
161         libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
162             libc::PIPE_WAIT,
163         libc::PIPE_UNLIMITED_INSTANCES,
164         65536,
165         65536,
166         0,
167         ptr::mut_null()
168     )
169 }
170
171 pub fn await(handle: libc::HANDLE, deadline: u64,
172              overlapped: &mut libc::OVERLAPPED) -> bool {
173     if deadline == 0 { return true }
174
175     // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
176     // to figure out if we should indeed get the result.
177     let now = ::io::timer::now();
178     let timeout = deadline < now || unsafe {
179         let ms = (deadline - now) as libc::DWORD;
180         let r = libc::WaitForSingleObject(overlapped.hEvent,
181                                           ms);
182         r != libc::WAIT_OBJECT_0
183     };
184     if timeout {
185         unsafe { let _ = c::CancelIo(handle); }
186         false
187     } else {
188         true
189     }
190 }
191
192 fn epipe() -> IoError {
193     IoError {
194         code: libc::ERROR_BROKEN_PIPE as uint,
195         extra: 0,
196         detail: None,
197     }
198 }
199
200 ////////////////////////////////////////////////////////////////////////////////
201 // Unix Streams
202 ////////////////////////////////////////////////////////////////////////////////
203
204 pub struct UnixStream {
205     inner: Arc<Inner>,
206     write: Option<Event>,
207     read: Option<Event>,
208     read_deadline: u64,
209     write_deadline: u64,
210 }
211
212 impl UnixStream {
213     fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
214         // Note that most of this is lifted from the libuv implementation.
215         // The idea is that if we fail to open a pipe in read/write mode
216         // that we try afterwards in just read or just write
217         let mut result = unsafe {
218             libc::CreateFileW(p,
219                 libc::GENERIC_READ | libc::GENERIC_WRITE,
220                 0,
221                 ptr::mut_null(),
222                 libc::OPEN_EXISTING,
223                 libc::FILE_FLAG_OVERLAPPED,
224                 ptr::mut_null())
225         };
226         if result != libc::INVALID_HANDLE_VALUE {
227             return Some(result)
228         }
229
230         let err = unsafe { libc::GetLastError() };
231         if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
232             result = unsafe {
233                 libc::CreateFileW(p,
234                     libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
235                     0,
236                     ptr::mut_null(),
237                     libc::OPEN_EXISTING,
238                     libc::FILE_FLAG_OVERLAPPED,
239                     ptr::mut_null())
240             };
241             if result != libc::INVALID_HANDLE_VALUE {
242                 return Some(result)
243             }
244         }
245         let err = unsafe { libc::GetLastError() };
246         if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
247             result = unsafe {
248                 libc::CreateFileW(p,
249                     libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
250                     0,
251                     ptr::mut_null(),
252                     libc::OPEN_EXISTING,
253                     libc::FILE_FLAG_OVERLAPPED,
254                     ptr::mut_null())
255             };
256             if result != libc::INVALID_HANDLE_VALUE {
257                 return Some(result)
258             }
259         }
260         None
261     }
262
263     pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
264         let addr = try!(to_utf16(addr));
265         let start = ::io::timer::now();
266         loop {
267             match UnixStream::try_connect(addr.as_ptr()) {
268                 Some(handle) => {
269                     let inner = Inner::new(handle);
270                     let mut mode = libc::PIPE_TYPE_BYTE |
271                                    libc::PIPE_READMODE_BYTE |
272                                    libc::PIPE_WAIT;
273                     let ret = unsafe {
274                         libc::SetNamedPipeHandleState(inner.handle,
275                                                       &mut mode,
276                                                       ptr::mut_null(),
277                                                       ptr::mut_null())
278                     };
279                     return if ret == 0 {
280                         Err(super::last_error())
281                     } else {
282                         Ok(UnixStream {
283                             inner: Arc::new(inner),
284                             read: None,
285                             write: None,
286                             read_deadline: 0,
287                             write_deadline: 0,
288                         })
289                     }
290                 }
291                 None => {}
292             }
293
294             // On windows, if you fail to connect, you may need to call the
295             // `WaitNamedPipe` function, and this is indicated with an error
296             // code of ERROR_PIPE_BUSY.
297             let code = unsafe { libc::GetLastError() };
298             if code as int != libc::ERROR_PIPE_BUSY as int {
299                 return Err(super::last_error())
300             }
301
302             match timeout {
303                 Some(timeout) => {
304                     let now = ::io::timer::now();
305                     let timed_out = (now - start) >= timeout || unsafe {
306                         let ms = (timeout - (now - start)) as libc::DWORD;
307                         libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
308                     };
309                     if timed_out {
310                         return Err(util::timeout("connect timed out"))
311                     }
312                 }
313
314                 // An example I found on Microsoft's website used 20
315                 // seconds, libuv uses 30 seconds, hence we make the
316                 // obvious choice of waiting for 25 seconds.
317                 None => {
318                     if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 {
319                         return Err(super::last_error())
320                     }
321                 }
322             }
323         }
324     }
325
326     fn handle(&self) -> libc::HANDLE { self.inner.handle }
327
328     fn read_closed(&self) -> bool {
329         self.inner.read_closed.load(atomic::SeqCst)
330     }
331
332     fn write_closed(&self) -> bool {
333         self.inner.write_closed.load(atomic::SeqCst)
334     }
335
336     fn cancel_io(&self) -> IoResult<()> {
337         match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
338             0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
339                 Ok(())
340             }
341             0 => Err(super::last_error()),
342             _ => Ok(())
343         }
344     }
345 }
346
347 impl rtio::RtioPipe for UnixStream {
348     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
349         if self.read.is_none() {
350             self.read = Some(try!(Event::new(true, false)));
351         }
352
353         let mut bytes_read = 0;
354         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
355         overlapped.hEvent = self.read.get_ref().handle();
356
357         // Pre-flight check to see if the reading half has been closed. This
358         // must be done before issuing the ReadFile request, but after we
359         // acquire the lock.
360         //
361         // See comments in close_read() about why this lock is necessary.
362         let guard = unsafe { self.inner.lock.lock() };
363         if self.read_closed() {
364             return Err(util::eof())
365         }
366
367         // Issue a nonblocking requests, succeeding quickly if it happened to
368         // succeed.
369         let ret = unsafe {
370             libc::ReadFile(self.handle(),
371                            buf.as_ptr() as libc::LPVOID,
372                            buf.len() as libc::DWORD,
373                            &mut bytes_read,
374                            &mut overlapped)
375         };
376         if ret != 0 { return Ok(bytes_read as uint) }
377
378         // If our errno doesn't say that the I/O is pending, then we hit some
379         // legitimate error and return immediately.
380         if os::errno() != libc::ERROR_IO_PENDING as uint {
381             return Err(super::last_error())
382         }
383
384         // Now that we've issued a successful nonblocking request, we need to
385         // wait for it to finish. This can all be done outside the lock because
386         // we'll see any invocation of CancelIoEx. We also call this in a loop
387         // because we're woken up if the writing half is closed, we just need to
388         // realize that the reading half wasn't closed and we go right back to
389         // sleep.
390         drop(guard);
391         loop {
392             // Process a timeout if one is pending
393             let succeeded = await(self.handle(), self.read_deadline,
394                                   &mut overlapped);
395
396             let ret = unsafe {
397                 libc::GetOverlappedResult(self.handle(),
398                                           &mut overlapped,
399                                           &mut bytes_read,
400                                           libc::TRUE)
401             };
402             // If we succeeded, or we failed for some reason other than
403             // CancelIoEx, return immediately
404             if ret != 0 { return Ok(bytes_read as uint) }
405             if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
406                 return Err(super::last_error())
407             }
408
409             // If the reading half is now closed, then we're done. If we woke up
410             // because the writing half was closed, keep trying.
411             if !succeeded {
412                 return Err(util::timeout("read timed out"))
413             }
414             if self.read_closed() {
415                 return Err(util::eof())
416             }
417         }
418     }
419
420     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
421         if self.write.is_none() {
422             self.write = Some(try!(Event::new(true, false)));
423         }
424
425         let mut offset = 0;
426         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
427         overlapped.hEvent = self.write.get_ref().handle();
428
429         while offset < buf.len() {
430             let mut bytes_written = 0;
431
432             // This sequence below is quite similar to the one found in read().
433             // Some careful looping is done to ensure that if close_write() is
434             // invoked we bail out early, and if close_read() is invoked we keep
435             // going after we woke up.
436             //
437             // See comments in close_read() about why this lock is necessary.
438             let guard = unsafe { self.inner.lock.lock() };
439             if self.write_closed() {
440                 return Err(epipe())
441             }
442             let ret = unsafe {
443                 libc::WriteFile(self.handle(),
444                                 buf.slice_from(offset).as_ptr() as libc::LPVOID,
445                                 (buf.len() - offset) as libc::DWORD,
446                                 &mut bytes_written,
447                                 &mut overlapped)
448             };
449             let err = os::errno();
450             drop(guard);
451
452             if ret == 0 {
453                 if err != libc::ERROR_IO_PENDING as uint {
454                     return Err(IoError {
455                         code: err as uint,
456                         extra: 0,
457                         detail: Some(os::error_string(err as uint)),
458                     })
459                 }
460                 // Process a timeout if one is pending
461                 let succeeded = await(self.handle(), self.write_deadline,
462                                       &mut overlapped);
463                 let ret = unsafe {
464                     libc::GetOverlappedResult(self.handle(),
465                                               &mut overlapped,
466                                               &mut bytes_written,
467                                               libc::TRUE)
468                 };
469                 // If we weren't aborted, this was a legit error, if we were
470                 // aborted, then check to see if the write half was actually
471                 // closed or whether we woke up from the read half closing.
472                 if ret == 0 {
473                     if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
474                         return Err(super::last_error())
475                     }
476                     if !succeeded {
477                         let amt = offset + bytes_written as uint;
478                         return if amt > 0 {
479                             Err(IoError {
480                                 code: libc::ERROR_OPERATION_ABORTED as uint,
481                                 extra: amt,
482                                 detail: Some("short write during write".to_string()),
483                             })
484                         } else {
485                             Err(util::timeout("write timed out"))
486                         }
487                     }
488                     if self.write_closed() {
489                         return Err(epipe())
490                     }
491                     continue // retry
492                 }
493             }
494             offset += bytes_written as uint;
495         }
496         Ok(())
497     }
498
499     fn clone(&self) -> Box<rtio::RtioPipe + Send> {
500         box UnixStream {
501             inner: self.inner.clone(),
502             read: None,
503             write: None,
504             read_deadline: 0,
505             write_deadline: 0,
506         } as Box<rtio::RtioPipe + Send>
507     }
508
509     fn close_read(&mut self) -> IoResult<()> {
510         // On windows, there's no actual shutdown() method for pipes, so we're
511         // forced to emulate the behavior manually at the application level. To
512         // do this, we need to both cancel any pending requests, as well as
513         // prevent all future requests from succeeding. These two operations are
514         // not atomic with respect to one another, so we must use a lock to do
515         // so.
516         //
517         // The read() code looks like:
518         //
519         //      1. Make sure the pipe is still open
520         //      2. Submit a read request
521         //      3. Wait for the read request to finish
522         //
523         // The race this lock is preventing is if another thread invokes
524         // close_read() between steps 1 and 2. By atomically executing steps 1
525         // and 2 with a lock with respect to close_read(), we're guaranteed that
526         // no thread will erroneously sit in a read forever.
527         let _guard = unsafe { self.inner.lock.lock() };
528         self.inner.read_closed.store(true, atomic::SeqCst);
529         self.cancel_io()
530     }
531
532     fn close_write(&mut self) -> IoResult<()> {
533         // see comments in close_read() for why this lock is necessary
534         let _guard = unsafe { self.inner.lock.lock() };
535         self.inner.write_closed.store(true, atomic::SeqCst);
536         self.cancel_io()
537     }
538
539     fn set_timeout(&mut self, timeout: Option<u64>) {
540         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
541         self.read_deadline = deadline;
542         self.write_deadline = deadline;
543     }
544     fn set_read_timeout(&mut self, timeout: Option<u64>) {
545         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
546     }
547     fn set_write_timeout(&mut self, timeout: Option<u64>) {
548         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
549     }
550 }
551
552 ////////////////////////////////////////////////////////////////////////////////
553 // Unix Listener
554 ////////////////////////////////////////////////////////////////////////////////
555
556 pub struct UnixListener {
557     handle: libc::HANDLE,
558     name: CString,
559 }
560
561 impl UnixListener {
562     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
563         // Although we technically don't need the pipe until much later, we
564         // create the initial handle up front to test the validity of the name
565         // and such.
566         let addr_v = try!(to_utf16(addr));
567         let ret = unsafe { pipe(addr_v.as_ptr(), true) };
568         if ret == libc::INVALID_HANDLE_VALUE {
569             Err(super::last_error())
570         } else {
571             Ok(UnixListener { handle: ret, name: addr.clone() })
572         }
573     }
574
575     pub fn native_listen(self) -> IoResult<UnixAcceptor> {
576         Ok(UnixAcceptor {
577             listener: self,
578             event: try!(Event::new(true, false)),
579             deadline: 0,
580         })
581     }
582 }
583
584 impl Drop for UnixListener {
585     fn drop(&mut self) {
586         unsafe { let _ = libc::CloseHandle(self.handle); }
587     }
588 }
589
590 impl rtio::RtioUnixListener for UnixListener {
591     fn listen(self: Box<UnixListener>)
592               -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
593         self.native_listen().map(|a| {
594             box a as Box<rtio::RtioUnixAcceptor + Send>
595         })
596     }
597 }
598
599 pub struct UnixAcceptor {
600     listener: UnixListener,
601     event: Event,
602     deadline: u64,
603 }
604
605 impl UnixAcceptor {
606     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
607         // This function has some funky implementation details when working with
608         // unix pipes. On windows, each server named pipe handle can be
609         // connected to a one or zero clients. To the best of my knowledge, a
610         // named server is considered active and present if there exists at
611         // least one server named pipe for it.
612         //
613         // The model of this function is to take the current known server
614         // handle, connect a client to it, and then transfer ownership to the
615         // UnixStream instance. The next time accept() is invoked, it'll need a
616         // different server handle to connect a client to.
617         //
618         // Note that there is a possible race here. Once our server pipe is
619         // handed off to a `UnixStream` object, the stream could be closed,
620         // meaning that there would be no active server pipes, hence even though
621         // we have a valid `UnixAcceptor`, no one can connect to it. For this
622         // reason, we generate the next accept call's server pipe at the end of
623         // this function call.
624         //
625         // This provides us an invariant that we always have at least one server
626         // connection open at a time, meaning that all connects to this acceptor
627         // should succeed while this is active.
628         //
629         // The actual implementation of doing this is a little tricky. Once a
630         // server pipe is created, a client can connect to it at any time. I
631         // assume that which server a client connects to is nondeterministic, so
632         // we also need to guarantee that the only server able to be connected
633         // to is the one that we're calling ConnectNamedPipe on. This means that
634         // we have to create the second server pipe *after* we've already
635         // accepted a connection. In order to at least somewhat gracefully
636         // handle errors, this means that if the second server pipe creation
637         // fails that we disconnect the connected client and then just keep
638         // using the original server pipe.
639         let handle = self.listener.handle;
640
641         let name = try!(to_utf16(&self.listener.name));
642
643         // Once we've got a "server handle", we need to wait for a client to
644         // connect. The ConnectNamedPipe function will block this thread until
645         // someone on the other end connects. This function can "fail" if a
646         // client connects after we created the pipe but before we got down
647         // here. Thanks windows.
648         let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
649         overlapped.hEvent = self.event.handle();
650         if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
651             let mut err = unsafe { libc::GetLastError() };
652
653             if err == libc::ERROR_IO_PENDING as libc::DWORD {
654                 // Process a timeout if one is pending
655                 let _ = await(handle, self.deadline, &mut overlapped);
656
657                 // This will block until the overlapped I/O is completed. The
658                 // timeout was previously handled, so this will either block in
659                 // the normal case or succeed very quickly in the timeout case.
660                 let ret = unsafe {
661                     let mut transfer = 0;
662                     libc::GetOverlappedResult(handle,
663                                               &mut overlapped,
664                                               &mut transfer,
665                                               libc::TRUE)
666                 };
667                 if ret == 0 {
668                     err = unsafe { libc::GetLastError() };
669                 } else {
670                     // we succeeded, bypass the check below
671                     err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
672                 }
673             }
674             if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
675                 return Err(super::last_error())
676             }
677         }
678
679         // Now that we've got a connected client to our handle, we need to
680         // create a second server pipe. If this fails, we disconnect the
681         // connected client and return an error (see comments above).
682         let new_handle = unsafe { pipe(name.as_ptr(), false) };
683         if new_handle == libc::INVALID_HANDLE_VALUE {
684             let ret = Err(super::last_error());
685             // If our disconnection fails, then there's not really a whole lot
686             // that we can do, so fail the task.
687             let err = unsafe { libc::DisconnectNamedPipe(handle) };
688             assert!(err != 0);
689             return ret;
690         } else {
691             self.listener.handle = new_handle;
692         }
693
694         // Transfer ownership of our handle into this stream
695         Ok(UnixStream {
696             inner: Arc::new(Inner::new(handle)),
697             read: None,
698             write: None,
699             read_deadline: 0,
700             write_deadline: 0,
701         })
702     }
703 }
704
705 impl rtio::RtioUnixAcceptor for UnixAcceptor {
706     fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
707         self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
708     }
709     fn set_timeout(&mut self, timeout: Option<u64>) {
710         self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
711     }
712
713     fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
714         fail!()
715     }
716
717     fn close_accept(&mut self) -> IoResult<()> {
718         fail!()
719     }
720 }
721