]> git.lizzy.rs Git - rust.git/blob - doc/tutorial-tasks.md
b2ef624d1ac3e6b1c90cb17a3c9dbfecfafe7637
[rust.git] / doc / tutorial-tasks.md
1 % Rust Tasks and Communication Tutorial
2
3 # Introduction
4
5 Rust provides safe concurrency through a combination
6 of lightweight, memory-isolated tasks and message passing.
7 This tutorial will describe the concurrency model in Rust, how it
8 relates to the Rust type system, and introduce
9 the fundamental library abstractions for constructing concurrent programs.
10
11 Rust tasks are not the same as traditional threads: rather,
12 they are considered _green threads_, lightweight units of execution that the Rust
13 runtime schedules cooperatively onto a small number of operating system threads.
14 On a multi-core system Rust tasks will be scheduled in parallel by default.
15 Because tasks are significantly
16 cheaper to create than traditional threads, Rust can create hundreds of
17 thousands of concurrent tasks on a typical 32-bit system.
18 In general, all Rust code executes inside a task, including the `main` function.
19
20 In order to make efficient use of memory Rust tasks have dynamically sized stacks.
21 A task begins its life with a small
22 amount of stack space (currently in the low thousands of bytes, depending on
23 platform), and acquires more stack as needed.
24 Unlike in languages such as C, a Rust task cannot accidentally write to
25 memory beyond the end of the stack, causing crashes or worse.
26
27 Tasks provide failure isolation and recovery. When a fatal error occurs in Rust
28 code as a result of an explicit call to `fail!()`, an assertion failure, or
29 another invalid operation, the runtime system destroys the entire
30 task. Unlike in languages such as Java and C++, there is no way to `catch` an
31 exception. Instead, tasks may monitor each other for failure.
32
33 Tasks use Rust's type system to provide strong memory safety guarantees. In
34 particular, the type system guarantees that tasks cannot share mutable state
35 with each other. Tasks communicate with each other by transferring _owned_
36 data through the global _exchange heap_.
37
38 ## A note about the libraries
39
40 While Rust's type system provides the building blocks needed for safe
41 and efficient tasks, all of the task functionality itself is implemented
42 in the standard and extra libraries, which are still under development
43 and do not always present a consistent or complete interface.
44
45 For your reference, these are the standard modules involved in Rust
46 concurrency at this writing:
47
48 * [`std::task`] - All code relating to tasks and task scheduling,
49 * [`std::comm`] - The message passing interface,
50 * [`std::pipes`] - The underlying messaging infrastructure,
51 * [`extra::comm`] - Additional messaging types based on `std::pipes`,
52 * [`extra::sync`] - More exotic synchronization tools, including locks,
53 * [`extra::arc`] - The Arc (atomically reference counted) type,
54   for safely sharing immutable data,
55 * [`extra::future`] - A type representing values that may be computed concurrently and retrieved at a later time.
56
57 [`std::task`]: std/task.html
58 [`std::comm`]: std/comm.html
59 [`std::pipes`]: std/pipes.html
60 [`extra::comm`]: extra/comm.html
61 [`extra::sync`]: extra/sync.html
62 [`extra::arc`]: extra/arc.html
63 [`extra::future`]: extra/future.html
64
65 # Basics
66
67 The programming interface for creating and managing tasks lives
68 in the `task` module of the `std` library, and is thus available to all
69 Rust code by default. At its simplest, creating a task is a matter of
70 calling the `spawn` function with a closure argument. `spawn` executes the
71 closure in the new task.
72
73 ~~~~
74 # use std::io::println;
75 # use std::task::spawn;
76
77 // Print something profound in a different task using a named function
78 fn print_message() { println("I am running in a different task!"); }
79 spawn(print_message);
80
81 // Print something more profound in a different task using a lambda expression
82 spawn( || println("I am also running in a different task!") );
83
84 // The canonical way to spawn is using `do` notation
85 do spawn {
86     println("I too am running in a different task!");
87 }
88 ~~~~
89
90 In Rust, there is nothing special about creating tasks: a task is not a
91 concept that appears in the language semantics. Instead, Rust's type system
92 provides all the tools necessary to implement safe concurrency: particularly,
93 _owned types_. The language leaves the implementation details to the standard
94 library.
95
96 The `spawn` function has a very simple type signature: `fn spawn(f:
97 ~fn())`. Because it accepts only owned closures, and owned closures
98 contain only owned data, `spawn` can safely move the entire closure
99 and all its associated state into an entirely different task for
100 execution. Like any closure, the function passed to `spawn` may capture
101 an environment that it carries across tasks.
102
103 ~~~
104 # use std::io::println;
105 # use std::task::spawn;
106 # fn generate_task_number() -> int { 0 }
107 // Generate some state locally
108 let child_task_number = generate_task_number();
109
110 do spawn {
111    // Capture it in the remote task
112    println(fmt!("I am child number %d", child_task_number));
113 }
114 ~~~
115
116 By default, the scheduler multiplexes tasks across the available cores, running
117 in parallel. Thus, on a multicore machine, running the following code
118 should interleave the output in vaguely random order.
119
120 ~~~
121 # use std::io::print;
122 # use std::task::spawn;
123
124 for child_task_number in range(0, 20) {
125     do spawn {
126        print(fmt!("I am child number %d\n", child_task_number));
127     }
128 }
129 ~~~
130
131 ## Communication
132
133 Now that we have spawned a new task, it would be nice if we could
134 communicate with it. Recall that Rust does not have shared mutable
135 state, so one task may not manipulate variables owned by another task.
136 Instead we use *pipes*.
137
138 A pipe is simply a pair of endpoints: one for sending messages and another for
139 receiving messages. Pipes are low-level communication building-blocks and so
140 come in a variety of forms, each one appropriate for a different use case. In
141 what follows, we cover the most commonly used varieties.
142
143 The simplest way to create a pipe is to use the `pipes::stream`
144 function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
145 is a sending endpoint of a pipe, and a *port* is the receiving
146 endpoint. Consider the following example of calculating two results
147 concurrently:
148
149 ~~~~
150 # use std::task::spawn;
151 # use std::comm::{stream, Port, Chan};
152
153 let (port, chan): (Port<int>, Chan<int>) = stream();
154
155 do spawn || {
156     let result = some_expensive_computation();
157     chan.send(result);
158 }
159
160 some_other_expensive_computation();
161 let result = port.recv();
162 # fn some_expensive_computation() -> int { 42 }
163 # fn some_other_expensive_computation() {}
164 ~~~~
165
166 Let's examine this example in detail. First, the `let` statement creates a
167 stream for sending and receiving integers (the left-hand side of the `let`,
168 `(chan, port)`, is an example of a *destructuring let*: the pattern separates
169 a tuple into its component parts).
170
171 ~~~~
172 # use std::comm::{stream, Chan, Port};
173 let (port, chan): (Port<int>, Chan<int>) = stream();
174 ~~~~
175
176 The child task will use the channel to send data to the parent task,
177 which will wait to receive the data on the port. The next statement
178 spawns the child task.
179
180 ~~~~
181 # use std::task::spawn;
182 # use std::comm::stream;
183 # fn some_expensive_computation() -> int { 42 }
184 # let (port, chan) = stream();
185 do spawn || {
186     let result = some_expensive_computation();
187     chan.send(result);
188 }
189 ~~~~
190
191 Notice that the creation of the task closure transfers `chan` to the child
192 task implicitly: the closure captures `chan` in its environment. Both `Chan`
193 and `Port` are sendable types and may be captured into tasks or otherwise
194 transferred between them. In the example, the child task runs an expensive
195 computation, then sends the result over the captured channel.
196
197 Finally, the parent continues with some other expensive
198 computation, then waits for the child's result to arrive on the
199 port:
200
201 ~~~~
202 # use std::comm::{stream};
203 # fn some_other_expensive_computation() {}
204 # let (port, chan) = stream::<int>();
205 # chan.send(0);
206 some_other_expensive_computation();
207 let result = port.recv();
208 ~~~~
209
210 The `Port` and `Chan` pair created by `stream` enables efficient communication
211 between a single sender and a single receiver, but multiple senders cannot use
212 a single `Chan`, and multiple receivers cannot use a single `Port`.  What if our
213 example needed to compute multiple results across a number of tasks? The
214 following program is ill-typed:
215
216 ~~~ {.xfail-test}
217 # use std::task::{spawn};
218 # use std::comm::{stream, Port, Chan};
219 # fn some_expensive_computation() -> int { 42 }
220 let (port, chan) = stream();
221
222 do spawn {
223     chan.send(some_expensive_computation());
224 }
225
226 // ERROR! The previous spawn statement already owns the channel,
227 // so the compiler will not allow it to be captured again
228 do spawn {
229     chan.send(some_expensive_computation());
230 }
231 ~~~
232
233 Instead we can use a `SharedChan`, a type that allows a single
234 `Chan` to be shared by multiple senders.
235
236 ~~~
237 # use std::task::spawn;
238 # use std::comm::{stream, SharedChan};
239
240 let (port, chan) = stream();
241 let chan = SharedChan::new(chan);
242
243 for init_val in range(0u, 3) {
244     // Create a new channel handle to distribute to the child task
245     let child_chan = chan.clone();
246     do spawn {
247         child_chan.send(some_expensive_computation(init_val));
248     }
249 }
250
251 let result = port.recv() + port.recv() + port.recv();
252 # fn some_expensive_computation(_i: uint) -> int { 42 }
253 ~~~
254
255 Here we transfer ownership of the channel into a new `SharedChan` value.  Like
256 `Chan`, `SharedChan` is a non-copyable, owned type (sometimes also referred to
257 as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
258 may duplicate a `SharedChan`, with the `clone()` method.  A cloned
259 `SharedChan` produces a new handle to the same channel, allowing multiple
260 tasks to send data to a single port.  Between `spawn`, `stream` and
261 `SharedChan`, we have enough tools to implement many useful concurrency
262 patterns.
263
264 Note that the above `SharedChan` example is somewhat contrived since
265 you could also simply use three `stream` pairs, but it serves to
266 illustrate the point. For reference, written with multiple streams, it
267 might look like the example below.
268
269 ~~~
270 # use std::task::spawn;
271 # use std::comm::stream;
272 # use std::vec;
273
274 // Create a vector of ports, one for each child task
275 let ports = do vec::from_fn(3) |init_val| {
276     let (port, chan) = stream();
277     do spawn {
278         chan.send(some_expensive_computation(init_val));
279     }
280     port
281 };
282
283 // Wait on each port, accumulating the results
284 let result = ports.iter().fold(0, |accum, port| accum + port.recv() );
285 # fn some_expensive_computation(_i: uint) -> int { 42 }
286 ~~~
287
288 ## Backgrounding computations: Futures
289 With `extra::future`, rust has a mechanism for requesting a computation and getting the result
290 later.
291
292 The basic example below illustrates this.
293 ~~~
294 # fn make_a_sandwich() {};
295 fn fib(n: uint) -> uint {
296     // lengthy computation returning an uint
297     12586269025
298 }
299
300 let mut delayed_fib = extra::future::spawn (|| fib(50) );
301 make_a_sandwich();
302 println(fmt!("fib(50) = %?", delayed_fib.get()))
303 ~~~
304
305 The call to `future::spawn` returns immediately a `future` object regardless of how long it
306 takes to run `fib(50)`. You can then make yourself a sandwich while the computation of `fib` is
307 running. The result of the execution of the method is obtained by calling `get` on the future.
308 This call will block until the value is available (*i.e.* the computation is complete). Note that
309 the future needs to be mutable so that it can save the result for next time `get` is called.
310
311 Here is another example showing how futures allow you to background computations. The workload will
312 be distributed on the available cores.
313 ~~~
314 # use std::vec;
315 fn partial_sum(start: uint) -> f64 {
316     let mut local_sum = 0f64;
317     for num in range(start*100000, (start+1)*100000) {
318         local_sum += (num as f64 + 1.0).pow(&-2.0);
319     }
320     local_sum
321 }
322
323 fn main() {
324     let mut futures = vec::from_fn(1000, |ind| do extra::future::spawn { partial_sum(ind) });
325
326     let mut final_res = 0f64;
327     for ft in futures.mut_iter()  {
328         final_res += ft.get();
329     }
330     println(fmt!("Ï€^2/6 is not far from : %?", final_res));
331 }
332 ~~~
333
334 ## Sharing immutable data without copy: Arc
335
336 To share immutable data between tasks, a first approach would be to only use pipes as we have seen
337 previously. A copy of the data to share would then be made for each task. In some cases, this would
338 add up to a significant amount of wasted memory and would require copying the same data more than
339 necessary.
340
341 To tackle this issue, one can use an Atomically Reference Counted wrapper (`Arc`) as implemented in
342 the `extra` library of Rust. With an Arc, the data will no longer be copied for each task. The Arc
343 acts as a reference to the shared data and only this reference is shared and cloned.
344
345 Here is a small example showing how to use Arcs. We wish to run concurrently several computations on
346 a single large vector of floats. Each task needs the full vector to perform its duty.
347 ~~~
348 # use std::vec;
349 # use std::rand;
350 use extra::arc::Arc;
351
352 fn pnorm(nums: &~[float], p: uint) -> float {
353     nums.iter().fold(0.0, |a,b| a+(*b).pow(&(p as float)) ).pow(&(1f / (p as float)))
354 }
355
356 fn main() {
357     let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
358     println(fmt!("Inf-norm = %?",  *numbers.iter().max().unwrap()));
359
360     let numbers_arc = Arc::new(numbers);
361
362     for num in range(1u, 10) {
363         let (port, chan)  = stream();
364         chan.send(numbers_arc.clone());
365
366         do spawn {
367             let local_arc : Arc<~[float]> = port.recv();
368             let task_numbers = local_arc.get();
369             println(fmt!("%u-norm = %?", num, pnorm(task_numbers, num)));
370         }
371     }
372 }
373 ~~~
374
375 The function `pnorm` performs a simple computation on the vector (it computes the sum of its items
376 at the power given as argument and takes the inverse power of this value). The Arc on the vector is
377 created by the line
378 ~~~
379 # use extra::arc::Arc;
380 # use std::vec;
381 # use std::rand;
382 # let numbers = vec::from_fn(1000000, |_| rand::random::<float>());
383 let numbers_arc=Arc::new(numbers);
384 ~~~
385 and a clone of it is sent to each task
386 ~~~
387 # use extra::arc::Arc;
388 # use std::vec;
389 # use std::rand;
390 # let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
391 # let numbers_arc = Arc::new(numbers);
392 # let (port, chan)  = stream();
393 chan.send(numbers_arc.clone());
394 ~~~
395 copying only the wrapper and not its contents.
396
397 Each task recovers the underlying data by
398 ~~~
399 # use extra::arc::Arc;
400 # use std::vec;
401 # use std::rand;
402 # let numbers=vec::from_fn(1000000, |_| rand::random::<float>());
403 # let numbers_arc=Arc::new(numbers);
404 # let (port, chan)  = stream();
405 # chan.send(numbers_arc.clone());
406 # let local_arc : Arc<~[float]> = port.recv();
407 let task_numbers = local_arc.get();
408 ~~~
409 and can use it as if it were local.
410
411 The `arc` module also implements Arcs around mutable data that are not covered here.
412
413 # Handling task failure
414
415 Rust has a built-in mechanism for raising exceptions. The `fail!()` macro
416 (which can also be written with an error string as an argument: `fail!(
417 ~reason)`) and the `assert!` construct (which effectively calls `fail!()`
418 if a boolean expression is false) are both ways to raise exceptions. When a
419 task raises an exception the task unwinds its stack---running destructors and
420 freeing memory along the way---and then exits. Unlike exceptions in C++,
421 exceptions in Rust are unrecoverable within a single task: once a task fails,
422 there is no way to "catch" the exception.
423
424 All tasks are, by default, _linked_ to each other. That means that the fates
425 of all tasks are intertwined: if one fails, so do all the others.
426
427 ~~~
428 # use std::task::spawn;
429 # use std::task;
430 # fn do_some_work() { loop { task::yield() } }
431 # do task::try {
432 // Create a child task that fails
433 do spawn { fail!() }
434
435 // This will also fail because the task we spawned failed
436 do_some_work();
437 # };
438 ~~~
439
440 While it isn't possible for a task to recover from failure, tasks may notify
441 each other of failure. The simplest way of handling task failure is with the
442 `try` function, which is similar to `spawn`, but immediately blocks waiting
443 for the child task to finish. `try` returns a value of type `Result<int,
444 ()>`. `Result` is an `enum` type with two variants: `Ok` and `Err`. In this
445 case, because the type arguments to `Result` are `int` and `()`, callers can
446 pattern-match on a result to check whether it's an `Ok` result with an `int`
447 field (representing a successful result) or an `Err` result (representing
448 termination with an error).
449
450 ~~~
451 # use std::task;
452 # fn some_condition() -> bool { false }
453 # fn calculate_result() -> int { 0 }
454 let result: Result<int, ()> = do task::try {
455     if some_condition() {
456         calculate_result()
457     } else {
458         fail!("oops!");
459     }
460 };
461 assert!(result.is_err());
462 ~~~
463
464 Unlike `spawn`, the function spawned using `try` may return a value,
465 which `try` will dutifully propagate back to the caller in a [`Result`]
466 enum. If the child task terminates successfully, `try` will
467 return an `Ok` result; if the child task fails, `try` will return
468 an `Error` result.
469
470 [`Result`]: std/result.html
471
472 > ***Note:*** A failed task does not currently produce a useful error
473 > value (`try` always returns `Err(())`). In the
474 > future, it may be possible for tasks to intercept the value passed to
475 > `fail!()`.
476
477 TODO: Need discussion of `future_result` in order to make failure
478 modes useful.
479
480 But not all failures are created equal. In some cases you might need to
481 abort the entire program (perhaps you're writing an assert which, if
482 it trips, indicates an unrecoverable logic error); in other cases you
483 might want to contain the failure at a certain boundary (perhaps a
484 small piece of input from the outside world, which you happen to be
485 processing in parallel, is malformed and its processing task can't
486 proceed). Hence, you will need different _linked failure modes_.
487
488 ## Failure modes
489
490 By default, task failure is _bidirectionally linked_, which means that if
491 either task fails, it kills the other one.
492
493 ~~~
494 # use std::task;
495 # fn sleep_forever() { loop { task::yield() } }
496 # do task::try {
497 do spawn {
498     do spawn {
499         fail!();  // All three tasks will fail.
500     }
501     sleep_forever();  // Will get woken up by force, then fail
502 }
503 sleep_forever();  // Will get woken up by force, then fail
504 # };
505 ~~~
506
507 If you want parent tasks to be able to kill their children, but do not want a
508 parent to fail automatically if one of its child task fails, you can call
509 `task::spawn_supervised` for _unidirectionally linked_ failure. The
510 function `task::try`, which we saw previously, uses `spawn_supervised`
511 internally, with additional logic to wait for the child task to finish
512 before returning. Hence:
513
514 ~~~
515 # use std::comm::{stream, Chan, Port};
516 # use std::task::{spawn, try};
517 # use std::task;
518 # fn sleep_forever() { loop { task::yield() } }
519 # do task::try {
520 let (receiver, sender): (Port<int>, Chan<int>) = stream();
521 do spawn {  // Bidirectionally linked
522     // Wait for the supervised child task to exist.
523     let message = receiver.recv();
524     // Kill both it and the parent task.
525     assert!(message != 42);
526 }
527 do try {  // Unidirectionally linked
528     sender.send(42);
529     sleep_forever();  // Will get woken up by force
530 }
531 // Flow never reaches here -- parent task was killed too.
532 # };
533 ~~~
534
535 Supervised failure is useful in any situation where one task manages
536 multiple fallible child tasks, and the parent task can recover
537 if any child fails. On the other hand, if the _parent_ (supervisor) fails,
538 then there is nothing the children can do to recover, so they should
539 also fail.
540
541 Supervised task failure propagates across multiple generations even if
542 an intermediate generation has already exited:
543
544 ~~~
545 # use std::task;
546 # fn sleep_forever() { loop { task::yield() } }
547 # fn wait_for_a_while() { do 1000.times { task::yield() } }
548 # do task::try::<int> {
549 do task::spawn_supervised {
550     do task::spawn_supervised {
551         sleep_forever();  // Will get woken up by force, then fail
552     }
553     // Intermediate task immediately exits
554 }
555 wait_for_a_while();
556 fail!();  // Will kill grandchild even if child has already exited
557 # };
558 ~~~
559
560 Finally, tasks can be configured to not propagate failure to each
561 other at all, using `task::spawn_unlinked` for _isolated failure_.
562
563 ~~~
564 # use std::task;
565 # fn random() -> uint { 100 }
566 # fn sleep_for(i: uint) { do i.times { task::yield() } }
567 # do task::try::<()> {
568 let (time1, time2) = (random(), random());
569 do task::spawn_unlinked {
570     sleep_for(time2);  // Won't get forced awake
571     fail!();
572 }
573 sleep_for(time1);  // Won't get forced awake
574 fail!();
575 // It will take MAX(time1,time2) for the program to finish.
576 # };
577 ~~~
578
579 ## Creating a task with a bi-directional communication path
580
581 A very common thing to do is to spawn a child task where the parent
582 and child both need to exchange messages with each other. The
583 function `extra::comm::DuplexStream()` supports this pattern.  We'll
584 look briefly at how to use it.
585
586 To see how `DuplexStream()` works, we will create a child task
587 that repeatedly receives a `uint` message, converts it to a string, and sends
588 the string in response.  The child terminates when it receives `0`.
589 Here is the function that implements the child task:
590
591 ~~~~
592 # use extra::comm::DuplexStream;
593 # use std::uint;
594 fn stringifier(channel: &DuplexStream<~str, uint>) {
595     let mut value: uint;
596     loop {
597         value = channel.recv();
598         channel.send(uint::to_str(value));
599         if value == 0 { break; }
600     }
601 }
602 ~~~~
603
604 The implementation of `DuplexStream` supports both sending and
605 receiving. The `stringifier` function takes a `DuplexStream` that can
606 send strings (the first type parameter) and receive `uint` messages
607 (the second type parameter). The body itself simply loops, reading
608 from the channel and then sending its response back.  The actual
609 response itself is simply the stringified version of the received value,
610 `uint::to_str(value)`.
611
612 Here is the code for the parent task:
613
614 ~~~~
615 # use std::task::spawn;
616 # use std::uint;
617 # use extra::comm::DuplexStream;
618 # fn stringifier(channel: &DuplexStream<~str, uint>) {
619 #     let mut value: uint;
620 #     loop {
621 #         value = channel.recv();
622 #         channel.send(uint::to_str(value));
623 #         if value == 0u { break; }
624 #     }
625 # }
626 # fn main() {
627
628 let (from_child, to_child) = DuplexStream();
629
630 do spawn {
631     stringifier(&to_child);
632 };
633
634 from_child.send(22);
635 assert!(from_child.recv() == ~"22");
636
637 from_child.send(23);
638 from_child.send(0);
639
640 assert!(from_child.recv() == ~"23");
641 assert!(from_child.recv() == ~"0");
642
643 # }
644 ~~~~
645
646 The parent task first calls `DuplexStream` to create a pair of bidirectional
647 endpoints. It then uses `task::spawn` to create the child task, which captures
648 one end of the communication channel.  As a result, both parent and child can
649 send and receive data to and from the other.