]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/unix.rs
Doc says to avoid mixing allocator instead of forbiding it
[rust.git] / src / libstd / io / net / unix.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12
13 Named pipes
14
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
18
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
21 instances as clients.
22
23 */
24
25 #![allow(missing_doc)]
26
27 use prelude::*;
28
29 use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error};
30 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
31 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
32 use time::Duration;
33
34 /// A stream which communicates over a named pipe.
35 pub struct UnixStream {
36     obj: Box<RtioPipe + Send>,
37 }
38
39 impl UnixStream {
40     /// Connect to a pipe named by `path`. This will attempt to open a
41     /// connection to the underlying socket.
42     ///
43     /// The returned stream will be closed when the object falls out of scope.
44     ///
45     /// # Example
46     ///
47     /// ```rust
48     /// # #![allow(unused_must_use)]
49     /// use std::io::net::unix::UnixStream;
50     ///
51     /// let server = Path::new("path/to/my/socket");
52     /// let mut stream = UnixStream::connect(&server);
53     /// stream.write([1, 2, 3]);
54     /// ```
55     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
56         LocalIo::maybe_raise(|io| {
57             io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
58         }).map_err(IoError::from_rtio_error)
59     }
60
61     /// Connect to a pipe named by `path`, timing out if the specified number of
62     /// milliseconds.
63     ///
64     /// This function is similar to `connect`, except that if `timeout`
65     /// elapses the function will return an error of kind `TimedOut`.
66     ///
67     /// If a `timeout` with zero or negative duration is specified then
68     /// the function returns `Err`, with the error kind set to `TimedOut`.
69     #[experimental = "the timeout argument is likely to change types"]
70     pub fn connect_timeout<P: ToCStr>(path: &P,
71                                       timeout: Duration) -> IoResult<UnixStream> {
72         if timeout <= Duration::milliseconds(0) {
73             return Err(standard_error(TimedOut));
74         }
75
76         LocalIo::maybe_raise(|io| {
77             let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64));
78             s.map(|p| UnixStream { obj: p })
79         }).map_err(IoError::from_rtio_error)
80     }
81
82
83     /// Closes the reading half of this connection.
84     ///
85     /// This method will close the reading portion of this connection, causing
86     /// all pending and future reads to immediately return with an error.
87     ///
88     /// Note that this method affects all cloned handles associated with this
89     /// stream, not just this one handle.
90     pub fn close_read(&mut self) -> IoResult<()> {
91         self.obj.close_read().map_err(IoError::from_rtio_error)
92     }
93
94     /// Closes the writing half of this connection.
95     ///
96     /// This method will close the writing portion of this connection, causing
97     /// all pending and future writes to immediately return with an error.
98     ///
99     /// Note that this method affects all cloned handles associated with this
100     /// stream, not just this one handle.
101     pub fn close_write(&mut self) -> IoResult<()> {
102         self.obj.close_write().map_err(IoError::from_rtio_error)
103     }
104
105     /// Sets the read/write timeout for this socket.
106     ///
107     /// For more information, see `TcpStream::set_timeout`
108     #[experimental = "the timeout argument may change in type and value"]
109     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
110         self.obj.set_timeout(timeout_ms)
111     }
112
113     /// Sets the read timeout for this socket.
114     ///
115     /// For more information, see `TcpStream::set_timeout`
116     #[experimental = "the timeout argument may change in type and value"]
117     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
118         self.obj.set_read_timeout(timeout_ms)
119     }
120
121     /// Sets the write timeout for this socket.
122     ///
123     /// For more information, see `TcpStream::set_timeout`
124     #[experimental = "the timeout argument may change in type and value"]
125     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
126         self.obj.set_write_timeout(timeout_ms)
127     }
128 }
129
130 impl Clone for UnixStream {
131     fn clone(&self) -> UnixStream {
132         UnixStream { obj: self.obj.clone() }
133     }
134 }
135
136 impl Reader for UnixStream {
137     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
138         self.obj.read(buf).map_err(IoError::from_rtio_error)
139     }
140 }
141
142 impl Writer for UnixStream {
143     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
144         self.obj.write(buf).map_err(IoError::from_rtio_error)
145     }
146 }
147
148 /// A value that can listen for incoming named pipe connection requests.
149 pub struct UnixListener {
150     /// The internal, opaque runtime Unix listener.
151     obj: Box<RtioUnixListener + Send>,
152 }
153
154 impl UnixListener {
155
156     /// Creates a new listener, ready to receive incoming connections on the
157     /// specified socket. The server will be named by `path`.
158     ///
159     /// This listener will be closed when it falls out of scope.
160     ///
161     /// # Example
162     ///
163     /// ```
164     /// # fn main() {}
165     /// # fn foo() {
166     /// # #![allow(unused_must_use)]
167     /// use std::io::net::unix::UnixListener;
168     /// use std::io::{Listener, Acceptor};
169     ///
170     /// let server = Path::new("/path/to/my/socket");
171     /// let stream = UnixListener::bind(&server);
172     /// for mut client in stream.listen().incoming() {
173     ///     client.write([1, 2, 3, 4]);
174     /// }
175     /// # }
176     /// ```
177     pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
178         LocalIo::maybe_raise(|io| {
179             io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
180         }).map_err(IoError::from_rtio_error)
181     }
182 }
183
184 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
185     fn listen(self) -> IoResult<UnixAcceptor> {
186         self.obj.listen().map(|obj| {
187             UnixAcceptor { obj: obj }
188         }).map_err(IoError::from_rtio_error)
189     }
190 }
191
192 /// A value that can accept named pipe connections, returned from `listen()`.
193 pub struct UnixAcceptor {
194     /// The internal, opaque runtime Unix acceptor.
195     obj: Box<RtioUnixAcceptor + Send>,
196 }
197
198 impl UnixAcceptor {
199     /// Sets a timeout for this acceptor, after which accept() will no longer
200     /// block indefinitely.
201     ///
202     /// The argument specified is the amount of time, in milliseconds, into the
203     /// future after which all invocations of accept() will not block (and any
204     /// pending invocation will return). A value of `None` will clear any
205     /// existing timeout.
206     ///
207     /// When using this method, it is likely necessary to reset the timeout as
208     /// appropriate, the timeout specified is specific to this object, not
209     /// specific to the next request.
210     #[experimental = "the name and arguments to this function are likely \
211                       to change"]
212     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
213         self.obj.set_timeout(timeout_ms)
214     }
215
216     /// Closes the accepting capabilities of this acceptor.
217     ///
218     /// This function has the same semantics as `TcpAcceptor::close_accept`, and
219     /// more information can be found in that documentation.
220     #[experimental]
221     pub fn close_accept(&mut self) -> IoResult<()> {
222         self.obj.close_accept().map_err(IoError::from_rtio_error)
223     }
224 }
225
226 impl Acceptor<UnixStream> for UnixAcceptor {
227     fn accept(&mut self) -> IoResult<UnixStream> {
228         self.obj.accept().map(|s| {
229             UnixStream { obj: s }
230         }).map_err(IoError::from_rtio_error)
231     }
232 }
233
234 impl Clone for UnixAcceptor {
235     /// Creates a new handle to this unix acceptor, allowing for simultaneous
236     /// accepts.
237     ///
238     /// The underlying unix acceptor will not be closed until all handles to the
239     /// acceptor have been deallocated. Incoming connections will be received on
240     /// at most once acceptor, the same connection will not be accepted twice.
241     ///
242     /// The `close_accept` method will shut down *all* acceptors cloned from the
243     /// same original acceptor, whereas the `set_timeout` method only affects
244     /// the selector that it is called on.
245     ///
246     /// This function is useful for creating a handle to invoke `close_accept`
247     /// on to wake up any other task blocked in `accept`.
248     fn clone(&self) -> UnixAcceptor {
249         UnixAcceptor { obj: self.obj.clone() }
250     }
251 }
252
253 #[cfg(test)]
254 #[allow(experimental)]
255 mod tests {
256     use prelude::*;
257     use super::*;
258     use io::*;
259     use io::test::*;
260
261     pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
262         let path1 = next_test_unix();
263         let path2 = path1.clone();
264
265         let mut acceptor = UnixListener::bind(&path1).listen();
266
267         spawn(proc() {
268             match UnixStream::connect(&path2) {
269                 Ok(c) => client(c),
270                 Err(e) => fail!("failed connect: {}", e),
271             }
272         });
273
274         match acceptor.accept() {
275             Ok(c) => server(c),
276             Err(e) => fail!("failed accept: {}", e),
277         }
278     }
279
280     iotest!(fn bind_error() {
281         let path = "path/to/nowhere";
282         match UnixListener::bind(&path) {
283             Ok(..) => fail!(),
284             Err(e) => {
285                 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
286                         e.kind == InvalidInput);
287             }
288         }
289     })
290
291     iotest!(fn connect_error() {
292         let path = if cfg!(windows) {
293             r"\\.\pipe\this_should_not_exist_ever"
294         } else {
295             "path/to/nowhere"
296         };
297         match UnixStream::connect(&path) {
298             Ok(..) => fail!(),
299             Err(e) => {
300                 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
301             }
302         }
303     })
304
305     iotest!(fn smoke() {
306         smalltest(proc(mut server) {
307             let mut buf = [0];
308             server.read(buf).unwrap();
309             assert!(buf[0] == 99);
310         }, proc(mut client) {
311             client.write([99]).unwrap();
312         })
313     })
314
315     iotest!(fn read_eof() {
316         smalltest(proc(mut server) {
317             let mut buf = [0];
318             assert!(server.read(buf).is_err());
319             assert!(server.read(buf).is_err());
320         }, proc(_client) {
321             // drop the client
322         })
323     } #[ignore(cfg(windows))]) // FIXME(#12516)
324
325     iotest!(fn write_begone() {
326         smalltest(proc(mut server) {
327             let buf = [0];
328             loop {
329                 match server.write(buf) {
330                     Ok(..) => {}
331                     Err(e) => {
332                         assert!(e.kind == BrokenPipe ||
333                                 e.kind == NotConnected ||
334                                 e.kind == ConnectionReset,
335                                 "unknown error {:?}", e);
336                         break;
337                     }
338                 }
339             }
340         }, proc(_client) {
341             // drop the client
342         })
343     })
344
345     iotest!(fn accept_lots() {
346         let times = 10;
347         let path1 = next_test_unix();
348         let path2 = path1.clone();
349
350         let mut acceptor = match UnixListener::bind(&path1).listen() {
351             Ok(a) => a,
352             Err(e) => fail!("failed listen: {}", e),
353         };
354
355         spawn(proc() {
356             for _ in range(0u, times) {
357                 let mut stream = UnixStream::connect(&path2);
358                 match stream.write([100]) {
359                     Ok(..) => {}
360                     Err(e) => fail!("failed write: {}", e)
361                 }
362             }
363         });
364
365         for _ in range(0, times) {
366             let mut client = acceptor.accept();
367             let mut buf = [0];
368             match client.read(buf) {
369                 Ok(..) => {}
370                 Err(e) => fail!("failed read/accept: {}", e),
371             }
372             assert_eq!(buf[0], 100);
373         }
374     })
375
376     #[cfg(unix)]
377     iotest!(fn path_exists() {
378         let path = next_test_unix();
379         let _acceptor = UnixListener::bind(&path).listen();
380         assert!(path.exists());
381     })
382
383     iotest!(fn unix_clone_smoke() {
384         let addr = next_test_unix();
385         let mut acceptor = UnixListener::bind(&addr).listen();
386
387         spawn(proc() {
388             let mut s = UnixStream::connect(&addr);
389             let mut buf = [0, 0];
390             debug!("client reading");
391             assert_eq!(s.read(buf), Ok(1));
392             assert_eq!(buf[0], 1);
393             debug!("client writing");
394             s.write([2]).unwrap();
395             debug!("client dropping");
396         });
397
398         let mut s1 = acceptor.accept().unwrap();
399         let s2 = s1.clone();
400
401         let (tx1, rx1) = channel();
402         let (tx2, rx2) = channel();
403         spawn(proc() {
404             let mut s2 = s2;
405             rx1.recv();
406             debug!("writer writing");
407             s2.write([1]).unwrap();
408             debug!("writer done");
409             tx2.send(());
410         });
411         tx1.send(());
412         let mut buf = [0, 0];
413         debug!("reader reading");
414         assert_eq!(s1.read(buf), Ok(1));
415         debug!("reader done");
416         rx2.recv();
417     })
418
419     iotest!(fn unix_clone_two_read() {
420         let addr = next_test_unix();
421         let mut acceptor = UnixListener::bind(&addr).listen();
422         let (tx1, rx) = channel();
423         let tx2 = tx1.clone();
424
425         spawn(proc() {
426             let mut s = UnixStream::connect(&addr);
427             s.write([1]).unwrap();
428             rx.recv();
429             s.write([2]).unwrap();
430             rx.recv();
431         });
432
433         let mut s1 = acceptor.accept().unwrap();
434         let s2 = s1.clone();
435
436         let (done, rx) = channel();
437         spawn(proc() {
438             let mut s2 = s2;
439             let mut buf = [0, 0];
440             s2.read(buf).unwrap();
441             tx2.send(());
442             done.send(());
443         });
444         let mut buf = [0, 0];
445         s1.read(buf).unwrap();
446         tx1.send(());
447
448         rx.recv();
449     })
450
451     iotest!(fn unix_clone_two_write() {
452         let addr = next_test_unix();
453         let mut acceptor = UnixListener::bind(&addr).listen();
454
455         spawn(proc() {
456             let mut s = UnixStream::connect(&addr);
457             let mut buf = [0, 1];
458             s.read(buf).unwrap();
459             s.read(buf).unwrap();
460         });
461
462         let mut s1 = acceptor.accept().unwrap();
463         let s2 = s1.clone();
464
465         let (tx, rx) = channel();
466         spawn(proc() {
467             let mut s2 = s2;
468             s2.write([1]).unwrap();
469             tx.send(());
470         });
471         s1.write([2]).unwrap();
472
473         rx.recv();
474     })
475
476     iotest!(fn drop_removes_listener_path() {
477         let path = next_test_unix();
478         let l = UnixListener::bind(&path).unwrap();
479         assert!(path.exists());
480         drop(l);
481         assert!(!path.exists());
482     } #[cfg(not(windows))])
483
484     iotest!(fn drop_removes_acceptor_path() {
485         let path = next_test_unix();
486         let l = UnixListener::bind(&path).unwrap();
487         assert!(path.exists());
488         drop(l.listen().unwrap());
489         assert!(!path.exists());
490     } #[cfg(not(windows))])
491
492     iotest!(fn accept_timeout() {
493         let addr = next_test_unix();
494         let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
495
496         a.set_timeout(Some(10));
497
498         // Make sure we time out once and future invocations also time out
499         let err = a.accept().err().unwrap();
500         assert_eq!(err.kind, TimedOut);
501         let err = a.accept().err().unwrap();
502         assert_eq!(err.kind, TimedOut);
503
504         // Also make sure that even though the timeout is expired that we will
505         // continue to receive any pending connections.
506         let (tx, rx) = channel();
507         let addr2 = addr.clone();
508         spawn(proc() {
509             tx.send(UnixStream::connect(&addr2).unwrap());
510         });
511         let l = rx.recv();
512         for i in range(0u, 1001) {
513             match a.accept() {
514                 Ok(..) => break,
515                 Err(ref e) if e.kind == TimedOut => {}
516                 Err(e) => fail!("error: {}", e),
517             }
518             ::task::deschedule();
519             if i == 1000 { fail!("should have a pending connection") }
520         }
521         drop(l);
522
523         // Unset the timeout and make sure that this always blocks.
524         a.set_timeout(None);
525         let addr2 = addr.clone();
526         spawn(proc() {
527             drop(UnixStream::connect(&addr2).unwrap());
528         });
529         a.accept().unwrap();
530     })
531
532     iotest!(fn connect_timeout_error() {
533         let addr = next_test_unix();
534         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
535     })
536
537     iotest!(fn connect_timeout_success() {
538         let addr = next_test_unix();
539         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
540         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
541     })
542
543     iotest!(fn connect_timeout_zero() {
544         let addr = next_test_unix();
545         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
546         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
547     })
548
549     iotest!(fn connect_timeout_negative() {
550         let addr = next_test_unix();
551         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
552         assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
553     })
554
555     iotest!(fn close_readwrite_smoke() {
556         let addr = next_test_unix();
557         let a = UnixListener::bind(&addr).listen().unwrap();
558         let (_tx, rx) = channel::<()>();
559         spawn(proc() {
560             let mut a = a;
561             let _s = a.accept().unwrap();
562             let _ = rx.recv_opt();
563         });
564
565         let mut b = [0];
566         let mut s = UnixStream::connect(&addr).unwrap();
567         let mut s2 = s.clone();
568
569         // closing should prevent reads/writes
570         s.close_write().unwrap();
571         assert!(s.write([0]).is_err());
572         s.close_read().unwrap();
573         assert!(s.read(b).is_err());
574
575         // closing should affect previous handles
576         assert!(s2.write([0]).is_err());
577         assert!(s2.read(b).is_err());
578
579         // closing should affect new handles
580         let mut s3 = s.clone();
581         assert!(s3.write([0]).is_err());
582         assert!(s3.read(b).is_err());
583
584         // make sure these don't die
585         let _ = s2.close_read();
586         let _ = s2.close_write();
587         let _ = s3.close_read();
588         let _ = s3.close_write();
589     })
590
591     iotest!(fn close_read_wakes_up() {
592         let addr = next_test_unix();
593         let a = UnixListener::bind(&addr).listen().unwrap();
594         let (_tx, rx) = channel::<()>();
595         spawn(proc() {
596             let mut a = a;
597             let _s = a.accept().unwrap();
598             let _ = rx.recv_opt();
599         });
600
601         let mut s = UnixStream::connect(&addr).unwrap();
602         let s2 = s.clone();
603         let (tx, rx) = channel();
604         spawn(proc() {
605             let mut s2 = s2;
606             assert!(s2.read([0]).is_err());
607             tx.send(());
608         });
609         // this should wake up the child task
610         s.close_read().unwrap();
611
612         // this test will never finish if the child doesn't wake up
613         rx.recv();
614     })
615
616     iotest!(fn readwrite_timeouts() {
617         let addr = next_test_unix();
618         let mut a = UnixListener::bind(&addr).listen().unwrap();
619         let (tx, rx) = channel::<()>();
620         spawn(proc() {
621             let mut s = UnixStream::connect(&addr).unwrap();
622             rx.recv();
623             assert!(s.write([0]).is_ok());
624             let _ = rx.recv_opt();
625         });
626
627         let mut s = a.accept().unwrap();
628         s.set_timeout(Some(20));
629         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
630         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
631
632         s.set_timeout(Some(20));
633         for i in range(0u, 1001) {
634             match s.write([0, .. 128 * 1024]) {
635                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
636                 Err(IoError { kind: TimedOut, .. }) => break,
637                 Err(e) => fail!("{}", e),
638            }
639            if i == 1000 { fail!("should have filled up?!"); }
640         }
641
642         // I'm not sure as to why, but apparently the write on windows always
643         // succeeds after the previous timeout. Who knows?
644         if !cfg!(windows) {
645             assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
646         }
647
648         tx.send(());
649         s.set_timeout(None);
650         assert_eq!(s.read([0, 0]), Ok(1));
651     })
652
653     iotest!(fn read_timeouts() {
654         let addr = next_test_unix();
655         let mut a = UnixListener::bind(&addr).listen().unwrap();
656         let (tx, rx) = channel::<()>();
657         spawn(proc() {
658             let mut s = UnixStream::connect(&addr).unwrap();
659             rx.recv();
660             let mut amt = 0;
661             while amt < 100 * 128 * 1024 {
662                 match s.read([0, ..128 * 1024]) {
663                     Ok(n) => { amt += n; }
664                     Err(e) => fail!("{}", e),
665                 }
666             }
667             let _ = rx.recv_opt();
668         });
669
670         let mut s = a.accept().unwrap();
671         s.set_read_timeout(Some(20));
672         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
673         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
674
675         tx.send(());
676         for _ in range(0u, 100) {
677             assert!(s.write([0, ..128 * 1024]).is_ok());
678         }
679     })
680
681     iotest!(fn write_timeouts() {
682         let addr = next_test_unix();
683         let mut a = UnixListener::bind(&addr).listen().unwrap();
684         let (tx, rx) = channel::<()>();
685         spawn(proc() {
686             let mut s = UnixStream::connect(&addr).unwrap();
687             rx.recv();
688             assert!(s.write([0]).is_ok());
689             let _ = rx.recv_opt();
690         });
691
692         let mut s = a.accept().unwrap();
693         s.set_write_timeout(Some(20));
694         for i in range(0u, 1001) {
695             match s.write([0, .. 128 * 1024]) {
696                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
697                 Err(IoError { kind: TimedOut, .. }) => break,
698                 Err(e) => fail!("{}", e),
699            }
700            if i == 1000 { fail!("should have filled up?!"); }
701         }
702
703         tx.send(());
704         assert!(s.read([0]).is_ok());
705     })
706
707     iotest!(fn timeout_concurrent_read() {
708         let addr = next_test_unix();
709         let mut a = UnixListener::bind(&addr).listen().unwrap();
710         let (tx, rx) = channel::<()>();
711         spawn(proc() {
712             let mut s = UnixStream::connect(&addr).unwrap();
713             rx.recv();
714             assert!(s.write([0]).is_ok());
715             let _ = rx.recv_opt();
716         });
717
718         let mut s = a.accept().unwrap();
719         let s2 = s.clone();
720         let (tx2, rx2) = channel();
721         spawn(proc() {
722             let mut s2 = s2;
723             assert!(s2.read([0]).is_ok());
724             tx2.send(());
725         });
726
727         s.set_read_timeout(Some(20));
728         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
729         tx.send(());
730
731         rx2.recv();
732     })
733
734     #[cfg(not(windows))]
735     iotest!(fn clone_accept_smoke() {
736         let addr = next_test_unix();
737         let l = UnixListener::bind(&addr);
738         let mut a = l.listen().unwrap();
739         let mut a2 = a.clone();
740
741         let addr2 = addr.clone();
742         spawn(proc() {
743             let _ = UnixStream::connect(&addr2);
744         });
745         spawn(proc() {
746             let _ = UnixStream::connect(&addr);
747         });
748
749         assert!(a.accept().is_ok());
750         drop(a);
751         assert!(a2.accept().is_ok());
752     })
753
754     iotest!(fn clone_accept_concurrent() {
755         let addr = next_test_unix();
756         let l = UnixListener::bind(&addr);
757         let a = l.listen().unwrap();
758         let a2 = a.clone();
759
760         let (tx, rx) = channel();
761         let tx2 = tx.clone();
762
763         spawn(proc() { let mut a = a; tx.send(a.accept()) });
764         spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
765
766         let addr2 = addr.clone();
767         spawn(proc() {
768             let _ = UnixStream::connect(&addr2);
769         });
770         spawn(proc() {
771             let _ = UnixStream::connect(&addr);
772         });
773
774         assert!(rx.recv().is_ok());
775         assert!(rx.recv().is_ok());
776     })
777
778     iotest!(fn close_accept_smoke() {
779         let addr = next_test_unix();
780         let l = UnixListener::bind(&addr);
781         let mut a = l.listen().unwrap();
782
783         a.close_accept().unwrap();
784         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
785     })
786
787     iotest!(fn close_accept_concurrent() {
788         let addr = next_test_unix();
789         let l = UnixListener::bind(&addr);
790         let a = l.listen().unwrap();
791         let mut a2 = a.clone();
792
793         let (tx, rx) = channel();
794         spawn(proc() {
795             let mut a = a;
796             tx.send(a.accept());
797         });
798         a2.close_accept().unwrap();
799
800         assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
801     })
802 }