1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use collections::Collection;
14 use comm::{Sender, Receiver};
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use super::{Reader, Writer, IoResult};
20 use slice::{bytes, MutableVector, ImmutableVector};
23 /// Allows reading from a rx.
28 /// use std::io::ChanReader;
30 /// let (tx, rx) = channel();
32 /// let mut reader = ChanReader::new(rx);
34 /// let mut buf = [0u8, ..100];
35 /// match reader.read(buf) {
36 /// Ok(nread) => println!("Read {} bytes", nread),
37 /// Err(e) => println!("read error: {}", e),
40 pub struct ChanReader {
41 buf: Option<Vec<u8>>, // A buffer of bytes received but not consumed.
42 pos: uint, // How many of the buffered bytes have already be consumed.
43 rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
44 closed: bool, // Whether the channel this Receiver connects to has been closed.
48 /// Wraps a `Port` in a `ChanReader` structure
49 pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
59 impl Reader for ChanReader {
60 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
65 let dst = buf.mut_slice_from(num_read);
66 let src = prev.slice_from(self.pos);
67 let count = cmp::min(dst.len(), src.len());
68 bytes::copy_memory(dst, src.slice_to(count));
74 if num_read == buf.len() || self.closed {
78 self.buf = self.rx.recv_opt().ok();
79 self.closed = self.buf.is_none();
81 if self.closed && num_read == 0 {
82 Err(io::standard_error(io::EndOfFile))
89 /// Allows writing to a tx.
94 /// # #![allow(unused_must_use)]
95 /// use std::io::ChanWriter;
97 /// let (tx, rx) = channel();
99 /// let mut writer = ChanWriter::new(tx);
100 /// writer.write("hello, world".as_bytes());
102 pub struct ChanWriter {
107 /// Wraps a channel in a `ChanWriter` structure
108 pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
109 ChanWriter { tx: tx }
113 impl Clone for ChanWriter {
114 fn clone(&self) -> ChanWriter {
115 ChanWriter { tx: self.tx.clone() }
119 impl Writer for ChanWriter {
120 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
121 self.tx.send_opt(Vec::from_slice(buf)).map_err(|_| {
123 kind: io::BrokenPipe,
140 fn test_rx_reader() {
141 let (tx, rx) = channel();
143 tx.send(vec![1u8, 2u8]);
145 tx.send(vec![3u8, 4u8]);
146 tx.send(vec![5u8, 6u8]);
147 tx.send(vec![7u8, 8u8]);
150 let mut reader = ChanReader::new(rx);
151 let mut buf = [0u8, ..3];
154 assert_eq!(Ok(0), reader.read([]));
156 assert_eq!(Ok(3), reader.read(buf));
157 assert_eq!(&[1,2,3], buf.as_slice());
159 assert_eq!(Ok(3), reader.read(buf));
160 assert_eq!(&[4,5,6], buf.as_slice());
162 assert_eq!(Ok(2), reader.read(buf));
163 assert_eq!(&[7,8,6], buf.as_slice());
165 match reader.read(buf) {
167 Err(e) => assert_eq!(e.kind, io::EndOfFile),
169 assert_eq!(&[7,8,6], buf.as_slice());
171 // Ensure it continues to fail in the same way.
172 match reader.read(buf) {
174 Err(e) => assert_eq!(e.kind, io::EndOfFile),
176 assert_eq!(&[7,8,6], buf.as_slice());
180 fn test_chan_writer() {
181 let (tx, rx) = channel();
182 let mut writer = ChanWriter::new(tx);
183 writer.write_be_u32(42).unwrap();
185 let wanted = vec![0u8, 0u8, 0u8, 42u8];
186 let got = task::try(proc() { rx.recv() }).unwrap();
187 assert_eq!(wanted, got);
189 match writer.write_u8(1) {
191 Err(e) => assert_eq!(e.kind, io::BrokenPipe),