1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 #[allow(missing_doc)];
17 use either::{Either, Left, Right};
19 use option::{Option, Some};
20 use unstable::sync::Exclusive;
21 pub use rt::comm::SendDeferred;
22 use rtcomm = rt::comm;
25 /// A trait for things that can send multiple messages.
26 pub trait GenericChan<T> {
31 /// Things that can send multiple messages and can detect when the receiver
33 pub trait GenericSmartChan<T> {
34 /// Sends a message, or report if the receiver has closed the connection.
35 fn try_send(&self, x: T) -> bool;
38 /// A trait for things that can receive multiple messages.
39 pub trait GenericPort<T> {
40 /// Receives a message, or fails if the connection closes.
43 /** Receives a message, or returns `none` if
44 the connection is closed or closes.
46 fn try_recv(&self) -> Option<T>;
49 /// Ports that can `peek`
50 pub trait Peekable<T> {
51 /// Returns true if a message is available
52 fn peek(&self) -> bool;
55 /// An endpoint that can send many messages.
57 inner: Either<pipesy::Chan<T>, rtcomm::Chan<T>>
60 /// An endpoint that can receive many messages.
62 inner: Either<pipesy::Port<T>, rtcomm::Port<T>>
65 /** Creates a `(Port, Chan)` pair.
67 These allow sending or receiving an unlimited number of messages.
70 pub fn stream<T:Send>() -> (Port<T>, Chan<T>) {
71 let (port, chan) = match rt::context() {
72 rt::OldTaskContext => match pipesy::stream() {
73 (p, c) => (Left(p), Left(c))
75 _ => match rtcomm::stream() {
76 (p, c) => (Right(p), Right(c))
79 let port = Port { inner: port };
80 let chan = Chan { inner: chan };
84 impl<T: Send> GenericChan<T> for Chan<T> {
85 fn send(&self, x: T) {
87 Left(ref chan) => chan.send(x),
88 Right(ref chan) => chan.send(x)
93 impl<T: Send> GenericSmartChan<T> for Chan<T> {
94 fn try_send(&self, x: T) -> bool {
96 Left(ref chan) => chan.try_send(x),
97 Right(ref chan) => chan.try_send(x)
102 impl<T: Send> SendDeferred<T> for Chan<T> {
103 fn send_deferred(&self, x: T) {
105 Left(ref chan) => chan.send(x),
106 Right(ref chan) => chan.send_deferred(x)
109 fn try_send_deferred(&self, x: T) -> bool {
111 Left(ref chan) => chan.try_send(x),
112 Right(ref chan) => chan.try_send_deferred(x)
117 impl<T: Send> GenericPort<T> for Port<T> {
118 fn recv(&self) -> T {
120 Left(ref port) => port.recv(),
121 Right(ref port) => port.recv()
125 fn try_recv(&self) -> Option<T> {
127 Left(ref port) => port.try_recv(),
128 Right(ref port) => port.try_recv()
133 impl<T: Send> Peekable<T> for Port<T> {
134 fn peek(&self) -> bool {
136 Left(ref port) => port.peek(),
137 Right(ref port) => port.peek()
142 /// A channel that can be shared between many senders.
143 pub struct SharedChan<T> {
144 inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
147 impl<T: Send> SharedChan<T> {
148 /// Converts a `chan` into a `shared_chan`.
149 pub fn new(c: Chan<T>) -> SharedChan<T> {
150 let Chan { inner } = c;
151 let c = match inner {
152 Left(c) => Left(Exclusive::new(c)),
153 Right(c) => Right(rtcomm::SharedChan::new(c))
155 SharedChan { inner: c }
159 impl<T: Send> GenericChan<T> for SharedChan<T> {
160 fn send(&self, x: T) {
164 let mut xx = Some(x);
165 do chan.with_imm |chan| {
166 chan.send(xx.take_unwrap())
170 Right(ref chan) => chan.send(x)
175 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
176 fn try_send(&self, x: T) -> bool {
180 let mut xx = Some(x);
181 do chan.with_imm |chan| {
182 chan.try_send(xx.take_unwrap())
186 Right(ref chan) => chan.try_send(x)
191 impl<T: Send> ::clone::Clone for SharedChan<T> {
192 fn clone(&self) -> SharedChan<T> {
193 SharedChan { inner: self.inner.clone() }
197 pub struct PortOne<T> {
198 inner: Either<pipesy::PortOne<T>, rtcomm::PortOne<T>>
201 pub struct ChanOne<T> {
202 inner: Either<pipesy::ChanOne<T>, rtcomm::ChanOne<T>>
205 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
206 let (port, chan) = match rt::context() {
207 rt::OldTaskContext => match pipesy::oneshot() {
208 (p, c) => (Left(p), Left(c)),
210 _ => match rtcomm::oneshot() {
211 (p, c) => (Right(p), Right(c))
214 let port = PortOne { inner: port };
215 let chan = ChanOne { inner: chan };
219 impl<T: Send> PortOne<T> {
220 pub fn recv(self) -> T {
221 let PortOne { inner } = self;
228 pub fn try_recv(self) -> Option<T> {
229 let PortOne { inner } = self;
231 Left(p) => p.try_recv(),
232 Right(p) => p.try_recv()
237 impl<T: Send> ChanOne<T> {
238 pub fn send(self, data: T) {
239 let ChanOne { inner } = self;
241 Left(p) => p.send(data),
242 Right(p) => p.send(data)
246 pub fn try_send(self, data: T) -> bool {
247 let ChanOne { inner } = self;
249 Left(p) => p.try_send(data),
250 Right(p) => p.try_send(data)
253 pub fn send_deferred(self, data: T) {
254 let ChanOne { inner } = self;
256 Left(p) => p.send(data),
257 Right(p) => p.send_deferred(data)
260 pub fn try_send_deferred(self, data: T) -> bool {
261 let ChanOne { inner } = self;
263 Left(p) => p.try_send(data),
264 Right(p) => p.try_send_deferred(data)
269 pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
270 let PortOne { inner } = port;
272 Left(p) => pipesy::recv_one(p),
277 pub fn try_recv_one<T: Send>(port: PortOne<T>) -> Option<T> {
278 let PortOne { inner } = port;
280 Left(p) => pipesy::try_recv_one(p),
281 Right(p) => p.try_recv()
285 pub fn send_one<T: Send>(chan: ChanOne<T>, data: T) {
286 let ChanOne { inner } = chan;
288 Left(c) => pipesy::send_one(c, data),
289 Right(c) => c.send(data)
293 pub fn try_send_one<T: Send>(chan: ChanOne<T>, data: T) -> bool {
294 let ChanOne { inner } = chan;
296 Left(c) => pipesy::try_send_one(c, data),
297 Right(c) => c.try_send(data)
304 use option::{Option, Some, None};
305 use pipes::{recv, try_recv, peek};
306 use super::{GenericChan, GenericSmartChan, GenericPort, Peekable};
307 use cast::transmute_mut;
310 Oneshot:send<T:Send> {
315 #[allow(non_camel_case_types)]
317 priv use std::kinds::Send;
318 use ptr::to_mut_unsafe_ptr;
320 pub fn init<T: Send>() -> (server::Oneshot<T>, client::Oneshot<T>) {
321 pub use std::pipes::HasBuffer;
323 let buffer = ~::std::pipes::Buffer {
324 header: ::std::pipes::BufferHeader(),
326 Oneshot: ::std::pipes::mk_packet::<Oneshot<T>>()
329 do ::std::pipes::entangle_buffer(buffer) |buffer, data| {
330 data.Oneshot.set_buffer(buffer);
331 to_mut_unsafe_ptr(&mut data.Oneshot)
334 #[allow(non_camel_case_types)]
335 pub enum Oneshot<T> { pub send(T), }
336 #[allow(non_camel_case_types)]
337 pub struct __Buffer<T> {
338 Oneshot: ::std::pipes::Packet<Oneshot<T>>,
341 #[allow(non_camel_case_types)]
344 priv use std::kinds::Send;
346 #[allow(non_camel_case_types)]
347 pub fn try_send<T: Send>(pipe: Oneshot<T>, x_0: T) ->
348 ::std::option::Option<()> {
351 let message = send(x_0);
352 if ::std::pipes::send(pipe, message) {
353 ::std::pipes::rt::make_some(())
354 } else { ::std::pipes::rt::make_none() }
358 #[allow(non_camel_case_types)]
359 pub fn send<T: Send>(pipe: Oneshot<T>, x_0: T) {
362 let message = send(x_0);
363 ::std::pipes::send(pipe, message);
367 #[allow(non_camel_case_types)]
368 pub type Oneshot<T> =
369 ::std::pipes::SendPacketBuffered<super::Oneshot<T>,
373 #[allow(non_camel_case_types)]
375 #[allow(non_camel_case_types)]
376 pub type Oneshot<T> =
377 ::std::pipes::RecvPacketBuffered<super::Oneshot<T>,
382 /// The send end of a oneshot pipe.
383 pub struct ChanOne<T> {
384 contents: oneshot::client::Oneshot<T>
388 pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> {
395 /// The receive end of a oneshot pipe.
396 pub struct PortOne<T> {
397 contents: oneshot::server::Oneshot<T>
401 pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> {
408 /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
409 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
410 let (port, chan) = oneshot::init();
411 (PortOne::new(port), ChanOne::new(chan))
414 impl<T: Send> PortOne<T> {
415 pub fn recv(self) -> T { recv_one(self) }
416 pub fn try_recv(self) -> Option<T> { try_recv_one(self) }
417 pub fn unwrap(self) -> oneshot::server::Oneshot<T> {
419 PortOne { contents: s } => s
424 impl<T: Send> ChanOne<T> {
425 pub fn send(self, data: T) { send_one(self, data) }
426 pub fn try_send(self, data: T) -> bool { try_send_one(self, data) }
427 pub fn unwrap(self) -> oneshot::client::Oneshot<T> {
429 ChanOne { contents: s } => s
435 * Receive a message from a oneshot pipe, failing if the connection was
438 pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
440 PortOne { contents: port } => {
441 let oneshot::send(message) = recv(port);
447 /// Receive a message from a oneshot pipe unless the connection was closed.
448 pub fn try_recv_one<T: Send> (port: PortOne<T>) -> Option<T> {
450 PortOne { contents: port } => {
451 let message = try_recv(port);
453 if message.is_none() {
456 let oneshot::send(message) = message.unwrap();
463 /// Send a message on a oneshot pipe, failing if the connection was closed.
464 pub fn send_one<T: Send>(chan: ChanOne<T>, data: T) {
466 ChanOne { contents: chan } => oneshot::client::send(chan, data),
471 * Send a message on a oneshot pipe, or return false if the connection was
474 pub fn try_send_one<T: Send>(chan: ChanOne<T>, data: T) -> bool {
476 ChanOne { contents: chan } => {
477 oneshot::client::try_send(chan, data).is_some()
482 // Streams - Make pipes a little easier in general.
490 #[allow(non_camel_case_types)]
492 priv use std::kinds::Send;
494 pub fn init<T: Send>() -> (server::Open<T>, client::Open<T>) {
495 pub use std::pipes::HasBuffer;
496 ::std::pipes::entangle()
499 #[allow(non_camel_case_types)]
500 pub enum Open<T> { pub data(T, server::Open<T>), }
502 #[allow(non_camel_case_types)]
504 priv use std::kinds::Send;
506 #[allow(non_camel_case_types)]
507 pub fn try_data<T: Send>(pipe: Open<T>, x_0: T) ->
508 ::std::option::Option<Open<T>> {
511 let (s, c) = ::std::pipes::entangle();
512 let message = data(x_0, s);
513 if ::std::pipes::send(pipe, message) {
514 ::std::pipes::rt::make_some(c)
515 } else { ::std::pipes::rt::make_none() }
519 #[allow(non_camel_case_types)]
520 pub fn data<T: Send>(pipe: Open<T>, x_0: T) -> Open<T> {
523 let (s, c) = ::std::pipes::entangle();
524 let message = data(x_0, s);
525 ::std::pipes::send(pipe, message);
530 #[allow(non_camel_case_types)]
531 pub type Open<T> = ::std::pipes::SendPacket<super::Open<T>>;
534 #[allow(non_camel_case_types)]
536 #[allow(non_camel_case_types)]
537 pub type Open<T> = ::std::pipes::RecvPacket<super::Open<T>>;
541 /// An endpoint that can send many messages.
542 #[unsafe_mut_field(endp)]
544 endp: Option<streamp::client::Open<T>>
547 /// An endpoint that can receive many messages.
548 #[unsafe_mut_field(endp)]
550 endp: Option<streamp::server::Open<T>>,
553 /** Creates a `(Port, Chan)` pair.
555 These allow sending or receiving an unlimited number of messages.
558 pub fn stream<T:Send>() -> (Port<T>, Chan<T>) {
559 let (s, c) = streamp::init();
568 impl<T: Send> GenericChan<T> for Chan<T> {
570 fn send(&self, x: T) {
572 let self_endp = transmute_mut(&self.endp);
573 *self_endp = Some(streamp::client::data(self_endp.take_unwrap(), x))
578 impl<T: Send> GenericSmartChan<T> for Chan<T> {
580 fn try_send(&self, x: T) -> bool {
582 let self_endp = transmute_mut(&self.endp);
583 match streamp::client::try_data(self_endp.take_unwrap(), x) {
585 *self_endp = Some(next);
594 impl<T: Send> GenericPort<T> for Port<T> {
596 fn recv(&self) -> T {
598 let self_endp = transmute_mut(&self.endp);
599 let endp = self_endp.take();
600 let streamp::data(x, endp) = recv(endp.unwrap());
601 *self_endp = Some(endp);
607 fn try_recv(&self) -> Option<T> {
609 let self_endp = transmute_mut(&self.endp);
610 let endp = self_endp.take();
611 match try_recv(endp.unwrap()) {
612 Some(streamp::data(x, endp)) => {
613 *self_endp = Some(endp);
622 impl<T: Send> Peekable<T> for Port<T> {
624 fn peek(&self) -> bool {
626 let self_endp = transmute_mut(&self.endp);
627 let mut endp = self_endp.take();
628 let peek = match endp {
629 Some(ref mut endp) => peek(endp),
630 None => fail!("peeking empty stream")
643 use super::{Chan, Port, oneshot, stream};
647 let (p, c) = oneshot();
655 fn test_peek_terminated() {
656 let (port, chan): (Port<int>, Chan<int>) = stream();
659 // Destroy the channel
663 assert!(!port.peek());