1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 /*! Runtime support for message passing with protocol enforcement.
14 Pipes consist of two endpoints. One endpoint can send messages and
15 the other can receive messages. The set of legal messages and which
16 directions they can flow at any given point are determined by a
17 protocol. Below is an example protocol.
30 The `proto!` syntax extension will convert this into a module called
31 `pingpong`, which includes a set of types and functions that can be
32 used to write programs that follow the pingpong protocol.
36 /* IMPLEMENTATION NOTES
38 The initial design for this feature is available at:
40 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
42 Much of the design in that document is still accurate. There are
43 several components for the pipe implementation. First of all is the
44 syntax extension. To see how that works, it is best see comments in
45 libsyntax/ext/pipes.rs.
47 This module includes two related pieces of the runtime
48 implementation: support for unbounded and bounded
49 protocols. The main difference between the two is the type of the
50 buffer that is carried along in the endpoint data structures.
53 The heart of the implementation is the packet type. It contains a
54 header and a payload field. Much of the code in this module deals with
55 the header field. This is where the synchronization information is
56 stored. In the case of a bounded protocol, the header also includes a
57 pointer to the buffer the packet is contained in.
59 Packets represent a single message in a protocol. The payload field
60 gets instatiated at the type of the message, which is usually an enum
61 generated by the pipe compiler. Packets are conceptually single use,
62 although in bounded protocols they are reused each time around the
66 Packets are usually handled through a send_packet_buffered or
67 recv_packet_buffered object. Each packet is referenced by one
68 send_packet and one recv_packet, and these wrappers enforce that only
69 one end can send and only one end can receive. The structs also
70 include a destructor that marks packets are terminated if the sender
71 or receiver destroys the object before sending or receiving a value.
73 The *_packet_buffered structs take two type parameters. The first is
74 the message type for the current packet (or state). The second
75 represents the type of the whole buffer. For bounded protocols, the
76 protocol compiler generates a struct with a field for each protocol
77 state. This generated struct is used as the buffer type parameter. For
78 unbounded protocols, the buffer is simply one packet, so there is a
79 shorthand struct called send_packet and recv_packet, where the buffer
80 type is just `packet<T>`. Using the same underlying structure for both
81 bounded and unbounded protocols allows for less code duplication.
85 use cast::{forget, transmute, transmute_copy};
86 use either::{Either, Left, Right};
90 use option::{None, Option, Some};
91 use unstable::intrinsics;
96 static SPIN_COUNT: uint = 0;
98 macro_rules! move_it (
99 { $x:expr } => ( unsafe { let y = *ptr::to_unsafe_ptr(&($x)); y } )
110 pub struct BufferHeader {
111 // Tracks whether this buffer needs to be freed. We can probably
112 // get away with restricting it to 0 or 1, if we're careful.
115 // We may want a drop, and to be careful about stringing this
119 pub fn BufferHeader() -> BufferHeader {
125 // This is for protocols to associate extra data to thread around.
126 pub struct Buffer<T> {
127 header: BufferHeader,
131 pub struct PacketHeader {
133 mut blocked_task: *rust_task,
135 // This is a transmute_copy of a ~buffer, that can also be cast
136 // to a buffer_header if need be.
137 mut buffer: *libc::c_void,
140 pub fn PacketHeader() -> PacketHeader {
143 blocked_task: ptr::null(),
148 pub impl PacketHeader {
149 // Returns the old state.
150 unsafe fn mark_blocked(&self, this: *rust_task) -> State {
151 rustrt::rust_task_ref(this);
152 let old_task = swap_task(&mut self.blocked_task, this);
153 assert!(old_task.is_null());
154 swap_state_acq(&mut self.state, Blocked)
157 unsafe fn unblock(&self) {
158 let old_task = swap_task(&mut self.blocked_task, ptr::null());
159 if !old_task.is_null() {
160 rustrt::rust_task_deref(old_task)
162 match swap_state_acq(&mut self.state, Empty) {
163 Empty | Blocked => (),
164 Terminated => self.state = Terminated,
165 Full => self.state = Full
169 // unsafe because this can do weird things to the space/time
170 // continuum. It ends making multiple unique pointers to the same
171 // thing. You'll proobably want to forget them when you're done.
172 unsafe fn buf_header(&self) -> ~BufferHeader {
173 assert!(self.buffer.is_not_null());
174 transmute_copy(&self.buffer)
177 fn set_buffer<T:Owned>(&self, b: ~Buffer<T>) {
179 self.buffer = transmute_copy(&b);
184 pub struct Packet<T> {
185 header: PacketHeader,
186 mut payload: Option<T>,
189 pub trait HasBuffer {
190 fn set_buffer(&self, b: *libc::c_void);
193 impl<T:Owned> HasBuffer for Packet<T> {
194 fn set_buffer(&self, b: *libc::c_void) {
195 self.header.buffer = b;
199 pub fn mk_packet<T:Owned>() -> Packet<T> {
201 header: PacketHeader(),
205 fn unibuffer<T>() -> ~Buffer<Packet<T>> {
207 header: BufferHeader(),
209 header: PacketHeader(),
215 b.data.header.buffer = transmute_copy(&b);
220 pub fn packet<T>() -> *Packet<T> {
222 let p = ptr::to_unsafe_ptr(&(b.data));
223 // We'll take over memory management from here.
228 pub fn entangle_buffer<T:Owned,Tstart:Owned>(
230 init: &fn(*libc::c_void, x: &T) -> *Packet<Tstart>)
231 -> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>)
233 let p = init(unsafe { transmute_copy(&buffer) }, &buffer.data);
234 unsafe { forget(buffer) }
235 (SendPacketBuffered(p), RecvPacketBuffered(p))
238 pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
239 // It might be worth making both acquire and release versions of
242 transmute(intrinsics::atomic_xchg(transmute(dst), src as int))
246 #[allow(non_camel_case_types)]
247 pub type rust_task = libc::c_void;
251 use super::rust_task;
255 unsafe fn rust_get_task() -> *rust_task;
257 unsafe fn rust_task_ref(task: *rust_task);
258 unsafe fn rust_task_deref(task: *rust_task);
261 unsafe fn task_clear_event_reject(task: *rust_task);
263 unsafe fn task_wait_event(this: *rust_task,
264 killed: &mut *libc::c_void)
266 unsafe fn task_signal_event(target: *rust_task, event: *libc::c_void);
270 fn wait_event(this: *rust_task) -> *libc::c_void {
272 let mut event = ptr::null();
274 let killed = rustrt::task_wait_event(this, &mut event);
275 if killed && !task::failing() {
282 fn swap_state_acq(dst: &mut State, src: State) -> State {
284 transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int))
288 fn swap_state_rel(dst: &mut State, src: State) -> State {
290 transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int))
294 pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> {
295 transmute((*p).buf_header())
298 // This could probably be done with SharedMutableState to avoid move_it!().
299 struct BufferResource<T> {
305 impl<T> ::ops::Drop for BufferResource<T> {
308 let b = move_it!(self.buffer);
309 //let p = ptr::to_unsafe_ptr(*b);
310 //error!("drop %?", p);
311 let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1);
312 //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
314 // The new count is 0.
316 // go go gadget drop glue
325 fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> {
326 //let p = ptr::to_unsafe_ptr(*b);
327 //error!("take %?", p);
328 unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) };
336 pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool {
337 let header = p.header();
339 let p = unsafe { &*p_ };
340 assert!(ptr::to_unsafe_ptr(&(p.header)) == header);
341 assert!(p.payload.is_none());
342 p.payload = Some(payload);
343 let old_state = swap_state_rel(&mut p.header.state, Full);
348 // The receiver will eventually clean this up.
349 //unsafe { forget(p); }
352 Full => fail!(~"duplicate send"),
354 debug!("waking up task for %?", p_);
355 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
356 if !old_task.is_null() {
358 rustrt::task_signal_event(
360 ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
361 rustrt::rust_task_deref(old_task);
365 // The receiver will eventually clean this up.
366 //unsafe { forget(p); }
370 // The receiver will never receive this. Rely on drop_glue
371 // to clean everything up.
377 /** Receives a message from a pipe.
379 Fails if the sender closes the connection.
382 pub fn recv<T:Owned,Tbuffer:Owned>(
383 p: RecvPacketBuffered<T, Tbuffer>) -> T {
384 try_recv(p).expect("connection closed")
387 /** Attempts to receive a message from a pipe.
389 Returns `None` if the sender has closed the connection without sending
390 a message, or `Some(T)` if a message was received.
393 pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>)
397 let p = unsafe { &*p_ };
399 struct DropState<'self> {
400 p: &'self PacketHeader,
404 impl<'self> Drop for DropState<'self> {
408 self.p.state = Terminated;
409 let old_task = swap_task(&mut self.p.blocked_task,
411 if !old_task.is_null() {
412 rustrt::rust_task_deref(old_task);
419 let _drop_state = DropState { p: &p.header };
422 match p.header.state {
424 let mut payload = None;
425 payload <-> p.payload;
426 p.header.state = Empty;
427 return Some(payload.unwrap())
429 Terminated => return None,
434 let this = unsafe { rustrt::rust_get_task() };
436 rustrt::task_clear_event_reject(this);
437 rustrt::rust_task_ref(this);
439 debug!("blocked = %x this = %x", p.header.blocked_task as uint,
441 let old_task = swap_task(&mut p.header.blocked_task, this);
442 debug!("blocked = %x this = %x old_task = %x",
443 p.header.blocked_task as uint,
444 this as uint, old_task as uint);
445 assert!(old_task.is_null());
446 let mut first = true;
447 let mut count = SPIN_COUNT;
450 rustrt::task_clear_event_reject(this);
453 let old_state = swap_state_acq(&mut p.header.state,
457 debug!("no data available on %?, going to sleep.", p_);
463 // FIXME (#524): Putting the yield here destroys a lot
464 // of the benefit of spinning, since we still go into
465 // the scheduler at every iteration. However, without
466 // this everything spins too much because we end up
467 // sometimes blocking the thing we are waiting on.
470 debug!("woke up, p.state = %?", copy p.header.state);
472 Blocked => if first {
473 fail!(~"blocking on already blocked packet")
476 let mut payload = None;
477 payload <-> p.payload;
478 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
479 if !old_task.is_null() {
481 rustrt::rust_task_deref(old_task);
484 p.header.state = Empty;
485 return Some(payload.unwrap())
488 // This assert detects when we've accidentally unsafely
489 // casted too big of a number to a state.
490 assert!(old_state == Terminated);
492 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
493 if !old_task.is_null() {
495 rustrt::rust_task_deref(old_task);
505 /// Returns true if messages are available.
506 pub fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool {
507 match unsafe {(*p.header()).state} {
508 Empty | Terminated => false,
509 Blocked => fail!(~"peeking on blocked packet"),
514 fn sender_terminate<T:Owned>(p: *Packet<T>) {
515 let p = unsafe { &*p };
516 match swap_state_rel(&mut p.header.state, Terminated) {
518 // The receiver will eventually clean up.
521 // wake up the target
522 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
523 if !old_task.is_null() {
525 rustrt::task_signal_event(
527 ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
528 rustrt::rust_task_deref(old_task);
531 // The receiver will eventually clean up.
534 // This is impossible
535 fail!(~"you dun goofed")
538 assert!(p.header.blocked_task.is_null());
539 // I have to clean up, use drop_glue
544 fn receiver_terminate<T:Owned>(p: *Packet<T>) {
545 let p = unsafe { &*p };
546 match swap_state_rel(&mut p.header.state, Terminated) {
548 assert!(p.header.blocked_task.is_null());
549 // the sender will clean up
552 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
553 if !old_task.is_null() {
555 rustrt::rust_task_deref(old_task);
556 assert!(old_task == rustrt::rust_get_task());
560 Terminated | Full => {
561 assert!(p.header.blocked_task.is_null());
562 // I have to clean up, use drop_glue
567 /** Returns when one of the packet headers reports data is available.
569 This function is primarily intended for building higher level waiting
570 functions, such as `select`, `select2`, etc.
572 It takes a vector slice of packet_headers and returns an index into
573 that vector. The index points to an endpoint that has either been
574 closed by the sender or has a message waiting to be received.
577 pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
578 let this = unsafe { rustrt::rust_get_task() };
581 rustrt::task_clear_event_reject(this);
584 let mut data_avail = false;
585 let mut ready_packet = pkts.len();
586 for pkts.eachi |i, p| {
588 let p = &*p.header();
589 let old = p.mark_blocked(this);
591 Full | Terminated => {
597 Blocked => fail!(~"blocking on blocked packet"),
604 debug!("sleeping on %? packets", pkts.len());
605 let event = wait_event(this) as *PacketHeader;
606 let pos = vec::position(pkts, |p| p.header() == event);
613 None => debug!("ignoring spurious event, %?", event)
617 debug!("%?", pkts[ready_packet]);
619 for pkts.each |p| { unsafe{ (*p.header()).unblock()} }
621 debug!("%?, %?", ready_packet, pkts[ready_packet]);
624 assert!((*pkts[ready_packet].header()).state == Full
625 || (*pkts[ready_packet].header()).state == Terminated);
631 /** The sending end of a pipe. It can be used to send exactly one
635 pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
637 pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> {
638 SendPacketBuffered(p)
641 pub struct SendPacketBuffered<T, Tbuffer> {
642 mut p: Option<*Packet<T>>,
643 mut buffer: Option<BufferResource<Tbuffer>>,
647 impl<T:Owned,Tbuffer:Owned> ::ops::Drop for SendPacketBuffered<T,Tbuffer> {
649 //if self.p != none {
650 // debug!("drop send %?", option::get(self.p));
655 sender_terminate(p.unwrap())
657 //unsafe { error!("send_drop: %?",
658 // if self.buffer == none {
660 // } else { "some" }); }
664 pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>)
665 -> SendPacketBuffered<T, Tbuffer> {
666 //debug!("take send %?", p);
671 get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
676 pub impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
677 fn unwrap(&self) -> *Packet<T> {
683 fn header(&self) -> *PacketHeader {
685 Some(packet) => unsafe {
686 let packet = &*packet;
687 let header = ptr::to_unsafe_ptr(&(packet.header));
691 None => fail!(~"packet already consumed")
695 fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
696 //error!("send reuse_buffer");
703 /// Represents the receive end of a pipe. It can receive exactly one
705 pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
707 pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> {
708 RecvPacketBuffered(p)
710 pub struct RecvPacketBuffered<T, Tbuffer> {
711 mut p: Option<*Packet<T>>,
712 mut buffer: Option<BufferResource<Tbuffer>>,
716 impl<T:Owned,Tbuffer:Owned> ::ops::Drop for RecvPacketBuffered<T,Tbuffer> {
718 //if self.p != none {
719 // debug!("drop recv %?", option::get(self.p));
724 receiver_terminate(p.unwrap())
726 //unsafe { error!("recv_drop: %?",
727 // if self.buffer == none {
729 // } else { "some" }); }
733 pub impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
734 fn unwrap(&self) -> *Packet<T> {
740 fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
741 //error!("recv reuse_buffer");
748 impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
749 fn header(&self) -> *PacketHeader {
751 Some(packet) => unsafe {
752 let packet = &*packet;
753 let header = ptr::to_unsafe_ptr(&(packet.header));
757 None => fail!(~"packet already consumed")
762 pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>)
763 -> RecvPacketBuffered<T,Tbuffer> {
764 //debug!("take recv %?", p);
769 get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
774 pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) {
776 (SendPacket(p), RecvPacket(p))
779 /** Receives a message from one of two endpoints.
781 The return value is `left` if the first endpoint received something,
782 or `right` if the second endpoint receives something. In each case,
783 the result includes the other endpoint as well so it can be used
784 again. Below is an example of using `select2`.
787 match select2(a, b) {
789 // endpoint a was closed.
792 // endpoint b was closed.
795 // endpoint a received a message
798 // endpoint b received a message.
803 Sometimes messages will be available on both endpoints at once. In
804 this case, `select2` may return either `left` or `right`.
807 pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
808 a: RecvPacketBuffered<A, Ab>,
809 b: RecvPacketBuffered<B, Bb>)
810 -> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
811 (RecvPacketBuffered<A, Ab>, Option<B>)>
813 let i = wait_many([a.header(), b.header()]);
816 0 => Left((try_recv(a), b)),
817 1 => Right((a, try_recv(b))),
818 _ => fail!(~"select2 return an invalid packet")
822 pub trait Selectable {
823 fn header(&self) -> *PacketHeader;
826 impl Selectable for *PacketHeader {
827 fn header(&self) -> *PacketHeader { *self }
830 /// Returns the index of an endpoint that is ready to receive.
831 pub fn selecti<T:Selectable>(endpoints: &[T]) -> uint {
835 /// Returns 0 or 1 depending on which endpoint is ready to receive
836 pub fn select2i<A:Selectable,B:Selectable>(a: &A, b: &B) ->
838 match wait_many([a.header(), b.header()]) {
841 _ => fail!(~"wait returned unexpected index")
845 /** Waits on a set of endpoints. Returns a message, its index, and a
846 list of the remaining endpoints.
849 pub fn select<T:Owned,Tb:Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>])
850 -> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
852 let ready = wait_many(endpoints.map(|p| p.header()));
853 let mut remaining = endpoints;
854 let port = remaining.swap_remove(ready);
855 let result = try_recv(port);
856 (ready, result, remaining)
860 use option::{None, Option, Some};
862 // These are used to hide the option constructors from the
863 // compiler because their names are changing
864 pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
865 pub fn make_none<T>() -> Option<T> { None }
871 use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
872 GenericChan, Peekable};
876 let (p1, c1) = stream();
877 let (p2, c2) = stream();
881 match (p1, p2).select() {
891 let (p, c) = oneshot();
899 fn test_peek_terminated() {
900 let (port, chan): (Port<int>, Chan<int>) = stream();
903 // Destroy the channel
907 assert!(!port.peek());