}
unsafe fn unblock() {
+ assert self.state != blocked || self.blocked_task != none;
+ self.blocked_task = none;
alt swap_state_acq(self.state, empty) {
empty | blocked { }
terminated { self.state = terminated; }
rustrt::task_signal_event(
task, ptr::addr_of(p.header) as *libc::c_void);
}
- none { fail ~"blocked packet has no task" }
+ none { debug!{"just kidding!"} }
}
// The receiver will eventually clean this up.
fn try_recv() -> option<T> {
let mut result = none;
- while result == none && self.ports.len() > 0 {
- let i = wait_many(self.ports.map(|p| p.header()));
- // dereferencing an unsafe pointer nonsense to appease the
- // borrowchecker.
- alt move unsafe {(*ptr::addr_of(self.ports[i])).try_recv()} {
- some(m) {
- result = some(move_it!{m});
- }
- none {
- // Remove this port.
- let mut ports = ~[];
- self.ports <-> ports;
- vec::consume(ports,
- |j, x| if i != j { vec::push(self.ports, x) });
- }
+ // we have to swap the ports array so we aren't borrowing
+ // aliasable mutable memory.
+ let mut ports = ~[];
+ ports <-> self.ports;
+ while result == none && ports.len() > 0 {
+ let i = wait_many(ports.map(|p| p.header()));
+ alt move ports[i].try_recv() {
+ some(m) {
+ result = some(move m);
+ }
+ none {
+ // Remove this port.
+ let mut ports_ = ~[];
+ ports <-> ports_;
+ vec::consume(ports_,
+ |j, x| if i != j {
+ vec::push(ports, x)
+ });
+ }
}
}
+ ports <-> self.ports;
result
}
export append;
export append_one;
-export consume;
+export consume, consume_mut;
export init_op;
export is_empty;
export is_not_empty;
unsafe::set_len(v, 0);
}
+fn consume_mut<T>(+v: ~[mut T], f: fn(uint, +T)) unsafe {
+ do as_buf(v) |p, ln| {
+ for uint::range(0, ln) |i| {
+ let x <- *ptr::offset(p, i);
+ f(i, x);
+ }
+ }
+
+ unsafe::set_len(v, 0);
+}
+
/// Remove the last element from a vector and return it
fn pop<T>(&v: ~[const T]) -> T {
let ln = len(v);