come in a variety of forms, each one appropriate for a different use case. In
what follows, we cover the most commonly used varieties.
-The simplest way to create a pipe is to use the `comm::stream`
+The simplest way to create a pipe is to use `Chan::new`
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
is a sending endpoint of a pipe, and a *port* is the receiving
endpoint. Consider the following example of calculating two results
~~~~
# use std::task::spawn;
-# use std::comm::{stream, Port, Chan};
-let (port, chan): (Port<int>, Chan<int>) = stream();
+let (port, chan): (Port<int>, Chan<int>) = Chan::new();
do spawn || {
let result = some_expensive_computation();
a tuple into its component parts).
~~~~
-# use std::comm::{stream, Chan, Port};
-let (port, chan): (Port<int>, Chan<int>) = stream();
+let (port, chan): (Port<int>, Chan<int>) = Chan::new();
~~~~
The child task will use the channel to send data to the parent task,
~~~~
# use std::task::spawn;
-# use std::comm::stream;
# fn some_expensive_computation() -> int { 42 }
-# let (port, chan) = stream();
+# let (port, chan) = Chan::new();
do spawn || {
let result = some_expensive_computation();
chan.send(result);
port:
~~~~
-# use std::comm::{stream};
# fn some_other_expensive_computation() {}
-# let (port, chan) = stream::<int>();
+# let (port, chan) = Chan::<int>::new();
# chan.send(0);
some_other_expensive_computation();
let result = port.recv();
~~~~
-The `Port` and `Chan` pair created by `stream` enables efficient communication
-between a single sender and a single receiver, but multiple senders cannot use
-a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
-example needed to compute multiple results across a number of tasks? The
-following program is ill-typed:
+The `Port` and `Chan` pair created by `Chan::new` enables efficient
+communication between a single sender and a single receiver, but multiple
+senders cannot use a single `Chan`, and multiple receivers cannot use a single
+`Port`. What if our example needed to compute multiple results across a number
+of tasks? The following program is ill-typed:
~~~ {.xfail-test}
# use std::task::{spawn};
-# use std::comm::{stream, Port, Chan};
# fn some_expensive_computation() -> int { 42 }
-let (port, chan) = stream();
+let (port, chan) = Chan::new();
do spawn {
chan.send(some_expensive_computation());
~~~
# use std::task::spawn;
-# use std::comm::{stream, SharedChan};
-let (port, chan) = stream();
-let chan = SharedChan::new(chan);
+let (port, chan) = SharedChan::new();
for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
may duplicate a `SharedChan`, with the `clone()` method. A cloned
`SharedChan` produces a new handle to the same channel, allowing multiple
-tasks to send data to a single port. Between `spawn`, `stream` and
+tasks to send data to a single port. Between `spawn`, `Chan` and
`SharedChan`, we have enough tools to implement many useful concurrency
patterns.
Note that the above `SharedChan` example is somewhat contrived since
-you could also simply use three `stream` pairs, but it serves to
+you could also simply use three `Chan` pairs, but it serves to
illustrate the point. For reference, written with multiple streams, it
might look like the example below.
~~~
# use std::task::spawn;
-# use std::comm::stream;
# use std::vec;
// Create a vector of ports, one for each child task
let ports = vec::from_fn(3, |init_val| {
- let (port, chan) = stream();
+ let (port, chan) = Chan::new();
do spawn {
chan.send(some_expensive_computation(init_val));
}
let numbers_arc = Arc::new(numbers);
for num in range(1u, 10) {
- let (port, chan) = stream();
+ let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
do spawn {
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc = Arc::new(numbers);
-# let (port, chan) = stream();
+# let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
~~~
copying only the wrapper and not its contents.
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc=Arc::new(numbers);
-# let (port, chan) = stream();
+# let (port, chan) = Chan::new();
# chan.send(numbers_arc.clone());
# let local_arc : Arc<~[f64]> = port.recv();
let task_numbers = local_arc.get();
# }
# fn main() {
-let (from_child, to_child) = DuplexStream();
+let (from_child, to_child) = DuplexStream::new();
do spawn {
stringifier(&to_child);
})
}
- let mut c = Some(c);
arc.access_cond(|state, cond| {
- c.take_unwrawp().send(());
+ c.send(());
assert!(!*state);
while !*state {
cond.wait();
let mi = m2.clone();
// spawn sibling task
do task::spawn { // linked
- let mut c = Some(c);
mi.lock_cond(|cond| {
c.send(()); // tell sibling to go ahead
(|| {
})
}
#[test]
+ #[ignore(reason = "linked failure?")]
fn test_mutex_different_conds() {
let result = do task::try {
let m = Mutex::new_with_condvars(2);
#[cfg(test)]
mod test {
- use std::comm::oneshot;
use std::rt::test::*;
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket};
#[test]
fn listen_ip4() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
#[test]
fn listen_ip6() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let addr = next_test_ip6();
do spawn {
#[test]
fn udp_recv_ip4() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let client = next_test_ip4();
let server = next_test_ip4();
#[test]
fn udp_recv_ip6() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let client = next_test_ip6();
let server = next_test_ip6();
use std::rt::rtio::*;
let addr = next_test_ip4();
static MAX: uint = 5000;
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do spawn {
let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
- let (p1, c1) = oneshot();
- let (p2, c2) = oneshot();
+ let (p1, c1) = Chan::new();
+ let (p2, c2) = Chan::new();
do spawn {
let l = local_loop();
#[test]
fn test_read_and_block() {
let addr = next_test_ip4();
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
- let (port2, chan2) = stream();
+ let (port2, chan2) = Chan::new();
chan.send(port2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
// thread, close itself, and then come back to the last thread.
#[test]
fn test_homing_closes_correctly() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do task::spawn_sched(task::SingleThreaded) {
let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
use std::rt::sched::{Shutdown, TaskFromFriend};
use std::rt::sleeper_list::SleeperList;
use std::rt::task::Task;
- use std::rt::task::UnwindResult;
use std::rt::thread::Thread;
use std::rt::deque::BufferPool;
+ use std::task::TaskResult;
use std::unstable::run_in_bare_thread;
use uvio::UvEventLoop;
let handle2 = sched2.make_handle();
let tasksFriendHandle = sched2.make_handle();
- let on_exit: proc(UnwindResult) = proc(exit_status) {
+ let on_exit: proc(TaskResult) = proc(exit_status) {
let mut handle1 = handle1;
let mut handle2 = handle2;
handle1.send(Shutdown);
handle2.send(Shutdown);
- assert!(exit_status.is_success());
+ assert!(exit_status.is_ok());
};
unsafe fn local_io() -> &'static mut IoFactory {
#[should_fail] #[test]
fn tcp_stream_fail_cleanup() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
#[should_fail] #[test]
fn udp_fail_other_task() {
let addr = next_test_ip4();
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
// force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure1() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure2() {
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure3() {
- let (port, chan) = stream();
+ let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
#[cfg(test)]
mod tests {
- use std::comm::oneshot;
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::rt::test::next_test_unix;
fn connect() {
let path = next_test_unix();
let path2 = path.clone();
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do spawn {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
fn connect_fail() {
let path = next_test_unix();
let path2 = path.clone();
- let (port, chan) = oneshot();
+ let (port, chan) = Chan::new();
do spawn {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
use super::*;
use super::super::local_loop;
use std::io::signal;
- use std::comm::{SharedChan, stream};
#[test]
fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375, relates to timers as well.
- let (port, chan) = stream();
- let chan = SharedChan::new(chan);
+ let (port, chan) = SharedChan::new();
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
chan);
handle: *uvll::uv_timer_t,
home: SchedHandle,
action: Option<NextAction>,
+ id: uint, // see comments in timer_cb
}
pub enum NextAction {
WakeTask(BlockedTask),
SendOnce(Chan<()>),
- SendMany(Chan<()>),
+ SendMany(Chan<()>, uint),
}
impl TimerWatcher {
handle: handle,
action: None,
home: get_handle_to_current_scheduler!(),
+ id: 0,
};
return me.install();
}
// we must temporarily un-home ourselves, then destroy the action, and
// then re-home again.
let missile = self.fire_homing_missile();
+ self.id += 1;
self.stop();
let _missile = match util::replace(&mut self.action, None) {
None => missile, // no need to do a homing dance
// of the homing missile
let _prev_action = {
let _m = self.fire_homing_missile();
+ self.id += 1;
self.stop();
self.start(msecs, 0);
util::replace(&mut self.action, Some(SendOnce(chan)))
// of the homing missile
let _prev_action = {
let _m = self.fire_homing_missile();
+ self.id += 1;
self.stop();
self.start(msecs, msecs);
- util::replace(&mut self.action, Some(SendMany(chan)))
+ util::replace(&mut self.action, Some(SendMany(chan, self.id)))
};
return port;
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(task);
}
- SendOnce(chan) => chan.send_deferred(()),
- SendMany(chan) => {
- chan.send_deferred(());
- timer.action = Some(SendMany(chan));
+ SendOnce(chan) => { chan.try_send_deferred(()); }
+ SendMany(chan, id) => {
+ chan.try_send_deferred(());
+
+ // Note that the above operation could have performed some form of
+ // scheduling. This means that the timer may have decided to insert
+ // some other action to happen. This 'id' keeps track of the updates
+ // to the timer, so we only reset the action back to sending on this
+ // channel if the id has remained the same. This is essentially a
+ // bug in that we have mutably aliasable memory, but that's libuv
+ // for you. We're guaranteed to all be running on the same thread,
+ // so there's no need for any synchronization here.
+ if timer.id == id {
+ timer.action = Some(SendMany(chan, id));
+ }
}
}
}
let oport = timer.oneshot(1);
let pport = timer.period(1);
timer.sleep(1);
- assert_eq!(oport.try_recv(), None);
- assert_eq!(pport.try_recv(), None);
+ assert_eq!(oport.recv_opt(), None);
+ assert_eq!(pport.recv_opt(), None);
timer.oneshot(1).recv();
}
let timer_port = timer.period(1000);
do spawn {
- timer_port.try_recv();
+ timer_port.recv_opt();
}
// when we drop the TimerWatcher we're going to destroy the channel,
let timer_port = timer.period(1000);
do spawn {
- timer_port.try_recv();
+ timer_port.recv_opt();
}
timer.oneshot(1);
let timer_port = timer.period(1000);
do spawn {
- timer_port.try_recv();
+ timer_port.recv_opt();
}
timer.sleep(1);
let mut timer = TimerWatcher::new(local_loop());
timer.oneshot(1000)
};
- assert_eq!(port.try_recv(), None);
+ assert_eq!(port.recv_opt(), None);
}
#[test]
let mut timer = TimerWatcher::new(local_loop());
timer.period(1000)
};
- assert_eq!(port.try_recv(), None);
+ assert_eq!(port.recv_opt(), None);
}
#[test]
fn f() $b
$($a)* #[test] fn uv() { f() }
- $($a)* #[test] fn native() {
+ $($a)* #[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
+ fn native() {
use unstable::run_in_bare_thread;
run_in_bare_thread(f);
}
}
#[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
fn send_from_outside_runtime() {
let (p, c) = Chan::<int>::new();
let (p1, c1) = Chan::new();
}
#[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
fn recv_from_outside_runtime() {
let (p, c) = Chan::<int>::new();
let t = do Thread::start {
}
#[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
fn no_runtime() {
let (p1, c1) = Chan::<int>::new();
let (p2, c2) = Chan::<int>::new();
/// A handle to a port which is currently a member of a `Select` set of ports.
/// This handle is used to keep the port in the set as well as interact with the
/// underlying port.
-pub struct Handle<'self, T> {
+pub struct Handle<'port, T> {
id: uint,
- priv selector: &'self Select,
- priv port: &'self mut Port<T>,
+ priv selector: &'port Select,
+ priv port: &'port mut Port<T>,
}
struct PacketIterator { priv cur: *mut Packet }
assert!(!(*packet).selecting.load(Relaxed));
}
+ assert!(ready_id != uint::max_value);
return ready_id;
}
}
fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } }
}
-impl<'self, T: Send> Handle<'self, T> {
+impl<'port, T: Send> Handle<'port, T> {
/// Receive a value on the underlying port. Has the same semantics as
/// `Port.recv`
pub fn recv(&mut self) -> T { self.port.recv() }
}
#[unsafe_destructor]
-impl<'self, T: Send> Drop for Handle<'self, T> {
+impl<'port, T: Send> Drop for Handle<'port, T> {
fn drop(&mut self) {
unsafe { self.selector.remove(self.port.queue.packet()) }
}
}
#[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
fn stress_native() {
use std::rt::thread::Thread;
use std::unstable::run_in_bare_thread;
}
#[test]
+ #[ignore(cfg(windows))] // FIXME(#11003)
fn native_both_ready() {
use std::rt::thread::Thread;
use std::unstable::run_in_bare_thread;
do spawntask {
let mut acceptor = UnixListener::bind(&path1).listen();
chan.send(());
- server.take()(acceptor.accept().unwrap());
+ server(acceptor.accept().unwrap());
}
port.recv();
- client.take()(UnixStream::connect(&path2).unwrap());
+ client(UnixStream::connect(&path2).unwrap());
}
}
#[cfg(windows)]
mod imp {
+ use super::DEFAULT_STACK_SIZE;
+
+ use cast;
+ use libc;
use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL,
LPVOID, DWORD, LPDWORD, HANDLE};
- use libc;
- use cast;
- use super::DEFAULT_STACK_SIZE;
+ use ptr;
pub type rust_thread = HANDLE;
pub type rust_thread_return = DWORD;
}
#[cfg(target_os = "macos")]
+ #[cfg(target_os = "android")]
pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); }
- #[cfg(not(target_os = "macos"))]
+ #[cfg(not(target_os = "macos"), not(target_os = "android"))]
pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); }
extern {
fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
#[cfg(target_os = "macos")]
+ #[cfg(target_os = "android")]
fn sched_yield() -> libc::c_int;
- #[cfg(not(target_os = "macos"))]
+ #[cfg(not(target_os = "macos"), not(target_os = "android"))]
fn pthread_yield() -> libc::c_int;
}
}
if opts.notify_chan.is_some() {
let notify_chan = opts.notify_chan.take_unwrap();
let on_exit: proc(TaskResult) = proc(task_result) {
- notify_chan.send(task_result)
+ notify_chan.try_send(task_result);
};
task.death.on_exit = Some(on_exit);
}
extern mod extra;
-use std::comm::{Port, Chan, SharedChan};
use std::comm;
use std::os;
use std::task;
let mut count = 0u;
let mut done = false;
while !done {
- match requests.try_recv() {
+ match requests.recv_opt() {
Some(get_count) => { responses.send(count.clone()); }
Some(bytes(b)) => {
//error!("server: received {:?} bytes", b);
}
fn run(args: &[~str]) {
- let (from_child, to_parent) = comm::stream();
- let (from_parent, to_child) = comm::stream();
-
- let to_child = SharedChan::new(to_child);
+ let (from_child, to_parent) = Chan::new();
+ let (from_parent, to_child) = SharedChan::new();
let size = from_str::<uint>(args[1]).unwrap();
let workers = from_str::<uint>(args[2]).unwrap();
extern mod extra;
-use std::comm::{SharedChan, Chan, stream};
use std::os;
use std::task;
use std::uint;
let mut count: uint = 0;
let mut done = false;
while !done {
- match requests.try_recv() {
+ match requests.recv_opt() {
Some(get_count) => { responses.send(count.clone()); }
Some(bytes(b)) => {
//error!("server: received {:?} bytes", b);
}
fn run(args: &[~str]) {
- let (from_child, to_parent) = stream();
- let (from_parent, to_child) = stream();
- let to_child = SharedChan::new(to_child);
+ let (from_child, to_parent) = Chan::new();
let size = from_str::<uint>(args[1]).unwrap();
let workers = from_str::<uint>(args[2]).unwrap();
let num_bytes = 100;
let start = extra::time::precise_time_s();
let mut worker_results = ~[];
- for _ in range(0u, workers) {
- let to_child = to_child.clone();
+ let from_parent = if workers == 1 {
+ let (from_parent, to_child) = Chan::new();
let mut builder = task::task();
worker_results.push(builder.future_result());
do builder.spawn {
}
//error!("worker {:?} exiting", i);
};
- }
+ from_parent
+ } else {
+ let (from_parent, to_child) = SharedChan::new();
+ for _ in range(0u, workers) {
+ let to_child = to_child.clone();
+ let mut builder = task::task();
+ worker_results.push(builder.future_result());
+ do builder.spawn {
+ for _ in range(0u, size / workers) {
+ //error!("worker {:?}: sending {:?} bytes", i, num_bytes);
+ to_child.send(bytes(num_bytes));
+ }
+ //error!("worker {:?} exiting", i);
+ };
+ }
+ from_parent
+ };
do task::spawn || {
server(&from_parent, &to_parent);
}
}
//error!("sending stop message");
- to_child.send(stop);
- move_out(to_child);
+ //to_child.send(stop);
+ //move_out(to_child);
let result = from_child.recv();
let end = extra::time::precise_time_s();
let elapsed = end - start;
// Create pairs of tasks that pingpong back and forth.
fn run_pair(n: uint) {
// Create a stream A->B
- let (pa,ca) = stream::<()>();
+ let (pa,ca) = Chan::<()>::new();
// Create a stream B->A
- let (pb,cb) = stream::<()>();
+ let (pb,cb) = Chan::<()>::new();
do spawntask_later() || {
let chan = ca;
use std::os;
use std::uint;
use std::rt::test::spawntask_later;
-use std::comm::oneshot;
// A simple implementation of parfib. One subtree is found in a new
// task and communicated over a oneshot pipe, the other is found
return 1;
}
- let (port,chan) = oneshot::<uint>();
+ let (port,chan) = Chan::new();
do spawntask_later {
chan.send(parfib(n-1));
};
extern mod extra;
-use std::comm::{stream, SharedChan};
use std::option;
use std::os;
use std::task;
fn rendezvous(nn: uint, set: ~[color]) {
// these ports will allow us to hear from the creatures
- let (from_creatures, to_rendezvous) = stream::<CreatureInfo>();
- let to_rendezvous = SharedChan::new(to_rendezvous);
- let (from_creatures_log, to_rendezvous_log) = stream::<~str>();
- let to_rendezvous_log = SharedChan::new(to_rendezvous_log);
+ let (from_creatures, to_rendezvous) = SharedChan::<CreatureInfo>::new();
+ let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new();
// these channels will be passed to the creatures so they can talk to us
let col = *col;
let to_rendezvous = to_rendezvous.clone();
let to_rendezvous_log = to_rendezvous_log.clone();
- let (from_rendezvous, to_creature) = stream();
+ let (from_rendezvous, to_creature) = Chan::new();
do task::spawn {
creature(ii,
col,
use extra::sort;
use std::cmp::Ord;
-use std::comm::{stream, Port, Chan};
use std::comm;
use std::hashmap::HashMap;
use std::option;
// initialize each sequence sorter
let sizes = ~[1u,2,3,4,6,12,18];
- let mut streams = vec::from_fn(sizes.len(), |_| Some(stream::<~str>()));
+ let mut streams = vec::from_fn(sizes.len(), |_| Some(Chan::<~str>::new()));
let mut from_child = ~[];
let to_child = sizes.iter().zip(streams.mut_iter()).map(|(sz, stream_ref)| {
let sz = *sz;
from_child.push(from_child_);
- let (from_parent, to_child) = comm::stream();
+ let (from_parent, to_child) = Chan::new();
do spawn {
make_sequence_processor(sz, &from_parent, &to_parent_);
extern mod extra;
use extra::{time, getopts};
-use std::comm::{stream, SharedChan};
use std::os;
use std::result::{Ok, Err};
use std::task;
} else if n <= 2 {
c.send(1);
} else {
- let (pp, cc) = stream();
- let cc = SharedChan::new(cc);
+ let (pp, cc) = SharedChan::new();
let ch = cc.clone();
task::spawn(proc() pfib(&ch, n - 1));
let ch = cc.clone();
}
}
- let (p, ch) = stream();
- let ch = SharedChan::new(ch);
+ let (p, ch) = SharedChan::new();
let _t = task::spawn(proc() pfib(&ch, n) );
p.recv()
}
use std::os;
fn start(n_tasks: int, token: int) {
- let (p, ch1) = stream();
+ let (p, ch1) = Chan::new();
let mut p = p;
let ch1 = ch1;
ch1.send(token);
// XXX could not get this to work with a range closure
let mut i = 2;
while i <= n_tasks {
- let (next_p, ch) = stream();
+ let (next_p, ch) = Chan::new();
let imm_i = i;
let imm_p = p;
do spawn {
args.clone()
};
- let (p,c) = comm::stream();
+ let (p,c) = Chan::new();
child_generation(from_str::<uint>(args[1]).unwrap(), c);
- if p.try_recv().is_none() {
+ if p.recv_opt().is_none() {
fail!("it happened when we slumbered");
}
}
mapper_done => { num_mappers -= 1; }
find_reducer(k, cc) => {
let mut c;
- match reducers.find(&str::from_utf8(k)) {
+ match reducers.find(&str::from_utf8(k).to_owned()) {
Some(&_c) => { c = _c; }
None => { c = 0; }
}
while (i > 0) {
info!("{}", i);
let ch = ch.clone();
- task::spawn({let i = i; proc() { child(i, &ch) });
+ task::spawn({let i = i; proc() { child(i, &ch) }});
i = i - 1;
}