]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm.rs
auto merge of #10519 : nikomatsakis/rust/issue-8624-borrowck-overly-permissive, r...
[rust.git] / src / libstd / comm.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 Message passing
13 */
14
15 #[allow(missing_doc)];
16
17 use clone::Clone;
18 use iter::Iterator;
19 use kinds::Send;
20 use option::Option;
21 use rtcomm = rt::comm;
22
23 /// A trait for things that can send multiple messages.
24 pub trait GenericChan<T> {
25     /// Sends a message.
26     fn send(&self, x: T);
27 }
28
29 /// Things that can send multiple messages and can detect when the receiver
30 /// is closed
31 pub trait GenericSmartChan<T> {
32     /// Sends a message, or report if the receiver has closed the connection.
33     fn try_send(&self, x: T) -> bool;
34 }
35
36 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
37 pub trait SendDeferred<T> {
38     fn send_deferred(&self, val: T);
39     fn try_send_deferred(&self, val: T) -> bool;
40 }
41
42 /// A trait for things that can receive multiple messages.
43 pub trait GenericPort<T> {
44     /// Receives a message, or fails if the connection closes.
45     fn recv(&self) -> T;
46
47     /// Receives a message, or returns `none` if
48     /// the connection is closed or closes.
49     fn try_recv(&self) -> Option<T>;
50
51     /// Returns an iterator that breaks once the connection closes.
52     ///
53     /// # Example
54     ///
55     /// ~~~rust
56     /// do spawn {
57     ///     for x in port.recv_iter() {
58     ///         if pred(x) { break; }
59     ///         println!("{}", x);
60     ///     }
61     /// }
62     /// ~~~
63     fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> {
64         RecvIterator { port: self }
65     }
66 }
67
68 pub struct RecvIterator<'a, P> {
69     priv port: &'a P,
70 }
71
72 impl<'a, T, P: GenericPort<T>> Iterator<T> for RecvIterator<'a, P> {
73     fn next(&mut self) -> Option<T> {
74         self.port.try_recv()
75     }
76 }
77
78 /// Ports that can `peek`
79 pub trait Peekable<T> {
80     /// Returns true if a message is available
81     fn peek(&self) -> bool;
82 }
83
84 pub struct PortOne<T> { priv x: rtcomm::PortOne<T> }
85 pub struct ChanOne<T> { priv x: rtcomm::ChanOne<T> }
86
87 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
88     let (p, c) = rtcomm::oneshot();
89     (PortOne { x: p }, ChanOne { x: c })
90 }
91
92 pub struct Port<T> { priv x: rtcomm::Port<T> }
93 pub struct Chan<T> { priv x: rtcomm::Chan<T> }
94
95 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
96     let (p, c) = rtcomm::stream();
97     (Port { x: p }, Chan { x: c })
98 }
99
100 impl<T: Send> ChanOne<T> {
101     pub fn send(self, val: T) {
102         let ChanOne { x: c } = self;
103         c.send(val)
104     }
105
106     pub fn try_send(self, val: T) -> bool {
107         let ChanOne { x: c } = self;
108         c.try_send(val)
109     }
110
111     pub fn send_deferred(self, val: T) {
112         let ChanOne { x: c } = self;
113         c.send_deferred(val)
114     }
115
116     pub fn try_send_deferred(self, val: T) -> bool {
117         let ChanOne{ x: c } = self;
118         c.try_send_deferred(val)
119     }
120 }
121
122 impl<T: Send> PortOne<T> {
123     pub fn recv(self) -> T {
124         let PortOne { x: p } = self;
125         p.recv()
126     }
127
128     pub fn try_recv(self) -> Option<T> {
129         let PortOne { x: p } = self;
130         p.try_recv()
131     }
132 }
133
134 impl<T: Send> Peekable<T>  for PortOne<T> {
135     fn peek(&self) -> bool {
136         let &PortOne { x: ref p } = self;
137         p.peek()
138     }
139 }
140
141 impl<T: Send> GenericChan<T> for Chan<T> {
142     fn send(&self, val: T) {
143         let &Chan { x: ref c } = self;
144         c.send(val)
145     }
146 }
147
148 impl<T: Send> GenericSmartChan<T> for Chan<T> {
149     fn try_send(&self, val: T) -> bool {
150         let &Chan { x: ref c } = self;
151         c.try_send(val)
152     }
153 }
154
155 impl<T: Send> SendDeferred<T> for Chan<T> {
156     fn send_deferred(&self, val: T) {
157         let &Chan { x: ref c } = self;
158         c.send_deferred(val)
159     }
160
161     fn try_send_deferred(&self, val: T) -> bool {
162         let &Chan { x: ref c } = self;
163         c.try_send_deferred(val)
164     }
165 }
166
167 impl<T: Send> GenericPort<T> for Port<T> {
168     fn recv(&self) -> T {
169         let &Port { x: ref p } = self;
170         p.recv()
171     }
172
173     fn try_recv(&self) -> Option<T> {
174         let &Port { x: ref p } = self;
175         p.try_recv()
176     }
177 }
178
179 impl<T: Send> Peekable<T> for Port<T> {
180     fn peek(&self) -> bool {
181         let &Port { x: ref p } = self;
182         p.peek()
183     }
184 }
185
186
187 pub struct SharedChan<T> { priv x: rtcomm::SharedChan<T> }
188
189 impl<T: Send> SharedChan<T> {
190     pub fn new(c: Chan<T>) -> SharedChan<T> {
191         let Chan { x: c } = c;
192         SharedChan { x: rtcomm::SharedChan::new(c) }
193     }
194 }
195
196 impl<T: Send> GenericChan<T> for SharedChan<T> {
197     fn send(&self, val: T) {
198         let &SharedChan { x: ref c } = self;
199         c.send(val)
200     }
201 }
202
203 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
204     fn try_send(&self, val: T) -> bool {
205         let &SharedChan { x: ref c } = self;
206         c.try_send(val)
207     }
208 }
209
210 impl<T: Send> SendDeferred<T> for SharedChan<T> {
211     fn send_deferred(&self, val: T) {
212         let &SharedChan { x: ref c } = self;
213         c.send_deferred(val)
214     }
215
216     fn try_send_deferred(&self, val: T) -> bool {
217         let &SharedChan { x: ref c } = self;
218         c.try_send_deferred(val)
219     }
220 }
221
222 impl<T: Send> Clone for SharedChan<T> {
223     fn clone(&self) -> SharedChan<T> {
224         let &SharedChan { x: ref c } = self;
225         SharedChan { x: c.clone() }
226     }
227 }
228
229 pub struct SharedPort<T> { priv x: rtcomm::SharedPort<T> }
230
231 impl<T: Send> SharedPort<T> {
232     pub fn new(p: Port<T>) -> SharedPort<T> {
233         let Port { x: p } = p;
234         SharedPort { x: rtcomm::SharedPort::new(p) }
235     }
236 }
237
238 impl<T: Send> GenericPort<T> for SharedPort<T> {
239     fn recv(&self) -> T {
240         let &SharedPort { x: ref p } = self;
241         p.recv()
242     }
243
244     fn try_recv(&self) -> Option<T> {
245         let &SharedPort { x: ref p } = self;
246         p.try_recv()
247     }
248 }
249
250 impl<T: Send> Clone for SharedPort<T> {
251     fn clone(&self) -> SharedPort<T> {
252         let &SharedPort { x: ref p } = self;
253         SharedPort { x: p.clone() }
254     }
255 }
256
257 #[cfg(test)]
258 mod tests {
259     use comm::*;
260     use prelude::*;
261
262     #[test]
263     fn test_nested_recv_iter() {
264         let (port, chan) = stream::<int>();
265         let (total_port, total_chan) = oneshot::<int>();
266
267         do spawn {
268             let mut acc = 0;
269             for x in port.recv_iter() {
270                 acc += x;
271                 for x in port.recv_iter() {
272                     acc += x;
273                     for x in port.try_recv().move_iter() {
274                         acc += x;
275                         total_chan.send(acc);
276                     }
277                 }
278             }
279         }
280
281         chan.send(3);
282         chan.send(1);
283         chan.send(2);
284         assert_eq!(total_port.recv(), 6);
285     }
286
287     #[test]
288     fn test_recv_iter_break() {
289         let (port, chan) = stream::<int>();
290         let (count_port, count_chan) = oneshot::<int>();
291
292         do spawn {
293             let mut count = 0;
294             for x in port.recv_iter() {
295                 if count >= 3 {
296                     count_chan.send(count);
297                     break;
298                 } else {
299                     count += x;
300                 }
301             }
302         }
303
304         chan.send(2);
305         chan.send(2);
306         chan.send(2);
307         chan.send(2);
308         assert_eq!(count_port.recv(), 4);
309     }
310 }