import arc::methods;
// Things used by code generated by the pipe compiler.
-export entangle;
+export entangle, get_buffer, drop_buffer;
// User-level things
export send_packet, recv_packet, send, recv, try_recv, peek;
// places. Once there is unary move, it can be removed.
fn move<T>(-x: T) -> T { x }
+/**
+
+Some thoughts about fixed buffers.
+
+The idea is if a protocol is bounded, we will synthesize a record that
+has a field for each state. Each of these states contains a packet for
+the messages that are legal to be sent in that state. Then, instead of
+allocating, the send code just finds a pointer to the right field and
+uses that instead.
+
+Unforunately, this makes things kind of tricky. We need to be able to
+find the buffer, which means we need to pass it around. This could
+either be associated with the (send|recv)_packet classes, or with the
+packet itself. We will also need some form of reference counting so we
+can track who has the responsibility of freeing the buffer.
+
+We want to preserve the ability to do things like optimistic buffer
+re-use, and skipping over to a new buffer when necessary. What I mean
+is, suppose we had the typical stream protocol. It'd make sense to
+amortize allocation costs by allocating a buffer with say 16
+messages. When the sender gets to the end of the buffer, it could
+check if the receiver is done with the packet in slot 0. If so, it can
+just reuse that one, checking if the receiver is done with the next
+one in each case. If it is ever not done, it just allocates a new
+buffer and skips over to that.
+
+Also, since protocols are in libcore, we have to do this in a way that
+maintains backwards compatibility.
+
+buffer header and buffer. Cast as c_void when necessary.
+
+===
+
+Okay, here are some new ideas.
+
+It'd be nice to keep the bounded/unbounded case as uniform as
+possible. It leads to less code duplication, and less things that can
+go sublty wrong. For the bounded case, we could either have a struct
+with a bunch of unique pointers to pre-allocated packets, or we could
+lay them out inline. Inline layout is better, if for no other reason
+than that we don't have to allocate each packet
+individually. Currently we pass unique packets around as unsafe
+pointers, but they are actually unique pointers. We should instead use
+real unsafe pointers. This makes freeing data and running destructors
+trickier though. Thus, we should allocate all packets in parter of a
+higher level buffer structure. Packets can maintain a pointer to their
+buffer, and this is the part that gets freed.
+
+It might be helpful to have some idea of a semi-unique pointer (like
+being partially pregnant, also like an ARC).
+
+*/
+
enum state {
empty,
full,
terminated
}
-type packet_header_ = {
- mut state: state,
- mut blocked_task: option<*rust_task>,
+class buffer_header {
+ // Tracks whether this buffer needs to be freed. We can probably
+ // get away with restricting it to 0 or 1, if we're careful.
+ let mut ref_count: int;
+
+ new() { self.ref_count = 1; }
+
+ // We may want a drop, and to be careful about stringing this
+ // thing along.
+}
+
+// This is for protocols to associate extra data to thread around.
+type buffer<T: send> = {
+ header: buffer_header,
+ data: T,
};
-enum packet_header {
- packet_header_(packet_header_)
+class packet_header {
+ let mut state: state;
+ let mut blocked_task: option<*rust_task>;
+
+ // This is a reinterpret_cast of a ~buffer, that can also be cast
+ // to a buffer_header if need be.
+ let mut buffer: *libc::c_void;
+
+ new() {
+ self.state = empty;
+ self.blocked_task = none;
+ self.buffer = ptr::null();
+ }
+
+ // Returns the old state.
+ unsafe fn mark_blocked(this: *rust_task) -> state {
+ self.blocked_task = some(this);
+ swap_state_acq(self.state, blocked)
+ }
+
+ unsafe fn unblock() {
+ alt swap_state_acq(self.state, empty) {
+ empty | blocked { }
+ terminated { self.state = terminated; }
+ full { self.state = full; }
+ }
+ }
+
+ // unsafe because this can do weird things to the space/time
+ // continuum. It ends making multiple unique pointers to the same
+ // thing. You'll proobably want to forget them when you're done.
+ unsafe fn buf_header() -> ~buffer_header {
+ assert self.buffer.is_not_null();
+ reinterpret_cast(self.buffer)
+ }
}
-type packet_<T:send> = {
+type packet<T: send> = {
header: packet_header,
- mut payload: option<T>
+ mut payload: option<T>,
};
-enum packet<T:send> {
- packet_(packet_<T>)
+fn unibuffer<T: send>() -> ~buffer<packet<T>> {
+ let b = ~{
+ header: buffer_header(),
+ data: {
+ header: packet_header(),
+ mut payload: none,
+ }
+ };
+
+ unsafe {
+ b.data.header.buffer = reinterpret_cast(b);
+ }
+
+ b
}
-fn packet<T: send>() -> *packet<T> unsafe {
- let p: *packet<T> = unsafe::transmute(~{
- header: {
- mut state: empty,
- mut blocked_task: none::<task::task>,
- },
- mut payload: none::<T>
- });
+fn packet<T: send>() -> *packet<T> {
+ let b = unibuffer();
+ let p = ptr::addr_of(b.data);
+ // We'll take over memory management from here.
+ unsafe { forget(b) }
p
}
fn atomic_xchng_rel(&dst: int, src: int) -> int;
}
+fn atomic_xchng_rel(&dst: int, src: int) -> int {
+ rusti::atomic_xchng_rel(dst, src)
+}
+
type rust_task = libc::c_void;
extern mod rustrt {
fn task_clear_event_reject(task: *rust_task);
fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool;
- fn task_signal_event(target: *rust_task, event: *libc::c_void);
-}
-
-// We should consider moving this to core::unsafe, although I
-// suspect graydon would want us to use void pointers instead.
-unsafe fn uniquify<T>(x: *T) -> ~T {
- unsafe { unsafe::reinterpret_cast(x) }
+ pure fn task_signal_event(target: *rust_task, event: *libc::c_void);
}
fn wait_event(this: *rust_task) -> *libc::c_void {
}
}
+unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
+ transmute((*p).buf_header())
+}
+
+class buffer_resource<T: send> {
+ let buffer: ~buffer<T>;
+ new(+b: ~buffer<T>) {
+ self.buffer = b;
+ }
+
+ drop unsafe {
+ let b = move!{self.buffer};
+ let old_count = atomic_xchng_rel(b.header.ref_count, 0);
+ if old_count == 0 {
+ // go go gadget drop glue
+ }
+ else {
+ forget(b)
+ }
+ }
+}
+
fn send<T: send>(-p: send_packet<T>, -payload: T) {
+ let header = p.header();
let p_ = p.unwrap();
- let p = unsafe { uniquify(p_) };
- assert (*p).payload == none;
- (*p).payload <- some(payload);
+ let p = unsafe { &*p_ };
+ assert ptr::addr_of(p.header) == header;
+ assert p.payload == none;
+ p.payload <- some(payload);
let old_state = swap_state_rel(p.header.state, full);
alt old_state {
empty {
// Yay, fastpath.
// The receiver will eventually clean this up.
- unsafe { forget(p); }
+ //unsafe { forget(p); }
}
full { fail ~"duplicate send" }
blocked {
}
// The receiver will eventually clean this up.
- unsafe { forget(p); }
+ //unsafe { forget(p); }
}
terminated {
// The receiver will never receive this. Rely on drop_glue
fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
let p_ = p.unwrap();
- let p = unsafe { uniquify(p_) };
+ let p = unsafe { &*p_ };
let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this);
p.header.blocked_task = some(this);
empty {
#debug("no data available on %?, going to sleep.", p_);
wait_event(this);
- #debug("woke up, p.state = %?", p.header.state);
+ #debug("woke up, p.state = %?", copy p.header.state);
}
blocked {
if first {
}
full {
let mut payload = none;
- payload <-> (*p).payload;
+ payload <-> p.payload;
p.header.state = terminated;
ret some(option::unwrap(payload))
}
}
fn sender_terminate<T: send>(p: *packet<T>) {
- let p = unsafe { uniquify(p) };
+ let p = unsafe { &*p };
alt swap_state_rel(p.header.state, terminated) {
empty {
// The receiver will eventually clean up.
- unsafe { forget(p) }
+ //unsafe { forget(p) }
}
blocked {
// wake up the target
ptr::addr_of(p.header) as *libc::c_void);
// The receiver will eventually clean up.
- unsafe { forget(p) }
+ //unsafe { forget(p) }
}
full {
// This is impossible
}
fn receiver_terminate<T: send>(p: *packet<T>) {
- let p = unsafe { uniquify(p) };
+ let p = unsafe { &*p };
alt swap_state_rel(p.header.state, terminated) {
empty {
// the sender will clean up
- unsafe { forget(p) }
+ //unsafe { forget(p) }
}
blocked {
// this shouldn't happen.
}
}
-impl private_methods for *packet_header {
- // Returns the old state.
- unsafe fn mark_blocked(this: *rust_task) -> state {
- let self = &*self;
- self.blocked_task = some(this);
- swap_state_acq(self.state, blocked)
- }
-
- unsafe fn unblock() {
- let self = &*self;
- alt swap_state_acq(self.state, empty) {
- empty | blocked { }
- terminated { self.state = terminated; }
- full { self.state = full; }
- }
- }
-}
-
#[doc = "Returns when one of the packet headers reports data is
available."]
fn wait_many(pkts: &[*packet_header]) -> uint {
let mut data_avail = false;
let mut ready_packet = pkts.len();
for pkts.eachi |i, p| unsafe {
+ let p = unsafe { &*p };
let old = p.mark_blocked(this);
alt old {
full | terminated {
#debug("%?", pkts[ready_packet]);
- for pkts.each |p| { unsafe{p.unblock()} }
+ for pkts.each |p| { unsafe{ (*p).unblock()} }
#debug("%?, %?", ready_packet, pkts[ready_packet]);
(ready, result, remaining)
}
-class send_packet<T: send> {
+type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
+
+fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
+ send_packet_buffered(p)
+}
+
+class send_packet_buffered<T: send, Tbuffer: send> {
let mut p: option<*packet<T>>;
+ let mut buffer: option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) {
//#debug("take send %?", p);
self.p = some(p);
+ unsafe {
+ self.buffer = some(
+ buffer_resource(
+ get_buffer(ptr::addr_of((*p).header))));
+ };
}
drop {
//if self.p != none {
p <-> self.p;
option::unwrap(p)
}
+
+ pure fn header() -> *packet_header {
+ alt self.p {
+ some(packet) {
+ unsafe {
+ let packet = &*packet;
+ let header = ptr::addr_of(packet.header);
+ //forget(packet);
+ header
+ }
+ }
+ none { fail ~"packet already consumed" }
+ }
+ }
}
-class recv_packet<T: send> {
+type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
+
+fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
+ recv_packet_buffered(p)
+}
+
+class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
let mut p: option<*packet<T>>;
+ let mut buffer: option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) {
//#debug("take recv %?", p);
self.p = some(p);
+ unsafe {
+ self.buffer = some(
+ buffer_resource(
+ get_buffer(ptr::addr_of((*p).header))));
+ };
}
drop {
//if self.p != none {
alt self.p {
some(packet) {
unsafe {
- let packet = uniquify(packet);
+ let packet = &*packet;
let header = ptr::addr_of(packet.header);
- forget(packet);
+ //forget(packet);
header
}
}