]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/comm_adapters.rs
bce097e17ef0f8d9573331194830bd5de399c530
[rust.git] / src / libstd / io / comm_adapters.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use clone::Clone;
12 use cmp;
13 use sync::mpsc::{Sender, Receiver};
14 use io;
15 use option::Option::{None, Some};
16 use ops::Index;
17 use result::Result::{Ok, Err};
18 use slice::{bytes, SliceExt};
19 use super::{Buffer, Reader, Writer, IoResult};
20 use vec::Vec;
21
22 /// Allows reading from a rx.
23 ///
24 /// # Example
25 ///
26 /// ```
27 /// use std::sync::mpsc::channel;
28 /// use std::io::ChanReader;
29 ///
30 /// let (tx, rx) = channel();
31 /// # drop(tx);
32 /// let mut reader = ChanReader::new(rx);
33 ///
34 /// let mut buf = [0u8; 100];
35 /// match reader.read(&mut buf) {
36 ///     Ok(nread) => println!("Read {} bytes", nread),
37 ///     Err(e) => println!("read error: {}", e),
38 /// }
39 /// ```
40 pub struct ChanReader {
41     buf: Vec<u8>,          // A buffer of bytes received but not consumed.
42     pos: uint,             // How many of the buffered bytes have already be consumed.
43     rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
44     closed: bool,          // Whether the channel this Receiver connects to has been closed.
45 }
46
47 impl ChanReader {
48     /// Wraps a `Port` in a `ChanReader` structure
49     pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
50         ChanReader {
51             buf: Vec::new(),
52             pos: 0,
53             rx: rx,
54             closed: false,
55         }
56     }
57 }
58
59 impl Buffer for ChanReader {
60     fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
61         if self.pos >= self.buf.len() {
62             self.pos = 0;
63             match self.rx.recv() {
64                 Ok(bytes) => {
65                     self.buf = bytes;
66                 },
67                 Err(..) => {
68                     self.closed = true;
69                     self.buf = Vec::new();
70                 }
71             }
72         }
73         if self.closed {
74             Err(io::standard_error(io::EndOfFile))
75         } else {
76             Ok(self.buf.slice_from(self.pos))
77         }
78     }
79
80     fn consume(&mut self, amt: uint) {
81         self.pos += amt;
82         assert!(self.pos <= self.buf.len());
83     }
84 }
85
86 impl Reader for ChanReader {
87     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
88         let mut num_read = 0;
89         loop {
90             let count = match self.fill_buf().ok() {
91                 Some(src) => {
92                     let dst = buf.slice_from_mut(num_read);
93                     let count = cmp::min(src.len(), dst.len());
94                     bytes::copy_memory(dst, src.index(&(0..count)));
95                     count
96                 },
97                 None => 0,
98             };
99             self.consume(count);
100             num_read += count;
101             if num_read == buf.len() || self.closed {
102                 break;
103             }
104         }
105         if self.closed && num_read == 0 {
106             Err(io::standard_error(io::EndOfFile))
107         } else {
108             Ok(num_read)
109         }
110     }
111 }
112
113 /// Allows writing to a tx.
114 ///
115 /// # Example
116 ///
117 /// ```
118 /// # #![allow(unused_must_use)]
119 /// use std::sync::mpsc::channel;
120 /// use std::io::ChanWriter;
121 ///
122 /// let (tx, rx) = channel();
123 /// # drop(rx);
124 /// let mut writer = ChanWriter::new(tx);
125 /// writer.write("hello, world".as_bytes());
126 /// ```
127 pub struct ChanWriter {
128     tx: Sender<Vec<u8>>,
129 }
130
131 impl ChanWriter {
132     /// Wraps a channel in a `ChanWriter` structure
133     pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
134         ChanWriter { tx: tx }
135     }
136 }
137
138 #[stable]
139 impl Clone for ChanWriter {
140     fn clone(&self) -> ChanWriter {
141         ChanWriter { tx: self.tx.clone() }
142     }
143 }
144
145 impl Writer for ChanWriter {
146     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
147         self.tx.send(buf.to_vec()).map_err(|_| {
148             io::IoError {
149                 kind: io::BrokenPipe,
150                 desc: "Pipe closed",
151                 detail: None
152             }
153         })
154     }
155 }
156
157
158 #[cfg(test)]
159 mod test {
160     use prelude::v1::*;
161
162     use sync::mpsc::channel;
163     use super::*;
164     use io;
165     use thread::Thread;
166
167     #[test]
168     fn test_rx_reader() {
169         let (tx, rx) = channel();
170         Thread::spawn(move|| {
171           tx.send(vec![1u8, 2u8]).unwrap();
172           tx.send(vec![]).unwrap();
173           tx.send(vec![3u8, 4u8]).unwrap();
174           tx.send(vec![5u8, 6u8]).unwrap();
175           tx.send(vec![7u8, 8u8]).unwrap();
176         });
177
178         let mut reader = ChanReader::new(rx);
179         let mut buf = [0u8; 3];
180
181         assert_eq!(Ok(0), reader.read(&mut []));
182
183         assert_eq!(Ok(3), reader.read(&mut buf));
184         let a: &[u8] = &[1,2,3];
185         assert_eq!(a, buf);
186
187         assert_eq!(Ok(3), reader.read(&mut buf));
188         let a: &[u8] = &[4,5,6];
189         assert_eq!(a, buf);
190
191         assert_eq!(Ok(2), reader.read(&mut buf));
192         let a: &[u8] = &[7,8,6];
193         assert_eq!(a, buf);
194
195         match reader.read(buf.as_mut_slice()) {
196             Ok(..) => panic!(),
197             Err(e) => assert_eq!(e.kind, io::EndOfFile),
198         }
199         assert_eq!(a, buf);
200
201         // Ensure it continues to panic in the same way.
202         match reader.read(buf.as_mut_slice()) {
203             Ok(..) => panic!(),
204             Err(e) => assert_eq!(e.kind, io::EndOfFile),
205         }
206         assert_eq!(a, buf);
207     }
208
209     #[test]
210     fn test_rx_buffer() {
211         let (tx, rx) = channel();
212         Thread::spawn(move|| {
213           tx.send(b"he".to_vec()).unwrap();
214           tx.send(b"llo wo".to_vec()).unwrap();
215           tx.send(b"".to_vec()).unwrap();
216           tx.send(b"rld\nhow ".to_vec()).unwrap();
217           tx.send(b"are you?".to_vec()).unwrap();
218           tx.send(b"".to_vec()).unwrap();
219         });
220
221         let mut reader = ChanReader::new(rx);
222
223         assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
224         assert_eq!(Ok("how are you?".to_string()), reader.read_line());
225         match reader.read_line() {
226             Ok(..) => panic!(),
227             Err(e) => assert_eq!(e.kind, io::EndOfFile),
228         }
229     }
230
231     #[test]
232     fn test_chan_writer() {
233         let (tx, rx) = channel();
234         let mut writer = ChanWriter::new(tx);
235         writer.write_be_u32(42).unwrap();
236
237         let wanted = vec![0u8, 0u8, 0u8, 42u8];
238         let got = match Thread::scoped(move|| { rx.recv().unwrap() }).join() {
239             Ok(got) => got,
240             Err(_) => panic!(),
241         };
242         assert_eq!(wanted, got);
243
244         match writer.write_u8(1) {
245             Ok(..) => panic!(),
246             Err(e) => assert_eq!(e.kind, io::BrokenPipe),
247         }
248     }
249 }