]> git.lizzy.rs Git - rust.git/commitdiff
Replace rust_atomic_increment/decrement and rust_compare_and_swap_ptr with intrinsics.
authorLuqman Aden <laden@csclub.uwaterloo.ca>
Mon, 22 Oct 2012 02:24:56 +0000 (22:24 -0400)
committerLuqman Aden <laden@csclub.uwaterloo.ca>
Mon, 22 Oct 2012 02:43:28 +0000 (22:43 -0400)
src/libcore/private.rs
src/rt/rust_builtin.cpp
src/rt/rustrt.def.in

index a54db3fa759b93e4fd269f992aade5fb07a9e642..fcc01ca42ffdee66953a2ad1c40b2c6bd6fb2130 100644 (file)
     fn rust_task_weaken(ch: rust_port_id);
     fn rust_task_unweaken(ch: rust_port_id);
 
-    #[rust_stack]
-    fn rust_atomic_increment(p: &mut libc::intptr_t)
-        -> libc::intptr_t;
-
-    #[rust_stack]
-    fn rust_atomic_decrement(p: &mut libc::intptr_t)
-        -> libc::intptr_t;
-
     #[rust_stack]
     fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
                                  oldval: libc::uintptr_t,
@@ -33,6 +25,15 @@ fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
     fn rust_unlock_little_lock(lock: rust_little_lock);
 }
 
+#[abi = "rust-intrinsic"]
+extern mod rusti {
+
+    #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]    
+    fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
+    fn atomic_xadd(dst: &mut int, src: int) -> int;
+    fn atomic_xsub(dst: &mut int, src: int) -> int;
+}
+
 #[allow(non_camel_case_types)] // runtime type
 type rust_port_id = uint;
 
@@ -43,6 +44,7 @@ fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
  * or, if no channel exists creates and installs a new channel and sets up a
  * new task to receive from it.
  */
+#[cfg(stage0)]
 pub unsafe fn chan_from_global_ptr<T: Send>(
     global: GlobalPtr,
     task_fn: fn() -> task::TaskBuilder,
@@ -103,6 +105,68 @@ enum Msg {
     }
 }
 
+#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]    
+pub unsafe fn chan_from_global_ptr<T: Send>(
+    global: GlobalPtr,
+    task_fn: fn() -> task::TaskBuilder,
+    f: fn~(comm::Port<T>)
+) -> comm::Chan<T> {
+
+    enum Msg {
+        Proceed,
+        Abort
+    }
+
+    log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
+    let is_probably_zero = *global == 0u;
+    log(debug,~"after is_prob_zero check");
+    if is_probably_zero {
+        log(debug,~"is probably zero...");
+        // There's no global channel. We must make it
+
+        let (setup_po, setup_ch) = do task_fn().spawn_conversation
+            |move f, setup_po, setup_ch| {
+            let po = comm::Port::<T>();
+            let ch = comm::Chan(&po);
+            comm::send(setup_ch, ch);
+
+            // Wait to hear if we are the official instance of
+            // this global task
+            match comm::recv::<Msg>(setup_po) {
+              Proceed => f(move po),
+              Abort => ()
+            }
+        };
+
+        log(debug,~"before setup recv..");
+        // This is the proposed global channel
+        let ch = comm::recv(setup_po);
+        // 0 is our sentinal value. It is not a valid channel
+        assert *ch != 0;
+
+        // Install the channel
+        log(debug,~"BEFORE COMPARE AND SWAP");
+        rusti::atomic_cxchg(
+            cast::reinterpret_cast(&global),
+            0, cast::reinterpret_cast(&ch));
+        let swapped = *global != 0;
+        log(debug,fmt!("AFTER .. swapped? %?", swapped));
+
+        if swapped {
+            // Success!
+            comm::send(setup_ch, Proceed);
+            ch
+        } else {
+            // Somebody else got in before we did
+            comm::send(setup_ch, Abort);
+            cast::reinterpret_cast(&*global)
+        }
+    } else {
+        log(debug, ~"global != 0");
+        cast::reinterpret_cast(&*global)
+    }
+}
+
 #[test]
 pub fn test_from_global_chan1() {
 
@@ -305,7 +369,7 @@ struct ArcDestruct<T> {
         }
         do task::unkillable {
             let data: ~ArcData<T> = cast::reinterpret_cast(&self.data);
-            let new_count = rustrt::rust_atomic_decrement(&mut data.count);
+            let new_count = rusti::atomic_xsub(&mut data.count, 1) - 1;
             assert new_count >= 0;
             if new_count == 0 {
                 // Were we really last, or should we hand off to an unwrapper?
@@ -341,6 +405,7 @@ fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
     }
 }
 
+#[cfg(stage0)]
 pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
         -> T {
     struct DeathThroes<T> {
@@ -373,8 +438,76 @@ struct DeathThroes<T> {
             // Got in. Step 0: Tell destructor not to run. We are now it.
             rc.data = ptr::null();
             // Step 1 - drop our own reference.
-            let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
-        //    assert new_count >= 0;
+            let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
+            //assert new_count >= 0;
+            if new_count == 0 {
+                // We were the last owner. Can unwrap immediately.
+                // Also we have to free the server endpoints.
+                let _server: UnwrapProto = cast::transmute(move serverp);
+                option::swap_unwrap(&mut ptr.data)
+                // drop glue takes over.
+            } else {
+                // The *next* person who sees the refcount hit 0 will wake us.
+                let end_result =
+                    DeathThroes { ptr: Some(move ptr),
+                                  response: Some(move c2) };
+                let mut p1 = Some(move p1); // argh
+                do task::rekillable {
+                    pipes::recv_one(option::swap_unwrap(&mut p1));
+                }
+                // Got here. Back in the 'unkillable' without getting killed.
+                // Recover ownership of ptr, then take the data out.
+                let ptr = option::swap_unwrap(&mut end_result.ptr);
+                option::swap_unwrap(&mut ptr.data)
+                // drop glue takes over.
+            }
+        } else {
+            // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
+            cast::forget(move ptr);
+            // Also we have to free the (rejected) server endpoints.
+            let _server: UnwrapProto = cast::transmute(move serverp);
+            fail ~"Another task is already unwrapping this ARC!";
+        }
+    }
+}
+
+#[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]    
+pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
+        -> T {
+    struct DeathThroes<T> {
+        mut ptr:      Option<~ArcData<T>>,
+        mut response: Option<pipes::ChanOne<bool>>,
+        drop unsafe {
+            let response = option::swap_unwrap(&mut self.response);
+            // In case we get killed early, we need to tell the person who
+            // tried to wake us whether they should hand-off the data to us.
+            if task::failing() {
+                pipes::send_one(move response, false);
+                // Either this swap_unwrap or the one below (at "Got here")
+                // ought to run.
+                cast::forget(option::swap_unwrap(&mut self.ptr));
+            } else {
+                assert self.ptr.is_none();
+                pipes::send_one(move response, true);
+            }
+        }
+    }
+
+    do task::unkillable {
+        let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
+        let (c1,p1) = pipes::oneshot(); // ()
+        let (c2,p2) = pipes::oneshot(); // bool
+        let server: UnwrapProto = ~mut Some((move c1,move p2));
+        let serverp: libc::uintptr_t = cast::transmute(move server);
+        // Try to put our server end in the unwrapper slot.
+        rusti::atomic_cxchg(cast::reinterpret_cast(&ptr.unwrapper),
+                            0, serverp as int);
+        if ptr.unwrapper != 0 {
+            // Got in. Step 0: Tell destructor not to run. We are now it.
+            rc.data = ptr::null();
+            // Step 1 - drop our own reference.
+            let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
+            //assert new_count >= 0;
             if new_count == 0 {
                 // We were the last owner. Can unwrap immediately.
                 // Also we have to free the server endpoints.
@@ -452,7 +585,7 @@ pub unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
         -> SharedMutableState<T> {
     unsafe {
         let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
-        let new_count = rustrt::rust_atomic_increment(&mut ptr.count);
+        let new_count = rusti::atomic_xadd(&mut ptr.count, 1) + 1;
         assert new_count >= 2;
         cast::forget(move ptr);
     }
index 5baa95c7323f401cc01f2479acec389a0401cfec..67281cbee5a365d6bcc2ca60c877f846d6a8a541 100644 (file)
@@ -830,16 +830,6 @@ rust_compare_and_swap_ptr(intptr_t *address,
     return sync::compare_and_swap(address, oldval, newval);
 }
 
-extern "C" CDECL intptr_t
-rust_atomic_increment(intptr_t *address) {
-    return sync::increment(address);
-}
-
-extern "C" CDECL intptr_t
-rust_atomic_decrement(intptr_t *address) {
-    return sync::decrement(address);
-}
-
 extern "C" CDECL void
 rust_task_weaken(rust_port_id chan) {
     rust_task *task = rust_get_current_task();
index 6a2bdd622cba30b9058a2bd59434a7e0b337fdf5..3760c9ff09f4a2e607a387bf403b1056933c7e19 100644 (file)
@@ -178,8 +178,6 @@ rust_dbg_do_nothing
 rust_dbg_breakpoint
 rust_osmain_sched_id
 rust_compare_and_swap_ptr
-rust_atomic_increment
-rust_atomic_decrement
 rust_global_env_chan_ptr
 rust_port_take
 rust_port_drop
@@ -207,4 +205,4 @@ rust_gc_metadata
 rust_uv_ip4_port
 rust_uv_ip6_port
 rust_uv_tcp_getpeername
-rust_uv_tcp_getpeername6
\ No newline at end of file
+rust_uv_tcp_getpeername6