]> git.lizzy.rs Git - rust.git/commitdiff
Refactoring pipes to allow implementing bounded protocols.
authorEric Holk <eric.holk@gmail.com>
Sat, 21 Jul 2012 02:06:32 +0000 (19:06 -0700)
committerEric Holk <eric.holk@gmail.com>
Wed, 25 Jul 2012 19:12:25 +0000 (12:12 -0700)
src/libcore/pipes.rs
src/libsyntax/ext/pipes/pipec.rs

index 2e463df9e90017aede723a1fa47ccca285aed93f..e020d5c3e9f4d7861d05dea1dd06c09c225238ce 100644 (file)
@@ -6,7 +6,7 @@
 import arc::methods;
 
 // Things used by code generated by the pipe compiler.
-export entangle;
+export entangle, get_buffer, drop_buffer;
 
 // User-level things
 export send_packet, recv_packet, send, recv, try_recv, peek;
@@ -22,6 +22,59 @@ macro_rules! move {
 // places. Once there is unary move, it can be removed.
 fn move<T>(-x: T) -> T { x }
 
+/**
+
+Some thoughts about fixed buffers.
+
+The idea is if a protocol is bounded, we will synthesize a record that
+has a field for each state. Each of these states contains a packet for
+the messages that are legal to be sent in that state. Then, instead of
+allocating, the send code just finds a pointer to the right field and
+uses that instead.
+
+Unforunately, this makes things kind of tricky. We need to be able to
+find the buffer, which means we need to pass it around. This could
+either be associated with the (send|recv)_packet classes, or with the
+packet itself. We will also need some form of reference counting so we
+can track who has the responsibility of freeing the buffer.
+
+We want to preserve the ability to do things like optimistic buffer
+re-use, and skipping over to a new buffer when necessary. What I mean
+is, suppose we had the typical stream protocol. It'd make sense to
+amortize allocation costs by allocating a buffer with say 16
+messages. When the sender gets to the end of the buffer, it could
+check if the receiver is done with the packet in slot 0. If so, it can
+just reuse that one, checking if the receiver is done with the next
+one in each case. If it is ever not done, it just allocates a new
+buffer and skips over to that.
+
+Also, since protocols are in libcore, we have to do this in a way that
+maintains backwards compatibility.
+
+buffer header and buffer. Cast as c_void when necessary.
+
+===
+
+Okay, here are some new ideas.
+
+It'd be nice to keep the bounded/unbounded case as uniform as
+possible. It leads to less code duplication, and less things that can
+go sublty wrong. For the bounded case, we could either have a struct
+with a bunch of unique pointers to pre-allocated packets, or we could
+lay them out inline. Inline layout is better, if for no other reason
+than that we don't have to allocate each packet
+individually. Currently we pass unique packets around as unsafe
+pointers, but they are actually unique pointers. We should instead use
+real unsafe pointers. This makes freeing data and running destructors
+trickier though. Thus, we should allocate all packets in parter of a
+higher level buffer structure. Packets can maintain a pointer to their
+buffer, and this is the part that gets freed.
+
+It might be helpful to have some idea of a semi-unique pointer (like
+being partially pregnant, also like an ARC). 
+
+*/
+
 enum state {
     empty,
     full,
@@ -29,32 +82,86 @@ enum state {
     terminated
 }
 
-type packet_header_ = {
-    mut state: state,
-    mut blocked_task: option<*rust_task>,
+class buffer_header {
+    // Tracks whether this buffer needs to be freed. We can probably
+    // get away with restricting it to 0 or 1, if we're careful.
+    let mut ref_count: int;
+
+    new() { self.ref_count = 1; }
+
+    // We may want a drop, and to be careful about stringing this
+    // thing along.
+}
+
+// This is for protocols to associate extra data to thread around.
+type buffer<T: send> = {
+    header: buffer_header,
+    data: T,
 };
 
-enum packet_header {
-    packet_header_(packet_header_)
+class packet_header {
+    let mut state: state;
+    let mut blocked_task: option<*rust_task>;
+
+    // This is a reinterpret_cast of a ~buffer, that can also be cast
+    // to a buffer_header if need be.
+    let mut buffer: *libc::c_void;
+
+    new() {
+        self.state = empty;
+        self.blocked_task = none;
+        self.buffer = ptr::null();
+    }
+
+    // Returns the old state.
+    unsafe fn mark_blocked(this: *rust_task) -> state {
+        self.blocked_task = some(this);
+        swap_state_acq(self.state, blocked)
+    }
+
+    unsafe fn unblock() {
+        alt swap_state_acq(self.state, empty) {
+          empty | blocked { }
+          terminated { self.state = terminated; }
+          full { self.state = full; }
+        }
+    }
+
+    // unsafe because this can do weird things to the space/time
+    // continuum. It ends making multiple unique pointers to the same
+    // thing. You'll proobably want to forget them when you're done.
+    unsafe fn buf_header() -> ~buffer_header {
+        assert self.buffer.is_not_null();
+        reinterpret_cast(self.buffer)
+    }
 }
 
-type packet_<T:send> = {
+type packet<T: send> = {
     header: packet_header,
-    mut payload: option<T>
+    mut payload: option<T>,
 };
 
-enum packet<T:send> {
-    packet_(packet_<T>)
+fn unibuffer<T: send>() -> ~buffer<packet<T>> {
+    let b = ~{
+        header: buffer_header(),
+        data: {
+            header: packet_header(),
+            mut payload: none,
+        }
+    };
+
+    unsafe {
+        b.data.header.buffer = reinterpret_cast(b);
+    }
+
+    b
 }
 
-fn packet<T: send>() -> *packet<T> unsafe {
-    let p: *packet<T> = unsafe::transmute(~{
-        header: {
-            mut state: empty,
-            mut blocked_task: none::<task::task>,
-        },
-        mut payload: none::<T>
-    });
+fn packet<T: send>() -> *packet<T> {
+    let b = unibuffer();
+    let p = ptr::addr_of(b.data);
+    // We'll take over memory management from here.
+    unsafe { forget(b) }
     p
 }
 
@@ -65,6 +172,10 @@ fn packet<T: send>() -> *packet<T> unsafe {
     fn atomic_xchng_rel(&dst: int, src: int) -> int;
 }
 
+fn atomic_xchng_rel(&dst: int, src: int) -> int {
+    rusti::atomic_xchng_rel(dst, src)
+}
+
 type rust_task = libc::c_void;
 
 extern mod rustrt {
@@ -75,13 +186,7 @@ fn packet<T: send>() -> *packet<T> unsafe {
     fn task_clear_event_reject(task: *rust_task);
 
     fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool;
-    fn task_signal_event(target: *rust_task, event: *libc::c_void);
-}
-
-// We should consider moving this to core::unsafe, although I
-// suspect graydon would want us to use void pointers instead.
-unsafe fn uniquify<T>(x: *T) -> ~T {
-    unsafe { unsafe::reinterpret_cast(x) }
+    pure fn task_signal_event(target: *rust_task, event: *libc::c_void);
 }
 
 fn wait_event(this: *rust_task) -> *libc::c_void {
@@ -110,18 +215,42 @@ fn swap_state_rel(&dst: state, src: state) -> state {
     }
 }
 
+unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
+    transmute((*p).buf_header())
+}
+
+class buffer_resource<T: send> {
+    let buffer: ~buffer<T>;
+    new(+b: ~buffer<T>) {
+        self.buffer = b;
+    }
+
+    drop unsafe {
+        let b = move!{self.buffer};
+        let old_count = atomic_xchng_rel(b.header.ref_count, 0);
+        if old_count == 0 {
+            // go go gadget drop glue
+        }
+        else {
+            forget(b)
+        }
+    }
+}
+
 fn send<T: send>(-p: send_packet<T>, -payload: T) {
+    let header = p.header();
     let p_ = p.unwrap();
-    let p = unsafe { uniquify(p_) };
-    assert (*p).payload == none;
-    (*p).payload <- some(payload);
+    let p = unsafe { &*p_ };
+    assert ptr::addr_of(p.header) == header;
+    assert p.payload == none;
+    p.payload <- some(payload);
     let old_state = swap_state_rel(p.header.state, full);
     alt old_state {
       empty {
         // Yay, fastpath.
 
         // The receiver will eventually clean this up.
-        unsafe { forget(p); }
+        //unsafe { forget(p); }
       }
       full { fail ~"duplicate send" }
       blocked {
@@ -135,7 +264,7 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
         }
 
         // The receiver will eventually clean this up.
-        unsafe { forget(p); }
+        //unsafe { forget(p); }
       }
       terminated {
         // The receiver will never receive this. Rely on drop_glue
@@ -150,7 +279,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> T {
 
 fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
     let p_ = p.unwrap();
-    let p = unsafe { uniquify(p_) };
+    let p = unsafe { &*p_ };
     let this = rustrt::rust_get_task();
     rustrt::task_clear_event_reject(this);
     p.header.blocked_task = some(this);
@@ -163,7 +292,7 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
           empty {
             #debug("no data available on %?, going to sleep.", p_);
             wait_event(this);
-            #debug("woke up, p.state = %?", p.header.state);
+            #debug("woke up, p.state = %?", copy p.header.state);
           }
           blocked {
             if first {
@@ -172,7 +301,7 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
           }
           full {
             let mut payload = none;
-            payload <-> (*p).payload;
+            payload <-> p.payload;
             p.header.state = terminated;
             ret some(option::unwrap(payload))
           }
@@ -195,11 +324,11 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
 }
 
 fn sender_terminate<T: send>(p: *packet<T>) {
-    let p = unsafe { uniquify(p) };
+    let p = unsafe { &*p };
     alt swap_state_rel(p.header.state, terminated) {
       empty {
         // The receiver will eventually clean up.
-        unsafe { forget(p) }
+        //unsafe { forget(p) }
       }
       blocked {
         // wake up the target
@@ -208,7 +337,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
                                   ptr::addr_of(p.header) as *libc::c_void);
 
         // The receiver will eventually clean up.
-        unsafe { forget(p) }
+        //unsafe { forget(p) }
       }
       full {
         // This is impossible
@@ -221,11 +350,11 @@ fn sender_terminate<T: send>(p: *packet<T>) {
 }
 
 fn receiver_terminate<T: send>(p: *packet<T>) {
-    let p = unsafe { uniquify(p) };
+    let p = unsafe { &*p };
     alt swap_state_rel(p.header.state, terminated) {
       empty {
         // the sender will clean up
-        unsafe { forget(p) }
+        //unsafe { forget(p) }
       }
       blocked {
         // this shouldn't happen.
@@ -237,24 +366,6 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
     }
 }
 
-impl private_methods for *packet_header {
-    // Returns the old state.
-    unsafe fn mark_blocked(this: *rust_task) -> state {
-        let self = &*self;
-        self.blocked_task = some(this);
-        swap_state_acq(self.state, blocked)
-    }
-
-    unsafe fn unblock() {
-        let self = &*self;
-        alt swap_state_acq(self.state, empty) {
-          empty | blocked { }
-          terminated { self.state = terminated; }
-          full { self.state = full; }
-        }
-    }
-}
-
 #[doc = "Returns when one of the packet headers reports data is
 available."]
 fn wait_many(pkts: &[*packet_header]) -> uint {
@@ -264,6 +375,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
     let mut data_avail = false;
     let mut ready_packet = pkts.len();
     for pkts.eachi |i, p| unsafe {
+        let p = unsafe { &*p };
         let old = p.mark_blocked(this);
         alt old {
           full | terminated {
@@ -295,7 +407,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
 
     #debug("%?", pkts[ready_packet]);
 
-    for pkts.each |p| { unsafe{p.unblock()} }
+    for pkts.each |p| { unsafe{ (*p).unblock()} }
 
     #debug("%?, %?", ready_packet, pkts[ready_packet]);
 
@@ -359,11 +471,23 @@ fn select<T: send>(+endpoints: ~[recv_packet<T>])
     (ready, result, remaining)
 }
 
-class send_packet<T: send> {
+type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
+
+fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
+    send_packet_buffered(p)
+}
+
+class send_packet_buffered<T: send, Tbuffer: send> {
     let mut p: option<*packet<T>>;
+    let mut buffer: option<buffer_resource<Tbuffer>>;
     new(p: *packet<T>) {
         //#debug("take send %?", p);
         self.p = some(p);
+        unsafe {
+            self.buffer = some(
+                buffer_resource(
+                    get_buffer(ptr::addr_of((*p).header))));
+        };
     }
     drop {
         //if self.p != none {
@@ -380,13 +504,39 @@ fn unwrap() -> *packet<T> {
         p <-> self.p;
         option::unwrap(p)
     }
+
+    pure fn header() -> *packet_header {
+        alt self.p {
+          some(packet) {
+            unsafe {
+                let packet = &*packet;
+                let header = ptr::addr_of(packet.header);
+                //forget(packet);
+                header
+            }
+          }
+          none { fail ~"packet already consumed" }
+        }
+    }
 }
 
-class recv_packet<T: send> {
+type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
+
+fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
+    recv_packet_buffered(p)
+}
+
+class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
     let mut p: option<*packet<T>>;
+    let mut buffer: option<buffer_resource<Tbuffer>>;
     new(p: *packet<T>) {
         //#debug("take recv %?", p);
         self.p = some(p);
+        unsafe {
+            self.buffer = some(
+                buffer_resource(
+                    get_buffer(ptr::addr_of((*p).header))));
+        };
     }
     drop {
         //if self.p != none {
@@ -408,9 +558,9 @@ fn unwrap() -> *packet<T> {
         alt self.p {
           some(packet) {
             unsafe {
-                let packet = uniquify(packet);
+                let packet = &*packet;
                 let header = ptr::addr_of(packet.header);
-                forget(packet);
+                //forget(packet);
                 header
             }
           }
index df5df748a74b2891e69e2c3ebbe4f2eeb111f6aa..70423f5437cd1f0a35ff6db464d60c2286e7b2f2 100644 (file)
@@ -51,10 +51,12 @@ fn gen_send(cx: ext_ctxt) -> @ast::item {
                 |n, t| cx.arg_mode(n, t, ast::by_copy)
             );
 
+            let pipe_ty = cx.ty_path_ast_builder(
+                path(this.data_name())
+                .add_tys(cx.ty_vars(this.ty_params)));
             let args_ast = vec::append(
                 ~[cx.arg_mode(@~"pipe",
-                              cx.ty_path_ast_builder(path(this.data_name())
-                                        .add_tys(cx.ty_vars(this.ty_params))),
+                              pipe_ty,
                               ast::by_copy)],
                 args_ast);
 
@@ -73,6 +75,7 @@ fn gen_send(cx: ext_ctxt) -> @ast::item {
                                       .map(|x| *x),
                                       ~", "));
             body += #fmt("pipes::send(pipe, message);\n");
+            // return the new channel
             body += ~"c }";
 
             let body = cx.parse_expr(body);