1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use collections::Collection;
14 use comm::{Sender, Receiver};
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use slice::{bytes, MutableSlice, ImmutableSlice};
20 use super::{Reader, Writer, IoResult};
23 /// Allows reading from a rx.
28 /// use std::io::ChanReader;
30 /// let (tx, rx) = channel();
32 /// let mut reader = ChanReader::new(rx);
34 /// let mut buf = [0u8, ..100];
35 /// match reader.read(buf) {
36 /// Ok(nread) => println!("Read {} bytes", nread),
37 /// Err(e) => println!("read error: {}", e),
40 pub struct ChanReader {
41 buf: Option<Vec<u8>>, // A buffer of bytes received but not consumed.
42 pos: uint, // How many of the buffered bytes have already be consumed.
43 rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
44 closed: bool, // Whether the channel this Receiver connects to has been closed.
48 /// Wraps a `Port` in a `ChanReader` structure
49 pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
59 impl Reader for ChanReader {
60 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
65 let dst = buf.mut_slice_from(num_read);
66 let src = prev.slice_from(self.pos);
67 let count = cmp::min(dst.len(), src.len());
68 bytes::copy_memory(dst, src.slice_to(count));
74 if num_read == buf.len() || self.closed {
78 self.buf = self.rx.recv_opt().ok();
79 self.closed = self.buf.is_none();
81 if self.closed && num_read == 0 {
82 Err(io::standard_error(io::EndOfFile))
89 /// Allows writing to a tx.
94 /// # #![allow(unused_must_use)]
95 /// use std::io::ChanWriter;
97 /// let (tx, rx) = channel();
99 /// let mut writer = ChanWriter::new(tx);
100 /// writer.write("hello, world".as_bytes());
102 pub struct ChanWriter {
107 /// Wraps a channel in a `ChanWriter` structure
108 pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
109 ChanWriter { tx: tx }
113 impl Clone for ChanWriter {
114 fn clone(&self) -> ChanWriter {
115 ChanWriter { tx: self.tx.clone() }
119 impl Writer for ChanWriter {
120 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
121 self.tx.send_opt(Vec::from_slice(buf)).map_err(|_| {
123 kind: io::BrokenPipe,
140 fn test_rx_reader() {
141 let (tx, rx) = channel();
143 tx.send(vec![1u8, 2u8]);
145 tx.send(vec![3u8, 4u8]);
146 tx.send(vec![5u8, 6u8]);
147 tx.send(vec![7u8, 8u8]);
150 let mut reader = ChanReader::new(rx);
151 let mut buf = [0u8, ..3];
154 assert_eq!(Ok(0), reader.read([]));
156 assert_eq!(Ok(3), reader.read(buf));
157 let a: &[u8] = &[1,2,3];
158 assert_eq!(a, buf.as_slice());
160 assert_eq!(Ok(3), reader.read(buf));
161 let a: &[u8] = &[4,5,6];
162 assert_eq!(a, buf.as_slice());
164 assert_eq!(Ok(2), reader.read(buf));
165 let a: &[u8] = &[7,8,6];
166 assert_eq!(a, buf.as_slice());
168 match reader.read(buf.as_mut_slice()) {
170 Err(e) => assert_eq!(e.kind, io::EndOfFile),
172 assert_eq!(a, buf.as_slice());
174 // Ensure it continues to fail in the same way.
175 match reader.read(buf.as_mut_slice()) {
177 Err(e) => assert_eq!(e.kind, io::EndOfFile),
179 assert_eq!(a, buf.as_slice());
183 fn test_chan_writer() {
184 let (tx, rx) = channel();
185 let mut writer = ChanWriter::new(tx);
186 writer.write_be_u32(42).unwrap();
188 let wanted = vec![0u8, 0u8, 0u8, 42u8];
189 let got = match task::try(proc() { rx.recv() }) {
193 assert_eq!(wanted, got);
195 match writer.write_u8(1) {
197 Err(e) => assert_eq!(e.kind, io::BrokenPipe),