]> git.lizzy.rs Git - rust.git/commitdiff
impl Buffer for ChanReader
authorRaphael Speyer <rspeyer@gmail.com>
Mon, 13 Oct 2014 11:09:30 +0000 (22:09 +1100)
committerRaphael Speyer <rspeyer@gmail.com>
Wed, 15 Oct 2014 17:39:58 +0000 (04:39 +1100)
src/libstd/io/comm_adapters.rs

index 5bec131f22251d3e24666fb150947dfa47f15ed1..bd9577c8cfc8cf3dcf8a1d70e8a46c7800ce34c7 100644 (file)
 use collections::Collection;
 use comm::{Sender, Receiver};
 use io;
-use option::{None, Option, Some};
+use option::{None, Some};
 use result::{Ok, Err};
 use slice::{bytes, CloneableVector};
-use super::{Reader, Writer, IoResult};
+use super::{Buffer, Reader, Writer, IoResult};
 use vec::Vec;
 
 /// Allows reading from a rx.
@@ -37,7 +37,7 @@
 /// }
 /// ```
 pub struct ChanReader {
-    buf: Option<Vec<u8>>,  // A buffer of bytes received but not consumed.
+    buf: Vec<u8>,          // A buffer of bytes received but not consumed.
     pos: uint,             // How many of the buffered bytes have already be consumed.
     rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
     closed: bool,          // Whether the channel this Receiver connects to has been closed.
@@ -47,7 +47,7 @@ impl ChanReader {
     /// Wraps a `Port` in a `ChanReader` structure
     pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
         ChanReader {
-            buf: None,
+            buf: Vec::new(),
             pos: 0,
             rx: rx,
             closed: false,
@@ -55,27 +55,51 @@ pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
     }
 }
 
+impl Buffer for ChanReader {
+    fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
+        if self.pos >= self.buf.len() {
+            self.pos = 0;
+            match self.rx.recv_opt() {
+                Ok(bytes) => {
+                    self.buf = bytes;
+                },
+                Err(()) => {
+                    self.closed = true;
+                    self.buf = Vec::new();
+                }
+            }
+        }
+        if self.closed {
+            Err(io::standard_error(io::EndOfFile))
+        } else {
+            Ok(self.buf.slice_from(self.pos))
+        }
+    }
+
+    fn consume(&mut self, amt: uint) {
+        self.pos += amt;
+        assert!(self.pos <= self.buf.len());
+    }
+}
+
 impl Reader for ChanReader {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
         let mut num_read = 0;
         loop {
-            match self.buf {
-                Some(ref prev) => {
+            let count = match self.fill_buf().ok() {
+                Some(src) => {
                     let dst = buf[mut num_read..];
-                    let src = prev[self.pos..];
-                    let count = cmp::min(dst.len(), src.len());
+                    let count = cmp::min(src.len(), dst.len());
                     bytes::copy_memory(dst, src[..count]);
-                    num_read += count;
-                    self.pos += count;
+                    count
                 },
-                None => (),
+                None => 0,
             };
+            self.consume(count);
+            num_read += count;
             if num_read == buf.len() || self.closed {
                 break;
             }
-            self.pos = 0;
-            self.buf = self.rx.recv_opt().ok();
-            self.closed = self.buf.is_none();
         }
         if self.closed && num_read == 0 {
             Err(io::standard_error(io::EndOfFile))
@@ -149,7 +173,6 @@ fn test_rx_reader() {
         let mut reader = ChanReader::new(rx);
         let mut buf = [0u8, ..3];
 
-
         assert_eq!(Ok(0), reader.read([]));
 
         assert_eq!(Ok(3), reader.read(buf));
@@ -178,6 +201,28 @@ fn test_rx_reader() {
         assert_eq!(a, buf.as_slice());
     }
 
+    #[test]
+    fn test_rx_buffer() {
+        let (tx, rx) = channel();
+        task::spawn(proc() {
+          tx.send(b"he".to_vec());
+          tx.send(b"llo wo".to_vec());
+          tx.send(b"".to_vec());
+          tx.send(b"rld\nhow ".to_vec());
+          tx.send(b"are you?".to_vec());
+          tx.send(b"".to_vec());
+        });
+
+        let mut reader = ChanReader::new(rx);
+
+        assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
+        assert_eq!(Ok("how are you?".to_string()), reader.read_line());
+        match reader.read_line() {
+            Ok(..) => fail!(),
+            Err(e) => assert_eq!(e.kind, io::EndOfFile),
+        }
+    }
+
     #[test]
     fn test_chan_writer() {
         let (tx, rx) = channel();