]> git.lizzy.rs Git - rust.git/blob - src/libstd/old_io/net/pipe.rs
Auto merge of #22541 - Manishearth:rollup, r=Gankro
[rust.git] / src / libstd / old_io / net / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes
12 //!
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
16 //!
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
20
21 #![allow(missing_docs)]
22
23 use prelude::v1::*;
24
25 use ffi::CString;
26 use old_path::BytesContainer;
27 use old_io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
28 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
29 use sys::pipe::UnixListener as UnixListenerImp;
30 use sys::pipe::UnixStream as UnixStreamImp;
31 use time::Duration;
32
33 use sys_common;
34
35 /// A stream which communicates over a named pipe.
36 pub struct UnixStream {
37     inner: UnixStreamImp,
38 }
39
40 impl UnixStream {
41
42     /// Connect to a pipe named by `path`. This will attempt to open a
43     /// connection to the underlying socket.
44     ///
45     /// The returned stream will be closed when the object falls out of scope.
46     ///
47     /// # Example
48     ///
49     /// ```rust
50     /// # #![allow(unused_must_use)]
51     /// use std::old_io::net::pipe::UnixStream;
52     ///
53     /// let server = Path::new("path/to/my/socket");
54     /// let mut stream = UnixStream::connect(&server);
55     /// stream.write(&[1, 2, 3]);
56     /// ```
57     pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> {
58         let path = try!(CString::new(path.container_as_bytes()));
59         UnixStreamImp::connect(&path, None)
60             .map(|inner| UnixStream { inner: inner })
61     }
62
63     /// Connect to a pipe named by `path`, timing out if the specified number of
64     /// milliseconds.
65     ///
66     /// This function is similar to `connect`, except that if `timeout`
67     /// elapses the function will return an error of kind `TimedOut`.
68     ///
69     /// If a `timeout` with zero or negative duration is specified then
70     /// the function returns `Err`, with the error kind set to `TimedOut`.
71     #[unstable(feature = "io",
72                reason = "the timeout argument is likely to change types")]
73     pub fn connect_timeout<P>(path: P, timeout: Duration)
74                               -> IoResult<UnixStream>
75                               where P: BytesContainer {
76         if timeout <= Duration::milliseconds(0) {
77             return Err(standard_error(TimedOut));
78         }
79
80         let path = try!(CString::new(path.container_as_bytes()));
81         UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64))
82             .map(|inner| UnixStream { inner: inner })
83     }
84
85
86     /// Closes the reading half of this connection.
87     ///
88     /// This method will close the reading portion of this connection, causing
89     /// all pending and future reads to immediately return with an error.
90     ///
91     /// Note that this method affects all cloned handles associated with this
92     /// stream, not just this one handle.
93     pub fn close_read(&mut self) -> IoResult<()> {
94         self.inner.close_read()
95     }
96
97     /// Closes the writing half of this connection.
98     ///
99     /// This method will close the writing portion of this connection, causing
100     /// all pending and future writes to immediately return with an error.
101     ///
102     /// Note that this method affects all cloned handles associated with this
103     /// stream, not just this one handle.
104     pub fn close_write(&mut self) -> IoResult<()> {
105         self.inner.close_write()
106     }
107
108     /// Sets the read/write timeout for this socket.
109     ///
110     /// For more information, see `TcpStream::set_timeout`
111     #[unstable(feature = "io",
112                reason = "the timeout argument may change in type and value")]
113     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
114         self.inner.set_timeout(timeout_ms)
115     }
116
117     /// Sets the read timeout for this socket.
118     ///
119     /// For more information, see `TcpStream::set_timeout`
120     #[unstable(feature = "io",
121                reason = "the timeout argument may change in type and value")]
122     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
123         self.inner.set_read_timeout(timeout_ms)
124     }
125
126     /// Sets the write timeout for this socket.
127     ///
128     /// For more information, see `TcpStream::set_timeout`
129     #[unstable(feature = "io",
130                reason = "the timeout argument may change in type and value")]
131     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
132         self.inner.set_write_timeout(timeout_ms)
133     }
134 }
135
136 impl Clone for UnixStream {
137     fn clone(&self) -> UnixStream {
138         UnixStream { inner: self.inner.clone() }
139     }
140 }
141
142 impl Reader for UnixStream {
143     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
144         self.inner.read(buf)
145     }
146 }
147
148 impl Writer for UnixStream {
149     fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
150         self.inner.write(buf)
151     }
152 }
153
154 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
155     fn as_inner(&self) -> &UnixStreamImp {
156         &self.inner
157     }
158 }
159
160 /// A value that can listen for incoming named pipe connection requests.
161 pub struct UnixListener {
162     /// The internal, opaque runtime Unix listener.
163     inner: UnixListenerImp,
164 }
165
166 impl UnixListener {
167     /// Creates a new listener, ready to receive incoming connections on the
168     /// specified socket. The server will be named by `path`.
169     ///
170     /// This listener will be closed when it falls out of scope.
171     ///
172     /// # Example
173     ///
174     /// ```
175     /// # fn foo() {
176     /// use std::old_io::net::pipe::UnixListener;
177     /// use std::old_io::{Listener, Acceptor};
178     ///
179     /// let server = Path::new("/path/to/my/socket");
180     /// let stream = UnixListener::bind(&server);
181     /// for mut client in stream.listen().incoming() {
182     ///     client.write(&[1, 2, 3, 4]);
183     /// }
184     /// # }
185     /// ```
186     pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> {
187         let path = try!(CString::new(path.container_as_bytes()));
188         UnixListenerImp::bind(&path)
189             .map(|inner| UnixListener { inner: inner })
190     }
191 }
192
193 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
194     fn listen(self) -> IoResult<UnixAcceptor> {
195         self.inner.listen()
196             .map(|inner| UnixAcceptor { inner: inner })
197     }
198 }
199
200 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
201     fn as_inner(&self) -> &UnixListenerImp {
202         &self.inner
203     }
204 }
205
206 /// A value that can accept named pipe connections, returned from `listen()`.
207 pub struct UnixAcceptor {
208     /// The internal, opaque runtime Unix acceptor.
209     inner: UnixAcceptorImp
210 }
211
212 impl UnixAcceptor {
213     /// Sets a timeout for this acceptor, after which accept() will no longer
214     /// block indefinitely.
215     ///
216     /// The argument specified is the amount of time, in milliseconds, into the
217     /// future after which all invocations of accept() will not block (and any
218     /// pending invocation will return). A value of `None` will clear any
219     /// existing timeout.
220     ///
221     /// When using this method, it is likely necessary to reset the timeout as
222     /// appropriate, the timeout specified is specific to this object, not
223     /// specific to the next request.
224     #[unstable(feature = "io",
225                reason = "the name and arguments to this function are likely \
226                          to change")]
227     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
228         self.inner.set_timeout(timeout_ms)
229     }
230
231     /// Closes the accepting capabilities of this acceptor.
232     ///
233     /// This function has the same semantics as `TcpAcceptor::close_accept`, and
234     /// more information can be found in that documentation.
235     #[unstable(feature = "io")]
236     pub fn close_accept(&mut self) -> IoResult<()> {
237         self.inner.close_accept()
238     }
239 }
240
241 impl Acceptor<UnixStream> for UnixAcceptor {
242     fn accept(&mut self) -> IoResult<UnixStream> {
243         self.inner.accept().map(|s| {
244             UnixStream { inner: s }
245         })
246     }
247 }
248
249 impl Clone for UnixAcceptor {
250     /// Creates a new handle to this unix acceptor, allowing for simultaneous
251     /// accepts.
252     ///
253     /// The underlying unix acceptor will not be closed until all handles to the
254     /// acceptor have been deallocated. Incoming connections will be received on
255     /// at most once acceptor, the same connection will not be accepted twice.
256     ///
257     /// The `close_accept` method will shut down *all* acceptors cloned from the
258     /// same original acceptor, whereas the `set_timeout` method only affects
259     /// the selector that it is called on.
260     ///
261     /// This function is useful for creating a handle to invoke `close_accept`
262     /// on to wake up any other task blocked in `accept`.
263     fn clone(&self) -> UnixAcceptor {
264         UnixAcceptor { inner: self.inner.clone() }
265     }
266 }
267
268 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
269     fn as_inner(&self) -> &UnixAcceptorImp {
270         &self.inner
271     }
272 }
273
274 #[cfg(test)]
275 mod tests {
276     use prelude::v1::*;
277
278     use old_io::fs::PathExtensions;
279     use old_io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
280     use old_io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
281     use old_io::{PermissionDenied, Acceptor, Listener};
282     use old_io::test::*;
283     use super::*;
284     use sync::mpsc::channel;
285     use thread;
286     use time::Duration;
287
288     pub fn smalltest<F,G>(server: F, client: G)
289         where F : FnOnce(UnixStream), F : Send,
290               G : FnOnce(UnixStream), G : Send + 'static
291     {
292         let path1 = next_test_unix();
293         let path2 = path1.clone();
294
295         let mut acceptor = UnixListener::bind(&path1).listen();
296
297         let _t = thread::spawn(move|| {
298             match UnixStream::connect(&path2) {
299                 Ok(c) => client(c),
300                 Err(e) => panic!("failed connect: {}", e),
301             }
302         });
303
304         match acceptor.accept() {
305             Ok(c) => server(c),
306             Err(e) => panic!("failed accept: {}", e),
307         }
308     }
309
310     #[test]
311     fn bind_error() {
312         let path = "path/to/nowhere";
313         match UnixListener::bind(&path) {
314             Ok(..) => panic!(),
315             Err(e) => {
316                 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
317                         e.kind == InvalidInput);
318             }
319         }
320     }
321
322     #[test]
323     fn connect_error() {
324         let path = if cfg!(windows) {
325             r"\\.\pipe\this_should_not_exist_ever"
326         } else {
327             "path/to/nowhere"
328         };
329         match UnixStream::connect(&path) {
330             Ok(..) => panic!(),
331             Err(e) => {
332                 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
333             }
334         }
335     }
336
337     #[test]
338     fn smoke() {
339         smalltest(move |mut server| {
340             let mut buf = [0];
341             server.read(&mut buf).unwrap();
342             assert!(buf[0] == 99);
343         }, move|mut client| {
344             client.write(&[99]).unwrap();
345         })
346     }
347
348     #[cfg_attr(windows, ignore)] // FIXME(#12516)
349     #[test]
350     fn read_eof() {
351         smalltest(move|mut server| {
352             let mut buf = [0];
353             assert!(server.read(&mut buf).is_err());
354             assert!(server.read(&mut buf).is_err());
355         }, move|_client| {
356             // drop the client
357         })
358     }
359
360     #[test]
361     fn write_begone() {
362         smalltest(move|mut server| {
363             let buf = [0];
364             loop {
365                 match server.write(&buf) {
366                     Ok(..) => {}
367                     Err(e) => {
368                         assert!(e.kind == BrokenPipe ||
369                                 e.kind == NotConnected ||
370                                 e.kind == ConnectionReset,
371                                 "unknown error {}", e);
372                         break;
373                     }
374                 }
375             }
376         }, move|_client| {
377             // drop the client
378         })
379     }
380
381     #[test]
382     fn accept_lots() {
383         let times = 10;
384         let path1 = next_test_unix();
385         let path2 = path1.clone();
386
387         let mut acceptor = match UnixListener::bind(&path1).listen() {
388             Ok(a) => a,
389             Err(e) => panic!("failed listen: {}", e),
390         };
391
392         let _t = thread::spawn(move|| {
393             for _ in 0..times {
394                 let mut stream = UnixStream::connect(&path2);
395                 match stream.write(&[100]) {
396                     Ok(..) => {}
397                     Err(e) => panic!("failed write: {}", e)
398                 }
399             }
400         });
401
402         for _ in 0..times {
403             let mut client = acceptor.accept();
404             let mut buf = [0];
405             match client.read(&mut buf) {
406                 Ok(..) => {}
407                 Err(e) => panic!("failed read/accept: {}", e),
408             }
409             assert_eq!(buf[0], 100);
410         }
411     }
412
413     #[cfg(unix)]
414     #[test]
415     fn path_exists() {
416         let path = next_test_unix();
417         let _acceptor = UnixListener::bind(&path).listen();
418         assert!(path.exists());
419     }
420
421     #[test]
422     fn unix_clone_smoke() {
423         let addr = next_test_unix();
424         let mut acceptor = UnixListener::bind(&addr).listen();
425
426         let _t = thread::spawn(move|| {
427             let mut s = UnixStream::connect(&addr);
428             let mut buf = [0, 0];
429             debug!("client reading");
430             assert_eq!(s.read(&mut buf), Ok(1));
431             assert_eq!(buf[0], 1);
432             debug!("client writing");
433             s.write(&[2]).unwrap();
434             debug!("client dropping");
435         });
436
437         let mut s1 = acceptor.accept().unwrap();
438         let s2 = s1.clone();
439
440         let (tx1, rx1) = channel();
441         let (tx2, rx2) = channel();
442         let _t = thread::spawn(move|| {
443             let mut s2 = s2;
444             rx1.recv().unwrap();
445             debug!("writer writing");
446             s2.write(&[1]).unwrap();
447             debug!("writer done");
448             tx2.send(()).unwrap();
449         });
450         tx1.send(()).unwrap();
451         let mut buf = [0, 0];
452         debug!("reader reading");
453         assert_eq!(s1.read(&mut buf), Ok(1));
454         debug!("reader done");
455         rx2.recv().unwrap();
456     }
457
458     #[test]
459     fn unix_clone_two_read() {
460         let addr = next_test_unix();
461         let mut acceptor = UnixListener::bind(&addr).listen();
462         let (tx1, rx) = channel();
463         let tx2 = tx1.clone();
464
465         let _t = thread::spawn(move|| {
466             let mut s = UnixStream::connect(&addr);
467             s.write(&[1]).unwrap();
468             rx.recv().unwrap();
469             s.write(&[2]).unwrap();
470             rx.recv().unwrap();
471         });
472
473         let mut s1 = acceptor.accept().unwrap();
474         let s2 = s1.clone();
475
476         let (done, rx) = channel();
477         let _t = thread::spawn(move|| {
478             let mut s2 = s2;
479             let mut buf = [0, 0];
480             s2.read(&mut buf).unwrap();
481             tx2.send(()).unwrap();
482             done.send(()).unwrap();
483         });
484         let mut buf = [0, 0];
485         s1.read(&mut buf).unwrap();
486         tx1.send(()).unwrap();
487
488         rx.recv().unwrap();
489     }
490
491     #[test]
492     fn unix_clone_two_write() {
493         let addr = next_test_unix();
494         let mut acceptor = UnixListener::bind(&addr).listen();
495
496         let _t = thread::spawn(move|| {
497             let mut s = UnixStream::connect(&addr);
498             let buf = &mut [0, 1];
499             s.read(buf).unwrap();
500             s.read(buf).unwrap();
501         });
502
503         let mut s1 = acceptor.accept().unwrap();
504         let s2 = s1.clone();
505
506         let (tx, rx) = channel();
507         let _t = thread::spawn(move|| {
508             let mut s2 = s2;
509             s2.write(&[1]).unwrap();
510             tx.send(()).unwrap();
511         });
512         s1.write(&[2]).unwrap();
513
514         rx.recv().unwrap();
515     }
516
517     #[cfg(not(windows))]
518     #[test]
519     fn drop_removes_listener_path() {
520         let path = next_test_unix();
521         let l = UnixListener::bind(&path).unwrap();
522         assert!(path.exists());
523         drop(l);
524         assert!(!path.exists());
525     }
526
527     #[cfg(not(windows))]
528     #[test]
529     fn drop_removes_acceptor_path() {
530         let path = next_test_unix();
531         let l = UnixListener::bind(&path).unwrap();
532         assert!(path.exists());
533         drop(l.listen().unwrap());
534         assert!(!path.exists());
535     }
536
537     #[test]
538     fn accept_timeout() {
539         let addr = next_test_unix();
540         let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
541
542         a.set_timeout(Some(10));
543
544         // Make sure we time out once and future invocations also time out
545         let err = a.accept().err().unwrap();
546         assert_eq!(err.kind, TimedOut);
547         let err = a.accept().err().unwrap();
548         assert_eq!(err.kind, TimedOut);
549
550         // Also make sure that even though the timeout is expired that we will
551         // continue to receive any pending connections.
552         let (tx, rx) = channel();
553         let addr2 = addr.clone();
554         let _t = thread::spawn(move|| {
555             tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
556         });
557         let l = rx.recv().unwrap();
558         for i in 0..1001 {
559             match a.accept() {
560                 Ok(..) => break,
561                 Err(ref e) if e.kind == TimedOut => {}
562                 Err(e) => panic!("error: {}", e),
563             }
564             ::thread::yield_now();
565             if i == 1000 { panic!("should have a pending connection") }
566         }
567         drop(l);
568
569         // Unset the timeout and make sure that this always blocks.
570         a.set_timeout(None);
571         let addr2 = addr.clone();
572         let _t = thread::spawn(move|| {
573             drop(UnixStream::connect(&addr2).unwrap());
574         });
575         a.accept().unwrap();
576     }
577
578     #[test]
579     fn connect_timeout_error() {
580         let addr = next_test_unix();
581         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
582     }
583
584     #[test]
585     fn connect_timeout_success() {
586         let addr = next_test_unix();
587         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
588         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
589     }
590
591     #[test]
592     fn connect_timeout_zero() {
593         let addr = next_test_unix();
594         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
595         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
596     }
597
598     #[test]
599     fn connect_timeout_negative() {
600         let addr = next_test_unix();
601         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
602         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
603     }
604
605     #[test]
606     fn close_readwrite_smoke() {
607         let addr = next_test_unix();
608         let a = UnixListener::bind(&addr).listen().unwrap();
609         let (_tx, rx) = channel::<()>();
610         thread::spawn(move|| {
611             let mut a = a;
612             let _s = a.accept().unwrap();
613             let _ = rx.recv();
614         });
615
616         let mut b = [0];
617         let mut s = UnixStream::connect(&addr).unwrap();
618         let mut s2 = s.clone();
619
620         // closing should prevent reads/writes
621         s.close_write().unwrap();
622         assert!(s.write(&[0]).is_err());
623         s.close_read().unwrap();
624         assert!(s.read(&mut b).is_err());
625
626         // closing should affect previous handles
627         assert!(s2.write(&[0]).is_err());
628         assert!(s2.read(&mut b).is_err());
629
630         // closing should affect new handles
631         let mut s3 = s.clone();
632         assert!(s3.write(&[0]).is_err());
633         assert!(s3.read(&mut b).is_err());
634
635         // make sure these don't die
636         let _ = s2.close_read();
637         let _ = s2.close_write();
638         let _ = s3.close_read();
639         let _ = s3.close_write();
640     }
641
642     #[test]
643     fn close_read_wakes_up() {
644         let addr = next_test_unix();
645         let a = UnixListener::bind(&addr).listen().unwrap();
646         let (_tx, rx) = channel::<()>();
647         thread::spawn(move|| {
648             let mut a = a;
649             let _s = a.accept().unwrap();
650             let _ = rx.recv();
651         });
652
653         let mut s = UnixStream::connect(&addr).unwrap();
654         let s2 = s.clone();
655         let (tx, rx) = channel();
656         let _t = thread::spawn(move|| {
657             let mut s2 = s2;
658             assert!(s2.read(&mut [0]).is_err());
659             tx.send(()).unwrap();
660         });
661         // this should wake up the child task
662         s.close_read().unwrap();
663
664         // this test will never finish if the child doesn't wake up
665         rx.recv().unwrap();
666     }
667
668     #[test]
669     fn readwrite_timeouts() {
670         let addr = next_test_unix();
671         let mut a = UnixListener::bind(&addr).listen().unwrap();
672         let (tx, rx) = channel::<()>();
673         thread::spawn(move|| {
674             let mut s = UnixStream::connect(&addr).unwrap();
675             rx.recv().unwrap();
676             assert!(s.write(&[0]).is_ok());
677             let _ = rx.recv();
678         });
679
680         let mut s = a.accept().unwrap();
681         s.set_timeout(Some(20));
682         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
683         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
684
685         s.set_timeout(Some(20));
686         for i in 0..1001 {
687             match s.write(&[0; 128 * 1024]) {
688                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
689                 Err(IoError { kind: TimedOut, .. }) => break,
690                 Err(e) => panic!("{}", e),
691            }
692            if i == 1000 { panic!("should have filled up?!"); }
693         }
694
695         // I'm not sure as to why, but apparently the write on windows always
696         // succeeds after the previous timeout. Who knows?
697         if !cfg!(windows) {
698             assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
699         }
700
701         tx.send(()).unwrap();
702         s.set_timeout(None);
703         assert_eq!(s.read(&mut [0, 0]), Ok(1));
704     }
705
706     #[test]
707     fn read_timeouts() {
708         let addr = next_test_unix();
709         let mut a = UnixListener::bind(&addr).listen().unwrap();
710         let (tx, rx) = channel::<()>();
711         thread::spawn(move|| {
712             let mut s = UnixStream::connect(&addr).unwrap();
713             rx.recv().unwrap();
714             let mut amt = 0;
715             while amt < 100 * 128 * 1024 {
716                 match s.read(&mut [0;128 * 1024]) {
717                     Ok(n) => { amt += n; }
718                     Err(e) => panic!("{}", e),
719                 }
720             }
721             let _ = rx.recv();
722         });
723
724         let mut s = a.accept().unwrap();
725         s.set_read_timeout(Some(20));
726         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
727         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
728
729         tx.send(()).unwrap();
730         for _ in 0..100 {
731             assert!(s.write(&[0;128 * 1024]).is_ok());
732         }
733     }
734
735     #[test]
736     fn write_timeouts() {
737         let addr = next_test_unix();
738         let mut a = UnixListener::bind(&addr).listen().unwrap();
739         let (tx, rx) = channel::<()>();
740         thread::spawn(move|| {
741             let mut s = UnixStream::connect(&addr).unwrap();
742             rx.recv().unwrap();
743             assert!(s.write(&[0]).is_ok());
744             let _ = rx.recv();
745         });
746
747         let mut s = a.accept().unwrap();
748         s.set_write_timeout(Some(20));
749         for i in 0..1001 {
750             match s.write(&[0; 128 * 1024]) {
751                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
752                 Err(IoError { kind: TimedOut, .. }) => break,
753                 Err(e) => panic!("{}", e),
754            }
755            if i == 1000 { panic!("should have filled up?!"); }
756         }
757
758         tx.send(()).unwrap();
759         assert!(s.read(&mut [0]).is_ok());
760     }
761
762     #[test]
763     fn timeout_concurrent_read() {
764         let addr = next_test_unix();
765         let mut a = UnixListener::bind(&addr).listen().unwrap();
766         let (tx, rx) = channel::<()>();
767         thread::spawn(move|| {
768             let mut s = UnixStream::connect(&addr).unwrap();
769             rx.recv().unwrap();
770             assert!(s.write(&[0]).is_ok());
771             let _ = rx.recv();
772         });
773
774         let mut s = a.accept().unwrap();
775         let s2 = s.clone();
776         let (tx2, rx2) = channel();
777         let _t = thread::spawn(move|| {
778             let mut s2 = s2;
779             assert!(s2.read(&mut [0]).is_ok());
780             tx2.send(()).unwrap();
781         });
782
783         s.set_read_timeout(Some(20));
784         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
785         tx.send(()).unwrap();
786
787         rx2.recv().unwrap();
788     }
789
790     #[cfg(not(windows))]
791     #[test]
792     fn clone_accept_smoke() {
793         let addr = next_test_unix();
794         let l = UnixListener::bind(&addr);
795         let mut a = l.listen().unwrap();
796         let mut a2 = a.clone();
797
798         let addr2 = addr.clone();
799         let _t = thread::spawn(move|| {
800             let _ = UnixStream::connect(&addr2);
801         });
802         let _t = thread::spawn(move|| {
803             let _ = UnixStream::connect(&addr);
804         });
805
806         assert!(a.accept().is_ok());
807         drop(a);
808         assert!(a2.accept().is_ok());
809     }
810
811     #[cfg(not(windows))] // FIXME #17553
812     #[test]
813     fn clone_accept_concurrent() {
814         let addr = next_test_unix();
815         let l = UnixListener::bind(&addr);
816         let a = l.listen().unwrap();
817         let a2 = a.clone();
818
819         let (tx, rx) = channel();
820         let tx2 = tx.clone();
821
822         let _t = thread::spawn(move|| {
823             let mut a = a;
824             tx.send(a.accept()).unwrap()
825         });
826         let _t = thread::spawn(move|| {
827             let mut a = a2;
828             tx2.send(a.accept()).unwrap()
829         });
830
831         let addr2 = addr.clone();
832         let _t = thread::spawn(move|| {
833             let _ = UnixStream::connect(&addr2);
834         });
835         let _t = thread::spawn(move|| {
836             let _ = UnixStream::connect(&addr);
837         });
838
839         assert!(rx.recv().unwrap().is_ok());
840         assert!(rx.recv().unwrap().is_ok());
841     }
842
843     #[test]
844     fn close_accept_smoke() {
845         let addr = next_test_unix();
846         let l = UnixListener::bind(&addr);
847         let mut a = l.listen().unwrap();
848
849         a.close_accept().unwrap();
850         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
851     }
852
853     #[test]
854     fn close_accept_concurrent() {
855         let addr = next_test_unix();
856         let l = UnixListener::bind(&addr);
857         let a = l.listen().unwrap();
858         let mut a2 = a.clone();
859
860         let (tx, rx) = channel();
861         let _t = thread::spawn(move|| {
862             let mut a = a;
863             tx.send(a.accept()).unwrap();
864         });
865         a2.close_accept().unwrap();
866
867         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
868     }
869 }