]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/comm_adapters.rs
use slicing sugar
[rust.git] / src / libstd / io / comm_adapters.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use clone::Clone;
12 use cmp;
13 use sync::mpsc::{Sender, Receiver};
14 use io;
15 use option::Option::{None, Some};
16 use result::Result::{Ok, Err};
17 use slice::{bytes, SliceExt};
18 use super::{Buffer, Reader, Writer, IoResult};
19 use vec::Vec;
20
21 /// Allows reading from a rx.
22 ///
23 /// # Example
24 ///
25 /// ```
26 /// use std::sync::mpsc::channel;
27 /// use std::io::ChanReader;
28 ///
29 /// let (tx, rx) = channel();
30 /// # drop(tx);
31 /// let mut reader = ChanReader::new(rx);
32 ///
33 /// let mut buf = [0u8; 100];
34 /// match reader.read(&mut buf) {
35 ///     Ok(nread) => println!("Read {} bytes", nread),
36 ///     Err(e) => println!("read error: {}", e),
37 /// }
38 /// ```
39 pub struct ChanReader {
40     buf: Vec<u8>,          // A buffer of bytes received but not consumed.
41     pos: uint,             // How many of the buffered bytes have already be consumed.
42     rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
43     closed: bool,          // Whether the channel this Receiver connects to has been closed.
44 }
45
46 impl ChanReader {
47     /// Wraps a `Port` in a `ChanReader` structure
48     pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
49         ChanReader {
50             buf: Vec::new(),
51             pos: 0,
52             rx: rx,
53             closed: false,
54         }
55     }
56 }
57
58 impl Buffer for ChanReader {
59     fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
60         if self.pos >= self.buf.len() {
61             self.pos = 0;
62             match self.rx.recv() {
63                 Ok(bytes) => {
64                     self.buf = bytes;
65                 },
66                 Err(..) => {
67                     self.closed = true;
68                     self.buf = Vec::new();
69                 }
70             }
71         }
72         if self.closed {
73             Err(io::standard_error(io::EndOfFile))
74         } else {
75             Ok(self.buf.slice_from(self.pos))
76         }
77     }
78
79     fn consume(&mut self, amt: uint) {
80         self.pos += amt;
81         assert!(self.pos <= self.buf.len());
82     }
83 }
84
85 impl Reader for ChanReader {
86     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
87         let mut num_read = 0;
88         loop {
89             let count = match self.fill_buf().ok() {
90                 Some(src) => {
91                     let dst = buf.slice_from_mut(num_read);
92                     let count = cmp::min(src.len(), dst.len());
93                     bytes::copy_memory(dst, &src[0..count]);
94                     count
95                 },
96                 None => 0,
97             };
98             self.consume(count);
99             num_read += count;
100             if num_read == buf.len() || self.closed {
101                 break;
102             }
103         }
104         if self.closed && num_read == 0 {
105             Err(io::standard_error(io::EndOfFile))
106         } else {
107             Ok(num_read)
108         }
109     }
110 }
111
112 /// Allows writing to a tx.
113 ///
114 /// # Example
115 ///
116 /// ```
117 /// # #![allow(unused_must_use)]
118 /// use std::sync::mpsc::channel;
119 /// use std::io::ChanWriter;
120 ///
121 /// let (tx, rx) = channel();
122 /// # drop(rx);
123 /// let mut writer = ChanWriter::new(tx);
124 /// writer.write("hello, world".as_bytes());
125 /// ```
126 pub struct ChanWriter {
127     tx: Sender<Vec<u8>>,
128 }
129
130 impl ChanWriter {
131     /// Wraps a channel in a `ChanWriter` structure
132     pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
133         ChanWriter { tx: tx }
134     }
135 }
136
137 #[stable]
138 impl Clone for ChanWriter {
139     fn clone(&self) -> ChanWriter {
140         ChanWriter { tx: self.tx.clone() }
141     }
142 }
143
144 impl Writer for ChanWriter {
145     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
146         self.tx.send(buf.to_vec()).map_err(|_| {
147             io::IoError {
148                 kind: io::BrokenPipe,
149                 desc: "Pipe closed",
150                 detail: None
151             }
152         })
153     }
154 }
155
156
157 #[cfg(test)]
158 mod test {
159     use prelude::v1::*;
160
161     use sync::mpsc::channel;
162     use super::*;
163     use io;
164     use thread::Thread;
165
166     #[test]
167     fn test_rx_reader() {
168         let (tx, rx) = channel();
169         Thread::spawn(move|| {
170           tx.send(vec![1u8, 2u8]).unwrap();
171           tx.send(vec![]).unwrap();
172           tx.send(vec![3u8, 4u8]).unwrap();
173           tx.send(vec![5u8, 6u8]).unwrap();
174           tx.send(vec![7u8, 8u8]).unwrap();
175         });
176
177         let mut reader = ChanReader::new(rx);
178         let mut buf = [0u8; 3];
179
180         assert_eq!(Ok(0), reader.read(&mut []));
181
182         assert_eq!(Ok(3), reader.read(&mut buf));
183         let a: &[u8] = &[1,2,3];
184         assert_eq!(a, buf);
185
186         assert_eq!(Ok(3), reader.read(&mut buf));
187         let a: &[u8] = &[4,5,6];
188         assert_eq!(a, buf);
189
190         assert_eq!(Ok(2), reader.read(&mut buf));
191         let a: &[u8] = &[7,8,6];
192         assert_eq!(a, buf);
193
194         match reader.read(buf.as_mut_slice()) {
195             Ok(..) => panic!(),
196             Err(e) => assert_eq!(e.kind, io::EndOfFile),
197         }
198         assert_eq!(a, buf);
199
200         // Ensure it continues to panic in the same way.
201         match reader.read(buf.as_mut_slice()) {
202             Ok(..) => panic!(),
203             Err(e) => assert_eq!(e.kind, io::EndOfFile),
204         }
205         assert_eq!(a, buf);
206     }
207
208     #[test]
209     fn test_rx_buffer() {
210         let (tx, rx) = channel();
211         Thread::spawn(move|| {
212           tx.send(b"he".to_vec()).unwrap();
213           tx.send(b"llo wo".to_vec()).unwrap();
214           tx.send(b"".to_vec()).unwrap();
215           tx.send(b"rld\nhow ".to_vec()).unwrap();
216           tx.send(b"are you?".to_vec()).unwrap();
217           tx.send(b"".to_vec()).unwrap();
218         });
219
220         let mut reader = ChanReader::new(rx);
221
222         assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
223         assert_eq!(Ok("how are you?".to_string()), reader.read_line());
224         match reader.read_line() {
225             Ok(..) => panic!(),
226             Err(e) => assert_eq!(e.kind, io::EndOfFile),
227         }
228     }
229
230     #[test]
231     fn test_chan_writer() {
232         let (tx, rx) = channel();
233         let mut writer = ChanWriter::new(tx);
234         writer.write_be_u32(42).unwrap();
235
236         let wanted = vec![0u8, 0u8, 0u8, 42u8];
237         let got = match Thread::scoped(move|| { rx.recv().unwrap() }).join() {
238             Ok(got) => got,
239             Err(_) => panic!(),
240         };
241         assert_eq!(wanted, got);
242
243         match writer.write_u8(1) {
244             Ok(..) => panic!(),
245             Err(e) => assert_eq!(e.kind, io::BrokenPipe),
246         }
247     }
248 }