1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use container::Container;
14 use comm::{Sender, Receiver};
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use super::{Reader, Writer, IoResult};
20 use slice::{bytes, CloneableVector, MutableVector, ImmutableVector};
22 /// Allows reading from a rx.
27 /// use std::io::ChanReader;
29 /// let (tx, rx) = channel();
31 /// let mut reader = ChanReader::new(rx);
33 /// let mut buf = ~[0u8, ..100];
34 /// match reader.read(buf) {
35 /// Ok(nread) => println!("Read {} bytes", nread),
36 /// Err(e) => println!("read error: {}", e),
39 pub struct ChanReader {
40 buf: Option<~[u8]>, // A buffer of bytes received but not consumed.
41 pos: uint, // How many of the buffered bytes have already be consumed.
42 rx: Receiver<~[u8]>, // The rx to pull data from.
43 closed: bool, // Whether the pipe this rx connects to has been closed.
47 /// Wraps a `Port` in a `ChanReader` structure
48 pub fn new(rx: Receiver<~[u8]>) -> ChanReader {
58 impl Reader for ChanReader {
59 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
64 let dst = buf.mut_slice_from(num_read);
65 let src = prev.slice_from(self.pos);
66 let count = cmp::min(dst.len(), src.len());
67 bytes::copy_memory(dst, src.slice_to(count));
73 if num_read == buf.len() || self.closed {
77 self.buf = self.rx.recv_opt().ok();
78 self.closed = self.buf.is_none();
80 if self.closed && num_read == 0 {
81 Err(io::standard_error(io::EndOfFile))
88 /// Allows writing to a tx.
93 /// # #![allow(unused_must_use)]
94 /// use std::io::ChanWriter;
96 /// let (tx, rx) = channel();
98 /// let mut writer = ChanWriter::new(tx);
99 /// writer.write("hello, world".as_bytes());
101 pub struct ChanWriter {
106 /// Wraps a channel in a `ChanWriter` structure
107 pub fn new(tx: Sender<~[u8]>) -> ChanWriter {
108 ChanWriter { tx: tx }
112 impl Clone for ChanWriter {
113 fn clone(&self) -> ChanWriter {
114 ChanWriter { tx: self.tx.clone() }
118 impl Writer for ChanWriter {
119 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
120 self.tx.send_opt(buf.to_owned()).map_err(|_| {
122 kind: io::BrokenPipe,
139 fn test_rx_reader() {
140 let (tx, rx) = channel();
142 tx.send(~[1u8, 2u8]);
144 tx.send(~[3u8, 4u8]);
145 tx.send(~[5u8, 6u8]);
146 tx.send(~[7u8, 8u8]);
149 let mut reader = ChanReader::new(rx);
150 let mut buf = ~[0u8, ..3];
153 assert_eq!(Ok(0), reader.read([]));
155 assert_eq!(Ok(3), reader.read(buf));
156 assert_eq!(~[1,2,3], buf);
158 assert_eq!(Ok(3), reader.read(buf));
159 assert_eq!(~[4,5,6], buf);
161 assert_eq!(Ok(2), reader.read(buf));
162 assert_eq!(~[7,8,6], buf);
164 match reader.read(buf) {
166 Err(e) => assert_eq!(e.kind, io::EndOfFile),
168 assert_eq!(~[7,8,6], buf);
170 // Ensure it continues to fail in the same way.
171 match reader.read(buf) {
173 Err(e) => assert_eq!(e.kind, io::EndOfFile),
175 assert_eq!(~[7,8,6], buf);
179 fn test_chan_writer() {
180 let (tx, rx) = channel();
181 let mut writer = ChanWriter::new(tx);
182 writer.write_be_u32(42).unwrap();
184 let wanted = ~[0u8, 0u8, 0u8, 42u8];
185 let got = task::try(proc() { rx.recv() }).unwrap();
186 assert_eq!(wanted, got);
188 match writer.write_u8(1) {
190 Err(e) => assert_eq!(e.kind, io::BrokenPipe),