]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/pipe.rs
Merge pull request #20674 from jbcrail/fix-misspelled-comments
[rust.git] / src / libstd / io / net / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes
12 //!
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
16 //!
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
20
21 #![allow(missing_docs)]
22
23 use prelude::v1::*;
24
25 use ffi::CString;
26 use path::BytesContainer;
27 use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
28 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
29 use sys::pipe::UnixListener as UnixListenerImp;
30 use sys::pipe::UnixStream as UnixStreamImp;
31 use time::Duration;
32
33 use sys_common;
34
35 /// A stream which communicates over a named pipe.
36 pub struct UnixStream {
37     inner: UnixStreamImp,
38 }
39
40 impl UnixStream {
41
42     /// Connect to a pipe named by `path`. This will attempt to open a
43     /// connection to the underlying socket.
44     ///
45     /// The returned stream will be closed when the object falls out of scope.
46     ///
47     /// # Example
48     ///
49     /// ```rust
50     /// # #![allow(unused_must_use)]
51     /// use std::io::net::pipe::UnixStream;
52     ///
53     /// let server = Path::new("path/to/my/socket");
54     /// let mut stream = UnixStream::connect(&server);
55     /// stream.write(&[1, 2, 3]);
56     /// ```
57     pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> {
58         let path = CString::from_slice(path.container_as_bytes());
59         UnixStreamImp::connect(&path, None)
60             .map(|inner| UnixStream { inner: inner })
61     }
62
63     /// Connect to a pipe named by `path`, timing out if the specified number of
64     /// milliseconds.
65     ///
66     /// This function is similar to `connect`, except that if `timeout`
67     /// elapses the function will return an error of kind `TimedOut`.
68     ///
69     /// If a `timeout` with zero or negative duration is specified then
70     /// the function returns `Err`, with the error kind set to `TimedOut`.
71     #[experimental = "the timeout argument is likely to change types"]
72     pub fn connect_timeout<P>(path: P, timeout: Duration)
73                               -> IoResult<UnixStream>
74                               where P: BytesContainer {
75         if timeout <= Duration::milliseconds(0) {
76             return Err(standard_error(TimedOut));
77         }
78
79         let path = CString::from_slice(path.container_as_bytes());
80         UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64))
81             .map(|inner| UnixStream { inner: inner })
82     }
83
84
85     /// Closes the reading half of this connection.
86     ///
87     /// This method will close the reading portion of this connection, causing
88     /// all pending and future reads to immediately return with an error.
89     ///
90     /// Note that this method affects all cloned handles associated with this
91     /// stream, not just this one handle.
92     pub fn close_read(&mut self) -> IoResult<()> {
93         self.inner.close_read()
94     }
95
96     /// Closes the writing half of this connection.
97     ///
98     /// This method will close the writing portion of this connection, causing
99     /// all pending and future writes to immediately return with an error.
100     ///
101     /// Note that this method affects all cloned handles associated with this
102     /// stream, not just this one handle.
103     pub fn close_write(&mut self) -> IoResult<()> {
104         self.inner.close_write()
105     }
106
107     /// Sets the read/write timeout for this socket.
108     ///
109     /// For more information, see `TcpStream::set_timeout`
110     #[experimental = "the timeout argument may change in type and value"]
111     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
112         self.inner.set_timeout(timeout_ms)
113     }
114
115     /// Sets the read timeout for this socket.
116     ///
117     /// For more information, see `TcpStream::set_timeout`
118     #[experimental = "the timeout argument may change in type and value"]
119     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
120         self.inner.set_read_timeout(timeout_ms)
121     }
122
123     /// Sets the write timeout for this socket.
124     ///
125     /// For more information, see `TcpStream::set_timeout`
126     #[experimental = "the timeout argument may change in type and value"]
127     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
128         self.inner.set_write_timeout(timeout_ms)
129     }
130 }
131
132 impl Clone for UnixStream {
133     fn clone(&self) -> UnixStream {
134         UnixStream { inner: self.inner.clone() }
135     }
136 }
137
138 impl Reader for UnixStream {
139     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
140         self.inner.read(buf)
141     }
142 }
143
144 impl Writer for UnixStream {
145     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
146         self.inner.write(buf)
147     }
148 }
149
150 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
151     fn as_inner(&self) -> &UnixStreamImp {
152         &self.inner
153     }
154 }
155
156 /// A value that can listen for incoming named pipe connection requests.
157 pub struct UnixListener {
158     /// The internal, opaque runtime Unix listener.
159     inner: UnixListenerImp,
160 }
161
162 impl UnixListener {
163     /// Creates a new listener, ready to receive incoming connections on the
164     /// specified socket. The server will be named by `path`.
165     ///
166     /// This listener will be closed when it falls out of scope.
167     ///
168     /// # Example
169     ///
170     /// ```
171     /// # fn main() {}
172     /// # fn foo() {
173     /// # #![allow(unused_must_use)]
174     /// use std::io::net::pipe::UnixListener;
175     /// use std::io::{Listener, Acceptor};
176     ///
177     /// let server = Path::new("/path/to/my/socket");
178     /// let stream = UnixListener::bind(&server);
179     /// for mut client in stream.listen().incoming() {
180     ///     client.write(&[1, 2, 3, 4]);
181     /// }
182     /// # }
183     /// ```
184     pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> {
185         let path = CString::from_slice(path.container_as_bytes());
186         UnixListenerImp::bind(&path)
187             .map(|inner| UnixListener { inner: inner })
188     }
189 }
190
191 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
192     fn listen(self) -> IoResult<UnixAcceptor> {
193         self.inner.listen()
194             .map(|inner| UnixAcceptor { inner: inner })
195     }
196 }
197
198 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
199     fn as_inner(&self) -> &UnixListenerImp {
200         &self.inner
201     }
202 }
203
204 /// A value that can accept named pipe connections, returned from `listen()`.
205 pub struct UnixAcceptor {
206     /// The internal, opaque runtime Unix acceptor.
207     inner: UnixAcceptorImp
208 }
209
210 impl UnixAcceptor {
211     /// Sets a timeout for this acceptor, after which accept() will no longer
212     /// block indefinitely.
213     ///
214     /// The argument specified is the amount of time, in milliseconds, into the
215     /// future after which all invocations of accept() will not block (and any
216     /// pending invocation will return). A value of `None` will clear any
217     /// existing timeout.
218     ///
219     /// When using this method, it is likely necessary to reset the timeout as
220     /// appropriate, the timeout specified is specific to this object, not
221     /// specific to the next request.
222     #[experimental = "the name and arguments to this function are likely \
223                       to change"]
224     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
225         self.inner.set_timeout(timeout_ms)
226     }
227
228     /// Closes the accepting capabilities of this acceptor.
229     ///
230     /// This function has the same semantics as `TcpAcceptor::close_accept`, and
231     /// more information can be found in that documentation.
232     #[experimental]
233     pub fn close_accept(&mut self) -> IoResult<()> {
234         self.inner.close_accept()
235     }
236 }
237
238 impl Acceptor<UnixStream> for UnixAcceptor {
239     fn accept(&mut self) -> IoResult<UnixStream> {
240         self.inner.accept().map(|s| {
241             UnixStream { inner: s }
242         })
243     }
244 }
245
246 impl Clone for UnixAcceptor {
247     /// Creates a new handle to this unix acceptor, allowing for simultaneous
248     /// accepts.
249     ///
250     /// The underlying unix acceptor will not be closed until all handles to the
251     /// acceptor have been deallocated. Incoming connections will be received on
252     /// at most once acceptor, the same connection will not be accepted twice.
253     ///
254     /// The `close_accept` method will shut down *all* acceptors cloned from the
255     /// same original acceptor, whereas the `set_timeout` method only affects
256     /// the selector that it is called on.
257     ///
258     /// This function is useful for creating a handle to invoke `close_accept`
259     /// on to wake up any other task blocked in `accept`.
260     fn clone(&self) -> UnixAcceptor {
261         UnixAcceptor { inner: self.inner.clone() }
262     }
263 }
264
265 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
266     fn as_inner(&self) -> &UnixAcceptorImp {
267         &self.inner
268     }
269 }
270
271 #[cfg(test)]
272 mod tests {
273     use prelude::v1::*;
274
275     use io::fs::PathExtensions;
276     use io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
277     use io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
278     use io::{PermissionDenied, Acceptor, Listener};
279     use io::test::*;
280     use super::*;
281     use sync::mpsc::channel;
282     use thread::Thread;
283     use time::Duration;
284
285     pub fn smalltest<F,G>(server: F, client: G)
286         where F : FnOnce(UnixStream), F : Send,
287               G : FnOnce(UnixStream), G : Send
288     {
289         let path1 = next_test_unix();
290         let path2 = path1.clone();
291
292         let mut acceptor = UnixListener::bind(&path1).listen();
293
294         let _t = Thread::spawn(move|| {
295             match UnixStream::connect(&path2) {
296                 Ok(c) => client(c),
297                 Err(e) => panic!("failed connect: {}", e),
298             }
299         });
300
301         match acceptor.accept() {
302             Ok(c) => server(c),
303             Err(e) => panic!("failed accept: {}", e),
304         }
305     }
306
307     #[test]
308     fn bind_error() {
309         let path = "path/to/nowhere";
310         match UnixListener::bind(&path) {
311             Ok(..) => panic!(),
312             Err(e) => {
313                 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
314                         e.kind == InvalidInput);
315             }
316         }
317     }
318
319     #[test]
320     fn connect_error() {
321         let path = if cfg!(windows) {
322             r"\\.\pipe\this_should_not_exist_ever"
323         } else {
324             "path/to/nowhere"
325         };
326         match UnixStream::connect(&path) {
327             Ok(..) => panic!(),
328             Err(e) => {
329                 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
330             }
331         }
332     }
333
334     #[test]
335     fn smoke() {
336         smalltest(move |mut server| {
337             let mut buf = [0];
338             server.read(&mut buf).unwrap();
339             assert!(buf[0] == 99);
340         }, move|mut client| {
341             client.write(&[99]).unwrap();
342         })
343     }
344
345     #[cfg_attr(windows, ignore)] // FIXME(#12516)
346     #[test]
347     fn read_eof() {
348         smalltest(move|mut server| {
349             let mut buf = [0];
350             assert!(server.read(&mut buf).is_err());
351             assert!(server.read(&mut buf).is_err());
352         }, move|_client| {
353             // drop the client
354         })
355     }
356
357     #[test]
358     fn write_begone() {
359         smalltest(move|mut server| {
360             let buf = [0];
361             loop {
362                 match server.write(&buf) {
363                     Ok(..) => {}
364                     Err(e) => {
365                         assert!(e.kind == BrokenPipe ||
366                                 e.kind == NotConnected ||
367                                 e.kind == ConnectionReset,
368                                 "unknown error {}", e);
369                         break;
370                     }
371                 }
372             }
373         }, move|_client| {
374             // drop the client
375         })
376     }
377
378     #[test]
379     fn accept_lots() {
380         let times = 10;
381         let path1 = next_test_unix();
382         let path2 = path1.clone();
383
384         let mut acceptor = match UnixListener::bind(&path1).listen() {
385             Ok(a) => a,
386             Err(e) => panic!("failed listen: {}", e),
387         };
388
389         let _t = Thread::spawn(move|| {
390             for _ in range(0u, times) {
391                 let mut stream = UnixStream::connect(&path2);
392                 match stream.write(&[100]) {
393                     Ok(..) => {}
394                     Err(e) => panic!("failed write: {}", e)
395                 }
396             }
397         });
398
399         for _ in range(0, times) {
400             let mut client = acceptor.accept();
401             let mut buf = [0];
402             match client.read(&mut buf) {
403                 Ok(..) => {}
404                 Err(e) => panic!("failed read/accept: {}", e),
405             }
406             assert_eq!(buf[0], 100);
407         }
408     }
409
410     #[cfg(unix)]
411     #[test]
412     fn path_exists() {
413         let path = next_test_unix();
414         let _acceptor = UnixListener::bind(&path).listen();
415         assert!(path.exists());
416     }
417
418     #[test]
419     fn unix_clone_smoke() {
420         let addr = next_test_unix();
421         let mut acceptor = UnixListener::bind(&addr).listen();
422
423         let _t = Thread::spawn(move|| {
424             let mut s = UnixStream::connect(&addr);
425             let mut buf = [0, 0];
426             debug!("client reading");
427             assert_eq!(s.read(&mut buf), Ok(1));
428             assert_eq!(buf[0], 1);
429             debug!("client writing");
430             s.write(&[2]).unwrap();
431             debug!("client dropping");
432         });
433
434         let mut s1 = acceptor.accept().unwrap();
435         let s2 = s1.clone();
436
437         let (tx1, rx1) = channel();
438         let (tx2, rx2) = channel();
439         let _t = Thread::spawn(move|| {
440             let mut s2 = s2;
441             rx1.recv().unwrap();
442             debug!("writer writing");
443             s2.write(&[1]).unwrap();
444             debug!("writer done");
445             tx2.send(()).unwrap();
446         });
447         tx1.send(()).unwrap();
448         let mut buf = [0, 0];
449         debug!("reader reading");
450         assert_eq!(s1.read(&mut buf), Ok(1));
451         debug!("reader done");
452         rx2.recv().unwrap();
453     }
454
455     #[test]
456     fn unix_clone_two_read() {
457         let addr = next_test_unix();
458         let mut acceptor = UnixListener::bind(&addr).listen();
459         let (tx1, rx) = channel();
460         let tx2 = tx1.clone();
461
462         let _t = Thread::spawn(move|| {
463             let mut s = UnixStream::connect(&addr);
464             s.write(&[1]).unwrap();
465             rx.recv().unwrap();
466             s.write(&[2]).unwrap();
467             rx.recv().unwrap();
468         });
469
470         let mut s1 = acceptor.accept().unwrap();
471         let s2 = s1.clone();
472
473         let (done, rx) = channel();
474         let _t = Thread::spawn(move|| {
475             let mut s2 = s2;
476             let mut buf = [0, 0];
477             s2.read(&mut buf).unwrap();
478             tx2.send(()).unwrap();
479             done.send(()).unwrap();
480         });
481         let mut buf = [0, 0];
482         s1.read(&mut buf).unwrap();
483         tx1.send(()).unwrap();
484
485         rx.recv().unwrap();
486     }
487
488     #[test]
489     fn unix_clone_two_write() {
490         let addr = next_test_unix();
491         let mut acceptor = UnixListener::bind(&addr).listen();
492
493         let _t = Thread::spawn(move|| {
494             let mut s = UnixStream::connect(&addr);
495             let buf = &mut [0, 1];
496             s.read(buf).unwrap();
497             s.read(buf).unwrap();
498         });
499
500         let mut s1 = acceptor.accept().unwrap();
501         let s2 = s1.clone();
502
503         let (tx, rx) = channel();
504         let _t = Thread::spawn(move|| {
505             let mut s2 = s2;
506             s2.write(&[1]).unwrap();
507             tx.send(()).unwrap();
508         });
509         s1.write(&[2]).unwrap();
510
511         rx.recv().unwrap();
512     }
513
514     #[cfg(not(windows))]
515     #[test]
516     fn drop_removes_listener_path() {
517         let path = next_test_unix();
518         let l = UnixListener::bind(&path).unwrap();
519         assert!(path.exists());
520         drop(l);
521         assert!(!path.exists());
522     }
523
524     #[cfg(not(windows))]
525     #[test]
526     fn drop_removes_acceptor_path() {
527         let path = next_test_unix();
528         let l = UnixListener::bind(&path).unwrap();
529         assert!(path.exists());
530         drop(l.listen().unwrap());
531         assert!(!path.exists());
532     }
533
534     #[test]
535     fn accept_timeout() {
536         let addr = next_test_unix();
537         let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
538
539         a.set_timeout(Some(10));
540
541         // Make sure we time out once and future invocations also time out
542         let err = a.accept().err().unwrap();
543         assert_eq!(err.kind, TimedOut);
544         let err = a.accept().err().unwrap();
545         assert_eq!(err.kind, TimedOut);
546
547         // Also make sure that even though the timeout is expired that we will
548         // continue to receive any pending connections.
549         let (tx, rx) = channel();
550         let addr2 = addr.clone();
551         let _t = Thread::spawn(move|| {
552             tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
553         });
554         let l = rx.recv().unwrap();
555         for i in range(0u, 1001) {
556             match a.accept() {
557                 Ok(..) => break,
558                 Err(ref e) if e.kind == TimedOut => {}
559                 Err(e) => panic!("error: {}", e),
560             }
561             ::thread::Thread::yield_now();
562             if i == 1000 { panic!("should have a pending connection") }
563         }
564         drop(l);
565
566         // Unset the timeout and make sure that this always blocks.
567         a.set_timeout(None);
568         let addr2 = addr.clone();
569         let _t = Thread::spawn(move|| {
570             drop(UnixStream::connect(&addr2).unwrap());
571         });
572         a.accept().unwrap();
573     }
574
575     #[test]
576     fn connect_timeout_error() {
577         let addr = next_test_unix();
578         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
579     }
580
581     #[test]
582     fn connect_timeout_success() {
583         let addr = next_test_unix();
584         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
585         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
586     }
587
588     #[test]
589     fn connect_timeout_zero() {
590         let addr = next_test_unix();
591         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
592         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
593     }
594
595     #[test]
596     fn connect_timeout_negative() {
597         let addr = next_test_unix();
598         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
599         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
600     }
601
602     #[test]
603     fn close_readwrite_smoke() {
604         let addr = next_test_unix();
605         let a = UnixListener::bind(&addr).listen().unwrap();
606         let (_tx, rx) = channel::<()>();
607         Thread::spawn(move|| {
608             let mut a = a;
609             let _s = a.accept().unwrap();
610             let _ = rx.recv();
611         });
612
613         let mut b = [0];
614         let mut s = UnixStream::connect(&addr).unwrap();
615         let mut s2 = s.clone();
616
617         // closing should prevent reads/writes
618         s.close_write().unwrap();
619         assert!(s.write(&[0]).is_err());
620         s.close_read().unwrap();
621         assert!(s.read(&mut b).is_err());
622
623         // closing should affect previous handles
624         assert!(s2.write(&[0]).is_err());
625         assert!(s2.read(&mut b).is_err());
626
627         // closing should affect new handles
628         let mut s3 = s.clone();
629         assert!(s3.write(&[0]).is_err());
630         assert!(s3.read(&mut b).is_err());
631
632         // make sure these don't die
633         let _ = s2.close_read();
634         let _ = s2.close_write();
635         let _ = s3.close_read();
636         let _ = s3.close_write();
637     }
638
639     #[test]
640     fn close_read_wakes_up() {
641         let addr = next_test_unix();
642         let a = UnixListener::bind(&addr).listen().unwrap();
643         let (_tx, rx) = channel::<()>();
644         Thread::spawn(move|| {
645             let mut a = a;
646             let _s = a.accept().unwrap();
647             let _ = rx.recv();
648         });
649
650         let mut s = UnixStream::connect(&addr).unwrap();
651         let s2 = s.clone();
652         let (tx, rx) = channel();
653         let _t = Thread::spawn(move|| {
654             let mut s2 = s2;
655             assert!(s2.read(&mut [0]).is_err());
656             tx.send(()).unwrap();
657         });
658         // this should wake up the child task
659         s.close_read().unwrap();
660
661         // this test will never finish if the child doesn't wake up
662         rx.recv().unwrap();
663     }
664
665     #[test]
666     fn readwrite_timeouts() {
667         let addr = next_test_unix();
668         let mut a = UnixListener::bind(&addr).listen().unwrap();
669         let (tx, rx) = channel::<()>();
670         Thread::spawn(move|| {
671             let mut s = UnixStream::connect(&addr).unwrap();
672             rx.recv().unwrap();
673             assert!(s.write(&[0]).is_ok());
674             let _ = rx.recv();
675         });
676
677         let mut s = a.accept().unwrap();
678         s.set_timeout(Some(20));
679         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
680         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
681
682         s.set_timeout(Some(20));
683         for i in range(0u, 1001) {
684             match s.write(&[0; 128 * 1024]) {
685                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
686                 Err(IoError { kind: TimedOut, .. }) => break,
687                 Err(e) => panic!("{}", e),
688            }
689            if i == 1000 { panic!("should have filled up?!"); }
690         }
691
692         // I'm not sure as to why, but apparently the write on windows always
693         // succeeds after the previous timeout. Who knows?
694         if !cfg!(windows) {
695             assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
696         }
697
698         tx.send(()).unwrap();
699         s.set_timeout(None);
700         assert_eq!(s.read(&mut [0, 0]), Ok(1));
701     }
702
703     #[test]
704     fn read_timeouts() {
705         let addr = next_test_unix();
706         let mut a = UnixListener::bind(&addr).listen().unwrap();
707         let (tx, rx) = channel::<()>();
708         Thread::spawn(move|| {
709             let mut s = UnixStream::connect(&addr).unwrap();
710             rx.recv().unwrap();
711             let mut amt = 0;
712             while amt < 100 * 128 * 1024 {
713                 match s.read(&mut [0;128 * 1024]) {
714                     Ok(n) => { amt += n; }
715                     Err(e) => panic!("{}", e),
716                 }
717             }
718             let _ = rx.recv();
719         });
720
721         let mut s = a.accept().unwrap();
722         s.set_read_timeout(Some(20));
723         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
724         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
725
726         tx.send(()).unwrap();
727         for _ in range(0u, 100) {
728             assert!(s.write(&[0;128 * 1024]).is_ok());
729         }
730     }
731
732     #[test]
733     fn write_timeouts() {
734         let addr = next_test_unix();
735         let mut a = UnixListener::bind(&addr).listen().unwrap();
736         let (tx, rx) = channel::<()>();
737         Thread::spawn(move|| {
738             let mut s = UnixStream::connect(&addr).unwrap();
739             rx.recv().unwrap();
740             assert!(s.write(&[0]).is_ok());
741             let _ = rx.recv();
742         });
743
744         let mut s = a.accept().unwrap();
745         s.set_write_timeout(Some(20));
746         for i in range(0u, 1001) {
747             match s.write(&[0; 128 * 1024]) {
748                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
749                 Err(IoError { kind: TimedOut, .. }) => break,
750                 Err(e) => panic!("{}", e),
751            }
752            if i == 1000 { panic!("should have filled up?!"); }
753         }
754
755         tx.send(()).unwrap();
756         assert!(s.read(&mut [0]).is_ok());
757     }
758
759     #[test]
760     fn timeout_concurrent_read() {
761         let addr = next_test_unix();
762         let mut a = UnixListener::bind(&addr).listen().unwrap();
763         let (tx, rx) = channel::<()>();
764         Thread::spawn(move|| {
765             let mut s = UnixStream::connect(&addr).unwrap();
766             rx.recv().unwrap();
767             assert!(s.write(&[0]).is_ok());
768             let _ = rx.recv();
769         });
770
771         let mut s = a.accept().unwrap();
772         let s2 = s.clone();
773         let (tx2, rx2) = channel();
774         let _t = Thread::spawn(move|| {
775             let mut s2 = s2;
776             assert!(s2.read(&mut [0]).is_ok());
777             tx2.send(()).unwrap();
778         });
779
780         s.set_read_timeout(Some(20));
781         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
782         tx.send(()).unwrap();
783
784         rx2.recv().unwrap();
785     }
786
787     #[cfg(not(windows))]
788     #[test]
789     fn clone_accept_smoke() {
790         let addr = next_test_unix();
791         let l = UnixListener::bind(&addr);
792         let mut a = l.listen().unwrap();
793         let mut a2 = a.clone();
794
795         let addr2 = addr.clone();
796         let _t = Thread::spawn(move|| {
797             let _ = UnixStream::connect(&addr2);
798         });
799         let _t = Thread::spawn(move|| {
800             let _ = UnixStream::connect(&addr);
801         });
802
803         assert!(a.accept().is_ok());
804         drop(a);
805         assert!(a2.accept().is_ok());
806     }
807
808     #[cfg(not(windows))] // FIXME #17553
809     #[test]
810     fn clone_accept_concurrent() {
811         let addr = next_test_unix();
812         let l = UnixListener::bind(&addr);
813         let a = l.listen().unwrap();
814         let a2 = a.clone();
815
816         let (tx, rx) = channel();
817         let tx2 = tx.clone();
818
819         let _t = Thread::spawn(move|| {
820             let mut a = a;
821             tx.send(a.accept()).unwrap()
822         });
823         let _t = Thread::spawn(move|| {
824             let mut a = a2;
825             tx2.send(a.accept()).unwrap()
826         });
827
828         let addr2 = addr.clone();
829         let _t = Thread::spawn(move|| {
830             let _ = UnixStream::connect(&addr2);
831         });
832         let _t = Thread::spawn(move|| {
833             let _ = UnixStream::connect(&addr);
834         });
835
836         assert!(rx.recv().unwrap().is_ok());
837         assert!(rx.recv().unwrap().is_ok());
838     }
839
840     #[test]
841     fn close_accept_smoke() {
842         let addr = next_test_unix();
843         let l = UnixListener::bind(&addr);
844         let mut a = l.listen().unwrap();
845
846         a.close_accept().unwrap();
847         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
848     }
849
850     #[test]
851     fn close_accept_concurrent() {
852         let addr = next_test_unix();
853         let l = UnixListener::bind(&addr);
854         let a = l.listen().unwrap();
855         let mut a2 = a.clone();
856
857         let (tx, rx) = channel();
858         let _t = Thread::spawn(move|| {
859             let mut a = a;
860             tx.send(a.accept()).unwrap();
861         });
862         a2.close_accept().unwrap();
863
864         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
865     }
866 }