1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use sync::mpsc::{Sender, Receiver};
15 use option::Option::{None, Some};
16 use result::Result::{Ok, Err};
18 use super::{Buffer, Reader, Writer, IoResult};
21 /// Allows reading from a rx.
26 /// # #![feature(old_io)]
27 /// use std::sync::mpsc::channel;
28 /// use std::old_io::*;
30 /// let (tx, rx) = channel();
32 /// let mut reader = ChanReader::new(rx);
34 /// let mut buf = [0; 100];
35 /// match reader.read(&mut buf) {
36 /// Ok(nread) => println!("Read {} bytes", nread),
37 /// Err(e) => println!("read error: {}", e),
40 pub struct ChanReader {
41 buf: Vec<u8>, // A buffer of bytes received but not consumed.
42 pos: usize, // How many of the buffered bytes have already be consumed.
43 rx: Receiver<Vec<u8>>, // The Receiver to pull data from.
44 closed: bool, // Whether the channel this Receiver connects to has been closed.
48 /// Wraps a `Port` in a `ChanReader` structure
49 pub fn new(rx: Receiver<Vec<u8>>) -> ChanReader {
59 impl Buffer for ChanReader {
60 fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> {
61 if self.pos >= self.buf.len() {
63 match self.rx.recv() {
69 self.buf = Vec::new();
74 Err(old_io::standard_error(old_io::EndOfFile))
76 Ok(&self.buf[self.pos..])
80 fn consume(&mut self, amt: usize) {
82 assert!(self.pos <= self.buf.len());
86 impl Reader for ChanReader {
87 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
90 let count = match self.fill_buf().ok() {
92 let dst = &mut buf[num_read..];
93 let count = cmp::min(src.len(), dst.len());
94 bytes::copy_memory(&src[..count], dst);
101 if num_read == buf.len() || self.closed {
105 if self.closed && num_read == 0 {
106 Err(old_io::standard_error(old_io::EndOfFile))
113 /// Allows writing to a tx.
118 /// # #![feature(old_io, io)]
119 /// # #![allow(unused_must_use)]
120 /// use std::sync::mpsc::channel;
121 /// use std::old_io::*;
123 /// let (tx, rx) = channel();
125 /// let mut writer = ChanWriter::new(tx);
126 /// writer.write("hello, world".as_bytes());
128 pub struct ChanWriter {
133 /// Wraps a channel in a `ChanWriter` structure
134 pub fn new(tx: Sender<Vec<u8>>) -> ChanWriter {
135 ChanWriter { tx: tx }
139 #[stable(feature = "rust1", since = "1.0.0")]
140 impl Clone for ChanWriter {
141 fn clone(&self) -> ChanWriter {
142 ChanWriter { tx: self.tx.clone() }
146 impl Writer for ChanWriter {
147 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
148 self.tx.send(buf.to_vec()).map_err(|_| {
150 kind: old_io::BrokenPipe,
163 use sync::mpsc::channel;
165 use old_io::{self, Reader, Writer, Buffer};
169 fn test_rx_reader() {
170 let (tx, rx) = channel();
171 thread::spawn(move|| {
172 tx.send(vec![1, 2]).unwrap();
173 tx.send(vec![]).unwrap();
174 tx.send(vec![3, 4]).unwrap();
175 tx.send(vec![5, 6]).unwrap();
176 tx.send(vec![7, 8]).unwrap();
179 let mut reader = ChanReader::new(rx);
180 let mut buf = [0; 3];
182 assert_eq!(Ok(0), reader.read(&mut []));
184 assert_eq!(Ok(3), reader.read(&mut buf));
185 let a: &[u8] = &[1,2,3];
188 assert_eq!(Ok(3), reader.read(&mut buf));
189 let a: &[u8] = &[4,5,6];
192 assert_eq!(Ok(2), reader.read(&mut buf));
193 let a: &[u8] = &[7,8,6];
196 match reader.read(&mut buf) {
198 Err(e) => assert_eq!(e.kind, old_io::EndOfFile),
202 // Ensure it continues to panic in the same way.
203 match reader.read(&mut buf) {
205 Err(e) => assert_eq!(e.kind, old_io::EndOfFile),
211 fn test_rx_buffer() {
212 let (tx, rx) = channel();
213 thread::spawn(move|| {
214 tx.send(b"he".to_vec()).unwrap();
215 tx.send(b"llo wo".to_vec()).unwrap();
216 tx.send(b"".to_vec()).unwrap();
217 tx.send(b"rld\nhow ".to_vec()).unwrap();
218 tx.send(b"are you?".to_vec()).unwrap();
219 tx.send(b"".to_vec()).unwrap();
222 let mut reader = ChanReader::new(rx);
224 assert_eq!(Ok("hello world\n".to_string()), reader.read_line());
225 assert_eq!(Ok("how are you?".to_string()), reader.read_line());
226 match reader.read_line() {
228 Err(e) => assert_eq!(e.kind, old_io::EndOfFile),
233 fn test_chan_writer() {
234 let (tx, rx) = channel();
235 let mut writer = ChanWriter::new(tx);
236 writer.write_be_u32(42).unwrap();
238 let wanted = vec![0, 0, 0, 42];
239 let got = thread::scoped(move|| { rx.recv().unwrap() }).join();
240 assert_eq!(wanted, got);
242 match writer.write_u8(1) {
244 Err(e) => assert_eq!(e.kind, old_io::BrokenPipe),