]> git.lizzy.rs Git - rust.git/commitdiff
Test fallout from std::comm rewrite
authorAlex Crichton <alex@alexcrichton.com>
Mon, 16 Dec 2013 02:17:43 +0000 (18:17 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Tue, 17 Dec 2013 06:55:49 +0000 (22:55 -0800)
23 files changed:
doc/tutorial-tasks.md
src/libextra/arc.rs
src/libextra/sync.rs
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/signal.rs
src/librustuv/timer.rs
src/libstd/comm/mod.rs
src/libstd/comm/select.rs
src/libstd/io/net/unix.rs
src/libstd/rt/thread.rs
src/libstd/task/spawn.rs
src/test/bench/msgsend-pipes-shared.rs
src/test/bench/msgsend-pipes.rs
src/test/bench/rt-messaging-ping-pong.rs
src/test/bench/rt-parfib.rs
src/test/bench/shootout-chameneos-redux.rs
src/test/bench/shootout-k-nucleotide-pipes.rs
src/test/bench/shootout-pfib.rs
src/test/bench/shootout-threadring.rs
src/test/bench/task-perf-jargon-metal-smoke.rs
src/test/run-pass/hashmap-memory.rs
src/test/run-pass/task-comm-14.rs

index 41cd796325c32b71d375a3f3027524d424553537..6213a0cfe1c25f67406ac112ef092dd3afb8d7a8 100644 (file)
@@ -121,7 +121,7 @@ receiving messages. Pipes are low-level communication building-blocks and so
 come in a variety of forms, each one appropriate for a different use case. In
 what follows, we cover the most commonly used varieties.
 
-The simplest way to create a pipe is to use the `comm::stream`
+The simplest way to create a pipe is to use `Chan::new`
 function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
 is a sending endpoint of a pipe, and a *port* is the receiving
 endpoint. Consider the following example of calculating two results
@@ -129,9 +129,8 @@ concurrently:
 
 ~~~~
 # use std::task::spawn;
-# use std::comm::{stream, Port, Chan};
 
-let (port, chan): (Port<int>, Chan<int>) = stream();
+let (port, chan): (Port<int>, Chan<int>) = Chan::new();
 
 do spawn || {
     let result = some_expensive_computation();
@@ -150,8 +149,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
 a tuple into its component parts).
 
 ~~~~
-# use std::comm::{stream, Chan, Port};
-let (port, chan): (Port<int>, Chan<int>) = stream();
+let (port, chan): (Port<int>, Chan<int>) = Chan::new();
 ~~~~
 
 The child task will use the channel to send data to the parent task,
@@ -160,9 +158,8 @@ spawns the child task.
 
 ~~~~
 # use std::task::spawn;
-# use std::comm::stream;
 # fn some_expensive_computation() -> int { 42 }
-# let (port, chan) = stream();
+# let (port, chan) = Chan::new();
 do spawn || {
     let result = some_expensive_computation();
     chan.send(result);
@@ -180,25 +177,23 @@ computation, then waits for the child's result to arrive on the
 port:
 
 ~~~~
-# use std::comm::{stream};
 # fn some_other_expensive_computation() {}
-# let (port, chan) = stream::<int>();
+# let (port, chan) = Chan::<int>::new();
 # chan.send(0);
 some_other_expensive_computation();
 let result = port.recv();
 ~~~~
 
-The `Port` and `Chan` pair created by `stream` enables efficient communication
-between a single sender and a single receiver, but multiple senders cannot use
-a single `Chan`, and multiple receivers cannot use a single `Port`.  What if our
-example needed to compute multiple results across a number of tasks? The
-following program is ill-typed:
+The `Port` and `Chan` pair created by `Chan::new` enables efficient
+communication between a single sender and a single receiver, but multiple
+senders cannot use a single `Chan`, and multiple receivers cannot use a single
+`Port`.  What if our example needed to compute multiple results across a number
+of tasks? The following program is ill-typed:
 
 ~~~ {.xfail-test}
 # use std::task::{spawn};
-# use std::comm::{stream, Port, Chan};
 # fn some_expensive_computation() -> int { 42 }
-let (port, chan) = stream();
+let (port, chan) = Chan::new();
 
 do spawn {
     chan.send(some_expensive_computation());
@@ -216,10 +211,8 @@ Instead we can use a `SharedChan`, a type that allows a single
 
 ~~~
 # use std::task::spawn;
-# use std::comm::{stream, SharedChan};
 
-let (port, chan) = stream();
-let chan = SharedChan::new(chan);
+let (port, chan) = SharedChan::new();
 
 for init_val in range(0u, 3) {
     // Create a new channel handle to distribute to the child task
@@ -238,23 +231,22 @@ Here we transfer ownership of the channel into a new `SharedChan` value.  Like
 as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
 may duplicate a `SharedChan`, with the `clone()` method.  A cloned
 `SharedChan` produces a new handle to the same channel, allowing multiple
-tasks to send data to a single port.  Between `spawn`, `stream` and
+tasks to send data to a single port.  Between `spawn`, `Chan` and
 `SharedChan`, we have enough tools to implement many useful concurrency
 patterns.
 
 Note that the above `SharedChan` example is somewhat contrived since
-you could also simply use three `stream` pairs, but it serves to
+you could also simply use three `Chan` pairs, but it serves to
 illustrate the point. For reference, written with multiple streams, it
 might look like the example below.
 
 ~~~
 # use std::task::spawn;
-# use std::comm::stream;
 # use std::vec;
 
 // Create a vector of ports, one for each child task
 let ports = vec::from_fn(3, |init_val| {
-    let (port, chan) = stream();
+    let (port, chan) = Chan::new();
     do spawn {
         chan.send(some_expensive_computation(init_val));
     }
@@ -341,7 +333,7 @@ fn main() {
     let numbers_arc = Arc::new(numbers);
 
     for num in range(1u, 10) {
-        let (port, chan)  = stream();
+        let (port, chan)  = Chan::new();
         chan.send(numbers_arc.clone());
 
         do spawn {
@@ -370,7 +362,7 @@ and a clone of it is sent to each task
 # use std::rand;
 # let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
 # let numbers_arc = Arc::new(numbers);
-# let (port, chan)  = stream();
+# let (port, chan)  = Chan::new();
 chan.send(numbers_arc.clone());
 ~~~
 copying only the wrapper and not its contents.
@@ -382,7 +374,7 @@ Each task recovers the underlying data by
 # use std::rand;
 # let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
 # let numbers_arc=Arc::new(numbers);
-# let (port, chan)  = stream();
+# let (port, chan)  = Chan::new();
 # chan.send(numbers_arc.clone());
 # let local_arc : Arc<~[f64]> = port.recv();
 let task_numbers = local_arc.get();
@@ -499,7 +491,7 @@ Here is the code for the parent task:
 # }
 # fn main() {
 
-let (from_child, to_child) = DuplexStream();
+let (from_child, to_child) = DuplexStream::new();
 
 do spawn {
     stringifier(&to_child);
index ea8066b786f4ea728f057d2d77ef772fc0ade1ca..6add053fa81816475de95b9f3d9a3f488271bdb3 100644 (file)
@@ -635,9 +635,8 @@ fn test_mutex_arc_condvar() {
             })
         }
 
-        let mut c = Some(c);
         arc.access_cond(|state, cond| {
-            c.take_unwrawp().send(());
+            c.send(());
             assert!(!*state);
             while !*state {
                 cond.wait();
index 1cc403c32f4c4559b50b3b754f5c947f678a1b65..6e582982962773f4e515ac05b396a2057a27ce39 100644 (file)
@@ -950,7 +950,6 @@ fn test_mutex_killed_broadcast() {
                 let mi = m2.clone();
                 // spawn sibling task
                 do task::spawn { // linked
-                    let mut c = Some(c);
                     mi.lock_cond(|cond| {
                         c.send(()); // tell sibling to go ahead
                         (|| {
@@ -994,6 +993,7 @@ fn test_mutex_cond_signal_on_0() {
         })
     }
     #[test]
+    #[ignore(reason = "linked failure?")]
     fn test_mutex_different_conds() {
         let result = do task::try {
             let m = Mutex::new_with_condvars(2);
index 6f1930bc7fe3143650a009d621da078f4d11c718..ce543eafd2f644d611a1c76ed00863debd803c69 100644 (file)
@@ -646,7 +646,6 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use std::comm::oneshot;
     use std::rt::test::*;
     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
                         RtioUdpSocket};
@@ -689,7 +688,7 @@ fn udp_bind_close_ip6() {
 
     #[test]
     fn listen_ip4() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip4();
 
         do spawn {
@@ -725,7 +724,7 @@ fn listen_ip4() {
 
     #[test]
     fn listen_ip6() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip6();
 
         do spawn {
@@ -761,7 +760,7 @@ fn listen_ip6() {
 
     #[test]
     fn udp_recv_ip4() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let client = next_test_ip4();
         let server = next_test_ip4();
 
@@ -793,7 +792,7 @@ fn udp_recv_ip4() {
 
     #[test]
     fn udp_recv_ip6() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let client = next_test_ip6();
         let server = next_test_ip6();
 
@@ -828,7 +827,7 @@ fn test_read_read_read() {
         use std::rt::rtio::*;
         let addr = next_test_ip4();
         static MAX: uint = 5000;
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             let listener = TcpListener::bind(local_loop(), addr).unwrap();
@@ -865,7 +864,7 @@ fn test_read_read_read() {
     fn test_udp_twice() {
         let server_addr = next_test_ip4();
         let client_addr = next_test_ip4();
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
@@ -896,8 +895,8 @@ fn test_udp_many_read() {
         let client_in_addr = next_test_ip4();
         static MAX: uint = 500_000;
 
-        let (p1, c1) = oneshot();
-        let (p2, c2) = oneshot();
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
 
         do spawn {
             let l = local_loop();
@@ -953,12 +952,12 @@ fn test_udp_many_read() {
     #[test]
     fn test_read_and_block() {
         let addr = next_test_ip4();
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             let listener = TcpListener::bind(local_loop(), addr).unwrap();
             let mut acceptor = listener.listen().unwrap();
-            let (port2, chan2) = stream();
+            let (port2, chan2) = Chan::new();
             chan.send(port2);
             let mut stream = acceptor.accept().unwrap();
             let mut buf = [0, .. 2048];
@@ -1026,7 +1025,7 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
     // thread, close itself, and then come back to the last thread.
     #[test]
     fn test_homing_closes_correctly() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do task::spawn_sched(task::SingleThreaded) {
             let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
@@ -1048,9 +1047,9 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
         use std::rt::sched::{Shutdown, TaskFromFriend};
         use std::rt::sleeper_list::SleeperList;
         use std::rt::task::Task;
-        use std::rt::task::UnwindResult;
         use std::rt::thread::Thread;
         use std::rt::deque::BufferPool;
+        use std::task::TaskResult;
         use std::unstable::run_in_bare_thread;
         use uvio::UvEventLoop;
 
@@ -1072,12 +1071,12 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
             let handle2 = sched2.make_handle();
             let tasksFriendHandle = sched2.make_handle();
 
-            let on_exit: proc(UnwindResult) = proc(exit_status) {
+            let on_exit: proc(TaskResult) = proc(exit_status) {
                 let mut handle1 = handle1;
                 let mut handle2 = handle2;
                 handle1.send(Shutdown);
                 handle2.send(Shutdown);
-                assert!(exit_status.is_success());
+                assert!(exit_status.is_ok());
             };
 
             unsafe fn local_io() -> &'static mut IoFactory {
@@ -1148,7 +1147,7 @@ fn tcp_listener_fail_cleanup() {
 
     #[should_fail] #[test]
     fn tcp_stream_fail_cleanup() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip4();
 
         do spawn {
@@ -1172,7 +1171,7 @@ fn udp_listener_fail_cleanup() {
     #[should_fail] #[test]
     fn udp_fail_other_task() {
         let addr = next_test_ip4();
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         // force the handle to be created on a different scheduler, failure in
         // the original task will force a homing operation back to this
@@ -1190,7 +1189,7 @@ fn udp_fail_other_task() {
     #[test]
     #[ignore(reason = "linked failure")]
     fn linked_failure1() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip4();
 
         do spawn {
@@ -1208,7 +1207,7 @@ fn linked_failure1() {
     #[test]
     #[ignore(reason = "linked failure")]
     fn linked_failure2() {
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip4();
 
         do spawn {
@@ -1229,7 +1228,7 @@ fn linked_failure2() {
     #[test]
     #[ignore(reason = "linked failure")]
     fn linked_failure3() {
-        let (port, chan) = stream();
+        let (port, chan) = Chan::new();
         let addr = next_test_ip4();
 
         do spawn {
index 86ebae45f19530960ee4c88bf3c5fbcc62728535..814205cbbf1ccf8f3233a24971b52ba2cca0efd5 100644 (file)
@@ -231,7 +231,6 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
 
 #[cfg(test)]
 mod tests {
-    use std::comm::oneshot;
     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
     use std::rt::test::next_test_unix;
 
@@ -274,7 +273,7 @@ fn bind_fail() {
     fn connect() {
         let path = next_test_unix();
         let path2 = path.clone();
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
@@ -298,7 +297,7 @@ fn connect() {
     fn connect_fail() {
         let path = next_test_unix();
         let path2 = path.clone();
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
index 67777050cf30a5aaa590e23533aadb143d183b0f..f082aef003c60bcf00f1526999b551ced7984991 100644 (file)
@@ -78,13 +78,11 @@ mod test {
     use super::*;
     use super::super::local_loop;
     use std::io::signal;
-    use std::comm::{SharedChan, stream};
 
     #[test]
     fn closing_channel_during_drop_doesnt_kill_everything() {
         // see issue #10375, relates to timers as well.
-        let (port, chan) = stream();
-        let chan = SharedChan::new(chan);
+        let (port, chan) = SharedChan::new();
         let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
                                          chan);
 
index 7efdafd236921afaaeeb7a78a1cbc1add3d7b5f2..ab143d6e8b077ada20042d4497e0cc8381f167e3 100644 (file)
@@ -23,12 +23,13 @@ pub struct TimerWatcher {
     handle: *uvll::uv_timer_t,
     home: SchedHandle,
     action: Option<NextAction>,
+    id: uint, // see comments in timer_cb
 }
 
 pub enum NextAction {
     WakeTask(BlockedTask),
     SendOnce(Chan<()>),
-    SendMany(Chan<()>),
+    SendMany(Chan<()>, uint),
 }
 
 impl TimerWatcher {
@@ -41,6 +42,7 @@ pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
             handle: handle,
             action: None,
             home: get_handle_to_current_scheduler!(),
+            id: 0,
         };
         return me.install();
     }
@@ -72,6 +74,7 @@ fn sleep(&mut self, msecs: u64) {
         // we must temporarily un-home ourselves, then destroy the action, and
         // then re-home again.
         let missile = self.fire_homing_missile();
+        self.id += 1;
         self.stop();
         let _missile = match util::replace(&mut self.action, None) {
             None => missile, // no need to do a homing dance
@@ -101,6 +104,7 @@ fn oneshot(&mut self, msecs: u64) -> Port<()> {
         // of the homing missile
         let _prev_action = {
             let _m = self.fire_homing_missile();
+            self.id += 1;
             self.stop();
             self.start(msecs, 0);
             util::replace(&mut self.action, Some(SendOnce(chan)))
@@ -116,9 +120,10 @@ fn period(&mut self, msecs: u64) -> Port<()> {
         // of the homing missile
         let _prev_action = {
             let _m = self.fire_homing_missile();
+            self.id += 1;
             self.stop();
             self.start(msecs, msecs);
-            util::replace(&mut self.action, Some(SendMany(chan)))
+            util::replace(&mut self.action, Some(SendMany(chan, self.id)))
         };
 
         return port;
@@ -135,10 +140,21 @@ fn period(&mut self, msecs: u64) -> Port<()> {
             let sched: ~Scheduler = Local::take();
             sched.resume_blocked_task_immediately(task);
         }
-        SendOnce(chan) => chan.send_deferred(()),
-        SendMany(chan) => {
-            chan.send_deferred(());
-            timer.action = Some(SendMany(chan));
+        SendOnce(chan) => { chan.try_send_deferred(()); }
+        SendMany(chan, id) => {
+            chan.try_send_deferred(());
+
+            // Note that the above operation could have performed some form of
+            // scheduling. This means that the timer may have decided to insert
+            // some other action to happen. This 'id' keeps track of the updates
+            // to the timer, so we only reset the action back to sending on this
+            // channel if the id has remained the same. This is essentially a
+            // bug in that we have mutably aliasable memory, but that's libuv
+            // for you. We're guaranteed to all be running on the same thread,
+            // so there's no need for any synchronization here.
+            if timer.id == id {
+                timer.action = Some(SendMany(chan, id));
+            }
         }
     }
 }
@@ -180,8 +196,8 @@ fn override() {
         let oport = timer.oneshot(1);
         let pport = timer.period(1);
         timer.sleep(1);
-        assert_eq!(oport.try_recv(), None);
-        assert_eq!(pport.try_recv(), None);
+        assert_eq!(oport.recv_opt(), None);
+        assert_eq!(pport.recv_opt(), None);
         timer.oneshot(1).recv();
     }
 
@@ -230,7 +246,7 @@ fn closing_channel_during_drop_doesnt_kill_everything() {
         let timer_port = timer.period(1000);
 
         do spawn {
-            timer_port.try_recv();
+            timer_port.recv_opt();
         }
 
         // when we drop the TimerWatcher we're going to destroy the channel,
@@ -244,7 +260,7 @@ fn reset_doesnt_switch_tasks() {
         let timer_port = timer.period(1000);
 
         do spawn {
-            timer_port.try_recv();
+            timer_port.recv_opt();
         }
 
         timer.oneshot(1);
@@ -256,7 +272,7 @@ fn reset_doesnt_switch_tasks2() {
         let timer_port = timer.period(1000);
 
         do spawn {
-            timer_port.try_recv();
+            timer_port.recv_opt();
         }
 
         timer.sleep(1);
@@ -268,7 +284,7 @@ fn sender_goes_away_oneshot() {
             let mut timer = TimerWatcher::new(local_loop());
             timer.oneshot(1000)
         };
-        assert_eq!(port.try_recv(), None);
+        assert_eq!(port.recv_opt(), None);
     }
 
     #[test]
@@ -277,7 +293,7 @@ fn sender_goes_away_period() {
             let mut timer = TimerWatcher::new(local_loop());
             timer.period(1000)
         };
-        assert_eq!(port.try_recv(), None);
+        assert_eq!(port.recv_opt(), None);
     }
 
     #[test]
index 9a65e9973cb483f2df686533fd972ac7281a39b1..4cbc6c7cbb7ba187da741bec18e8248493dd7200 100644 (file)
@@ -255,7 +255,9 @@ mod $name {
             fn f() $b
 
             $($a)* #[test] fn uv() { f() }
-            $($a)* #[test] fn native() {
+            $($a)* #[test]
+            #[ignore(cfg(windows))] // FIXME(#11003)
+            fn native() {
                 use unstable::run_in_bare_thread;
                 run_in_bare_thread(f);
             }
@@ -1021,6 +1023,7 @@ fn stress_shared() {
     }
 
     #[test]
+    #[ignore(cfg(windows))] // FIXME(#11003)
     fn send_from_outside_runtime() {
         let (p, c) = Chan::<int>::new();
         let (p1, c1) = Chan::new();
@@ -1040,6 +1043,7 @@ fn send_from_outside_runtime() {
     }
 
     #[test]
+    #[ignore(cfg(windows))] // FIXME(#11003)
     fn recv_from_outside_runtime() {
         let (p, c) = Chan::<int>::new();
         let t = do Thread::start {
@@ -1054,6 +1058,7 @@ fn recv_from_outside_runtime() {
     }
 
     #[test]
+    #[ignore(cfg(windows))] // FIXME(#11003)
     fn no_runtime() {
         let (p1, c1) = Chan::<int>::new();
         let (p2, c2) = Chan::<int>::new();
index 2d9bc6e9c12bc11582408c52cb70a1d083dda026..4d6b540f2a5cb615f890ed9b1c4fe2c4b44eeefd 100644 (file)
@@ -83,10 +83,10 @@ pub struct Select {
 /// A handle to a port which is currently a member of a `Select` set of ports.
 /// This handle is used to keep the port in the set as well as interact with the
 /// underlying port.
-pub struct Handle<'self, T> {
+pub struct Handle<'port, T> {
     id: uint,
-    priv selector: &'self Select,
-    priv port: &'self mut Port<T>,
+    priv selector: &'port Select,
+    priv port: &'port mut Port<T>,
 }
 
 struct PacketIterator { priv cur: *mut Packet }
@@ -234,6 +234,7 @@ pub fn wait(&self) -> uint {
                 assert!(!(*packet).selecting.load(Relaxed));
             }
 
+            assert!(ready_id != uint::max_value);
             return ready_id;
         }
     }
@@ -261,7 +262,7 @@ unsafe fn remove(&self, packet: *mut Packet) {
     fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } }
 }
 
-impl<'self, T: Send> Handle<'self, T> {
+impl<'port, T: Send> Handle<'port, T> {
     /// Receive a value on the underlying port. Has the same semantics as
     /// `Port.recv`
     pub fn recv(&mut self) -> T { self.port.recv() }
@@ -283,7 +284,7 @@ fn drop(&mut self) {
 }
 
 #[unsafe_destructor]
-impl<'self, T: Send> Drop for Handle<'self, T> {
+impl<'port, T: Send> Drop for Handle<'port, T> {
     fn drop(&mut self) {
         unsafe { self.selector.remove(self.port.queue.packet()) }
     }
@@ -437,6 +438,7 @@ fn stress() {
     }
 
     #[test]
+    #[ignore(cfg(windows))] // FIXME(#11003)
     fn stress_native() {
         use std::rt::thread::Thread;
         use std::unstable::run_in_bare_thread;
@@ -470,6 +472,7 @@ fn stress_native() {
     }
 
     #[test]
+    #[ignore(cfg(windows))] // FIXME(#11003)
     fn native_both_ready() {
         use std::rt::thread::Thread;
         use std::unstable::run_in_bare_thread;
index d3fc265cf2aa8135e6e6440758a95fb18a7e0ee0..49770b80060b870de546bc574104d1bb8dccaf39 100644 (file)
@@ -163,11 +163,11 @@ fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
             do spawntask {
                 let mut acceptor = UnixListener::bind(&path1).listen();
                 chan.send(());
-                server.take()(acceptor.accept().unwrap());
+                server(acceptor.accept().unwrap());
             }
 
             port.recv();
-            client.take()(UnixStream::connect(&path2).unwrap());
+            client(UnixStream::connect(&path2).unwrap());
         }
     }
 
index da02988c75c94072e9ec012cce0b758f43f7f92f..6128f310a2ebf27b51ddbf9bed20575fe26914a4 100644 (file)
@@ -132,11 +132,13 @@ fn drop(&mut self) {
 
 #[cfg(windows)]
 mod imp {
+    use super::DEFAULT_STACK_SIZE;
+
+    use cast;
+    use libc;
     use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL,
                                        LPVOID, DWORD, LPDWORD, HANDLE};
-    use libc;
-    use cast;
-    use super::DEFAULT_STACK_SIZE;
+    use ptr;
 
     pub type rust_thread = HANDLE;
     pub type rust_thread_return = DWORD;
@@ -210,9 +212,10 @@ pub unsafe fn detach(native: rust_thread) {
     }
 
     #[cfg(target_os = "macos")]
+    #[cfg(target_os = "android")]
     pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); }
 
-    #[cfg(not(target_os = "macos"))]
+    #[cfg(not(target_os = "macos"), not(target_os = "android"))]
     pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); }
 
     extern {
@@ -230,8 +233,9 @@ fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t,
         fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
 
         #[cfg(target_os = "macos")]
+        #[cfg(target_os = "android")]
         fn sched_yield() -> libc::c_int;
-        #[cfg(not(target_os = "macos"))]
+        #[cfg(not(target_os = "macos"), not(target_os = "android"))]
         fn pthread_yield() -> libc::c_int;
     }
 }
index eb3e19f4a5a7f46bd8552ad47c6a82413639f5a9..1148774020a14b083f73d6b5d5e3064ff7061312 100644 (file)
@@ -171,7 +171,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
     if opts.notify_chan.is_some() {
         let notify_chan = opts.notify_chan.take_unwrap();
         let on_exit: proc(TaskResult) = proc(task_result) {
-            notify_chan.send(task_result)
+            notify_chan.try_send(task_result);
         };
         task.death.on_exit = Some(on_exit);
     }
index 2a5971be216b876c1229205f9d597af31fad9f54..50cb00b25d4b116ead9b024dc9b2978127c984b5 100644 (file)
@@ -20,7 +20,6 @@
 
 extern mod extra;
 
-use std::comm::{Port, Chan, SharedChan};
 use std::comm;
 use std::os;
 use std::task;
@@ -38,7 +37,7 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) {
     let mut count = 0u;
     let mut done = false;
     while !done {
-        match requests.try_recv() {
+        match requests.recv_opt() {
           Some(get_count) => { responses.send(count.clone()); }
           Some(bytes(b)) => {
             //error!("server: received {:?} bytes", b);
@@ -53,10 +52,8 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) {
 }
 
 fn run(args: &[~str]) {
-    let (from_child, to_parent) = comm::stream();
-    let (from_parent, to_child) = comm::stream();
-
-    let to_child = SharedChan::new(to_child);
+    let (from_child, to_parent) = Chan::new();
+    let (from_parent, to_child) = SharedChan::new();
 
     let size = from_str::<uint>(args[1]).unwrap();
     let workers = from_str::<uint>(args[2]).unwrap();
index 1ff531324b3585cb9c4bf1c0d35ebd9bcda5ede3..3cf1a97a36e041821f6a598c89f3f9d3d0d22d87 100644 (file)
@@ -16,7 +16,6 @@
 
 extern mod extra;
 
-use std::comm::{SharedChan, Chan, stream};
 use std::os;
 use std::task;
 use std::uint;
@@ -33,7 +32,7 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) {
     let mut count: uint = 0;
     let mut done = false;
     while !done {
-        match requests.try_recv() {
+        match requests.recv_opt() {
           Some(get_count) => { responses.send(count.clone()); }
           Some(bytes(b)) => {
             //error!("server: received {:?} bytes", b);
@@ -48,17 +47,15 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) {
 }
 
 fn run(args: &[~str]) {
-    let (from_child, to_parent) = stream();
-    let (from_parent, to_child) = stream();
-    let to_child = SharedChan::new(to_child);
+    let (from_child, to_parent) = Chan::new();
 
     let size = from_str::<uint>(args[1]).unwrap();
     let workers = from_str::<uint>(args[2]).unwrap();
     let num_bytes = 100;
     let start = extra::time::precise_time_s();
     let mut worker_results = ~[];
-    for _ in range(0u, workers) {
-        let to_child = to_child.clone();
+    let from_parent = if workers == 1 {
+        let (from_parent, to_child) = Chan::new();
         let mut builder = task::task();
         worker_results.push(builder.future_result());
         do builder.spawn {
@@ -68,7 +65,23 @@ fn run(args: &[~str]) {
             }
             //error!("worker {:?} exiting", i);
         };
-    }
+        from_parent
+    } else {
+        let (from_parent, to_child) = SharedChan::new();
+        for _ in range(0u, workers) {
+            let to_child = to_child.clone();
+            let mut builder = task::task();
+            worker_results.push(builder.future_result());
+            do builder.spawn {
+                for _ in range(0u, size / workers) {
+                    //error!("worker {:?}: sending {:?} bytes", i, num_bytes);
+                    to_child.send(bytes(num_bytes));
+                }
+                //error!("worker {:?} exiting", i);
+            };
+        }
+        from_parent
+    };
     do task::spawn || {
         server(&from_parent, &to_parent);
     }
@@ -78,8 +91,8 @@ fn run(args: &[~str]) {
     }
 
     //error!("sending stop message");
-    to_child.send(stop);
-    move_out(to_child);
+    //to_child.send(stop);
+    //move_out(to_child);
     let result = from_child.recv();
     let end = extra::time::precise_time_s();
     let elapsed = end - start;
index 8fa26b42e852714defedc6321aa687bdce37907e..90d81aa7c3ee688379ce664b24165934c58702cf 100644 (file)
@@ -24,9 +24,9 @@ fn ping_pong_bench(n: uint, m: uint) {
     // Create pairs of tasks that pingpong back and forth.
     fn run_pair(n: uint) {
         // Create a stream A->B
-        let (pa,ca) = stream::<()>();
+        let (pa,ca) = Chan::<()>::new();
         // Create a stream B->A
-        let (pb,cb) = stream::<()>();
+        let (pb,cb) = Chan::<()>::new();
 
         do spawntask_later() || {
             let chan = ca;
index e6519a7885629d7166ec94f8600e2bef4371fa0c..ab607d9aebc758da867cc357cbc8982ebb5f89e5 100644 (file)
@@ -13,7 +13,6 @@
 use std::os;
 use std::uint;
 use std::rt::test::spawntask_later;
-use std::comm::oneshot;
 
 // A simple implementation of parfib. One subtree is found in a new
 // task and communicated over a oneshot pipe, the other is found
@@ -24,7 +23,7 @@ fn parfib(n: uint) -> uint {
         return 1;
     }
 
-    let (port,chan) = oneshot::<uint>();
+    let (port,chan) = Chan::new();
     do spawntask_later {
         chan.send(parfib(n-1));
     };
index 464bc664fb5b2b09a99fd218dc567c6e34181217..7801a64fcedba40df1948d838eecda2d7e7322ad 100644 (file)
@@ -12,7 +12,6 @@
 
 extern mod extra;
 
-use std::comm::{stream, SharedChan};
 use std::option;
 use std::os;
 use std::task;
@@ -138,10 +137,8 @@ fn creature(
 fn rendezvous(nn: uint, set: ~[color]) {
 
     // these ports will allow us to hear from the creatures
-    let (from_creatures, to_rendezvous) = stream::<CreatureInfo>();
-    let to_rendezvous = SharedChan::new(to_rendezvous);
-    let (from_creatures_log, to_rendezvous_log) = stream::<~str>();
-    let to_rendezvous_log = SharedChan::new(to_rendezvous_log);
+    let (from_creatures, to_rendezvous) = SharedChan::<CreatureInfo>::new();
+    let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new();
 
     // these channels will be passed to the creatures so they can talk to us
 
@@ -154,7 +151,7 @@ fn rendezvous(nn: uint, set: ~[color]) {
             let col = *col;
             let to_rendezvous = to_rendezvous.clone();
             let to_rendezvous_log = to_rendezvous_log.clone();
-            let (from_rendezvous, to_creature) = stream();
+            let (from_rendezvous, to_creature) = Chan::new();
             do task::spawn {
                 creature(ii,
                          col,
index a12eac50852846cbae8a4b35fbcc43b2b14903f3..96de609787345b687e3105c26de5f125a8851dff 100644 (file)
@@ -17,7 +17,6 @@
 
 use extra::sort;
 use std::cmp::Ord;
-use std::comm::{stream, Port, Chan};
 use std::comm;
 use std::hashmap::HashMap;
 use std::option;
@@ -165,7 +164,7 @@ fn main() {
 
     // initialize each sequence sorter
     let sizes = ~[1u,2,3,4,6,12,18];
-    let mut streams = vec::from_fn(sizes.len(), |_| Some(stream::<~str>()));
+    let mut streams = vec::from_fn(sizes.len(), |_| Some(Chan::<~str>::new()));
     let mut from_child = ~[];
     let to_child   = sizes.iter().zip(streams.mut_iter()).map(|(sz, stream_ref)| {
         let sz = *sz;
@@ -174,7 +173,7 @@ fn main() {
 
         from_child.push(from_child_);
 
-        let (from_parent, to_child) = comm::stream();
+        let (from_parent, to_child) = Chan::new();
 
         do spawn {
             make_sequence_processor(sz, &from_parent, &to_parent_);
index da25f1e82eebb77c310afd6221cec7b7f8a2ee3e..aa060ceb0973f1ab7c97ce64795b3516a324358c 100644 (file)
@@ -21,7 +21,6 @@
 extern mod extra;
 
 use extra::{time, getopts};
-use std::comm::{stream, SharedChan};
 use std::os;
 use std::result::{Ok, Err};
 use std::task;
@@ -34,8 +33,7 @@ fn pfib(c: &SharedChan<int>, n: int) {
         } else if n <= 2 {
             c.send(1);
         } else {
-            let (pp, cc) = stream();
-            let cc = SharedChan::new(cc);
+            let (pp, cc) = SharedChan::new();
             let ch = cc.clone();
             task::spawn(proc() pfib(&ch, n - 1));
             let ch = cc.clone();
@@ -44,8 +42,7 @@ fn pfib(c: &SharedChan<int>, n: int) {
         }
     }
 
-    let (p, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
     let _t = task::spawn(proc() pfib(&ch, n) );
     p.recv()
 }
index 5e0968163064134ae6e2f525d93e261356bcee31..6293b6ce8669ba365cf19df091908fa00933064c 100644 (file)
 use std::os;
 
 fn start(n_tasks: int, token: int) {
-    let (p, ch1) = stream();
+    let (p, ch1) = Chan::new();
     let mut p = p;
     let ch1 = ch1;
     ch1.send(token);
     //  XXX could not get this to work with a range closure
     let mut i = 2;
     while i <= n_tasks {
-        let (next_p, ch) = stream();
+        let (next_p, ch) = Chan::new();
         let imm_i = i;
         let imm_p = p;
         do spawn {
index 8e7b48040cdf12909d55e25a1a7930bfc2cca8e2..dc31ef06fa6f20143f2b1dd8fbd0e19922d34ceb 100644 (file)
@@ -48,9 +48,9 @@ fn main() {
         args.clone()
     };
 
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     child_generation(from_str::<uint>(args[1]).unwrap(), c);
-    if p.try_recv().is_none() {
+    if p.recv_opt().is_none() {
         fail!("it happened when we slumbered");
     }
 }
index bacf8353a2e3686e45c8e0726b8add262cf3cc62..49aa8d18e90ecad23d5890527ed16ad73bb49ca9 100644 (file)
@@ -81,7 +81,7 @@ pub fn map_reduce(inputs: ~[~str]) {
               mapper_done => { num_mappers -= 1; }
               find_reducer(k, cc) => {
                 let mut c;
-                match reducers.find(&str::from_utf8(k)) {
+                match reducers.find(&str::from_utf8(k).to_owned()) {
                   Some(&_c) => { c = _c; }
                   None => { c = 0; }
                 }
index 435d68ada4964bca64f0754dcaf9111df61ea225..2a7a0c25a21c20dd33d2b5d6ec49abcace36b435 100644 (file)
@@ -20,7 +20,7 @@ pub fn main() {
     while (i > 0) {
         info!("{}", i);
         let ch = ch.clone();
-        task::spawn({let i = i; proc() { child(i, &ch) });
+        task::spawn({let i = i; proc() { child(i, &ch) }});
         i = i - 1;
     }