]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/pipe.rs
std: Second pass stabilization for `comm`
[rust.git] / src / libstd / io / net / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes
12 //!
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
16 //!
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
20
21 #![allow(missing_docs)]
22
23 use prelude::v1::*;
24
25 use c_str::ToCStr;
26 use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
27 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
28 use sys::pipe::UnixListener as UnixListenerImp;
29 use sys::pipe::UnixStream as UnixStreamImp;
30 use time::Duration;
31
32 use sys_common;
33
34 /// A stream which communicates over a named pipe.
35 pub struct UnixStream {
36     inner: UnixStreamImp,
37 }
38
39 impl UnixStream {
40
41     /// Connect to a pipe named by `path`. This will attempt to open a
42     /// connection to the underlying socket.
43     ///
44     /// The returned stream will be closed when the object falls out of scope.
45     ///
46     /// # Example
47     ///
48     /// ```rust
49     /// # #![allow(unused_must_use)]
50     /// use std::io::net::pipe::UnixStream;
51     ///
52     /// let server = Path::new("path/to/my/socket");
53     /// let mut stream = UnixStream::connect(&server);
54     /// stream.write(&[1, 2, 3]);
55     /// ```
56     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
57         UnixStreamImp::connect(&path.to_c_str(), None)
58             .map(|inner| UnixStream { inner: inner })
59     }
60
61     /// Connect to a pipe named by `path`, timing out if the specified number of
62     /// milliseconds.
63     ///
64     /// This function is similar to `connect`, except that if `timeout`
65     /// elapses the function will return an error of kind `TimedOut`.
66     ///
67     /// If a `timeout` with zero or negative duration is specified then
68     /// the function returns `Err`, with the error kind set to `TimedOut`.
69     #[experimental = "the timeout argument is likely to change types"]
70     pub fn connect_timeout<P: ToCStr>(path: &P,
71                                       timeout: Duration) -> IoResult<UnixStream> {
72         if timeout <= Duration::milliseconds(0) {
73             return Err(standard_error(TimedOut));
74         }
75
76         UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64))
77             .map(|inner| UnixStream { inner: inner })
78     }
79
80
81     /// Closes the reading half of this connection.
82     ///
83     /// This method will close the reading portion of this connection, causing
84     /// all pending and future reads to immediately return with an error.
85     ///
86     /// Note that this method affects all cloned handles associated with this
87     /// stream, not just this one handle.
88     pub fn close_read(&mut self) -> IoResult<()> {
89         self.inner.close_read()
90     }
91
92     /// Closes the writing half of this connection.
93     ///
94     /// This method will close the writing portion of this connection, causing
95     /// all pending and future writes to immediately return with an error.
96     ///
97     /// Note that this method affects all cloned handles associated with this
98     /// stream, not just this one handle.
99     pub fn close_write(&mut self) -> IoResult<()> {
100         self.inner.close_write()
101     }
102
103     /// Sets the read/write timeout for this socket.
104     ///
105     /// For more information, see `TcpStream::set_timeout`
106     #[experimental = "the timeout argument may change in type and value"]
107     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
108         self.inner.set_timeout(timeout_ms)
109     }
110
111     /// Sets the read timeout for this socket.
112     ///
113     /// For more information, see `TcpStream::set_timeout`
114     #[experimental = "the timeout argument may change in type and value"]
115     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
116         self.inner.set_read_timeout(timeout_ms)
117     }
118
119     /// Sets the write timeout for this socket.
120     ///
121     /// For more information, see `TcpStream::set_timeout`
122     #[experimental = "the timeout argument may change in type and value"]
123     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
124         self.inner.set_write_timeout(timeout_ms)
125     }
126 }
127
128 impl Clone for UnixStream {
129     fn clone(&self) -> UnixStream {
130         UnixStream { inner: self.inner.clone() }
131     }
132 }
133
134 impl Reader for UnixStream {
135     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
136         self.inner.read(buf)
137     }
138 }
139
140 impl Writer for UnixStream {
141     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
142         self.inner.write(buf)
143     }
144 }
145
146 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
147     fn as_inner(&self) -> &UnixStreamImp {
148         &self.inner
149     }
150 }
151
152 /// A value that can listen for incoming named pipe connection requests.
153 pub struct UnixListener {
154     /// The internal, opaque runtime Unix listener.
155     inner: UnixListenerImp,
156 }
157
158 impl UnixListener {
159     /// Creates a new listener, ready to receive incoming connections on the
160     /// specified socket. The server will be named by `path`.
161     ///
162     /// This listener will be closed when it falls out of scope.
163     ///
164     /// # Example
165     ///
166     /// ```
167     /// # fn main() {}
168     /// # fn foo() {
169     /// # #![allow(unused_must_use)]
170     /// use std::io::net::pipe::UnixListener;
171     /// use std::io::{Listener, Acceptor};
172     ///
173     /// let server = Path::new("/path/to/my/socket");
174     /// let stream = UnixListener::bind(&server);
175     /// for mut client in stream.listen().incoming() {
176     ///     client.write(&[1, 2, 3, 4]);
177     /// }
178     /// # }
179     /// ```
180     pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
181         UnixListenerImp::bind(&path.to_c_str())
182             .map(|inner| UnixListener { inner: inner })
183     }
184 }
185
186 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
187     fn listen(self) -> IoResult<UnixAcceptor> {
188         self.inner.listen()
189             .map(|inner| UnixAcceptor { inner: inner })
190     }
191 }
192
193 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
194     fn as_inner(&self) -> &UnixListenerImp {
195         &self.inner
196     }
197 }
198
199 /// A value that can accept named pipe connections, returned from `listen()`.
200 pub struct UnixAcceptor {
201     /// The internal, opaque runtime Unix acceptor.
202     inner: UnixAcceptorImp
203 }
204
205 impl UnixAcceptor {
206     /// Sets a timeout for this acceptor, after which accept() will no longer
207     /// block indefinitely.
208     ///
209     /// The argument specified is the amount of time, in milliseconds, into the
210     /// future after which all invocations of accept() will not block (and any
211     /// pending invocation will return). A value of `None` will clear any
212     /// existing timeout.
213     ///
214     /// When using this method, it is likely necessary to reset the timeout as
215     /// appropriate, the timeout specified is specific to this object, not
216     /// specific to the next request.
217     #[experimental = "the name and arguments to this function are likely \
218                       to change"]
219     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
220         self.inner.set_timeout(timeout_ms)
221     }
222
223     /// Closes the accepting capabilities of this acceptor.
224     ///
225     /// This function has the same semantics as `TcpAcceptor::close_accept`, and
226     /// more information can be found in that documentation.
227     #[experimental]
228     pub fn close_accept(&mut self) -> IoResult<()> {
229         self.inner.close_accept()
230     }
231 }
232
233 impl Acceptor<UnixStream> for UnixAcceptor {
234     fn accept(&mut self) -> IoResult<UnixStream> {
235         self.inner.accept().map(|s| {
236             UnixStream { inner: s }
237         })
238     }
239 }
240
241 impl Clone for UnixAcceptor {
242     /// Creates a new handle to this unix acceptor, allowing for simultaneous
243     /// accepts.
244     ///
245     /// The underlying unix acceptor will not be closed until all handles to the
246     /// acceptor have been deallocated. Incoming connections will be received on
247     /// at most once acceptor, the same connection will not be accepted twice.
248     ///
249     /// The `close_accept` method will shut down *all* acceptors cloned from the
250     /// same original acceptor, whereas the `set_timeout` method only affects
251     /// the selector that it is called on.
252     ///
253     /// This function is useful for creating a handle to invoke `close_accept`
254     /// on to wake up any other task blocked in `accept`.
255     fn clone(&self) -> UnixAcceptor {
256         UnixAcceptor { inner: self.inner.clone() }
257     }
258 }
259
260 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
261     fn as_inner(&self) -> &UnixAcceptorImp {
262         &self.inner
263     }
264 }
265
266 #[cfg(test)]
267 mod tests {
268     use prelude::v1::*;
269
270     use io::fs::PathExtensions;
271     use io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
272     use io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
273     use io::{PermissionDenied, Acceptor, Listener};
274     use io::test::*;
275     use super::*;
276     use sync::mpsc::channel;
277     use thread::Thread;
278     use time::Duration;
279
280     pub fn smalltest<F,G>(server: F, client: G)
281         where F : FnOnce(UnixStream), F : Send,
282               G : FnOnce(UnixStream), G : Send
283     {
284         let path1 = next_test_unix();
285         let path2 = path1.clone();
286
287         let mut acceptor = UnixListener::bind(&path1).listen();
288
289         let _t = Thread::spawn(move|| {
290             match UnixStream::connect(&path2) {
291                 Ok(c) => client(c),
292                 Err(e) => panic!("failed connect: {}", e),
293             }
294         });
295
296         match acceptor.accept() {
297             Ok(c) => server(c),
298             Err(e) => panic!("failed accept: {}", e),
299         }
300     }
301
302     #[test]
303     fn bind_error() {
304         let path = "path/to/nowhere";
305         match UnixListener::bind(&path) {
306             Ok(..) => panic!(),
307             Err(e) => {
308                 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
309                         e.kind == InvalidInput);
310             }
311         }
312     }
313
314     #[test]
315     fn connect_error() {
316         let path = if cfg!(windows) {
317             r"\\.\pipe\this_should_not_exist_ever"
318         } else {
319             "path/to/nowhere"
320         };
321         match UnixStream::connect(&path) {
322             Ok(..) => panic!(),
323             Err(e) => {
324                 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
325             }
326         }
327     }
328
329     #[test]
330     fn smoke() {
331         smalltest(move |mut server| {
332             let mut buf = [0];
333             server.read(&mut buf).unwrap();
334             assert!(buf[0] == 99);
335         }, move|mut client| {
336             client.write(&[99]).unwrap();
337         })
338     }
339
340     #[cfg_attr(windows, ignore)] // FIXME(#12516)
341     #[test]
342     fn read_eof() {
343         smalltest(move|mut server| {
344             let mut buf = [0];
345             assert!(server.read(&mut buf).is_err());
346             assert!(server.read(&mut buf).is_err());
347         }, move|_client| {
348             // drop the client
349         })
350     }
351
352     #[test]
353     fn write_begone() {
354         smalltest(move|mut server| {
355             let buf = [0];
356             loop {
357                 match server.write(&buf) {
358                     Ok(..) => {}
359                     Err(e) => {
360                         assert!(e.kind == BrokenPipe ||
361                                 e.kind == NotConnected ||
362                                 e.kind == ConnectionReset,
363                                 "unknown error {}", e);
364                         break;
365                     }
366                 }
367             }
368         }, move|_client| {
369             // drop the client
370         })
371     }
372
373     #[test]
374     fn accept_lots() {
375         let times = 10;
376         let path1 = next_test_unix();
377         let path2 = path1.clone();
378
379         let mut acceptor = match UnixListener::bind(&path1).listen() {
380             Ok(a) => a,
381             Err(e) => panic!("failed listen: {}", e),
382         };
383
384         let _t = Thread::spawn(move|| {
385             for _ in range(0u, times) {
386                 let mut stream = UnixStream::connect(&path2);
387                 match stream.write(&[100]) {
388                     Ok(..) => {}
389                     Err(e) => panic!("failed write: {}", e)
390                 }
391             }
392         });
393
394         for _ in range(0, times) {
395             let mut client = acceptor.accept();
396             let mut buf = [0];
397             match client.read(&mut buf) {
398                 Ok(..) => {}
399                 Err(e) => panic!("failed read/accept: {}", e),
400             }
401             assert_eq!(buf[0], 100);
402         }
403     }
404
405     #[cfg(unix)]
406     #[test]
407     fn path_exists() {
408         let path = next_test_unix();
409         let _acceptor = UnixListener::bind(&path).listen();
410         assert!(path.exists());
411     }
412
413     #[test]
414     fn unix_clone_smoke() {
415         let addr = next_test_unix();
416         let mut acceptor = UnixListener::bind(&addr).listen();
417
418         let _t = Thread::spawn(move|| {
419             let mut s = UnixStream::connect(&addr);
420             let mut buf = [0, 0];
421             debug!("client reading");
422             assert_eq!(s.read(&mut buf), Ok(1));
423             assert_eq!(buf[0], 1);
424             debug!("client writing");
425             s.write(&[2]).unwrap();
426             debug!("client dropping");
427         });
428
429         let mut s1 = acceptor.accept().unwrap();
430         let s2 = s1.clone();
431
432         let (tx1, rx1) = channel();
433         let (tx2, rx2) = channel();
434         let _t = Thread::spawn(move|| {
435             let mut s2 = s2;
436             rx1.recv().unwrap();
437             debug!("writer writing");
438             s2.write(&[1]).unwrap();
439             debug!("writer done");
440             tx2.send(()).unwrap();
441         });
442         tx1.send(()).unwrap();
443         let mut buf = [0, 0];
444         debug!("reader reading");
445         assert_eq!(s1.read(&mut buf), Ok(1));
446         debug!("reader done");
447         rx2.recv().unwrap();
448     }
449
450     #[test]
451     fn unix_clone_two_read() {
452         let addr = next_test_unix();
453         let mut acceptor = UnixListener::bind(&addr).listen();
454         let (tx1, rx) = channel();
455         let tx2 = tx1.clone();
456
457         let _t = Thread::spawn(move|| {
458             let mut s = UnixStream::connect(&addr);
459             s.write(&[1]).unwrap();
460             rx.recv().unwrap();
461             s.write(&[2]).unwrap();
462             rx.recv().unwrap();
463         });
464
465         let mut s1 = acceptor.accept().unwrap();
466         let s2 = s1.clone();
467
468         let (done, rx) = channel();
469         let _t = Thread::spawn(move|| {
470             let mut s2 = s2;
471             let mut buf = [0, 0];
472             s2.read(&mut buf).unwrap();
473             tx2.send(()).unwrap();
474             done.send(()).unwrap();
475         });
476         let mut buf = [0, 0];
477         s1.read(&mut buf).unwrap();
478         tx1.send(()).unwrap();
479
480         rx.recv().unwrap();
481     }
482
483     #[test]
484     fn unix_clone_two_write() {
485         let addr = next_test_unix();
486         let mut acceptor = UnixListener::bind(&addr).listen();
487
488         let _t = Thread::spawn(move|| {
489             let mut s = UnixStream::connect(&addr);
490             let buf = &mut [0, 1];
491             s.read(buf).unwrap();
492             s.read(buf).unwrap();
493         });
494
495         let mut s1 = acceptor.accept().unwrap();
496         let s2 = s1.clone();
497
498         let (tx, rx) = channel();
499         let _t = Thread::spawn(move|| {
500             let mut s2 = s2;
501             s2.write(&[1]).unwrap();
502             tx.send(()).unwrap();
503         });
504         s1.write(&[2]).unwrap();
505
506         rx.recv().unwrap();
507     }
508
509     #[cfg(not(windows))]
510     #[test]
511     fn drop_removes_listener_path() {
512         let path = next_test_unix();
513         let l = UnixListener::bind(&path).unwrap();
514         assert!(path.exists());
515         drop(l);
516         assert!(!path.exists());
517     }
518
519     #[cfg(not(windows))]
520     #[test]
521     fn drop_removes_acceptor_path() {
522         let path = next_test_unix();
523         let l = UnixListener::bind(&path).unwrap();
524         assert!(path.exists());
525         drop(l.listen().unwrap());
526         assert!(!path.exists());
527     }
528
529     #[test]
530     fn accept_timeout() {
531         let addr = next_test_unix();
532         let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
533
534         a.set_timeout(Some(10));
535
536         // Make sure we time out once and future invocations also time out
537         let err = a.accept().err().unwrap();
538         assert_eq!(err.kind, TimedOut);
539         let err = a.accept().err().unwrap();
540         assert_eq!(err.kind, TimedOut);
541
542         // Also make sure that even though the timeout is expired that we will
543         // continue to receive any pending connections.
544         let (tx, rx) = channel();
545         let addr2 = addr.clone();
546         let _t = Thread::spawn(move|| {
547             tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
548         });
549         let l = rx.recv().unwrap();
550         for i in range(0u, 1001) {
551             match a.accept() {
552                 Ok(..) => break,
553                 Err(ref e) if e.kind == TimedOut => {}
554                 Err(e) => panic!("error: {}", e),
555             }
556             ::thread::Thread::yield_now();
557             if i == 1000 { panic!("should have a pending connection") }
558         }
559         drop(l);
560
561         // Unset the timeout and make sure that this always blocks.
562         a.set_timeout(None);
563         let addr2 = addr.clone();
564         let _t = Thread::spawn(move|| {
565             drop(UnixStream::connect(&addr2).unwrap());
566         });
567         a.accept().unwrap();
568     }
569
570     #[test]
571     fn connect_timeout_error() {
572         let addr = next_test_unix();
573         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
574     }
575
576     #[test]
577     fn connect_timeout_success() {
578         let addr = next_test_unix();
579         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
580         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
581     }
582
583     #[test]
584     fn connect_timeout_zero() {
585         let addr = next_test_unix();
586         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
587         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
588     }
589
590     #[test]
591     fn connect_timeout_negative() {
592         let addr = next_test_unix();
593         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
594         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
595     }
596
597     #[test]
598     fn close_readwrite_smoke() {
599         let addr = next_test_unix();
600         let a = UnixListener::bind(&addr).listen().unwrap();
601         let (_tx, rx) = channel::<()>();
602         Thread::spawn(move|| {
603             let mut a = a;
604             let _s = a.accept().unwrap();
605             let _ = rx.recv();
606         }).detach();
607
608         let mut b = [0];
609         let mut s = UnixStream::connect(&addr).unwrap();
610         let mut s2 = s.clone();
611
612         // closing should prevent reads/writes
613         s.close_write().unwrap();
614         assert!(s.write(&[0]).is_err());
615         s.close_read().unwrap();
616         assert!(s.read(&mut b).is_err());
617
618         // closing should affect previous handles
619         assert!(s2.write(&[0]).is_err());
620         assert!(s2.read(&mut b).is_err());
621
622         // closing should affect new handles
623         let mut s3 = s.clone();
624         assert!(s3.write(&[0]).is_err());
625         assert!(s3.read(&mut b).is_err());
626
627         // make sure these don't die
628         let _ = s2.close_read();
629         let _ = s2.close_write();
630         let _ = s3.close_read();
631         let _ = s3.close_write();
632     }
633
634     #[test]
635     fn close_read_wakes_up() {
636         let addr = next_test_unix();
637         let a = UnixListener::bind(&addr).listen().unwrap();
638         let (_tx, rx) = channel::<()>();
639         Thread::spawn(move|| {
640             let mut a = a;
641             let _s = a.accept().unwrap();
642             let _ = rx.recv();
643         }).detach();
644
645         let mut s = UnixStream::connect(&addr).unwrap();
646         let s2 = s.clone();
647         let (tx, rx) = channel();
648         let _t = Thread::spawn(move|| {
649             let mut s2 = s2;
650             assert!(s2.read(&mut [0]).is_err());
651             tx.send(()).unwrap();
652         });
653         // this should wake up the child task
654         s.close_read().unwrap();
655
656         // this test will never finish if the child doesn't wake up
657         rx.recv().unwrap();
658     }
659
660     #[test]
661     fn readwrite_timeouts() {
662         let addr = next_test_unix();
663         let mut a = UnixListener::bind(&addr).listen().unwrap();
664         let (tx, rx) = channel::<()>();
665         Thread::spawn(move|| {
666             let mut s = UnixStream::connect(&addr).unwrap();
667             rx.recv().unwrap();
668             assert!(s.write(&[0]).is_ok());
669             let _ = rx.recv();
670         }).detach();
671
672         let mut s = a.accept().unwrap();
673         s.set_timeout(Some(20));
674         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
675         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
676
677         s.set_timeout(Some(20));
678         for i in range(0u, 1001) {
679             match s.write(&[0, .. 128 * 1024]) {
680                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
681                 Err(IoError { kind: TimedOut, .. }) => break,
682                 Err(e) => panic!("{}", e),
683            }
684            if i == 1000 { panic!("should have filled up?!"); }
685         }
686
687         // I'm not sure as to why, but apparently the write on windows always
688         // succeeds after the previous timeout. Who knows?
689         if !cfg!(windows) {
690             assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
691         }
692
693         tx.send(()).unwrap();
694         s.set_timeout(None);
695         assert_eq!(s.read(&mut [0, 0]), Ok(1));
696     }
697
698     #[test]
699     fn read_timeouts() {
700         let addr = next_test_unix();
701         let mut a = UnixListener::bind(&addr).listen().unwrap();
702         let (tx, rx) = channel::<()>();
703         Thread::spawn(move|| {
704             let mut s = UnixStream::connect(&addr).unwrap();
705             rx.recv().unwrap();
706             let mut amt = 0;
707             while amt < 100 * 128 * 1024 {
708                 match s.read(&mut [0, ..128 * 1024]) {
709                     Ok(n) => { amt += n; }
710                     Err(e) => panic!("{}", e),
711                 }
712             }
713             let _ = rx.recv();
714         }).detach();
715
716         let mut s = a.accept().unwrap();
717         s.set_read_timeout(Some(20));
718         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
719         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
720
721         tx.send(()).unwrap();
722         for _ in range(0u, 100) {
723             assert!(s.write(&[0, ..128 * 1024]).is_ok());
724         }
725     }
726
727     #[test]
728     fn write_timeouts() {
729         let addr = next_test_unix();
730         let mut a = UnixListener::bind(&addr).listen().unwrap();
731         let (tx, rx) = channel::<()>();
732         Thread::spawn(move|| {
733             let mut s = UnixStream::connect(&addr).unwrap();
734             rx.recv().unwrap();
735             assert!(s.write(&[0]).is_ok());
736             let _ = rx.recv();
737         }).detach();
738
739         let mut s = a.accept().unwrap();
740         s.set_write_timeout(Some(20));
741         for i in range(0u, 1001) {
742             match s.write(&[0, .. 128 * 1024]) {
743                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
744                 Err(IoError { kind: TimedOut, .. }) => break,
745                 Err(e) => panic!("{}", e),
746            }
747            if i == 1000 { panic!("should have filled up?!"); }
748         }
749
750         tx.send(()).unwrap();
751         assert!(s.read(&mut [0]).is_ok());
752     }
753
754     #[test]
755     fn timeout_concurrent_read() {
756         let addr = next_test_unix();
757         let mut a = UnixListener::bind(&addr).listen().unwrap();
758         let (tx, rx) = channel::<()>();
759         Thread::spawn(move|| {
760             let mut s = UnixStream::connect(&addr).unwrap();
761             rx.recv().unwrap();
762             assert!(s.write(&[0]).is_ok());
763             let _ = rx.recv();
764         }).detach();
765
766         let mut s = a.accept().unwrap();
767         let s2 = s.clone();
768         let (tx2, rx2) = channel();
769         let _t = Thread::spawn(move|| {
770             let mut s2 = s2;
771             assert!(s2.read(&mut [0]).is_ok());
772             tx2.send(()).unwrap();
773         });
774
775         s.set_read_timeout(Some(20));
776         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
777         tx.send(()).unwrap();
778
779         rx2.recv().unwrap();
780     }
781
782     #[cfg(not(windows))]
783     #[test]
784     fn clone_accept_smoke() {
785         let addr = next_test_unix();
786         let l = UnixListener::bind(&addr);
787         let mut a = l.listen().unwrap();
788         let mut a2 = a.clone();
789
790         let addr2 = addr.clone();
791         let _t = Thread::spawn(move|| {
792             let _ = UnixStream::connect(&addr2);
793         });
794         let _t = Thread::spawn(move|| {
795             let _ = UnixStream::connect(&addr);
796         });
797
798         assert!(a.accept().is_ok());
799         drop(a);
800         assert!(a2.accept().is_ok());
801     }
802
803     #[cfg(not(windows))] // FIXME #17553
804     #[test]
805     fn clone_accept_concurrent() {
806         let addr = next_test_unix();
807         let l = UnixListener::bind(&addr);
808         let a = l.listen().unwrap();
809         let a2 = a.clone();
810
811         let (tx, rx) = channel();
812         let tx2 = tx.clone();
813
814         let _t = Thread::spawn(move|| {
815             let mut a = a;
816             tx.send(a.accept()).unwrap()
817         });
818         let _t = Thread::spawn(move|| {
819             let mut a = a2;
820             tx2.send(a.accept()).unwrap()
821         });
822
823         let addr2 = addr.clone();
824         let _t = Thread::spawn(move|| {
825             let _ = UnixStream::connect(&addr2);
826         });
827         let _t = Thread::spawn(move|| {
828             let _ = UnixStream::connect(&addr);
829         });
830
831         assert!(rx.recv().unwrap().is_ok());
832         assert!(rx.recv().unwrap().is_ok());
833     }
834
835     #[test]
836     fn close_accept_smoke() {
837         let addr = next_test_unix();
838         let l = UnixListener::bind(&addr);
839         let mut a = l.listen().unwrap();
840
841         a.close_accept().unwrap();
842         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
843     }
844
845     #[test]
846     fn close_accept_concurrent() {
847         let addr = next_test_unix();
848         let l = UnixListener::bind(&addr);
849         let a = l.listen().unwrap();
850         let mut a2 = a.clone();
851
852         let (tx, rx) = channel();
853         let _t = Thread::spawn(move|| {
854             let mut a = a;
855             tx.send(a.accept()).unwrap();
856         });
857         a2.close_accept().unwrap();
858
859         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
860     }
861 }