1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 #[allow(missing_doc)];
21 use rtcomm = rt::comm;
23 /// A trait for things that can send multiple messages.
24 pub trait GenericChan<T> {
29 /// Things that can send multiple messages and can detect when the receiver
31 pub trait GenericSmartChan<T> {
32 /// Sends a message, or report if the receiver has closed the connection.
33 fn try_send(&self, x: T) -> bool;
36 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
37 pub trait SendDeferred<T> {
38 fn send_deferred(&self, val: T);
39 fn try_send_deferred(&self, val: T) -> bool;
42 /// A trait for things that can receive multiple messages.
43 pub trait GenericPort<T> {
44 /// Receives a message, or fails if the connection closes.
47 /// Receives a message, or returns `none` if
48 /// the connection is closed or closes.
49 fn try_recv(&self) -> Option<T>;
51 /// Returns an iterator that breaks once the connection closes.
57 /// for x in port.recv_iter() {
58 /// if pred(x) { break; }
59 /// println!("{}", x);
63 fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> {
64 RecvIterator { port: self }
68 pub struct RecvIterator<'a, P> {
72 impl<'a, T, P: GenericPort<T>> Iterator<T> for RecvIterator<'a, P> {
73 fn next(&mut self) -> Option<T> {
78 /// Ports that can `peek`
79 pub trait Peekable<T> {
80 /// Returns true if a message is available
81 fn peek(&self) -> bool;
84 pub struct PortOne<T> { priv x: rtcomm::PortOne<T> }
85 pub struct ChanOne<T> { priv x: rtcomm::ChanOne<T> }
87 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
88 let (p, c) = rtcomm::oneshot();
89 (PortOne { x: p }, ChanOne { x: c })
92 pub struct Port<T> { priv x: rtcomm::Port<T> }
93 pub struct Chan<T> { priv x: rtcomm::Chan<T> }
95 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
96 let (p, c) = rtcomm::stream();
97 (Port { x: p }, Chan { x: c })
100 impl<T: Send> ChanOne<T> {
101 pub fn send(self, val: T) {
102 let ChanOne { x: c } = self;
106 pub fn try_send(self, val: T) -> bool {
107 let ChanOne { x: c } = self;
111 pub fn send_deferred(self, val: T) {
112 let ChanOne { x: c } = self;
116 pub fn try_send_deferred(self, val: T) -> bool {
117 let ChanOne{ x: c } = self;
118 c.try_send_deferred(val)
122 impl<T: Send> PortOne<T> {
123 pub fn recv(self) -> T {
124 let PortOne { x: p } = self;
128 pub fn try_recv(self) -> Option<T> {
129 let PortOne { x: p } = self;
134 impl<T: Send> Peekable<T> for PortOne<T> {
135 fn peek(&self) -> bool {
136 let &PortOne { x: ref p } = self;
141 impl<T: Send> GenericChan<T> for Chan<T> {
142 fn send(&self, val: T) {
143 let &Chan { x: ref c } = self;
148 impl<T: Send> GenericSmartChan<T> for Chan<T> {
149 fn try_send(&self, val: T) -> bool {
150 let &Chan { x: ref c } = self;
155 impl<T: Send> SendDeferred<T> for Chan<T> {
156 fn send_deferred(&self, val: T) {
157 let &Chan { x: ref c } = self;
161 fn try_send_deferred(&self, val: T) -> bool {
162 let &Chan { x: ref c } = self;
163 c.try_send_deferred(val)
167 impl<T: Send> GenericPort<T> for Port<T> {
168 fn recv(&self) -> T {
169 let &Port { x: ref p } = self;
173 fn try_recv(&self) -> Option<T> {
174 let &Port { x: ref p } = self;
179 impl<T: Send> Peekable<T> for Port<T> {
180 fn peek(&self) -> bool {
181 let &Port { x: ref p } = self;
187 pub struct SharedChan<T> { priv x: rtcomm::SharedChan<T> }
189 impl<T: Send> SharedChan<T> {
190 pub fn new(c: Chan<T>) -> SharedChan<T> {
191 let Chan { x: c } = c;
192 SharedChan { x: rtcomm::SharedChan::new(c) }
196 impl<T: Send> GenericChan<T> for SharedChan<T> {
197 fn send(&self, val: T) {
198 let &SharedChan { x: ref c } = self;
203 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
204 fn try_send(&self, val: T) -> bool {
205 let &SharedChan { x: ref c } = self;
210 impl<T: Send> SendDeferred<T> for SharedChan<T> {
211 fn send_deferred(&self, val: T) {
212 let &SharedChan { x: ref c } = self;
216 fn try_send_deferred(&self, val: T) -> bool {
217 let &SharedChan { x: ref c } = self;
218 c.try_send_deferred(val)
222 impl<T: Send> Clone for SharedChan<T> {
223 fn clone(&self) -> SharedChan<T> {
224 let &SharedChan { x: ref c } = self;
225 SharedChan { x: c.clone() }
229 pub struct SharedPort<T> { priv x: rtcomm::SharedPort<T> }
231 impl<T: Send> SharedPort<T> {
232 pub fn new(p: Port<T>) -> SharedPort<T> {
233 let Port { x: p } = p;
234 SharedPort { x: rtcomm::SharedPort::new(p) }
238 impl<T: Send> GenericPort<T> for SharedPort<T> {
239 fn recv(&self) -> T {
240 let &SharedPort { x: ref p } = self;
244 fn try_recv(&self) -> Option<T> {
245 let &SharedPort { x: ref p } = self;
250 impl<T: Send> Clone for SharedPort<T> {
251 fn clone(&self) -> SharedPort<T> {
252 let &SharedPort { x: ref p } = self;
253 SharedPort { x: p.clone() }
263 fn test_nested_recv_iter() {
264 let (port, chan) = stream::<int>();
265 let (total_port, total_chan) = oneshot::<int>();
269 for x in port.recv_iter() {
271 for x in port.recv_iter() {
273 for x in port.try_recv().move_iter() {
275 total_chan.send(acc);
284 assert_eq!(total_port.recv(), 6);
288 fn test_recv_iter_break() {
289 let (port, chan) = stream::<int>();
290 let (count_port, count_chan) = oneshot::<int>();
294 for x in port.recv_iter() {
296 count_chan.send(count);
308 assert_eq!(count_port.recv(), 4);