]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/comm_adapters.rs
auto merge of #14709 : alexcrichton/rust/collections, r=brson
[rust.git] / src / libstd / io / comm_adapters.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use clone::Clone;
12 use cmp;
13 use collections::Collection;
14 use comm::{Sender, Receiver};
15 use io;
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use super::{Reader, Writer, IoResult};
19 use str::StrSlice;
20 use slice::{bytes, MutableVector, ImmutableVector};
21 use vec::Vec;
22
23 /// Allows reading from a rx.
24 ///
25 /// # Example
26 ///
27 /// ```
28 /// use std::io::ChanReader;
29 ///
30 /// let (tx, rx) = channel();
31 /// # drop(tx);
32 /// let mut reader = ChanReader::new(rx);
33 ///
34 /// let mut buf = [0u8, ..100];
35 /// match reader.read(buf) {
36 ///     Ok(nread) => println!("Read {} bytes", nread),
37 ///     Err(e) => println!("read error: {}", e),
38 /// }
39 /// ```
40 pub struct ChanReader {
41     buf: Option<Vec<u8>>,  // A buffer of bytes received but not consumed.
42     pos: uint,             // How many of the buffered bytes have already be consumed.
43     rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
44     closed: bool,          // Whether the channel this Receiver connects to has been closed.
45 }
46
47 impl ChanReader {
48     /// Wraps a `Port` in a `ChanReader` structure
49     pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
50         ChanReader {
51             buf: None,
52             pos: 0,
53             rx: rx,
54             closed: false,
55         }
56     }
57 }
58
59 impl Reader for ChanReader {
60     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
61         let mut num_read = 0;
62         loop {
63             match self.buf {
64                 Some(ref prev) => {
65                     let dst = buf.mut_slice_from(num_read);
66                     let src = prev.slice_from(self.pos);
67                     let count = cmp::min(dst.len(), src.len());
68                     bytes::copy_memory(dst, src.slice_to(count));
69                     num_read += count;
70                     self.pos += count;
71                 },
72                 None => (),
73             };
74             if num_read == buf.len() || self.closed {
75                 break;
76             }
77             self.pos = 0;
78             self.buf = self.rx.recv_opt().ok();
79             self.closed = self.buf.is_none();
80         }
81         if self.closed && num_read == 0 {
82             Err(io::standard_error(io::EndOfFile))
83         } else {
84             Ok(num_read)
85         }
86     }
87 }
88
89 /// Allows writing to a tx.
90 ///
91 /// # Example
92 ///
93 /// ```
94 /// # #![allow(unused_must_use)]
95 /// use std::io::ChanWriter;
96 ///
97 /// let (tx, rx) = channel();
98 /// # drop(rx);
99 /// let mut writer = ChanWriter::new(tx);
100 /// writer.write("hello, world".as_bytes());
101 /// ```
102 pub struct ChanWriter {
103     tx: Sender<Vec<u8>>,
104 }
105
106 impl ChanWriter {
107     /// Wraps a channel in a `ChanWriter` structure
108     pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
109         ChanWriter { tx: tx }
110     }
111 }
112
113 impl Clone for ChanWriter {
114     fn clone(&self) -> ChanWriter {
115         ChanWriter { tx: self.tx.clone() }
116     }
117 }
118
119 impl Writer for ChanWriter {
120     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
121         self.tx.send_opt(Vec::from_slice(buf)).map_err(|_| {
122             io::IoError {
123                 kind: io::BrokenPipe,
124                 desc: "Pipe closed",
125                 detail: None
126             }
127         })
128     }
129 }
130
131
132 #[cfg(test)]
133 mod test {
134     use prelude::*;
135     use super::*;
136     use io;
137     use task;
138
139     #[test]
140     fn test_rx_reader() {
141         let (tx, rx) = channel();
142         task::spawn(proc() {
143           tx.send(vec![1u8, 2u8]);
144           tx.send(vec![]);
145           tx.send(vec![3u8, 4u8]);
146           tx.send(vec![5u8, 6u8]);
147           tx.send(vec![7u8, 8u8]);
148         });
149
150         let mut reader = ChanReader::new(rx);
151         let mut buf = [0u8, ..3];
152
153
154         assert_eq!(Ok(0), reader.read([]));
155
156         assert_eq!(Ok(3), reader.read(buf));
157         assert_eq!(&[1,2,3], buf.as_slice());
158
159         assert_eq!(Ok(3), reader.read(buf));
160         assert_eq!(&[4,5,6], buf.as_slice());
161
162         assert_eq!(Ok(2), reader.read(buf));
163         assert_eq!(&[7,8,6], buf.as_slice());
164
165         match reader.read(buf) {
166             Ok(..) => fail!(),
167             Err(e) => assert_eq!(e.kind, io::EndOfFile),
168         }
169         assert_eq!(&[7,8,6], buf.as_slice());
170
171         // Ensure it continues to fail in the same way.
172         match reader.read(buf) {
173             Ok(..) => fail!(),
174             Err(e) => assert_eq!(e.kind, io::EndOfFile),
175         }
176         assert_eq!(&[7,8,6], buf.as_slice());
177     }
178
179     #[test]
180     fn test_chan_writer() {
181         let (tx, rx) = channel();
182         let mut writer = ChanWriter::new(tx);
183         writer.write_be_u32(42).unwrap();
184
185         let wanted = vec![0u8, 0u8, 0u8, 42u8];
186         let got = task::try(proc() { rx.recv() }).unwrap();
187         assert_eq!(wanted, got);
188
189         match writer.write_u8(1) {
190             Ok(..) => fail!(),
191             Err(e) => assert_eq!(e.kind, io::BrokenPipe),
192         }
193     }
194 }