]> git.lizzy.rs Git - rust.git/commitdiff
De-mode pipes
authorEric Holk <eric.holk@gmail.com>
Tue, 14 Aug 2012 23:39:57 +0000 (16:39 -0700)
committerEric Holk <eric.holk@gmail.com>
Wed, 15 Aug 2012 20:58:08 +0000 (13:58 -0700)
src/libcore/pipes.rs
src/test/run-pass/pipe-peek.rs

index bc6d89e92b6cbf13d03b42841ea1f10d6c18d71d..ba8ac5cae8711e3615c60cea78a33c9ccce34eb9 100644 (file)
 
 */
 
+// NB: transitionary, de-mode-ing.
+#[forbid(deprecated_mode)];
+#[forbid(deprecated_pattern)];
+
 import unsafe::{forget, reinterpret_cast, transmute};
 import either::{either, left, right};
 import option::unwrap;
@@ -143,15 +147,15 @@ struct packet_header {
     // Returns the old state.
     unsafe fn mark_blocked(this: *rust_task) -> state {
         rustrt::rust_task_ref(this);
-        let old_task = swap_task(self.blocked_task, this);
+        let old_task = swap_task(&mut self.blocked_task, this);
         assert old_task.is_null();
-        swap_state_acq(self.state, blocked)
+        swap_state_acq(&mut self.state, blocked)
     }
 
     unsafe fn unblock() {
-        let old_task = swap_task(self.blocked_task, ptr::null());
+        let old_task = swap_task(&mut self.blocked_task, ptr::null());
         if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
-        match swap_state_acq(self.state, empty) {
+        match swap_state_acq(&mut self.state, empty) {
           empty | blocked => (),
           terminated => self.state = terminated,
           full => self.state = full
@@ -224,7 +228,7 @@ fn packet<T: send>() -> *packet<T> {
 
 #[doc(hidden)]
 fn entangle_buffer<T: send, Tstart: send>(
-    -buffer: ~buffer<T>,
+    +buffer: ~buffer<T>,
     init: fn(*libc::c_void, x: &T) -> *packet<Tstart>)
     -> (send_packet_buffered<Tstart, T>, recv_packet_buffered<Tstart, T>)
 {
@@ -247,27 +251,27 @@ fn entangle_buffer<T: send, Tstart: send>(
 // If I call the rusti versions directly from a polymorphic function,
 // I get link errors. This is a bug that needs investigated more.
 #[doc(hidden)]
-fn atomic_xchng_rel(&dst: int, src: int) -> int {
-    rusti::atomic_xchng_rel(dst, src)
+fn atomic_xchng_rel(dst: &mut int, src: int) -> int {
+    rusti::atomic_xchng_rel(*dst, src)
 }
 
 #[doc(hidden)]
-fn atomic_add_acq(&dst: int, src: int) -> int {
-    rusti::atomic_add_acq(dst, src)
+fn atomic_add_acq(dst: &mut int, src: int) -> int {
+    rusti::atomic_add_acq(*dst, src)
 }
 
 #[doc(hidden)]
-fn atomic_sub_rel(&dst: int, src: int) -> int {
-    rusti::atomic_sub_rel(dst, src)
+fn atomic_sub_rel(dst: &mut int, src: int) -> int {
+    rusti::atomic_sub_rel(*dst, src)
 }
 
 #[doc(hidden)]
-fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
+fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
     // It might be worth making both acquire and release versions of
     // this.
     unsafe {
         reinterpret_cast(rusti::atomic_xchng(
-            *(ptr::mut_addr_of(dst) as *mut int),
+            *(ptr::mut_addr_of(*dst) as *mut int),
             src as int))
     }
 }
@@ -302,19 +306,19 @@ fn wait_event(this: *rust_task) -> *libc::c_void {
 }
 
 #[doc(hidden)]
-fn swap_state_acq(&dst: state, src: state) -> state {
+fn swap_state_acq(dst: &mut state, src: state) -> state {
     unsafe {
         reinterpret_cast(rusti::atomic_xchng_acq(
-            *(ptr::mut_addr_of(dst) as *mut int),
+            *(ptr::mut_addr_of(*dst) as *mut int),
             src as int))
     }
 }
 
 #[doc(hidden)]
-fn swap_state_rel(&dst: state, src: state) -> state {
+fn swap_state_rel(dst: &mut state, src: state) -> state {
     unsafe {
         reinterpret_cast(rusti::atomic_xchng_rel(
-            *(ptr::mut_addr_of(dst) as *mut int),
+            *(ptr::mut_addr_of(*dst) as *mut int),
             src as int))
     }
 }
@@ -329,7 +333,7 @@ struct buffer_resource<T: send> {
     new(+b: ~buffer<T>) {
         //let p = ptr::addr_of(*b);
         //error!{"take %?", p};
-        atomic_add_acq(b.header.ref_count, 1);
+        atomic_add_acq(&mut b.header.ref_count, 1);
         self.buffer = b;
     }
 
@@ -337,7 +341,7 @@ struct buffer_resource<T: send> {
         let b = move_it!{self.buffer};
         //let p = ptr::addr_of(*b);
         //error!{"drop %?", p};
-        let old_count = atomic_sub_rel(b.header.ref_count, 1);
+        let old_count = atomic_sub_rel(&mut b.header.ref_count, 1);
         //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
         if old_count == 1 {
             // The new count is 0.
@@ -351,15 +355,15 @@ struct buffer_resource<T: send> {
 }
 
 #[doc(hidden)]
-fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
-                                -payload: T) -> bool {
+fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
+                                +payload: T) -> bool {
     let header = p.header();
     let p_ = p.unwrap();
     let p = unsafe { &*p_ };
     assert ptr::addr_of(p.header) == header;
     assert p.payload == none;
     p.payload <- some(payload);
-    let old_state = swap_state_rel(p.header.state, full);
+    let old_state = swap_state_rel(&mut p.header.state, full);
     match old_state {
         empty => {
             // Yay, fastpath.
@@ -371,7 +375,7 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
         full => fail ~"duplicate send",
         blocked => {
             debug!{"waking up task for %?", p_};
-            let old_task = swap_task(p.header.blocked_task, ptr::null());
+            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
             if !old_task.is_null() {
                 rustrt::task_signal_event(
                     old_task, ptr::addr_of(p.header) as *libc::c_void);
@@ -395,7 +399,7 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
 Fails if the sender closes the connection.
 
 */
-fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
+fn recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>) -> T {
     option::unwrap_expect(try_recv(p), "connection closed")
 }
 
@@ -405,7 +409,7 @@ fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
 a message, or `some(T)` if a message was received.
 
 */
-fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
+fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
     -> option<T>
 {
     let p_ = p.unwrap();
@@ -417,7 +421,8 @@ struct drop_state {
         drop {
             if task::failing() {
                 self.p.state = terminated;
-                let old_task = swap_task(self.p.blocked_task, ptr::null());
+                let old_task = swap_task(&mut self.p.blocked_task,
+                                         ptr::null());
                 if !old_task.is_null() {
                     rustrt::rust_task_deref(old_task);
                 }
@@ -443,13 +448,13 @@ struct drop_state {
     let this = rustrt::rust_get_task();
     rustrt::task_clear_event_reject(this);
     rustrt::rust_task_ref(this);
-    let old_task = swap_task(p.header.blocked_task, this);
+    let old_task = swap_task(&mut p.header.blocked_task, this);
     assert old_task.is_null();
     let mut first = true;
     let mut count = SPIN_COUNT;
     loop {
         rustrt::task_clear_event_reject(this);
-        let old_state = swap_state_acq(p.header.state,
+        let old_state = swap_state_acq(&mut p.header.state,
                                        blocked);
         match old_state {
           empty => {
@@ -474,7 +479,7 @@ struct drop_state {
           full => {
             let mut payload = none;
             payload <-> p.payload;
-            let old_task = swap_task(p.header.blocked_task, ptr::null());
+            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
             if !old_task.is_null() {
                 rustrt::rust_task_deref(old_task);
             }
@@ -486,7 +491,7 @@ struct drop_state {
             // casted too big of a number to a state.
             assert old_state == terminated;
 
-            let old_task = swap_task(p.header.blocked_task, ptr::null());
+            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
             if !old_task.is_null() {
                 rustrt::rust_task_deref(old_task);
             }
@@ -498,7 +503,7 @@ struct drop_state {
 }
 
 /// Returns true if messages are available.
-pure fn peek<T: send, Tb: send>(p: recv_packet_buffered<T, Tb>) -> bool {
+pure fn peek<T: send, Tb: send>(p: &recv_packet_buffered<T, Tb>) -> bool {
     match unsafe {(*p.header()).state} {
       empty => false,
       blocked => fail ~"peeking on blocked packet",
@@ -508,20 +513,20 @@ struct drop_state {
 
 impl<T: send, Tb: send> recv_packet_buffered<T, Tb> {
     pure fn peek() -> bool {
-        peek(self)
+        peek(&self)
     }
 }
 
 #[doc(hidden)]
 fn sender_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { &*p };
-    match swap_state_rel(p.header.state, terminated) {
+    match swap_state_rel(&mut p.header.state, terminated) {
       empty => {
         // The receiver will eventually clean up.
       }
       blocked => {
         // wake up the target
-        let old_task = swap_task(p.header.blocked_task, ptr::null());
+        let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
         if !old_task.is_null() {
             rustrt::task_signal_event(
                 old_task,
@@ -544,13 +549,13 @@ fn sender_terminate<T: send>(p: *packet<T>) {
 #[doc(hidden)]
 fn receiver_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { &*p };
-    match swap_state_rel(p.header.state, terminated) {
+    match swap_state_rel(&mut p.header.state, terminated) {
       empty => {
         assert p.header.blocked_task.is_null();
         // the sender will clean up
       }
       blocked => {
-        let old_task = swap_task(p.header.blocked_task, ptr::null());
+        let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
         if !old_task.is_null() {
             rustrt::rust_task_deref(old_task);
             assert old_task == rustrt::rust_get_task();
@@ -682,7 +687,7 @@ fn selecti<T: selectable>(endpoints: &[T]) -> uint {
 }
 
 /// Returns 0 or 1 depending on which endpoint is ready to receive
-fn select2i<A: selectable, B: selectable>(a: A, b: B) -> either<(), ()> {
+fn select2i<A: selectable, B: selectable>(a: &A, b: &B) -> either<(), ()> {
     match wait_many([a.header(), b.header()]/_) {
       0 => left(()),
       1 => right(()),
@@ -1004,7 +1009,7 @@ fn try_recv() -> option<T> {
         let mut endp = none;
         endp <-> self.endp;
         let peek = match endp {
-          some(endp) => pipes::peek(endp),
+          some(endp) => pipes::peek(&endp),
           none => fail ~"peeking empty stream"
         };
         self.endp <-> endp;
@@ -1122,7 +1127,7 @@ impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
 
     fn select() -> either<T, U> {
         match self {
-          (lp, rp) => match select2i(lp, rp) {
+          (lp, rp) => match select2i(&lp, &rp) {
             left(()) => left (lp.recv()),
             right(()) => right(rp.recv())
           }
@@ -1131,7 +1136,7 @@ fn select() -> either<T, U> {
 
     fn try_select() -> either<option<T>, option<U>> {
         match self {
-          (lp, rp) => match select2i(lp, rp) {
+          (lp, rp) => match select2i(&lp, &rp) {
             left(()) => left (lp.try_recv()),
             right(()) => right(rp.try_recv())
           }
index 4515262743419d4f10be0fd529e6e1e75d2c611b..f9f05056a6687e8e4ce8594c2a9fc315f4eda41d 100644 (file)
@@ -13,9 +13,9 @@
 fn main() {
     let (c, p) = oneshot::init();
 
-    assert !pipes::peek(p);
+    assert !pipes::peek(&p);
 
     oneshot::client::signal(c);
 
-    assert pipes::peek(p);
+    assert pipes::peek(&p);
 }