1 % Rust Tasks and Communication Tutorial
5 Rust provides safe concurrency through a combination
6 of lightweight, memory-isolated tasks and message passing.
7 This tutorial will describe the concurrency model in Rust, how it
8 relates to the Rust type system, and introduce
9 the fundamental library abstractions for constructing concurrent programs.
11 Rust tasks are not the same as traditional threads: rather,
12 they are considered _green threads_, lightweight units of execution that the Rust
13 runtime schedules cooperatively onto a small number of operating system threads.
14 On a multi-core system Rust tasks will be scheduled in parallel by default.
15 Because tasks are significantly
16 cheaper to create than traditional threads, Rust can create hundreds of
17 thousands of concurrent tasks on a typical 32-bit system.
18 In general, all Rust code executes inside a task, including the `main` function.
20 In order to make efficient use of memory Rust tasks have dynamically sized stacks.
21 A task begins its life with a small
22 amount of stack space (currently in the low thousands of bytes, depending on
23 platform), and acquires more stack as needed.
24 Unlike in languages such as C, a Rust task cannot accidentally write to
25 memory beyond the end of the stack, causing crashes or worse.
27 Tasks provide failure isolation and recovery. When a fatal error occurs in Rust
28 code as a result of an explicit call to `fail!()`, an assertion failure, or
29 another invalid operation, the runtime system destroys the entire
30 task. Unlike in languages such as Java and C++, there is no way to `catch` an
31 exception. Instead, tasks may monitor each other for failure.
33 Tasks use Rust's type system to provide strong memory safety guarantees. In
34 particular, the type system guarantees that tasks cannot share mutable state
35 with each other. Tasks communicate with each other by transferring _owned_
36 data through the global _exchange heap_.
38 ## A note about the libraries
40 While Rust's type system provides the building blocks needed for safe
41 and efficient tasks, all of the task functionality itself is implemented
42 in the standard and extra libraries, which are still under development
43 and do not always present a consistent or complete interface.
45 For your reference, these are the standard modules involved in Rust
46 concurrency at this writing:
48 * [`std::task`] - All code relating to tasks and task scheduling,
49 * [`std::comm`] - The message passing interface,
50 * [`std::pipes`] - The underlying messaging infrastructure,
51 * [`extra::comm`] - Additional messaging types based on `std::pipes`,
52 * [`extra::sync`] - More exotic synchronization tools, including locks,
53 * [`extra::arc`] - The Arc (atomically reference counted) type,
54 for safely sharing immutable data,
55 * [`extra::future`] - A type representing values that may be computed concurrently and retrieved at a later time.
57 [`std::task`]: std/task.html
58 [`std::comm`]: std/comm.html
59 [`std::pipes`]: std/pipes.html
60 [`extra::comm`]: extra/comm.html
61 [`extra::sync`]: extra/sync.html
62 [`extra::arc`]: extra/arc.html
63 [`extra::future`]: extra/future.html
67 The programming interface for creating and managing tasks lives
68 in the `task` module of the `std` library, and is thus available to all
69 Rust code by default. At its simplest, creating a task is a matter of
70 calling the `spawn` function with a closure argument. `spawn` executes the
71 closure in the new task.
74 # use std::io::println;
75 # use std::task::spawn;
77 // Print something profound in a different task using a named function
78 fn print_message() { println("I am running in a different task!"); }
81 // Print something more profound in a different task using a lambda expression
82 spawn( || println("I am also running in a different task!") );
84 // The canonical way to spawn is using `do` notation
86 println("I too am running in a different task!");
90 In Rust, there is nothing special about creating tasks: a task is not a
91 concept that appears in the language semantics. Instead, Rust's type system
92 provides all the tools necessary to implement safe concurrency: particularly,
93 _owned types_. The language leaves the implementation details to the standard
96 The `spawn` function has a very simple type signature: `fn spawn(f:
97 ~fn())`. Because it accepts only owned closures, and owned closures
98 contain only owned data, `spawn` can safely move the entire closure
99 and all its associated state into an entirely different task for
100 execution. Like any closure, the function passed to `spawn` may capture
101 an environment that it carries across tasks.
104 # use std::io::println;
105 # use std::task::spawn;
106 # fn generate_task_number() -> int { 0 }
107 // Generate some state locally
108 let child_task_number = generate_task_number();
111 // Capture it in the remote task
112 println(fmt!("I am child number %d", child_task_number));
116 By default, the scheduler multiplexes tasks across the available cores, running
117 in parallel. Thus, on a multicore machine, running the following code
118 should interleave the output in vaguely random order.
121 # use std::io::print;
122 # use std::task::spawn;
124 for child_task_number in range(0, 20) {
126 print(fmt!("I am child number %d\n", child_task_number));
133 Now that we have spawned a new task, it would be nice if we could
134 communicate with it. Recall that Rust does not have shared mutable
135 state, so one task may not manipulate variables owned by another task.
136 Instead we use *pipes*.
138 A pipe is simply a pair of endpoints: one for sending messages and another for
139 receiving messages. Pipes are low-level communication building-blocks and so
140 come in a variety of forms, each one appropriate for a different use case. In
141 what follows, we cover the most commonly used varieties.
143 The simplest way to create a pipe is to use the `pipes::stream`
144 function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
145 is a sending endpoint of a pipe, and a *port* is the receiving
146 endpoint. Consider the following example of calculating two results
150 # use std::task::spawn;
151 # use std::comm::{stream, Port, Chan};
153 let (port, chan): (Port<int>, Chan<int>) = stream();
156 let result = some_expensive_computation();
160 some_other_expensive_computation();
161 let result = port.recv();
162 # fn some_expensive_computation() -> int { 42 }
163 # fn some_other_expensive_computation() {}
166 Let's examine this example in detail. First, the `let` statement creates a
167 stream for sending and receiving integers (the left-hand side of the `let`,
168 `(chan, port)`, is an example of a *destructuring let*: the pattern separates
169 a tuple into its component parts).
172 # use std::comm::{stream, Chan, Port};
173 let (port, chan): (Port<int>, Chan<int>) = stream();
176 The child task will use the channel to send data to the parent task,
177 which will wait to receive the data on the port. The next statement
178 spawns the child task.
181 # use std::task::spawn;
182 # use std::comm::stream;
183 # fn some_expensive_computation() -> int { 42 }
184 # let (port, chan) = stream();
186 let result = some_expensive_computation();
191 Notice that the creation of the task closure transfers `chan` to the child
192 task implicitly: the closure captures `chan` in its environment. Both `Chan`
193 and `Port` are sendable types and may be captured into tasks or otherwise
194 transferred between them. In the example, the child task runs an expensive
195 computation, then sends the result over the captured channel.
197 Finally, the parent continues with some other expensive
198 computation, then waits for the child's result to arrive on the
202 # use std::comm::{stream};
203 # fn some_other_expensive_computation() {}
204 # let (port, chan) = stream::<int>();
206 some_other_expensive_computation();
207 let result = port.recv();
210 The `Port` and `Chan` pair created by `stream` enables efficient communication
211 between a single sender and a single receiver, but multiple senders cannot use
212 a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
213 example needed to compute multiple results across a number of tasks? The
214 following program is ill-typed:
217 # use std::task::{spawn};
218 # use std::comm::{stream, Port, Chan};
219 # fn some_expensive_computation() -> int { 42 }
220 let (port, chan) = stream();
223 chan.send(some_expensive_computation());
226 // ERROR! The previous spawn statement already owns the channel,
227 // so the compiler will not allow it to be captured again
229 chan.send(some_expensive_computation());
233 Instead we can use a `SharedChan`, a type that allows a single
234 `Chan` to be shared by multiple senders.
237 # use std::task::spawn;
238 # use std::comm::{stream, SharedChan};
240 let (port, chan) = stream();
241 let chan = SharedChan::new(chan);
243 for init_val in range(0u, 3) {
244 // Create a new channel handle to distribute to the child task
245 let child_chan = chan.clone();
247 child_chan.send(some_expensive_computation(init_val));
251 let result = port.recv() + port.recv() + port.recv();
252 # fn some_expensive_computation(_i: uint) -> int { 42 }
255 Here we transfer ownership of the channel into a new `SharedChan` value. Like
256 `Chan`, `SharedChan` is a non-copyable, owned type (sometimes also referred to
257 as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
258 may duplicate a `SharedChan`, with the `clone()` method. A cloned
259 `SharedChan` produces a new handle to the same channel, allowing multiple
260 tasks to send data to a single port. Between `spawn`, `stream` and
261 `SharedChan`, we have enough tools to implement many useful concurrency
264 Note that the above `SharedChan` example is somewhat contrived since
265 you could also simply use three `stream` pairs, but it serves to
266 illustrate the point. For reference, written with multiple streams, it
267 might look like the example below.
270 # use std::task::spawn;
271 # use std::comm::stream;
274 // Create a vector of ports, one for each child task
275 let ports = do vec::from_fn(3) |init_val| {
276 let (port, chan) = stream();
278 chan.send(some_expensive_computation(init_val));
283 // Wait on each port, accumulating the results
284 let result = ports.iter().fold(0, |accum, port| accum + port.recv() );
285 # fn some_expensive_computation(_i: uint) -> int { 42 }
288 ## Backgrounding computations: Futures
289 With `extra::future`, rust has a mechanism for requesting a computation and getting the result
292 The basic example below illustrates this.
294 # fn make_a_sandwich() {};
295 fn fib(n: uint) -> uint {
296 // lengthy computation returning an uint
300 let mut delayed_fib = extra::future::spawn (|| fib(50) );
302 println(fmt!("fib(50) = %?", delayed_fib.get()))
305 The call to `future::spawn` returns immediately a `future` object regardless of how long it
306 takes to run `fib(50)`. You can then make yourself a sandwich while the computation of `fib` is
307 running. The result of the execution of the method is obtained by calling `get` on the future.
308 This call will block until the value is available (*i.e.* the computation is complete). Note that
309 the future needs to be mutable so that it can save the result for next time `get` is called.
311 Here is another example showing how futures allow you to background computations. The workload will
312 be distributed on the available cores.
315 fn partial_sum(start: uint) -> f64 {
316 let mut local_sum = 0f64;
317 for num in range(start*100000, (start+1)*100000) {
318 local_sum += (num as f64 + 1.0).pow(&-2.0);
324 let mut futures = vec::from_fn(1000, |ind| do extra::future::spawn { partial_sum(ind) });
326 let mut final_res = 0f64;
327 for ft in futures.mut_iter() {
328 final_res += ft.get();
330 println(fmt!("Ï€^2/6 is not far from : %?", final_res));
334 ## Sharing immutable data without copy: Arc
336 To share immutable data between tasks, a first approach would be to only use pipes as we have seen
337 previously. A copy of the data to share would then be made for each task. In some cases, this would
338 add up to a significant amount of wasted memory and would require copying the same data more than
341 To tackle this issue, one can use an Atomically Reference Counted wrapper (`Arc`) as implemented in
342 the `extra` library of Rust. With an Arc, the data will no longer be copied for each task. The Arc
343 acts as a reference to the shared data and only this reference is shared and cloned.
345 Here is a small example showing how to use Arcs. We wish to run concurrently several computations on
346 a single large vector of floats. Each task needs the full vector to perform its duty.
352 fn pnorm(nums: &~[float], p: uint) -> float {
353 nums.iter().fold(0.0, |a,b| a+(*b).pow(&(p as float)) ).pow(&(1f / (p as float)))
357 let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
358 println(fmt!("Inf-norm = %?", *numbers.iter().max().unwrap()));
360 let numbers_arc = Arc::new(numbers);
362 for num in range(1u, 10) {
363 let (port, chan) = stream();
364 chan.send(numbers_arc.clone());
367 let local_arc : Arc<~[float]> = port.recv();
368 let task_numbers = local_arc.get();
369 println(fmt!("%u-norm = %?", num, pnorm(task_numbers, num)));
375 The function `pnorm` performs a simple computation on the vector (it computes the sum of its items
376 at the power given as argument and takes the inverse power of this value). The Arc on the vector is
379 # use extra::arc::Arc;
382 # let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
383 let numbers_arc=Arc::new(numbers);
385 and a clone of it is sent to each task
387 # use extra::arc::Arc;
390 # let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
391 # let numbers_arc = Arc::new(numbers);
392 # let (port, chan) = stream();
393 chan.send(numbers_arc.clone());
395 copying only the wrapper and not its contents.
397 Each task recovers the underlying data by
399 # use extra::arc::Arc;
402 # let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
403 # let numbers_arc=Arc::new(numbers);
404 # let (port, chan) = stream();
405 # chan.send(numbers_arc.clone());
406 # let local_arc : Arc<~[float]> = port.recv();
407 let task_numbers = local_arc.get();
409 and can use it as if it were local.
411 The `arc` module also implements Arcs around mutable data that are not covered here.
413 # Handling task failure
415 Rust has a built-in mechanism for raising exceptions. The `fail!()` macro
416 (which can also be written with an error string as an argument: `fail!(
417 ~reason)`) and the `assert!` construct (which effectively calls `fail!()`
418 if a boolean expression is false) are both ways to raise exceptions. When a
419 task raises an exception the task unwinds its stack---running destructors and
420 freeing memory along the way---and then exits. Unlike exceptions in C++,
421 exceptions in Rust are unrecoverable within a single task: once a task fails,
422 there is no way to "catch" the exception.
424 All tasks are, by default, _linked_ to each other. That means that the fates
425 of all tasks are intertwined: if one fails, so do all the others.
428 # use std::task::spawn;
430 # fn do_some_work() { loop { task::yield() } }
432 // Create a child task that fails
435 // This will also fail because the task we spawned failed
440 While it isn't possible for a task to recover from failure, tasks may notify
441 each other of failure. The simplest way of handling task failure is with the
442 `try` function, which is similar to `spawn`, but immediately blocks waiting
443 for the child task to finish. `try` returns a value of type `Result<int,
444 ()>`. `Result` is an `enum` type with two variants: `Ok` and `Err`. In this
445 case, because the type arguments to `Result` are `int` and `()`, callers can
446 pattern-match on a result to check whether it's an `Ok` result with an `int`
447 field (representing a successful result) or an `Err` result (representing
448 termination with an error).
452 # fn some_condition() -> bool { false }
453 # fn calculate_result() -> int { 0 }
454 let result: Result<int, ()> = do task::try {
455 if some_condition() {
461 assert!(result.is_err());
464 Unlike `spawn`, the function spawned using `try` may return a value,
465 which `try` will dutifully propagate back to the caller in a [`Result`]
466 enum. If the child task terminates successfully, `try` will
467 return an `Ok` result; if the child task fails, `try` will return
470 [`Result`]: std/result.html
472 > ***Note:*** A failed task does not currently produce a useful error
473 > value (`try` always returns `Err(())`). In the
474 > future, it may be possible for tasks to intercept the value passed to
477 TODO: Need discussion of `future_result` in order to make failure
480 But not all failures are created equal. In some cases you might need to
481 abort the entire program (perhaps you're writing an assert which, if
482 it trips, indicates an unrecoverable logic error); in other cases you
483 might want to contain the failure at a certain boundary (perhaps a
484 small piece of input from the outside world, which you happen to be
485 processing in parallel, is malformed and its processing task can't
486 proceed). Hence, you will need different _linked failure modes_.
490 By default, task failure is _bidirectionally linked_, which means that if
491 either task fails, it kills the other one.
495 # fn sleep_forever() { loop { task::yield() } }
499 fail!(); // All three tasks will fail.
501 sleep_forever(); // Will get woken up by force, then fail
503 sleep_forever(); // Will get woken up by force, then fail
507 If you want parent tasks to be able to kill their children, but do not want a
508 parent to fail automatically if one of its child task fails, you can call
509 `task::spawn_supervised` for _unidirectionally linked_ failure. The
510 function `task::try`, which we saw previously, uses `spawn_supervised`
511 internally, with additional logic to wait for the child task to finish
512 before returning. Hence:
515 # use std::comm::{stream, Chan, Port};
516 # use std::task::{spawn, try};
518 # fn sleep_forever() { loop { task::yield() } }
520 let (receiver, sender): (Port<int>, Chan<int>) = stream();
521 do spawn { // Bidirectionally linked
522 // Wait for the supervised child task to exist.
523 let message = receiver.recv();
524 // Kill both it and the parent task.
525 assert!(message != 42);
527 do try { // Unidirectionally linked
529 sleep_forever(); // Will get woken up by force
531 // Flow never reaches here -- parent task was killed too.
535 Supervised failure is useful in any situation where one task manages
536 multiple fallible child tasks, and the parent task can recover
537 if any child fails. On the other hand, if the _parent_ (supervisor) fails,
538 then there is nothing the children can do to recover, so they should
541 Supervised task failure propagates across multiple generations even if
542 an intermediate generation has already exited:
546 # fn sleep_forever() { loop { task::yield() } }
547 # fn wait_for_a_while() { do 1000.times { task::yield() } }
548 # do task::try::<int> {
549 do task::spawn_supervised {
550 do task::spawn_supervised {
551 sleep_forever(); // Will get woken up by force, then fail
553 // Intermediate task immediately exits
556 fail!(); // Will kill grandchild even if child has already exited
560 Finally, tasks can be configured to not propagate failure to each
561 other at all, using `task::spawn_unlinked` for _isolated failure_.
565 # fn random() -> uint { 100 }
566 # fn sleep_for(i: uint) { do i.times { task::yield() } }
567 # do task::try::<()> {
568 let (time1, time2) = (random(), random());
569 do task::spawn_unlinked {
570 sleep_for(time2); // Won't get forced awake
573 sleep_for(time1); // Won't get forced awake
575 // It will take MAX(time1,time2) for the program to finish.
579 ## Creating a task with a bi-directional communication path
581 A very common thing to do is to spawn a child task where the parent
582 and child both need to exchange messages with each other. The
583 function `extra::comm::DuplexStream()` supports this pattern. We'll
584 look briefly at how to use it.
586 To see how `DuplexStream()` works, we will create a child task
587 that repeatedly receives a `uint` message, converts it to a string, and sends
588 the string in response. The child terminates when it receives `0`.
589 Here is the function that implements the child task:
592 # use extra::comm::DuplexStream;
594 fn stringifier(channel: &DuplexStream<~str, uint>) {
597 value = channel.recv();
598 channel.send(uint::to_str(value));
599 if value == 0 { break; }
604 The implementation of `DuplexStream` supports both sending and
605 receiving. The `stringifier` function takes a `DuplexStream` that can
606 send strings (the first type parameter) and receive `uint` messages
607 (the second type parameter). The body itself simply loops, reading
608 from the channel and then sending its response back. The actual
609 response itself is simply the stringified version of the received value,
610 `uint::to_str(value)`.
612 Here is the code for the parent task:
615 # use std::task::spawn;
617 # use extra::comm::DuplexStream;
618 # fn stringifier(channel: &DuplexStream<~str, uint>) {
619 # let mut value: uint;
621 # value = channel.recv();
622 # channel.send(uint::to_str(value));
623 # if value == 0u { break; }
628 let (from_child, to_child) = DuplexStream();
631 stringifier(&to_child);
635 assert!(from_child.recv() == ~"22");
640 assert!(from_child.recv() == ~"23");
641 assert!(from_child.recv() == ~"0");
646 The parent task first calls `DuplexStream` to create a pair of bidirectional
647 endpoints. It then uses `task::spawn` to create the child task, which captures
648 one end of the communication channel. As a result, both parent and child can
649 send and receive data to and from the other.