unwrap(util::replace(opt, none))
}
-pure fn unwrap_expect<T>(-opt: option<T>, reason: ~str) -> T {
+pure fn unwrap_expect<T>(-opt: option<T>, reason: &str) -> T {
//! As unwrap, but with a specified failure message.
- if opt.is_none() { fail reason; }
+ if opt.is_none() { fail reason.to_unique(); }
unwrap(opt)
}
*/
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
- option::unwrap(try_recv(p))
+ option::unwrap_expect(try_recv(p), "connection closed")
}
/** Attempts to receive a message from a pipe.
full {
let mut payload = none;
payload <-> p.payload;
+ p.header.blocked_task = none;
p.header.state = empty;
return some(option::unwrap(payload))
}
terminated {
+ // This assert detects when we've accidentally unsafely
+ // casted too big of a number to a state.
assert old_state == terminated;
return none;
}
}
blocked {
// wake up the target
- let target = p.header.blocked_task.get();
- rustrt::task_signal_event(target,
- ptr::addr_of(p.header) as *libc::c_void);
-
+ alt p.header.blocked_task {
+ some(target) =>
+ rustrt::task_signal_event(
+ target,
+ ptr::addr_of(p.header) as *libc::c_void),
+ none => { debug!{"receiver is already shutting down"} }
+ }
// The receiver will eventually clean up.
//unsafe { forget(p) }
}
#[doc(hidden)]
fn receiver_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
+ assert p.header.blocked_task == none;
alt swap_state_rel(p.header.state, terminated) {
empty {
// the sender will clean up
for pkts.each |p| { unsafe{ (*p).unblock()} }
- debug!{"%?, %?", ready_packet, pkts[ready_packet]};
+ debug!("%?, %?", ready_packet, pkts[ready_packet]);
unsafe {
assert (*pkts[ready_packet]).state == full