]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm.rs
4356f1143da4668f760d598aa1f1d34d50cb8120
[rust.git] / src / libstd / comm.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 Message passing
13 */
14
15 #[allow(missing_doc)];
16
17 use either::{Either, Left, Right};
18 use kinds::Send;
19 use option::{Option, Some};
20 use unstable::sync::Exclusive;
21 pub use rt::comm::SendDeferred;
22 use rtcomm = rt::comm;
23 use rt;
24
25 /// A trait for things that can send multiple messages.
26 pub trait GenericChan<T> {
27     /// Sends a message.
28     fn send(&self, x: T);
29 }
30
31 /// Things that can send multiple messages and can detect when the receiver
32 /// is closed
33 pub trait GenericSmartChan<T> {
34     /// Sends a message, or report if the receiver has closed the connection.
35     fn try_send(&self, x: T) -> bool;
36 }
37
38 /// A trait for things that can receive multiple messages.
39 pub trait GenericPort<T> {
40     /// Receives a message, or fails if the connection closes.
41     fn recv(&self) -> T;
42
43     /** Receives a message, or returns `none` if
44     the connection is closed or closes.
45     */
46     fn try_recv(&self) -> Option<T>;
47 }
48
49 /// Ports that can `peek`
50 pub trait Peekable<T> {
51     /// Returns true if a message is available
52     fn peek(&self) -> bool;
53 }
54
55 /// An endpoint that can send many messages.
56 pub struct Chan<T> {
57     inner: Either<pipesy::Chan<T>, rtcomm::Chan<T>>
58 }
59
60 /// An endpoint that can receive many messages.
61 pub struct Port<T> {
62     inner: Either<pipesy::Port<T>, rtcomm::Port<T>>
63 }
64
65 /** Creates a `(Port, Chan)` pair.
66
67 These allow sending or receiving an unlimited number of messages.
68
69 */
70 pub fn stream<T:Send>() -> (Port<T>, Chan<T>) {
71     let (port, chan) = match rt::context() {
72         rt::OldTaskContext => match pipesy::stream() {
73             (p, c) => (Left(p), Left(c))
74         },
75         _ => match rtcomm::stream() {
76             (p, c) => (Right(p), Right(c))
77         }
78     };
79     let port = Port { inner: port };
80     let chan = Chan { inner: chan };
81     return (port, chan);
82 }
83
84 impl<T: Send> GenericChan<T> for Chan<T> {
85     fn send(&self, x: T) {
86         match self.inner {
87             Left(ref chan) => chan.send(x),
88             Right(ref chan) => chan.send(x)
89         }
90     }
91 }
92
93 impl<T: Send> GenericSmartChan<T> for Chan<T> {
94     fn try_send(&self, x: T) -> bool {
95         match self.inner {
96             Left(ref chan) => chan.try_send(x),
97             Right(ref chan) => chan.try_send(x)
98         }
99     }
100 }
101
102 impl<T: Send> SendDeferred<T> for Chan<T> {
103     fn send_deferred(&self, x: T) {
104         match self.inner {
105             Left(ref chan) => chan.send(x),
106             Right(ref chan) => chan.send_deferred(x)
107         }
108     }
109     fn try_send_deferred(&self, x: T) -> bool {
110         match self.inner {
111             Left(ref chan) => chan.try_send(x),
112             Right(ref chan) => chan.try_send_deferred(x)
113         }
114     }
115 }
116
117 impl<T: Send> GenericPort<T> for Port<T> {
118     fn recv(&self) -> T {
119         match self.inner {
120             Left(ref port) => port.recv(),
121             Right(ref port) => port.recv()
122         }
123     }
124
125     fn try_recv(&self) -> Option<T> {
126         match self.inner {
127             Left(ref port) => port.try_recv(),
128             Right(ref port) => port.try_recv()
129         }
130     }
131 }
132
133 impl<T: Send> Peekable<T> for Port<T> {
134     fn peek(&self) -> bool {
135         match self.inner {
136             Left(ref port) => port.peek(),
137             Right(ref port) => port.peek()
138         }
139     }
140 }
141
142 /// A channel that can be shared between many senders.
143 pub struct SharedChan<T> {
144     inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
145 }
146
147 impl<T: Send> SharedChan<T> {
148     /// Converts a `chan` into a `shared_chan`.
149     pub fn new(c: Chan<T>) -> SharedChan<T> {
150         let Chan { inner } = c;
151         let c = match inner {
152             Left(c) => Left(Exclusive::new(c)),
153             Right(c) => Right(rtcomm::SharedChan::new(c))
154         };
155         SharedChan { inner: c }
156     }
157 }
158
159 impl<T: Send> GenericChan<T> for SharedChan<T> {
160     fn send(&self, x: T) {
161         match self.inner {
162             Left(ref chan) => {
163                 unsafe {
164                     let mut xx = Some(x);
165                     do chan.with_imm |chan| {
166                         chan.send(xx.take_unwrap())
167                     }
168                 }
169             }
170             Right(ref chan) => chan.send(x)
171         }
172     }
173 }
174
175 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
176     fn try_send(&self, x: T) -> bool {
177         match self.inner {
178             Left(ref chan) => {
179                 unsafe {
180                     let mut xx = Some(x);
181                     do chan.with_imm |chan| {
182                         chan.try_send(xx.take_unwrap())
183                     }
184                 }
185             }
186             Right(ref chan) => chan.try_send(x)
187         }
188     }
189 }
190
191 impl<T: Send> ::clone::Clone for SharedChan<T> {
192     fn clone(&self) -> SharedChan<T> {
193         SharedChan { inner: self.inner.clone() }
194     }
195 }
196
197 pub struct PortOne<T> {
198     inner: Either<pipesy::PortOne<T>, rtcomm::PortOne<T>>
199 }
200
201 pub struct ChanOne<T> {
202     inner: Either<pipesy::ChanOne<T>, rtcomm::ChanOne<T>>
203 }
204
205 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
206     let (port, chan) = match rt::context() {
207         rt::OldTaskContext => match pipesy::oneshot() {
208             (p, c) => (Left(p), Left(c)),
209         },
210         _ => match rtcomm::oneshot() {
211             (p, c) => (Right(p), Right(c))
212         }
213     };
214     let port = PortOne { inner: port };
215     let chan = ChanOne { inner: chan };
216     return (port, chan);
217 }
218
219 impl<T: Send> PortOne<T> {
220     pub fn recv(self) -> T {
221         let PortOne { inner } = self;
222         match inner {
223             Left(p) => p.recv(),
224             Right(p) => p.recv()
225         }
226     }
227
228     pub fn try_recv(self) -> Option<T> {
229         let PortOne { inner } = self;
230         match inner {
231             Left(p) => p.try_recv(),
232             Right(p) => p.try_recv()
233         }
234     }
235 }
236
237 impl<T: Send> ChanOne<T> {
238     pub fn send(self, data: T) {
239         let ChanOne { inner } = self;
240         match inner {
241             Left(p) => p.send(data),
242             Right(p) => p.send(data)
243         }
244     }
245
246     pub fn try_send(self, data: T) -> bool {
247         let ChanOne { inner } = self;
248         match inner {
249             Left(p) => p.try_send(data),
250             Right(p) => p.try_send(data)
251         }
252     }
253     pub fn send_deferred(self, data: T) {
254         let ChanOne { inner } = self;
255         match inner {
256             Left(p) => p.send(data),
257             Right(p) => p.send_deferred(data)
258         }
259     }
260     pub fn try_send_deferred(self, data: T) -> bool {
261         let ChanOne { inner } = self;
262         match inner {
263             Left(p) => p.try_send(data),
264             Right(p) => p.try_send_deferred(data)
265         }
266     }
267 }
268
269 pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
270     let PortOne { inner } = port;
271     match inner {
272         Left(p) => pipesy::recv_one(p),
273         Right(p) => p.recv()
274     }
275 }
276
277 pub fn try_recv_one<T: Send>(port: PortOne<T>) -> Option<T> {
278     let PortOne { inner } = port;
279     match inner {
280         Left(p) => pipesy::try_recv_one(p),
281         Right(p) => p.try_recv()
282     }
283 }
284
285 pub fn send_one<T: Send>(chan: ChanOne<T>, data: T) {
286     let ChanOne { inner } = chan;
287     match inner {
288         Left(c) => pipesy::send_one(c, data),
289         Right(c) => c.send(data)
290     }
291 }
292
293 pub fn try_send_one<T: Send>(chan: ChanOne<T>, data: T) -> bool {
294     let ChanOne { inner } = chan;
295     match inner {
296         Left(c) => pipesy::try_send_one(c, data),
297         Right(c) => c.try_send(data)
298     }
299 }
300
301 mod pipesy {
302
303     use kinds::Send;
304     use option::{Option, Some, None};
305     use pipes::{recv, try_recv, peek};
306     use super::{GenericChan, GenericSmartChan, GenericPort, Peekable};
307     use cast::transmute_mut;
308
309     /*proto! oneshot (
310         Oneshot:send<T:Send> {
311             send(T) -> !
312         }
313     )*/
314
315     #[allow(non_camel_case_types)]
316     pub mod oneshot {
317         priv use std::kinds::Send;
318         use ptr::to_mut_unsafe_ptr;
319
320         pub fn init<T: Send>() -> (server::Oneshot<T>, client::Oneshot<T>) {
321             pub use std::pipes::HasBuffer;
322
323             let buffer = ~::std::pipes::Buffer {
324                 header: ::std::pipes::BufferHeader(),
325                 data: __Buffer {
326                     Oneshot: ::std::pipes::mk_packet::<Oneshot<T>>()
327                 },
328             };
329             do ::std::pipes::entangle_buffer(buffer) |buffer, data| {
330                 data.Oneshot.set_buffer(buffer);
331                 to_mut_unsafe_ptr(&mut data.Oneshot)
332             }
333         }
334         #[allow(non_camel_case_types)]
335         pub enum Oneshot<T> { pub send(T), }
336         #[allow(non_camel_case_types)]
337         pub struct __Buffer<T> {
338             Oneshot: ::std::pipes::Packet<Oneshot<T>>,
339         }
340
341         #[allow(non_camel_case_types)]
342         pub mod client {
343
344             priv use std::kinds::Send;
345
346             #[allow(non_camel_case_types)]
347             pub fn try_send<T: Send>(pipe: Oneshot<T>, x_0: T) ->
348                 ::std::option::Option<()> {
349                 {
350                     use super::send;
351                     let message = send(x_0);
352                     if ::std::pipes::send(pipe, message) {
353                         ::std::pipes::rt::make_some(())
354                     } else { ::std::pipes::rt::make_none() }
355                 }
356             }
357
358             #[allow(non_camel_case_types)]
359             pub fn send<T: Send>(pipe: Oneshot<T>, x_0: T) {
360                 {
361                     use super::send;
362                     let message = send(x_0);
363                     ::std::pipes::send(pipe, message);
364                 }
365             }
366
367             #[allow(non_camel_case_types)]
368             pub type Oneshot<T> =
369                 ::std::pipes::SendPacketBuffered<super::Oneshot<T>,
370             super::__Buffer<T>>;
371         }
372
373         #[allow(non_camel_case_types)]
374         pub mod server {
375             #[allow(non_camel_case_types)]
376             pub type Oneshot<T> =
377                 ::std::pipes::RecvPacketBuffered<super::Oneshot<T>,
378             super::__Buffer<T>>;
379         }
380     }
381
382     /// The send end of a oneshot pipe.
383     pub struct ChanOne<T> {
384         contents: oneshot::client::Oneshot<T>
385     }
386
387     impl<T> ChanOne<T> {
388         pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> {
389             ChanOne {
390                 contents: contents
391             }
392         }
393     }
394
395     /// The receive end of a oneshot pipe.
396     pub struct PortOne<T> {
397         contents: oneshot::server::Oneshot<T>
398     }
399
400     impl<T> PortOne<T> {
401         pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> {
402             PortOne {
403                 contents: contents
404             }
405         }
406     }
407
408     /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
409     pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
410         let (port, chan) = oneshot::init();
411         (PortOne::new(port), ChanOne::new(chan))
412     }
413
414     impl<T: Send> PortOne<T> {
415         pub fn recv(self) -> T { recv_one(self) }
416         pub fn try_recv(self) -> Option<T> { try_recv_one(self) }
417         pub fn unwrap(self) -> oneshot::server::Oneshot<T> {
418             match self {
419                 PortOne { contents: s } => s
420             }
421         }
422     }
423
424     impl<T: Send> ChanOne<T> {
425         pub fn send(self, data: T) { send_one(self, data) }
426         pub fn try_send(self, data: T) -> bool { try_send_one(self, data) }
427         pub fn unwrap(self) -> oneshot::client::Oneshot<T> {
428             match self {
429                 ChanOne { contents: s } => s
430             }
431         }
432     }
433
434     /**
435     * Receive a message from a oneshot pipe, failing if the connection was
436     * closed.
437     */
438     pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
439         match port {
440             PortOne { contents: port } => {
441                 let oneshot::send(message) = recv(port);
442                 message
443             }
444         }
445     }
446
447     /// Receive a message from a oneshot pipe unless the connection was closed.
448     pub fn try_recv_one<T: Send> (port: PortOne<T>) -> Option<T> {
449         match port {
450             PortOne { contents: port } => {
451                 let message = try_recv(port);
452
453                 if message.is_none() {
454                     None
455                 } else {
456                     let oneshot::send(message) = message.unwrap();
457                     Some(message)
458                 }
459             }
460         }
461     }
462
463     /// Send a message on a oneshot pipe, failing if the connection was closed.
464     pub fn send_one<T: Send>(chan: ChanOne<T>, data: T) {
465         match chan {
466             ChanOne { contents: chan } => oneshot::client::send(chan, data),
467         }
468     }
469
470     /**
471     * Send a message on a oneshot pipe, or return false if the connection was
472     * closed.
473     */
474     pub fn try_send_one<T: Send>(chan: ChanOne<T>, data: T) -> bool {
475         match chan {
476             ChanOne { contents: chan } => {
477                 oneshot::client::try_send(chan, data).is_some()
478             }
479         }
480     }
481
482     // Streams - Make pipes a little easier in general.
483
484     /*proto! streamp (
485         Open:send<T: Send> {
486             data(T) -> Open<T>
487         }
488     )*/
489
490     #[allow(non_camel_case_types)]
491     pub mod streamp {
492         priv use std::kinds::Send;
493
494         pub fn init<T: Send>() -> (server::Open<T>, client::Open<T>) {
495             pub use std::pipes::HasBuffer;
496             ::std::pipes::entangle()
497         }
498
499         #[allow(non_camel_case_types)]
500         pub enum Open<T> { pub data(T, server::Open<T>), }
501
502         #[allow(non_camel_case_types)]
503         pub mod client {
504             priv use std::kinds::Send;
505
506             #[allow(non_camel_case_types)]
507             pub fn try_data<T: Send>(pipe: Open<T>, x_0: T) ->
508                 ::std::option::Option<Open<T>> {
509                 {
510                     use super::data;
511                     let (s, c) = ::std::pipes::entangle();
512                     let message = data(x_0, s);
513                     if ::std::pipes::send(pipe, message) {
514                         ::std::pipes::rt::make_some(c)
515                     } else { ::std::pipes::rt::make_none() }
516                 }
517             }
518
519             #[allow(non_camel_case_types)]
520             pub fn data<T: Send>(pipe: Open<T>, x_0: T) -> Open<T> {
521                 {
522                     use super::data;
523                     let (s, c) = ::std::pipes::entangle();
524                     let message = data(x_0, s);
525                     ::std::pipes::send(pipe, message);
526                     c
527                 }
528             }
529
530             #[allow(non_camel_case_types)]
531             pub type Open<T> = ::std::pipes::SendPacket<super::Open<T>>;
532         }
533
534         #[allow(non_camel_case_types)]
535         pub mod server {
536             #[allow(non_camel_case_types)]
537             pub type Open<T> = ::std::pipes::RecvPacket<super::Open<T>>;
538         }
539     }
540
541     /// An endpoint that can send many messages.
542     #[unsafe_mut_field(endp)]
543     pub struct Chan<T> {
544         endp: Option<streamp::client::Open<T>>
545     }
546
547     /// An endpoint that can receive many messages.
548     #[unsafe_mut_field(endp)]
549     pub struct Port<T> {
550         endp: Option<streamp::server::Open<T>>,
551     }
552
553     /** Creates a `(Port, Chan)` pair.
554
555     These allow sending or receiving an unlimited number of messages.
556
557     */
558     pub fn stream<T:Send>() -> (Port<T>, Chan<T>) {
559         let (s, c) = streamp::init();
560
561         (Port {
562             endp: Some(s)
563         }, Chan {
564             endp: Some(c)
565         })
566     }
567
568     impl<T: Send> GenericChan<T> for Chan<T> {
569         #[inline]
570         fn send(&self, x: T) {
571             unsafe {
572                 let self_endp = transmute_mut(&self.endp);
573                 *self_endp = Some(streamp::client::data(self_endp.take_unwrap(), x))
574             }
575         }
576     }
577
578     impl<T: Send> GenericSmartChan<T> for Chan<T> {
579         #[inline]
580         fn try_send(&self, x: T) -> bool {
581             unsafe {
582                 let self_endp = transmute_mut(&self.endp);
583                 match streamp::client::try_data(self_endp.take_unwrap(), x) {
584                     Some(next) => {
585                         *self_endp = Some(next);
586                         true
587                     }
588                     None => false
589                 }
590             }
591         }
592     }
593
594     impl<T: Send> GenericPort<T> for Port<T> {
595         #[inline]
596         fn recv(&self) -> T {
597             unsafe {
598                 let self_endp = transmute_mut(&self.endp);
599                 let endp = self_endp.take();
600                 let streamp::data(x, endp) = recv(endp.unwrap());
601                 *self_endp = Some(endp);
602                 x
603             }
604         }
605
606         #[inline]
607         fn try_recv(&self) -> Option<T> {
608             unsafe {
609                 let self_endp = transmute_mut(&self.endp);
610                 let endp = self_endp.take();
611                 match try_recv(endp.unwrap()) {
612                     Some(streamp::data(x, endp)) => {
613                         *self_endp = Some(endp);
614                         Some(x)
615                     }
616                     None => None
617                 }
618             }
619         }
620     }
621
622     impl<T: Send> Peekable<T> for Port<T> {
623         #[inline]
624         fn peek(&self) -> bool {
625             unsafe {
626                 let self_endp = transmute_mut(&self.endp);
627                 let mut endp = self_endp.take();
628                 let peek = match endp {
629                     Some(ref mut endp) => peek(endp),
630                     None => fail!("peeking empty stream")
631                 };
632                 *self_endp = endp;
633                 peek
634             }
635         }
636     }
637
638 }
639
640 #[cfg(test)]
641 mod test {
642     use either::Right;
643     use super::{Chan, Port, oneshot, stream};
644
645     #[test]
646     fn test_oneshot() {
647         let (p, c) = oneshot();
648
649         c.send(());
650
651         p.recv()
652     }
653
654     #[test]
655     fn test_peek_terminated() {
656         let (port, chan): (Port<int>, Chan<int>) = stream();
657
658         {
659             // Destroy the channel
660             let _chan = chan;
661         }
662
663         assert!(!port.peek());
664     }
665 }