1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use sync::mpsc::{Sender, Receiver};
15 use option::Option::{None, Some};
16 use result::Result::{Ok, Err};
17 use slice::{bytes, SliceExt};
18 use super::{Buffer, Reader, Writer, IoResult};
21 /// Allows reading from a rx.
26 /// use std::sync::mpsc::channel;
27 /// use std::io::ChanReader;
29 /// let (tx, rx) = channel();
31 /// let mut reader = ChanReader::new(rx);
33 /// let mut buf = [0u8; 100];
34 /// match reader.read(&mut buf) {
35 /// Ok(nread) => println!("Read {} bytes", nread),
36 /// Err(e) => println!("read error: {}", e),
39 pub struct ChanReader {
40 buf: Vec<u8>, // A buffer of bytes received but not consumed.
41 pos: uint, // How many of the buffered bytes have already be consumed.
42 rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
43 closed: bool, // Whether the channel this Receiver connects to has been closed.
47 /// Wraps a `Port` in a `ChanReader` structure
48 pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
58 impl Buffer for ChanReader {
59 fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
60 if self.pos >= self.buf.len() {
62 match self.rx.recv() {
68 self.buf = Vec::new();
73 Err(io::standard_error(io::EndOfFile))
75 Ok(self.buf.slice_from(self.pos))
79 fn consume(&mut self, amt: uint) {
81 assert!(self.pos <= self.buf.len());
85 impl Reader for ChanReader {
86 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
89 let count = match self.fill_buf().ok() {
91 let dst = buf.slice_from_mut(num_read);
92 let count = cmp::min(src.len(), dst.len());
93 bytes::copy_memory(dst, &src[0..count]);
100 if num_read == buf.len() || self.closed {
104 if self.closed && num_read == 0 {
105 Err(io::standard_error(io::EndOfFile))
112 /// Allows writing to a tx.
117 /// # #![allow(unused_must_use)]
118 /// use std::sync::mpsc::channel;
119 /// use std::io::ChanWriter;
121 /// let (tx, rx) = channel();
123 /// let mut writer = ChanWriter::new(tx);
124 /// writer.write("hello, world".as_bytes());
126 pub struct ChanWriter {
131 /// Wraps a channel in a `ChanWriter` structure
132 pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
133 ChanWriter { tx: tx }
138 impl Clone for ChanWriter {
139 fn clone(&self) -> ChanWriter {
140 ChanWriter { tx: self.tx.clone() }
144 impl Writer for ChanWriter {
145 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
146 self.tx.send(buf.to_vec()).map_err(|_| {
148 kind: io::BrokenPipe,
161 use sync::mpsc::channel;
167 fn test_rx_reader() {
168 let (tx, rx) = channel();
169 Thread::spawn(move|| {
170 tx.send(vec![1u8, 2u8]).unwrap();
171 tx.send(vec![]).unwrap();
172 tx.send(vec![3u8, 4u8]).unwrap();
173 tx.send(vec![5u8, 6u8]).unwrap();
174 tx.send(vec![7u8, 8u8]).unwrap();
177 let mut reader = ChanReader::new(rx);
178 let mut buf = [0u8; 3];
180 assert_eq!(Ok(0), reader.read(&mut []));
182 assert_eq!(Ok(3), reader.read(&mut buf));
183 let a: &[u8] = &[1,2,3];
186 assert_eq!(Ok(3), reader.read(&mut buf));
187 let a: &[u8] = &[4,5,6];
190 assert_eq!(Ok(2), reader.read(&mut buf));
191 let a: &[u8] = &[7,8,6];
194 match reader.read(buf.as_mut_slice()) {
196 Err(e) => assert_eq!(e.kind, io::EndOfFile),
200 // Ensure it continues to panic in the same way.
201 match reader.read(buf.as_mut_slice()) {
203 Err(e) => assert_eq!(e.kind, io::EndOfFile),
209 fn test_rx_buffer() {
210 let (tx, rx) = channel();
211 Thread::spawn(move|| {
212 tx.send(b"he".to_vec()).unwrap();
213 tx.send(b"llo wo".to_vec()).unwrap();
214 tx.send(b"".to_vec()).unwrap();
215 tx.send(b"rld\nhow ".to_vec()).unwrap();
216 tx.send(b"are you?".to_vec()).unwrap();
217 tx.send(b"".to_vec()).unwrap();
220 let mut reader = ChanReader::new(rx);
222 assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
223 assert_eq!(Ok("how are you?".to_string()), reader.read_line());
224 match reader.read_line() {
226 Err(e) => assert_eq!(e.kind, io::EndOfFile),
231 fn test_chan_writer() {
232 let (tx, rx) = channel();
233 let mut writer = ChanWriter::new(tx);
234 writer.write_be_u32(42).unwrap();
236 let wanted = vec![0u8, 0u8, 0u8, 42u8];
237 let got = match Thread::scoped(move|| { rx.recv().unwrap() }).join() {
241 assert_eq!(wanted, got);
243 match writer.write_u8(1) {
245 Err(e) => assert_eq!(e.kind, io::BrokenPipe),