use collections::Collection;
use comm::{Sender, Receiver};
use io;
-use option::{None, Option, Some};
+use option::{None, Some};
use result::{Ok, Err};
use slice::{bytes, CloneableVector};
-use super::{Reader, Writer, IoResult};
+use super::{Buffer, Reader, Writer, IoResult};
use vec::Vec;
/// Allows reading from a rx.
/// }
/// ```
pub struct ChanReader {
- buf: Option<Vec<u8>>, // A buffer of bytes received but not consumed.
+ buf: Vec<u8>, // A buffer of bytes received but not consumed.
pos: uint, // How many of the buffered bytes have already be consumed.
rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
closed: bool, // Whether the channel this Receiver connects to has been closed.
/// Wraps a `Port` in a `ChanReader` structure
pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
ChanReader {
- buf: None,
+ buf: Vec::new(),
pos: 0,
rx: rx,
closed: false,
}
}
+impl Buffer for ChanReader {
+ fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
+ if self.pos >= self.buf.len() {
+ self.pos = 0;
+ match self.rx.recv_opt() {
+ Ok(bytes) => {
+ self.buf = bytes;
+ },
+ Err(()) => {
+ self.closed = true;
+ self.buf = Vec::new();
+ }
+ }
+ }
+ if self.closed {
+ Err(io::standard_error(io::EndOfFile))
+ } else {
+ Ok(self.buf.slice_from(self.pos))
+ }
+ }
+
+ fn consume(&mut self, amt: uint) {
+ self.pos += amt;
+ assert!(self.pos <= self.buf.len());
+ }
+}
+
impl Reader for ChanReader {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let mut num_read = 0;
loop {
- match self.buf {
- Some(ref prev) => {
+ let count = match self.fill_buf().ok() {
+ Some(src) => {
let dst = buf[mut num_read..];
- let src = prev[self.pos..];
- let count = cmp::min(dst.len(), src.len());
+ let count = cmp::min(src.len(), dst.len());
bytes::copy_memory(dst, src[..count]);
- num_read += count;
- self.pos += count;
+ count
},
- None => (),
+ None => 0,
};
+ self.consume(count);
+ num_read += count;
if num_read == buf.len() || self.closed {
break;
}
- self.pos = 0;
- self.buf = self.rx.recv_opt().ok();
- self.closed = self.buf.is_none();
}
if self.closed && num_read == 0 {
Err(io::standard_error(io::EndOfFile))
let mut reader = ChanReader::new(rx);
let mut buf = [0u8, ..3];
-
assert_eq!(Ok(0), reader.read([]));
assert_eq!(Ok(3), reader.read(buf));
assert_eq!(a, buf.as_slice());
}
+ #[test]
+ fn test_rx_buffer() {
+ let (tx, rx) = channel();
+ task::spawn(proc() {
+ tx.send(b"he".to_vec());
+ tx.send(b"llo wo".to_vec());
+ tx.send(b"".to_vec());
+ tx.send(b"rld\nhow ".to_vec());
+ tx.send(b"are you?".to_vec());
+ tx.send(b"".to_vec());
+ });
+
+ let mut reader = ChanReader::new(rx);
+
+ assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
+ assert_eq!(Ok("how are you?".to_string()), reader.read_line());
+ match reader.read_line() {
+ Ok(..) => fail!(),
+ Err(e) => assert_eq!(e.kind, io::EndOfFile),
+ }
+ }
+
#[test]
fn test_chan_writer() {
let (tx, rx) = channel();