fn rust_task_weaken(ch: rust_port_id);
fn rust_task_unweaken(ch: rust_port_id);
- #[rust_stack]
- fn rust_atomic_increment(p: &mut libc::intptr_t)
- -> libc::intptr_t;
-
- #[rust_stack]
- fn rust_atomic_decrement(p: &mut libc::intptr_t)
- -> libc::intptr_t;
-
#[rust_stack]
fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
oldval: libc::uintptr_t,
fn rust_unlock_little_lock(lock: rust_little_lock);
}
+#[abi = "rust-intrinsic"]
+extern mod rusti {
+
+ #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
+ fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
+ fn atomic_xadd(dst: &mut int, src: int) -> int;
+ fn atomic_xsub(dst: &mut int, src: int) -> int;
+}
+
#[allow(non_camel_case_types)] // runtime type
type rust_port_id = uint;
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
+#[cfg(stage0)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
}
}
+#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
+pub unsafe fn chan_from_global_ptr<T: Send>(
+ global: GlobalPtr,
+ task_fn: fn() -> task::TaskBuilder,
+ f: fn~(comm::Port<T>)
+) -> comm::Chan<T> {
+
+ enum Msg {
+ Proceed,
+ Abort
+ }
+
+ log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
+ let is_probably_zero = *global == 0u;
+ log(debug,~"after is_prob_zero check");
+ if is_probably_zero {
+ log(debug,~"is probably zero...");
+ // There's no global channel. We must make it
+
+ let (setup_po, setup_ch) = do task_fn().spawn_conversation
+ |move f, setup_po, setup_ch| {
+ let po = comm::Port::<T>();
+ let ch = comm::Chan(&po);
+ comm::send(setup_ch, ch);
+
+ // Wait to hear if we are the official instance of
+ // this global task
+ match comm::recv::<Msg>(setup_po) {
+ Proceed => f(move po),
+ Abort => ()
+ }
+ };
+
+ log(debug,~"before setup recv..");
+ // This is the proposed global channel
+ let ch = comm::recv(setup_po);
+ // 0 is our sentinal value. It is not a valid channel
+ assert *ch != 0;
+
+ // Install the channel
+ log(debug,~"BEFORE COMPARE AND SWAP");
+ rusti::atomic_cxchg(
+ cast::reinterpret_cast(&global),
+ 0, cast::reinterpret_cast(&ch));
+ let swapped = *global != 0;
+ log(debug,fmt!("AFTER .. swapped? %?", swapped));
+
+ if swapped {
+ // Success!
+ comm::send(setup_ch, Proceed);
+ ch
+ } else {
+ // Somebody else got in before we did
+ comm::send(setup_ch, Abort);
+ cast::reinterpret_cast(&*global)
+ }
+ } else {
+ log(debug, ~"global != 0");
+ cast::reinterpret_cast(&*global)
+ }
+}
+
#[test]
pub fn test_from_global_chan1() {
}
do task::unkillable {
let data: ~ArcData<T> = cast::reinterpret_cast(&self.data);
- let new_count = rustrt::rust_atomic_decrement(&mut data.count);
+ let new_count = rusti::atomic_xsub(&mut data.count, 1) - 1;
assert new_count >= 0;
if new_count == 0 {
// Were we really last, or should we hand off to an unwrapper?
}
}
+#[cfg(stage0)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
// Got in. Step 0: Tell destructor not to run. We are now it.
rc.data = ptr::null();
// Step 1 - drop our own reference.
- let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
- // assert new_count >= 0;
+ let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
+ //assert new_count >= 0;
+ if new_count == 0 {
+ // We were the last owner. Can unwrap immediately.
+ // Also we have to free the server endpoints.
+ let _server: UnwrapProto = cast::transmute(move serverp);
+ option::swap_unwrap(&mut ptr.data)
+ // drop glue takes over.
+ } else {
+ // The *next* person who sees the refcount hit 0 will wake us.
+ let end_result =
+ DeathThroes { ptr: Some(move ptr),
+ response: Some(move c2) };
+ let mut p1 = Some(move p1); // argh
+ do task::rekillable {
+ pipes::recv_one(option::swap_unwrap(&mut p1));
+ }
+ // Got here. Back in the 'unkillable' without getting killed.
+ // Recover ownership of ptr, then take the data out.
+ let ptr = option::swap_unwrap(&mut end_result.ptr);
+ option::swap_unwrap(&mut ptr.data)
+ // drop glue takes over.
+ }
+ } else {
+ // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
+ cast::forget(move ptr);
+ // Also we have to free the (rejected) server endpoints.
+ let _server: UnwrapProto = cast::transmute(move serverp);
+ fail ~"Another task is already unwrapping this ARC!";
+ }
+ }
+}
+
+#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
+pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
+ -> T {
+ struct DeathThroes<T> {
+ mut ptr: Option<~ArcData<T>>,
+ mut response: Option<pipes::ChanOne<bool>>,
+ drop unsafe {
+ let response = option::swap_unwrap(&mut self.response);
+ // In case we get killed early, we need to tell the person who
+ // tried to wake us whether they should hand-off the data to us.
+ if task::failing() {
+ pipes::send_one(move response, false);
+ // Either this swap_unwrap or the one below (at "Got here")
+ // ought to run.
+ cast::forget(option::swap_unwrap(&mut self.ptr));
+ } else {
+ assert self.ptr.is_none();
+ pipes::send_one(move response, true);
+ }
+ }
+ }
+
+ do task::unkillable {
+ let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
+ let (c1,p1) = pipes::oneshot(); // ()
+ let (c2,p2) = pipes::oneshot(); // bool
+ let server: UnwrapProto = ~mut Some((move c1,move p2));
+ let serverp: libc::uintptr_t = cast::transmute(move server);
+ // Try to put our server end in the unwrapper slot.
+ rusti::atomic_cxchg(cast::reinterpret_cast(&ptr.unwrapper),
+ 0, serverp as int);
+ if ptr.unwrapper != 0 {
+ // Got in. Step 0: Tell destructor not to run. We are now it.
+ rc.data = ptr::null();
+ // Step 1 - drop our own reference.
+ let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
+ //assert new_count >= 0;
if new_count == 0 {
// We were the last owner. Can unwrap immediately.
// Also we have to free the server endpoints.
-> SharedMutableState<T> {
unsafe {
let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
- let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
+ let new_count = rusti::atomic_xadd(&mut ptr.count, 1) + 1;
assert new_count >= 2;
cast::forget(move ptr);
}