1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 /*! Runtime support for message passing with protocol enforcement.
14 Pipes consist of two endpoints. One endpoint can send messages and
15 the other can receive messages. The set of legal messages and which
16 directions they can flow at any given point are determined by a
17 protocol. Below is an example protocol.
30 The `proto!` syntax extension will convert this into a module called
31 `pingpong`, which includes a set of types and functions that can be
32 used to write programs that follow the pingpong protocol.
36 /* IMPLEMENTATION NOTES
38 The initial design for this feature is available at:
40 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
42 Much of the design in that document is still accurate. There are
43 several components for the pipe implementation. First of all is the
44 syntax extension. To see how that works, it is best see comments in
45 libsyntax/ext/pipes.rs.
47 This module includes two related pieces of the runtime
48 implementation: support for unbounded and bounded
49 protocols. The main difference between the two is the type of the
50 buffer that is carried along in the endpoint data structures.
53 The heart of the implementation is the packet type. It contains a
54 header and a payload field. Much of the code in this module deals with
55 the header field. This is where the synchronization information is
56 stored. In the case of a bounded protocol, the header also includes a
57 pointer to the buffer the packet is contained in.
59 Packets represent a single message in a protocol. The payload field
60 gets instatiated at the type of the message, which is usually an enum
61 generated by the pipe compiler. Packets are conceptually single use,
62 although in bounded protocols they are reused each time around the
66 Packets are usually handled through a send_packet_buffered or
67 recv_packet_buffered object. Each packet is referenced by one
68 send_packet and one recv_packet, and these wrappers enforce that only
69 one end can send and only one end can receive. The structs also
70 include a destructor that marks packets are terminated if the sender
71 or receiver destroys the object before sending or receiving a value.
73 The *_packet_buffered structs take two type parameters. The first is
74 the message type for the current packet (or state). The second
75 represents the type of the whole buffer. For bounded protocols, the
76 protocol compiler generates a struct with a field for each protocol
77 state. This generated struct is used as the buffer type parameter. For
78 unbounded protocols, the buffer is simply one packet, so there is a
79 shorthand struct called send_packet and recv_packet, where the buffer
80 type is just `packet<T>`. Using the same underlying structure for both
81 bounded and unbounded protocols allows for less code duplication.
85 #[allow(missing_doc)];
87 use container::Container;
88 use cast::{forget, transmute, transmute_copy, transmute_mut};
89 use either::{Either, Left, Right};
90 use iterator::IteratorUtil;
94 use option::{None, Option, Some};
95 use unstable::finally::Finally;
96 use unstable::intrinsics;
100 use vec::{OwnedVector, MutableVector};
103 static SPIN_COUNT: uint = 0;
113 pub struct BufferHeader {
114 // Tracks whether this buffer needs to be freed. We can probably
115 // get away with restricting it to 0 or 1, if we're careful.
118 // We may want a drop, and to be careful about stringing this
122 pub fn BufferHeader() -> BufferHeader {
128 // This is for protocols to associate extra data to thread around.
129 pub struct Buffer<T> {
130 header: BufferHeader,
134 pub struct PacketHeader {
136 blocked_task: *rust_task,
138 // This is a transmute_copy of a ~buffer, that can also be cast
139 // to a buffer_header if need be.
140 buffer: *libc::c_void,
143 pub fn PacketHeader() -> PacketHeader {
146 blocked_task: ptr::null(),
152 // Returns the old state.
153 pub unsafe fn mark_blocked(&mut self, this: *rust_task) -> State {
154 rustrt::rust_task_ref(this);
155 let old_task = swap_task(&mut self.blocked_task, this);
156 assert!(old_task.is_null());
157 swap_state_acq(&mut self.state, Blocked)
160 pub unsafe fn unblock(&mut self) {
161 let old_task = swap_task(&mut self.blocked_task, ptr::null());
162 if !old_task.is_null() {
163 rustrt::rust_task_deref(old_task)
165 match swap_state_acq(&mut self.state, Empty) {
166 Empty | Blocked => (),
167 Terminated => self.state = Terminated,
168 Full => self.state = Full
172 // unsafe because this can do weird things to the space/time
173 // continuum. It ends making multiple unique pointers to the same
174 // thing. You'll probably want to forget them when you're done.
175 pub unsafe fn buf_header(&mut self) -> ~BufferHeader {
176 assert!(self.buffer.is_not_null());
177 transmute_copy(&self.buffer)
180 pub fn set_buffer<T:Owned>(&mut self, b: ~Buffer<T>) {
182 self.buffer = transmute_copy(&b);
187 pub struct Packet<T> {
188 header: PacketHeader,
192 pub trait HasBuffer {
193 fn set_buffer(&mut self, b: *libc::c_void);
196 impl<T:Owned> HasBuffer for Packet<T> {
197 fn set_buffer(&mut self, b: *libc::c_void) {
198 self.header.buffer = b;
202 pub fn mk_packet<T:Owned>() -> Packet<T> {
204 header: PacketHeader(),
208 fn unibuffer<T>() -> ~Buffer<Packet<T>> {
209 let mut b = ~Buffer {
210 header: BufferHeader(),
212 header: PacketHeader(),
218 b.data.header.buffer = transmute_copy(&b);
223 pub fn packet<T>() -> *mut Packet<T> {
224 let mut b = unibuffer();
225 let p = ptr::to_mut_unsafe_ptr(&mut b.data);
226 // We'll take over memory management from here.
233 pub fn entangle_buffer<T:Owned,Tstart:Owned>(
234 mut buffer: ~Buffer<T>,
235 init: &fn(*libc::c_void, x: &mut T) -> *mut Packet<Tstart>)
236 -> (RecvPacketBuffered<Tstart, T>, SendPacketBuffered<Tstart, T>) {
238 let p = init(transmute_copy(&buffer), &mut buffer.data);
240 (RecvPacketBuffered(p), SendPacketBuffered(p))
244 pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
245 // It might be worth making both acquire and release versions of
248 transmute(intrinsics::atomic_xchg(transmute(dst), src as int))
252 #[allow(non_camel_case_types)]
253 pub type rust_task = libc::c_void;
257 use super::rust_task;
261 unsafe fn rust_get_task() -> *rust_task;
263 unsafe fn rust_task_ref(task: *rust_task);
264 unsafe fn rust_task_deref(task: *rust_task);
267 unsafe fn task_clear_event_reject(task: *rust_task);
269 unsafe fn task_wait_event(this: *rust_task,
270 killed: &mut *libc::c_void)
272 unsafe fn task_signal_event(target: *rust_task, event: *libc::c_void);
276 fn wait_event(this: *rust_task) -> *libc::c_void {
278 let mut event = ptr::null();
280 let killed = rustrt::task_wait_event(this, &mut event);
281 if killed && !task::failing() {
288 fn swap_state_acq(dst: &mut State, src: State) -> State {
290 transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int))
294 fn swap_state_rel(dst: &mut State, src: State) -> State {
296 transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int))
300 pub unsafe fn get_buffer<T>(p: *mut PacketHeader) -> ~Buffer<T> {
301 transmute((*p).buf_header())
304 // This could probably be done with SharedMutableState to avoid move_it!().
305 struct BufferResource<T> {
311 impl<T> Drop for BufferResource<T> {
314 // FIXME(#4330) Need self by value to get mutability.
315 let this: &mut BufferResource<T> = transmute_mut(self);
317 let null_buffer: ~Buffer<T> = transmute(ptr::null::<Buffer<T>>());
318 let mut b = replace(&mut this.buffer, null_buffer);
320 //let p = ptr::to_unsafe_ptr(*b);
321 //error!("drop %?", p);
322 let old_count = intrinsics::atomic_xsub_rel(
323 &mut b.header.ref_count,
325 //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
327 // The new count is 0.
329 // go go gadget drop glue
338 fn BufferResource<T>(mut b: ~Buffer<T>) -> BufferResource<T> {
339 //let p = ptr::to_unsafe_ptr(*b);
340 //error!("take %?", p);
342 intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1);
351 pub fn send<T,Tbuffer>(mut p: SendPacketBuffered<T,Tbuffer>,
354 let header = p.header();
356 let p = unsafe { &mut *p_ };
357 assert_eq!(ptr::to_unsafe_ptr(&(p.header)), header);
358 assert!(p.payload.is_none());
359 p.payload = Some(payload);
360 let old_state = swap_state_rel(&mut p.header.state, Full);
365 // The receiver will eventually clean this up.
366 //unsafe { forget(p); }
369 Full => fail!("duplicate send"),
371 debug!("waking up task for %?", p_);
372 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
373 if !old_task.is_null() {
375 rustrt::task_signal_event(
377 ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
378 rustrt::rust_task_deref(old_task);
382 // The receiver will eventually clean this up.
383 //unsafe { forget(p); }
387 // The receiver will never receive this. Rely on drop_glue
388 // to clean everything up.
394 /** Receives a message from a pipe.
396 Fails if the sender closes the connection.
399 pub fn recv<T:Owned,Tbuffer:Owned>(
400 p: RecvPacketBuffered<T, Tbuffer>) -> T {
401 try_recv(p).expect("connection closed")
404 /** Attempts to receive a message from a pipe.
406 Returns `None` if the sender has closed the connection without sending
407 a message, or `Some(T)` if a message was received.
410 pub fn try_recv<T:Owned,Tbuffer:Owned>(mut p: RecvPacketBuffered<T, Tbuffer>)
413 let p = unsafe { &mut *p_ };
420 p.header.state = Terminated;
421 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
422 if !old_task.is_null() {
423 rustrt::rust_task_deref(old_task);
430 fn try_recv_<T:Owned>(p: &mut Packet<T>) -> Option<T> {
432 match p.header.state {
434 let payload = replace(&mut p.payload, None);
435 p.header.state = Empty;
436 return Some(payload.unwrap())
438 Terminated => return None,
443 let this = unsafe { rustrt::rust_get_task() };
445 rustrt::task_clear_event_reject(this);
446 rustrt::rust_task_ref(this);
448 debug!("blocked = %x this = %x", p.header.blocked_task as uint,
450 let old_task = swap_task(&mut p.header.blocked_task, this);
451 debug!("blocked = %x this = %x old_task = %x",
452 p.header.blocked_task as uint,
453 this as uint, old_task as uint);
454 assert!(old_task.is_null());
455 let mut first = true;
456 let mut count = SPIN_COUNT;
459 rustrt::task_clear_event_reject(this);
462 let old_state = swap_state_acq(&mut p.header.state,
466 debug!("no data available on %?, going to sleep.", p);
472 // FIXME (#524): Putting the yield here destroys a lot
473 // of the benefit of spinning, since we still go into
474 // the scheduler at every iteration. However, without
475 // this everything spins too much because we end up
476 // sometimes blocking the thing we are waiting on.
479 debug!("woke up, p.state = %?", copy p.header.state);
481 Blocked => if first {
482 fail!("blocking on already blocked packet")
485 let payload = replace(&mut p.payload, None);
486 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
487 if !old_task.is_null() {
489 rustrt::rust_task_deref(old_task);
492 p.header.state = Empty;
493 return Some(payload.unwrap())
496 // This assert detects when we've accidentally unsafely
497 // casted too big of a number to a state.
498 assert_eq!(old_state, Terminated);
500 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
501 if !old_task.is_null() {
503 rustrt::rust_task_deref(old_task);
513 /// Returns true if messages are available.
514 pub fn peek<T:Owned,Tb:Owned>(p: &mut RecvPacketBuffered<T, Tb>) -> bool {
516 match (*p.header()).state {
517 Empty | Terminated => false,
518 Blocked => fail!("peeking on blocked packet"),
524 fn sender_terminate<T:Owned>(p: *mut Packet<T>) {
528 match swap_state_rel(&mut p.header.state, Terminated) {
530 // The receiver will eventually clean up.
533 // wake up the target
534 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
535 if !old_task.is_null() {
537 rustrt::task_signal_event(
539 ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
540 rustrt::rust_task_deref(old_task);
543 // The receiver will eventually clean up.
546 // This is impossible
547 fail!("you dun goofed")
550 assert!(p.header.blocked_task.is_null());
551 // I have to clean up, use drop_glue
556 fn receiver_terminate<T:Owned>(p: *mut Packet<T>) {
560 match swap_state_rel(&mut p.header.state, Terminated) {
562 assert!(p.header.blocked_task.is_null());
563 // the sender will clean up
566 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
567 if !old_task.is_null() {
569 rustrt::rust_task_deref(old_task);
570 assert_eq!(old_task, rustrt::rust_get_task());
574 Terminated | Full => {
575 assert!(p.header.blocked_task.is_null());
576 // I have to clean up, use drop_glue
581 /** Returns when one of the packet headers reports data is available.
583 This function is primarily intended for building higher level waiting
584 functions, such as `select`, `select2`, etc.
586 It takes a vector slice of packet_headers and returns an index into
587 that vector. The index points to an endpoint that has either been
588 closed by the sender or has a message waiting to be received.
591 pub fn wait_many<T: Selectable>(pkts: &mut [T]) -> uint {
593 rustrt::rust_get_task()
597 rustrt::task_clear_event_reject(this);
600 let mut data_avail = false;
601 let mut ready_packet = pkts.len();
602 for pkts.mut_iter().enumerate().advance |(i, p)| {
604 let p = &mut *p.header();
605 let old = p.mark_blocked(this);
607 Full | Terminated => {
613 Blocked => fail!("blocking on blocked packet"),
620 debug!("sleeping on %? packets", pkts.len());
621 let event = wait_event(this) as *PacketHeader;
624 for pkts.mut_iter().enumerate().advance |(i, p)| {
625 if p.header() == event {
636 None => debug!("ignoring spurious event, %?", event)
640 debug!("%?", &mut pkts[ready_packet]);
642 for pkts.mut_iter().advance |p| {
644 (*p.header()).unblock()
648 debug!("%?, %?", ready_packet, &mut pkts[ready_packet]);
651 assert!((*pkts[ready_packet].header()).state == Full
652 || (*pkts[ready_packet].header()).state == Terminated);
658 /** The sending end of a pipe. It can be used to send exactly one
662 pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
664 pub fn SendPacket<T>(p: *mut Packet<T>) -> SendPacket<T> {
665 SendPacketBuffered(p)
668 pub struct SendPacketBuffered<T, Tbuffer> {
669 p: Option<*mut Packet<T>>,
670 buffer: Option<BufferResource<Tbuffer>>,
674 impl<T:Owned,Tbuffer:Owned> Drop for SendPacketBuffered<T,Tbuffer> {
677 let this: &mut SendPacketBuffered<T,Tbuffer> = transmute(self);
679 let p = replace(&mut this.p, None);
680 sender_terminate(p.unwrap())
686 pub fn SendPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
687 -> SendPacketBuffered<T,Tbuffer> {
691 Some(BufferResource(get_buffer(&mut (*p).header)))
696 impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
697 pub fn unwrap(&mut self) -> *mut Packet<T> {
698 replace(&mut self.p, None).unwrap()
701 pub fn header(&mut self) -> *mut PacketHeader {
703 Some(packet) => unsafe {
704 let packet = &mut *packet;
705 let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
708 None => fail!("packet already consumed")
712 pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
713 //error!("send reuse_buffer");
714 replace(&mut self.buffer, None).unwrap()
718 /// Represents the receive end of a pipe. It can receive exactly one
720 pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
722 pub fn RecvPacket<T>(p: *mut Packet<T>) -> RecvPacket<T> {
723 RecvPacketBuffered(p)
726 pub struct RecvPacketBuffered<T, Tbuffer> {
727 p: Option<*mut Packet<T>>,
728 buffer: Option<BufferResource<Tbuffer>>,
732 impl<T:Owned,Tbuffer:Owned> Drop for RecvPacketBuffered<T,Tbuffer> {
735 let this: &mut RecvPacketBuffered<T,Tbuffer> = transmute(self);
737 let p = replace(&mut this.p, None);
738 receiver_terminate(p.unwrap())
744 impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
745 pub fn unwrap(&mut self) -> *mut Packet<T> {
746 replace(&mut self.p, None).unwrap()
749 pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
750 replace(&mut self.buffer, None).unwrap()
754 impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
755 fn header(&mut self) -> *mut PacketHeader {
757 Some(packet) => unsafe {
758 let packet = &mut *packet;
759 let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
762 None => fail!("packet already consumed")
767 pub fn RecvPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
768 -> RecvPacketBuffered<T,Tbuffer> {
772 Some(BufferResource(get_buffer(&mut (*p).header)))
777 pub fn entangle<T>() -> (RecvPacket<T>, SendPacket<T>) {
779 (RecvPacket(p), SendPacket(p))
782 /** Receives a message from one of two endpoints.
784 The return value is `left` if the first endpoint received something,
785 or `right` if the second endpoint receives something. In each case,
786 the result includes the other endpoint as well so it can be used
787 again. Below is an example of using `select2`.
790 match select2(a, b) {
792 // endpoint a was closed.
795 // endpoint b was closed.
798 // endpoint a received a message
801 // endpoint b received a message.
806 Sometimes messages will be available on both endpoints at once. In
807 this case, `select2` may return either `left` or `right`.
810 pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
811 mut a: RecvPacketBuffered<A, Ab>,
812 mut b: RecvPacketBuffered<B, Bb>)
813 -> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
814 (RecvPacketBuffered<A, Ab>, Option<B>)> {
815 let mut endpoints = [ a.header(), b.header() ];
816 let i = wait_many(endpoints);
818 0 => Left((try_recv(a), b)),
819 1 => Right((a, try_recv(b))),
820 _ => fail!("select2 return an invalid packet")
824 pub trait Selectable {
825 fn header(&mut self) -> *mut PacketHeader;
828 impl Selectable for *mut PacketHeader {
829 fn header(&mut self) -> *mut PacketHeader { *self }
832 /// Returns the index of an endpoint that is ready to receive.
833 pub fn selecti<T:Selectable>(endpoints: &mut [T]) -> uint {
837 /// Returns 0 or 1 depending on which endpoint is ready to receive
838 pub fn select2i<A:Selectable,B:Selectable>(a: &mut A, b: &mut B)
840 let mut endpoints = [ a.header(), b.header() ];
841 match wait_many(endpoints) {
844 _ => fail!("wait returned unexpected index")
848 /// Waits on a set of endpoints. Returns a message, its index, and a
849 /// list of the remaining endpoints.
850 pub fn select<T:Owned,Tb:Owned>(mut endpoints: ~[RecvPacketBuffered<T, Tb>])
853 ~[RecvPacketBuffered<T, Tb>]) {
854 let mut endpoint_headers = ~[];
855 for endpoints.mut_iter().advance |endpoint| {
856 endpoint_headers.push(endpoint.header());
859 let ready = wait_many(endpoint_headers);
860 let mut remaining = endpoints;
861 let port = remaining.swap_remove(ready);
862 let result = try_recv(port);
863 (ready, result, remaining)
867 use option::{None, Option, Some};
869 // These are used to hide the option constructors from the
870 // compiler because their names are changing
871 pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
872 pub fn make_none<T>() -> Option<T> { None }
878 use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
879 GenericChan, Peekable};
883 let (p1, c1) = stream();
884 let (p2, c2) = stream();
888 let mut tuple = (p1, p2);
889 match tuple.select() {
899 let (p, c) = oneshot();
907 fn test_peek_terminated() {
908 let (port, chan): (Port<int>, Chan<int>) = stream();
911 // Destroy the channel
915 assert!(!port.peek());