let buffer: ~buffer<T>;
new(+b: ~buffer<T>) {
let p = ptr::addr_of(*b);
- #error("take %?", p);
+ //#error("take %?", p);
atomic_add_acq(b.header.ref_count, 1);
self.buffer = b;
}
drop unsafe {
let b = move!{self.buffer};
let p = ptr::addr_of(*b);
- #error("drop %?", p);
+ //#error("drop %?", p);
let old_count = atomic_sub_rel(b.header.ref_count, 1);
//let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 1 {
full {
let mut payload = none;
payload <-> p.payload;
- p.header.state = terminated;
+ p.header.state = empty;
ret some(option::unwrap(payload))
}
terminated {
p <-> self.p;
sender_terminate(option::unwrap(p))
}
- unsafe { #error("send_drop: %?",
- if self.buffer == none {
- "none"
- } else { "some" }); }
+ //unsafe { #error("send_drop: %?",
+ // if self.buffer == none {
+ // "none"
+ // } else { "some" }); }
}
fn unwrap() -> *packet<T> {
let mut p = none;
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
- #error("send reuse_buffer");
+ //#error("send reuse_buffer");
let mut tmp = none;
tmp <-> self.buffer;
option::unwrap(tmp)
p <-> self.p;
receiver_terminate(option::unwrap(p))
}
- unsafe { #error("recv_drop: %?",
- if self.buffer == none {
- "none"
- } else { "some" }); }
+ //unsafe { #error("recv_drop: %?",
+ // if self.buffer == none {
+ // "none"
+ // } else { "some" }); }
}
fn unwrap() -> *packet<T> {
let mut p = none;
}
fn reuse_buffer() -> buffer_resource<Tbuffer> {
- #error("recv reuse_buffer");
+ //#error("recv reuse_buffer");
let mut tmp = none;
tmp <-> self.buffer;
option::unwrap(tmp)
--- /dev/null
+// Compare bounded and unbounded protocol performance.
+
+// xfail-test
+// xfail-pretty
+
+use std;
+
+import pipes::{spawn_service, recv};
+import std::time::precise_time_s;
+
+proto! pingpong {
+ ping: send {
+ ping -> pong
+ }
+
+ pong: recv {
+ pong -> ping
+ }
+}
+
+proto! pingpong_unbounded {
+ ping: send {
+ ping -> pong
+ }
+
+ pong: recv {
+ pong -> ping
+ }
+
+ you_will_never_catch_me: send {
+ never_ever_ever -> you_will_never_catch_me
+ }
+}
+
+// This stuff should go in libcore::pipes
+macro_rules! move {
+ { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
+}
+
+macro_rules! follow {
+ {
+ $($message:path($($x: ident),+) -> $next:ident $e:expr)+
+ } => (
+ |m| alt move(m) {
+ $(some($message($($x,)* next)) {
+ let $next = move!{next};
+ $e })+
+ _ { fail }
+ }
+ );
+
+ {
+ $($message:path -> $next:ident $e:expr)+
+ } => (
+ |m| alt move(m) {
+ $(some($message(next)) {
+ let $next = move!{next};
+ $e })+
+ _ { fail }
+ }
+ )
+}
+
+fn switch<T: send, Tb: send, U>(+endp: pipes::recv_packet_buffered<T, Tb>,
+ f: fn(+option<T>) -> U) -> U {
+ f(pipes::try_recv(endp))
+}
+
+fn move<T>(-x: T) -> T { x }
+
+// Here's the benchmark
+
+fn bounded(count: uint) {
+ import pingpong::*;
+
+ let mut ch = do spawn_service(init) |ch| {
+ let mut count = count;
+ let mut ch = ch;
+ while count > 0 {
+ ch = switch(ch, follow! {
+ ping -> next { server::pong(next) }
+ });
+
+ count -= 1;
+ }
+ };
+
+ let mut count = count;
+ while count > 0 {
+ let ch_ = client::ping(ch);
+
+ ch = switch(ch_, follow! {
+ pong -> next { next }
+ });
+
+ count -= 1;
+ }
+}
+
+fn unbounded(count: uint) {
+ import pingpong_unbounded::*;
+
+ let mut ch = do spawn_service(init) |ch| {
+ let mut count = count;
+ let mut ch = ch;
+ while count > 0 {
+ ch = switch(ch, follow! {
+ ping -> next { server::pong(next) }
+ });
+
+ count -= 1;
+ }
+ };
+
+ let mut count = count;
+ while count > 0 {
+ let ch_ = client::ping(ch);
+
+ ch = switch(ch_, follow! {
+ pong -> next { next }
+ });
+
+ count -= 1;
+ }
+}
+
+fn timeit(f: fn()) -> float {
+ let start = precise_time_s();
+ f();
+ let stop = precise_time_s();
+ stop - start
+}
+
+fn main() {
+ let count = 1000000;
+ let bounded = do timeit { bounded(count) };
+ let unbounded = do timeit { unbounded(count) };
+
+ io::println(#fmt("count: %?\n", count));
+ io::println(#fmt("bounded: %? s\t(%? μs/message)",
+ bounded, bounded * 1000000. / (count as float)));
+ io::println(#fmt("unbounded: %? s\t(%? μs/message)",
+ unbounded, unbounded * 1000000. / (count as float)));
+
+ io::println(#fmt("\n\
+ bounded is %?%% faster",
+ (unbounded - bounded) / bounded * 100.));
+}