1 /*! Runtime support for message passing with protocol enforcement.
4 Pipes consist of two endpoints. One endpoint can send messages and
5 the other can receive messages. The set of legal messages and which
6 directions they can flow at any given point are determined by a
7 protocol. Below is an example protocol.
20 The `proto!` syntax extension will convert this into a module called
21 `pingpong`, which includes a set of types and functions that can be
22 used to write programs that follow the pingpong protocol.
26 /* IMPLEMENTATION NOTES
28 The initial design for this feature is available at:
30 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
32 Much of the design in that document is still accurate. There are
33 several components for the pipe implementation. First of all is the
34 syntax extension. To see how that works, it is best see comments in
35 libsyntax/ext/pipes.rs.
37 This module includes two related pieces of the runtime
38 implementation: support for unbounded and bounded
39 protocols. The main difference between the two is the type of the
40 buffer that is carried along in the endpoint data structures.
43 The heart of the implementation is the packet type. It contains a
44 header and a payload field. Much of the code in this module deals with
45 the header field. This is where the synchronization information is
46 stored. In the case of a bounded protocol, the header also includes a
47 pointer to the buffer the packet is contained in.
49 Packets represent a single message in a protocol. The payload field
50 gets instatiated at the type of the message, which is usually an enum
51 generated by the pipe compiler. Packets are conceptually single use,
52 although in bounded protocols they are reused each time around the
56 Packets are usually handled through a send_packet_buffered or
57 recv_packet_buffered object. Each packet is referenced by one
58 send_packet and one recv_packet, and these wrappers enforce that only
59 one end can send and only one end can receive. The structs also
60 include a destructor that marks packets are terminated if the sender
61 or receiver destroys the object before sending or receiving a value.
63 The *_packet_buffered structs take two type parameters. The first is
64 the message type for the current packet (or state). The second
65 represents the type of the whole buffer. For bounded protocols, the
66 protocol compiler generates a struct with a field for each protocol
67 state. This generated struct is used as the buffer type parameter. For
68 unbounded protocols, the buffer is simply one packet, so there is a
69 shorthand struct called send_packet and recv_packet, where the buffer
70 type is just `packet<T>`. Using the same underlying structure for both
71 bounded and unbounded protocols allows for less code duplication.
75 import unsafe::{forget, reinterpret_cast, transmute};
76 import either::{either, left, right};
77 import option::unwrap;
79 // Things used by code generated by the pipe compiler.
80 export entangle, get_buffer, drop_buffer;
81 export send_packet_buffered, recv_packet_buffered;
82 export packet, mk_packet, entangle_buffer, has_buffer, buffer_header;
84 // export these so we can find them in the buffer_resource
85 // destructor. This is probably a symptom of #3005.
86 export atomic_add_acq, atomic_sub_rel;
89 export send_packet, recv_packet, send, recv, try_recv, peek;
90 export select, select2, selecti, select2i, selectable;
91 export spawn_service, spawn_service_recv;
92 export stream, port, chan, shared_chan, port_set, channel;
93 export oneshot, chan_one, port_one;
94 export recv_one, try_recv_one, send_one, try_send_one;
97 const SPIN_COUNT: uint = 0;
99 macro_rules! move_it {
100 { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
111 struct buffer_header {
112 // Tracks whether this buffer needs to be freed. We can probably
113 // get away with restricting it to 0 or 1, if we're careful.
114 let mut ref_count: int;
116 new() { self.ref_count = 0; }
118 // We may want a drop, and to be careful about stringing this
122 // This is for protocols to associate extra data to thread around.
124 type buffer<T: send> = {
125 header: buffer_header,
129 struct packet_header {
130 let mut state: state;
131 let mut blocked_task: *rust_task;
133 // This is a reinterpret_cast of a ~buffer, that can also be cast
134 // to a buffer_header if need be.
135 let mut buffer: *libc::c_void;
139 self.blocked_task = ptr::null();
140 self.buffer = ptr::null();
143 // Returns the old state.
144 unsafe fn mark_blocked(this: *rust_task) -> state {
145 rustrt::rust_task_ref(this);
146 let old_task = swap_task(self.blocked_task, this);
147 assert old_task.is_null();
148 swap_state_acq(self.state, blocked)
151 unsafe fn unblock() {
152 let old_task = swap_task(self.blocked_task, ptr::null());
153 if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
154 match swap_state_acq(self.state, empty) {
155 empty | blocked => (),
156 terminated => self.state = terminated,
157 full => self.state = full
161 // unsafe because this can do weird things to the space/time
162 // continuum. It ends making multiple unique pointers to the same
163 // thing. You'll proobably want to forget them when you're done.
164 unsafe fn buf_header() -> ~buffer_header {
165 assert self.buffer.is_not_null();
166 reinterpret_cast(self.buffer)
169 fn set_buffer<T: send>(b: ~buffer<T>) unsafe {
170 self.buffer = reinterpret_cast(b);
175 type packet<T: send> = {
176 header: packet_header,
177 mut payload: option<T>,
182 fn set_buffer(b: *libc::c_void);
185 impl<T: send> packet<T>: has_buffer {
186 fn set_buffer(b: *libc::c_void) {
187 self.header.buffer = b;
192 fn mk_packet<T: send>() -> packet<T> {
194 header: packet_header(),
200 fn unibuffer<T: send>() -> ~buffer<packet<T>> {
202 header: buffer_header(),
204 header: packet_header(),
210 b.data.header.buffer = reinterpret_cast(b);
217 fn packet<T: send>() -> *packet<T> {
219 let p = ptr::addr_of(b.data);
220 // We'll take over memory management from here.
226 fn entangle_buffer<T: send, Tstart: send>(
228 init: fn(*libc::c_void, x: &T) -> *packet<Tstart>)
229 -> (send_packet_buffered<Tstart, T>, recv_packet_buffered<Tstart, T>)
231 let p = init(unsafe { reinterpret_cast(buffer) }, &buffer.data);
232 unsafe { forget(buffer) }
233 (send_packet_buffered(p), recv_packet_buffered(p))
236 #[abi = "rust-intrinsic"]
239 fn atomic_xchng(&dst: int, src: int) -> int;
240 fn atomic_xchng_acq(&dst: int, src: int) -> int;
241 fn atomic_xchng_rel(&dst: int, src: int) -> int;
243 fn atomic_add_acq(&dst: int, src: int) -> int;
244 fn atomic_sub_rel(&dst: int, src: int) -> int;
247 // If I call the rusti versions directly from a polymorphic function,
248 // I get link errors. This is a bug that needs investigated more.
250 fn atomic_xchng_rel(&dst: int, src: int) -> int {
251 rusti::atomic_xchng_rel(dst, src)
255 fn atomic_add_acq(&dst: int, src: int) -> int {
256 rusti::atomic_add_acq(dst, src)
260 fn atomic_sub_rel(&dst: int, src: int) -> int {
261 rusti::atomic_sub_rel(dst, src)
265 fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
266 // It might be worth making both acquire and release versions of
269 reinterpret_cast(rusti::atomic_xchng(
270 *(ptr::mut_addr_of(dst) as *mut int),
276 type rust_task = libc::c_void;
281 fn rust_get_task() -> *rust_task;
283 fn rust_task_ref(task: *rust_task);
284 fn rust_task_deref(task: *rust_task);
287 fn task_clear_event_reject(task: *rust_task);
289 fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool;
290 pure fn task_signal_event(target: *rust_task, event: *libc::c_void);
294 fn wait_event(this: *rust_task) -> *libc::c_void {
295 let mut event = ptr::null();
297 let killed = rustrt::task_wait_event(this, &mut event);
298 if killed && !task::failing() {
305 fn swap_state_acq(&dst: state, src: state) -> state {
307 reinterpret_cast(rusti::atomic_xchng_acq(
308 *(ptr::mut_addr_of(dst) as *mut int),
314 fn swap_state_rel(&dst: state, src: state) -> state {
316 reinterpret_cast(rusti::atomic_xchng_rel(
317 *(ptr::mut_addr_of(dst) as *mut int),
323 unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
324 transmute((*p).buf_header())
327 struct buffer_resource<T: send> {
328 let buffer: ~buffer<T>;
329 new(+b: ~buffer<T>) {
330 //let p = ptr::addr_of(*b);
331 //error!{"take %?", p};
332 atomic_add_acq(b.header.ref_count, 1);
337 let b = move_it!{self.buffer};
338 //let p = ptr::addr_of(*b);
339 //error!{"drop %?", p};
340 let old_count = atomic_sub_rel(b.header.ref_count, 1);
341 //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
343 // The new count is 0.
345 // go go gadget drop glue
354 fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
355 -payload: T) -> bool {
356 let header = p.header();
358 let p = unsafe { &*p_ };
359 assert ptr::addr_of(p.header) == header;
360 assert p.payload == none;
361 p.payload <- some(payload);
362 let old_state = swap_state_rel(p.header.state, full);
367 // The receiver will eventually clean this up.
368 //unsafe { forget(p); }
371 full => fail ~"duplicate send",
373 debug!{"waking up task for %?", p_};
374 let old_task = swap_task(p.header.blocked_task, ptr::null());
375 if !old_task.is_null() {
376 rustrt::task_signal_event(
377 old_task, ptr::addr_of(p.header) as *libc::c_void);
378 rustrt::rust_task_deref(old_task);
381 // The receiver will eventually clean this up.
382 //unsafe { forget(p); }
386 // The receiver will never receive this. Rely on drop_glue
387 // to clean everything up.
393 /** Receives a message from a pipe.
395 Fails if the sender closes the connection.
398 fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
399 option::unwrap_expect(try_recv(p), "connection closed")
402 /** Attempts to receive a message from a pipe.
404 Returns `none` if the sender has closed the connection without sending
405 a message, or `some(T)` if a message was received.
408 fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
412 let p = unsafe { &*p_ };
419 self.p.state = terminated;
420 let old_task = swap_task(self.p.blocked_task, ptr::null());
421 if !old_task.is_null() {
422 rustrt::rust_task_deref(old_task);
428 let _drop_state = drop_state { p: &p.header };
431 match p.header.state {
433 let mut payload = none;
434 payload <-> p.payload;
435 p.header.state = empty;
436 return some(option::unwrap(payload))
438 terminated => return none,
443 let this = rustrt::rust_get_task();
444 rustrt::task_clear_event_reject(this);
445 rustrt::rust_task_ref(this);
446 let old_task = swap_task(p.header.blocked_task, this);
447 assert old_task.is_null();
448 let mut first = true;
449 let mut count = SPIN_COUNT;
451 rustrt::task_clear_event_reject(this);
452 let old_state = swap_state_acq(p.header.state,
456 debug!{"no data available on %?, going to sleep.", p_};
462 // FIXME (#524): Putting the yield here destroys a lot
463 // of the benefit of spinning, since we still go into
464 // the scheduler at every iteration. However, without
465 // this everything spins too much because we end up
466 // sometimes blocking the thing we are waiting on.
469 debug!{"woke up, p.state = %?", copy p.header.state};
471 blocked => if first {
472 fail ~"blocking on already blocked packet"
475 let mut payload = none;
476 payload <-> p.payload;
477 let old_task = swap_task(p.header.blocked_task, ptr::null());
478 if !old_task.is_null() {
479 rustrt::rust_task_deref(old_task);
481 p.header.state = empty;
482 return some(option::unwrap(payload))
485 // This assert detects when we've accidentally unsafely
486 // casted too big of a number to a state.
487 assert old_state == terminated;
489 let old_task = swap_task(p.header.blocked_task, ptr::null());
490 if !old_task.is_null() {
491 rustrt::rust_task_deref(old_task);
500 /// Returns true if messages are available.
501 pure fn peek<T: send, Tb: send>(p: recv_packet_buffered<T, Tb>) -> bool {
502 match unsafe {(*p.header()).state} {
504 blocked => fail ~"peeking on blocked packet",
505 full | terminated => true
509 impl<T: send, Tb: send> recv_packet_buffered<T, Tb> {
510 pure fn peek() -> bool {
516 fn sender_terminate<T: send>(p: *packet<T>) {
517 let p = unsafe { &*p };
518 match swap_state_rel(p.header.state, terminated) {
520 // The receiver will eventually clean up.
523 // wake up the target
524 let old_task = swap_task(p.header.blocked_task, ptr::null());
525 if !old_task.is_null() {
526 rustrt::task_signal_event(
528 ptr::addr_of(p.header) as *libc::c_void);
529 rustrt::rust_task_deref(old_task);
531 // The receiver will eventually clean up.
534 // This is impossible
535 fail ~"you dun goofed"
538 assert p.header.blocked_task.is_null();
539 // I have to clean up, use drop_glue
545 fn receiver_terminate<T: send>(p: *packet<T>) {
546 let p = unsafe { &*p };
547 match swap_state_rel(p.header.state, terminated) {
549 assert p.header.blocked_task.is_null();
550 // the sender will clean up
553 let old_task = swap_task(p.header.blocked_task, ptr::null());
554 if !old_task.is_null() {
555 rustrt::rust_task_deref(old_task);
556 assert old_task == rustrt::rust_get_task();
559 terminated | full => {
560 assert p.header.blocked_task.is_null();
561 // I have to clean up, use drop_glue
566 /** Returns when one of the packet headers reports data is available.
568 This function is primarily intended for building higher level waiting
569 functions, such as `select`, `select2`, etc.
571 It takes a vector slice of packet_headers and returns an index into
572 that vector. The index points to an endpoint that has either been
573 closed by the sender or has a message waiting to be received.
576 fn wait_many(pkts: &[*packet_header]) -> uint {
577 let this = rustrt::rust_get_task();
579 rustrt::task_clear_event_reject(this);
580 let mut data_avail = false;
581 let mut ready_packet = pkts.len();
582 for pkts.eachi |i, p| unsafe {
583 let p = unsafe { &*p };
584 let old = p.mark_blocked(this);
586 full | terminated => {
592 blocked => fail ~"blocking on blocked packet",
598 debug!{"sleeping on %? packets", pkts.len()};
599 let event = wait_event(this) as *packet_header;
600 let pos = vec::position(pkts, |p| p == event);
607 none => debug!{"ignoring spurious event, %?", event}
611 debug!{"%?", pkts[ready_packet]};
613 for pkts.each |p| { unsafe{ (*p).unblock()} }
615 debug!("%?, %?", ready_packet, pkts[ready_packet]);
618 assert (*pkts[ready_packet]).state == full
619 || (*pkts[ready_packet]).state == terminated;
625 /** Receives a message from one of two endpoints.
627 The return value is `left` if the first endpoint received something,
628 or `right` if the second endpoint receives something. In each case,
629 the result includes the other endpoint as well so it can be used
630 again. Below is an example of using `select2`.
633 match select2(a, b) {
635 // endpoint a was closed.
638 // endpoint b was closed.
641 // endpoint a received a message
644 // endpoint b received a message.
649 Sometimes messages will be available on both endpoints at once. In
650 this case, `select2` may return either `left` or `right`.
653 fn select2<A: send, Ab: send, B: send, Bb: send>(
654 +a: recv_packet_buffered<A, Ab>,
655 +b: recv_packet_buffered<B, Bb>)
656 -> either<(option<A>, recv_packet_buffered<B, Bb>),
657 (recv_packet_buffered<A, Ab>, option<B>)>
659 let i = wait_many([a.header(), b.header()]/_);
663 0 => left((try_recv(a), b)),
664 1 => right((a, try_recv(b))),
665 _ => fail ~"select2 return an invalid packet"
672 pure fn header() -> *packet_header;
675 impl *packet_header: selectable {
676 pure fn header() -> *packet_header { self }
679 /// Returns the index of an endpoint that is ready to receive.
680 fn selecti<T: selectable>(endpoints: &[T]) -> uint {
681 wait_many(endpoints.map(|p| p.header()))
684 /// Returns 0 or 1 depending on which endpoint is ready to receive
685 fn select2i<A: selectable, B: selectable>(a: A, b: B) -> either<(), ()> {
686 match wait_many([a.header(), b.header()]/_) {
689 _ => fail ~"wait returned unexpected index"
693 /** Waits on a set of endpoints. Returns a message, its index, and a
694 list of the remaining endpoints.
697 fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
698 -> (uint, option<T>, ~[recv_packet_buffered<T, Tb>])
700 let ready = wait_many(endpoints.map(|p| p.header()));
701 let mut remaining = ~[];
702 let mut result = none;
703 do vec::consume(endpoints) |i, p| {
705 result = try_recv(p);
708 vec::push(remaining, p);
712 (ready, result, remaining)
715 /** The sending end of a pipe. It can be used to send exactly one
719 type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
722 fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
723 send_packet_buffered(p)
726 struct send_packet_buffered<T: send, Tbuffer: send> {
727 let mut p: option<*packet<T>>;
728 let mut buffer: option<buffer_resource<Tbuffer>>;
730 //debug!{"take send %?", p};
735 get_buffer(ptr::addr_of((*p).header))));
739 //if self.p != none {
740 // debug!{"drop send %?", option::get(self.p)};
745 sender_terminate(option::unwrap(p))
747 //unsafe { error!{"send_drop: %?",
748 // if self.buffer == none {
750 // } else { "some" }}; }
752 fn unwrap() -> *packet<T> {
758 pure fn header() -> *packet_header {
760 some(packet) => unsafe {
761 let packet = &*packet;
762 let header = ptr::addr_of(packet.header);
766 none => fail ~"packet already consumed"
770 fn reuse_buffer() -> buffer_resource<Tbuffer> {
771 //error!{"send reuse_buffer"};
778 /// Represents the receive end of a pipe. It can receive exactly one
780 type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
783 fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
784 recv_packet_buffered(p)
787 struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
788 let mut p: option<*packet<T>>;
789 let mut buffer: option<buffer_resource<Tbuffer>>;
791 //debug!{"take recv %?", p};
796 get_buffer(ptr::addr_of((*p).header))));
800 //if self.p != none {
801 // debug!{"drop recv %?", option::get(self.p)};
806 receiver_terminate(option::unwrap(p))
808 //unsafe { error!{"recv_drop: %?",
809 // if self.buffer == none {
811 // } else { "some" }}; }
813 fn unwrap() -> *packet<T> {
819 pure fn header() -> *packet_header {
821 some(packet) => unsafe {
822 let packet = &*packet;
823 let header = ptr::addr_of(packet.header);
827 none => fail ~"packet already consumed"
831 fn reuse_buffer() -> buffer_resource<Tbuffer> {
832 //error!{"recv reuse_buffer"};
840 fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
842 (send_packet(p), recv_packet(p))
845 /** Spawn a task to provide a service.
847 It takes an initialization function that produces a send and receive
848 endpoint. The send endpoint is returned to the caller and the receive
849 endpoint is passed to the new task.
852 fn spawn_service<T: send, Tb: send>(
853 init: extern fn() -> (send_packet_buffered<T, Tb>,
854 recv_packet_buffered<T, Tb>),
855 +service: fn~(+recv_packet_buffered<T, Tb>))
856 -> send_packet_buffered<T, Tb>
858 let (client, server) = init();
860 // This is some nasty gymnastics required to safely move the pipe
862 let server = ~mut some(server);
863 do task::spawn |move service| {
864 let mut server_ = none;
866 service(option::unwrap(server_))
872 /** Like `spawn_service_recv`, but for protocols that start in the
876 fn spawn_service_recv<T: send, Tb: send>(
877 init: extern fn() -> (recv_packet_buffered<T, Tb>,
878 send_packet_buffered<T, Tb>),
879 +service: fn~(+send_packet_buffered<T, Tb>))
880 -> recv_packet_buffered<T, Tb>
882 let (client, server) = init();
884 // This is some nasty gymnastics required to safely move the pipe
886 let server = ~mut some(server);
887 do task::spawn |move service| {
888 let mut server_ = none;
890 service(option::unwrap(server_))
896 // Streams - Make pipes a little easier in general.
904 /// A trait for things that can send multiple messages.
905 trait channel<T: send> {
906 // It'd be nice to call this send, but it'd conflict with the
907 // built in send kind.
912 /// Sends a message, or report if the receiver has closed the connection.
913 fn try_send(+x: T) -> bool;
916 /// A trait for things that can receive multiple messages.
917 trait recv<T: send> {
918 /// Receives a message, or fails if the connection closes.
921 /** Receives a message if one is available, or returns `none` if
922 the connection is closed.
925 fn try_recv() -> option<T>;
927 /** Returns true if a message is available or the connection is
931 pure fn peek() -> bool;
935 type chan_<T:send> = { mut endp: option<streamp::client::open<T>> };
937 /// An endpoint that can send many messages.
943 type port_<T:send> = { mut endp: option<streamp::server::open<T>> };
945 /// An endpoint that can receive many messages.
950 /** Creates a `(chan, port)` pair.
952 These allow sending or receiving an unlimited number of messages.
955 fn stream<T:send>() -> (chan<T>, port<T>) {
956 let (c, s) = streamp::init();
958 (chan_({ mut endp: some(c) }), port_({ mut endp: some(s) }))
961 impl<T: send> chan<T>: channel<T> {
966 streamp::client::data(unwrap(endp), x))
969 fn try_send(+x: T) -> bool {
972 match move streamp::client::try_data(unwrap(endp), x) {
974 self.endp = some(move_it!(next));
982 impl<T: send> port<T>: recv<T> {
986 let streamp::data(x, endp) = pipes::recv(unwrap(endp));
987 self.endp = some(endp);
991 fn try_recv() -> option<T> {
994 match move pipes::try_recv(unwrap(endp)) {
995 some(streamp::data(x, endp)) => {
996 self.endp = some(move_it!{endp});
1003 pure fn peek() -> bool unchecked {
1004 let mut endp = none;
1006 let peek = match endp {
1007 some(endp) => pipes::peek(endp),
1008 none => fail ~"peeking empty stream"
1015 // Treat a whole bunch of ports as one.
1016 struct port_set<T: send> : recv<T> {
1017 let mut ports: ~[pipes::port<T>];
1019 new() { self.ports = ~[]; }
1021 fn add(+port: pipes::port<T>) {
1022 vec::push(self.ports, port)
1025 fn chan() -> chan<T> {
1026 let (ch, po) = stream();
1031 fn try_recv() -> option<T> {
1032 let mut result = none;
1033 // we have to swap the ports array so we aren't borrowing
1034 // aliasable mutable memory.
1035 let mut ports = ~[];
1036 ports <-> self.ports;
1037 while result == none && ports.len() > 0 {
1038 let i = wait_many(ports.map(|p| p.header()));
1039 match move ports[i].try_recv() {
1041 result = some(move m);
1044 // Remove this port.
1045 let mut ports_ = ~[];
1047 vec::consume(ports_,
1054 ports <-> self.ports;
1059 match move self.try_recv() {
1060 some(copy x) => move x,
1061 none => fail ~"port_set: endpoints closed"
1065 pure fn peek() -> bool {
1066 // It'd be nice to use self.port.each, but that version isn't
1068 for vec::each(self.ports) |p| {
1069 if p.peek() { return true }
1075 impl<T: send> port<T>: selectable {
1076 pure fn header() -> *packet_header unchecked {
1078 some(endp) => endp.header(),
1079 none => fail ~"peeking empty stream"
1084 /// A channel that can be shared between many senders.
1085 type shared_chan<T: send> = unsafe::Exclusive<chan<T>>;
1087 impl<T: send> shared_chan<T>: channel<T> {
1089 let mut xx = some(x);
1090 do self.with |chan| {
1093 chan.send(option::unwrap(x))
1097 fn try_send(+x: T) -> bool {
1098 let mut xx = some(x);
1099 do self.with |chan| {
1102 chan.try_send(option::unwrap(x))
1107 /// Converts a `chan` into a `shared_chan`.
1108 fn shared_chan<T:send>(+c: chan<T>) -> shared_chan<T> {
1109 unsafe::exclusive(c)
1112 /// Receive a message from one of two endpoints.
1113 trait select2<T: send, U: send> {
1114 /// Receive a message or return `none` if a connection closes.
1115 fn try_select() -> either<option<T>, option<U>>;
1116 /// Receive a message or fail if a connection closes.
1117 fn select() -> either<T, U>;
1120 impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
1121 (Left, Right): select2<T, U> {
1123 fn select() -> either<T, U> {
1125 (lp, rp) => match select2i(lp, rp) {
1126 left(()) => left (lp.recv()),
1127 right(()) => right(rp.recv())
1132 fn try_select() -> either<option<T>, option<U>> {
1134 (lp, rp) => match select2i(lp, rp) {
1135 left(()) => left (lp.try_recv()),
1136 right(()) => right(rp.try_recv())
1143 oneshot:send<T:send> {
1148 /// The send end of a oneshot pipe.
1149 type chan_one<T: send> = oneshot::client::oneshot<T>;
1150 /// The receive end of a oneshot pipe.
1151 type port_one<T: send> = oneshot::server::oneshot<T>;
1153 /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
1154 fn oneshot<T: send>() -> (chan_one<T>, port_one<T>) {
1159 * Receive a message from a oneshot pipe, failing if the connection was
1162 fn recv_one<T: send>(+port: port_one<T>) -> T {
1163 let oneshot::send(message) = recv(port);
1167 /// Receive a message from a oneshot pipe unless the connection was closed.
1168 fn try_recv_one<T: send> (+port: port_one<T>) -> option<T> {
1169 let message = try_recv(port);
1171 if message == none { none }
1173 let oneshot::send(message) = option::unwrap(message);
1178 /// Send a message on a oneshot pipe, failing if the connection was closed.
1179 fn send_one<T: send>(+chan: chan_one<T>, +data: T) {
1180 oneshot::client::send(chan, data);
1184 * Send a message on a oneshot pipe, or return false if the connection was
1187 fn try_send_one<T: send>(+chan: chan_one<T>, +data: T)
1189 oneshot::client::try_send(chan, data).is_some()
1196 let (c1, p1) = pipes::stream();
1197 let (c2, p2) = pipes::stream();
1201 match (p1, p2).select() {
1211 let (c, p) = oneshot::init();
1213 oneshot::client::send(c, ());