#[doc(hidden)];
-use compare_and_swap = rustrt::rust_compare_and_swap_ptr;
use task::TaskBuilder;
use task::atomically;
type GlobalPtr = *libc::uintptr_t;
+// TODO: Remove once snapshots have atomic_cxchg
+#[cfg(stage0)]
+fn compare_and_swap(address: &mut libc::uintptr_t,
+ oldval: libc::uintptr_t,
+ newval: libc::uintptr_t) -> bool {
+ rustrt::rust_compare_and_swap_ptr(address, oldval, newval)
+}
+
+#[cfg(stage1)]
+#[cfg(stage2)]
+#[cfg(stage3)]
+fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
+ let old = rusti::atomic_cxchg(address, oldval, newval);
+ old == oldval
+}
+
/**
* Atomically gets a channel from a pointer to a pointer-sized memory location
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
-#[cfg(stage0)]
pub unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
let swapped = compare_and_swap(
- cast::reinterpret_cast(&global),
- 0u, cast::reinterpret_cast(&ch));
- log(debug,fmt!("AFTER .. swapped? %?", swapped));
-
- if swapped {
- // Success!
- comm::send(setup_ch, Proceed);
- ch
- } else {
- // Somebody else got in before we did
- comm::send(setup_ch, Abort);
- cast::reinterpret_cast(&*global)
- }
- } else {
- log(debug, ~"global != 0");
- cast::reinterpret_cast(&*global)
- }
-}
-
-#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
-pub unsafe fn chan_from_global_ptr<T: Send>(
- global: GlobalPtr,
- task_fn: fn() -> task::TaskBuilder,
- f: fn~(comm::Port<T>)
-) -> comm::Chan<T> {
-
- enum Msg {
- Proceed,
- Abort
- }
-
- log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
- let is_probably_zero = *global == 0u;
- log(debug,~"after is_prob_zero check");
- if is_probably_zero {
- log(debug,~"is probably zero...");
- // There's no global channel. We must make it
-
- let (setup_po, setup_ch) = do task_fn().spawn_conversation
- |move f, setup_po, setup_ch| {
- let po = comm::Port::<T>();
- let ch = comm::Chan(&po);
- comm::send(setup_ch, ch);
-
- // Wait to hear if we are the official instance of
- // this global task
- match comm::recv::<Msg>(setup_po) {
- Proceed => f(move po),
- Abort => ()
- }
- };
-
- log(debug,~"before setup recv..");
- // This is the proposed global channel
- let ch = comm::recv(setup_po);
- // 0 is our sentinal value. It is not a valid channel
- assert *ch != 0;
-
- // Install the channel
- log(debug,~"BEFORE COMPARE AND SWAP");
- rusti::atomic_cxchg(
cast::reinterpret_cast(&global),
0, cast::reinterpret_cast(&ch));
- let swapped = *global != 0;
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
}
}
-#[cfg(stage0)]
pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
-> T {
struct DeathThroes<T> {
}
}
-#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
-pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
- -> T {
- struct DeathThroes<T> {
- mut ptr: Option<~ArcData<T>>,
- mut response: Option<pipes::ChanOne<bool>>,
- drop unsafe {
- let response = option::swap_unwrap(&mut self.response);
- // In case we get killed early, we need to tell the person who
- // tried to wake us whether they should hand-off the data to us.
- if task::failing() {
- pipes::send_one(move response, false);
- // Either this swap_unwrap or the one below (at "Got here")
- // ought to run.
- cast::forget(option::swap_unwrap(&mut self.ptr));
- } else {
- assert self.ptr.is_none();
- pipes::send_one(move response, true);
- }
- }
- }
-
- do task::unkillable {
- let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
- let (c1,p1) = pipes::oneshot(); // ()
- let (c2,p2) = pipes::oneshot(); // bool
- let server: UnwrapProto = ~mut Some((move c1,move p2));
- let serverp: libc::uintptr_t = cast::transmute(move server);
- // Try to put our server end in the unwrapper slot.
- rusti::atomic_cxchg(cast::reinterpret_cast(&ptr.unwrapper),
- 0, serverp as int);
- if ptr.unwrapper != 0 {
- // Got in. Step 0: Tell destructor not to run. We are now it.
- rc.data = ptr::null();
- // Step 1 - drop our own reference.
- let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
- //assert new_count >= 0;
- if new_count == 0 {
- // We were the last owner. Can unwrap immediately.
- // Also we have to free the server endpoints.
- let _server: UnwrapProto = cast::transmute(move serverp);
- option::swap_unwrap(&mut ptr.data)
- // drop glue takes over.
- } else {
- // The *next* person who sees the refcount hit 0 will wake us.
- let end_result =
- DeathThroes { ptr: Some(move ptr),
- response: Some(move c2) };
- let mut p1 = Some(move p1); // argh
- do task::rekillable {
- pipes::recv_one(option::swap_unwrap(&mut p1));
- }
- // Got here. Back in the 'unkillable' without getting killed.
- // Recover ownership of ptr, then take the data out.
- let ptr = option::swap_unwrap(&mut end_result.ptr);
- option::swap_unwrap(&mut ptr.data)
- // drop glue takes over.
- }
- } else {
- // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
- cast::forget(move ptr);
- // Also we have to free the (rejected) server endpoints.
- let _server: UnwrapProto = cast::transmute(move serverp);
- fail ~"Another task is already unwrapping this ARC!";
- }
- }
-}
-
/**
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
*