]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/comm_adapters.rs
Replace all ~"" with "".to_owned()
[rust.git] / src / libstd / io / comm_adapters.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use clone::Clone;
12 use cmp;
13 use container::Container;
14 use comm::{Sender, Receiver};
15 use io;
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use super::{Reader, Writer, IoResult};
19 use str::StrSlice;
20 use slice::{bytes, CloneableVector, MutableVector, ImmutableVector};
21
22 /// Allows reading from a rx.
23 ///
24 /// # Example
25 ///
26 /// ```
27 /// use std::io::ChanReader;
28 ///
29 /// let (tx, rx) = channel();
30 /// # drop(tx);
31 /// let mut reader = ChanReader::new(rx);
32 ///
33 /// let mut buf = ~[0u8, ..100];
34 /// match reader.read(buf) {
35 ///     Ok(nread) => println!("Read {} bytes", nread),
36 ///     Err(e) => println!("read error: {}", e),
37 /// }
38 /// ```
39 pub struct ChanReader {
40     buf: Option<~[u8]>,  // A buffer of bytes received but not consumed.
41     pos: uint,           // How many of the buffered bytes have already be consumed.
42     rx: Receiver<~[u8]>,   // The rx to pull data from.
43     closed: bool,        // Whether the pipe this rx connects to has been closed.
44 }
45
46 impl ChanReader {
47     /// Wraps a `Port` in a `ChanReader` structure
48     pub fn new(rx: Receiver<~[u8]>) -> ChanReader {
49         ChanReader {
50             buf: None,
51             pos: 0,
52             rx: rx,
53             closed: false,
54         }
55     }
56 }
57
58 impl Reader for ChanReader {
59     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
60         let mut num_read = 0;
61         loop {
62             match self.buf {
63                 Some(ref prev) => {
64                     let dst = buf.mut_slice_from(num_read);
65                     let src = prev.slice_from(self.pos);
66                     let count = cmp::min(dst.len(), src.len());
67                     bytes::copy_memory(dst, src.slice_to(count));
68                     num_read += count;
69                     self.pos += count;
70                 },
71                 None => (),
72             };
73             if num_read == buf.len() || self.closed {
74                 break;
75             }
76             self.pos = 0;
77             self.buf = self.rx.recv_opt().ok();
78             self.closed = self.buf.is_none();
79         }
80         if self.closed && num_read == 0 {
81             Err(io::standard_error(io::EndOfFile))
82         } else {
83             Ok(num_read)
84         }
85     }
86 }
87
88 /// Allows writing to a tx.
89 ///
90 /// # Example
91 ///
92 /// ```
93 /// # #![allow(unused_must_use)]
94 /// use std::io::ChanWriter;
95 ///
96 /// let (tx, rx) = channel();
97 /// # drop(rx);
98 /// let mut writer = ChanWriter::new(tx);
99 /// writer.write("hello, world".as_bytes());
100 /// ```
101 pub struct ChanWriter {
102     tx: Sender<~[u8]>,
103 }
104
105 impl ChanWriter {
106     /// Wraps a channel in a `ChanWriter` structure
107     pub fn new(tx: Sender<~[u8]>) -> ChanWriter {
108         ChanWriter { tx: tx }
109     }
110 }
111
112 impl Clone for ChanWriter {
113     fn clone(&self) -> ChanWriter {
114         ChanWriter { tx: self.tx.clone() }
115     }
116 }
117
118 impl Writer for ChanWriter {
119     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
120         self.tx.send_opt(buf.to_owned()).map_err(|_| {
121             io::IoError {
122                 kind: io::BrokenPipe,
123                 desc: "Pipe closed",
124                 detail: None
125             }
126         })
127     }
128 }
129
130
131 #[cfg(test)]
132 mod test {
133     use prelude::*;
134     use super::*;
135     use io;
136     use task;
137
138     #[test]
139     fn test_rx_reader() {
140         let (tx, rx) = channel();
141         task::spawn(proc() {
142           tx.send(~[1u8, 2u8]);
143           tx.send(~[]);
144           tx.send(~[3u8, 4u8]);
145           tx.send(~[5u8, 6u8]);
146           tx.send(~[7u8, 8u8]);
147         });
148
149         let mut reader = ChanReader::new(rx);
150         let mut buf = ~[0u8, ..3];
151
152
153         assert_eq!(Ok(0), reader.read([]));
154
155         assert_eq!(Ok(3), reader.read(buf));
156         assert_eq!(~[1,2,3], buf);
157
158         assert_eq!(Ok(3), reader.read(buf));
159         assert_eq!(~[4,5,6], buf);
160
161         assert_eq!(Ok(2), reader.read(buf));
162         assert_eq!(~[7,8,6], buf);
163
164         match reader.read(buf) {
165             Ok(..) => fail!(),
166             Err(e) => assert_eq!(e.kind, io::EndOfFile),
167         }
168         assert_eq!(~[7,8,6], buf);
169
170         // Ensure it continues to fail in the same way.
171         match reader.read(buf) {
172             Ok(..) => fail!(),
173             Err(e) => assert_eq!(e.kind, io::EndOfFile),
174         }
175         assert_eq!(~[7,8,6], buf);
176     }
177
178     #[test]
179     fn test_chan_writer() {
180         let (tx, rx) = channel();
181         let mut writer = ChanWriter::new(tx);
182         writer.write_be_u32(42).unwrap();
183
184         let wanted = ~[0u8, 0u8, 0u8, 42u8];
185         let got = task::try(proc() { rx.recv() }).unwrap();
186         assert_eq!(wanted, got);
187
188         match writer.write_u8(1) {
189             Ok(..) => fail!(),
190             Err(e) => assert_eq!(e.kind, io::BrokenPipe),
191         }
192     }
193 }