]> git.lizzy.rs Git - rust.git/blob - src/librustrt/task.rs
auto merge of #14854 : jakub-/rust/issue-10991, r=pcwalton
[rust.git] / src / librustrt / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Language-level runtime services that should reasonably expected
12 //! to be available 'everywhere'. Local heaps, GC, unwinding,
13 //! local storage, and logging. Even a 'freestanding' Rust would likely want
14 //! to implement this.
15
16 use core::prelude::*;
17
18 use alloc::arc::Arc;
19 use alloc::owned::{AnyOwnExt, Box};
20 use core::any::Any;
21 use core::atomics::{AtomicUint, SeqCst};
22 use core::finally::Finally;
23 use core::iter::Take;
24 use core::mem;
25 use core::raw;
26
27 use local_data;
28 use Runtime;
29 use local::Local;
30 use local_heap::LocalHeap;
31 use rtio::LocalIo;
32 use unwind::Unwinder;
33 use collections::str::SendStr;
34
35 /// The Task struct represents all state associated with a rust
36 /// task. There are at this point two primary "subtypes" of task,
37 /// however instead of using a subtype we just have a "task_type" field
38 /// in the struct. This contains a pointer to another struct that holds
39 /// the type-specific state.
40 pub struct Task {
41     pub heap: LocalHeap,
42     pub gc: GarbageCollector,
43     pub storage: LocalStorage,
44     pub unwinder: Unwinder,
45     pub death: Death,
46     pub destroyed: bool,
47     pub name: Option<SendStr>,
48
49     imp: Option<Box<Runtime + Send>>,
50 }
51
52 pub struct TaskOpts {
53     /// Invoke this procedure with the result of the task when it finishes.
54     pub on_exit: Option<proc(Result): Send>,
55     /// A name for the task-to-be, for identification in failure messages
56     pub name: Option<SendStr>,
57     /// The size of the stack for the spawned task
58     pub stack_size: Option<uint>,
59 }
60
61 /// Indicates the manner in which a task exited.
62 ///
63 /// A task that completes without failing is considered to exit successfully.
64 ///
65 /// If you wish for this result's delivery to block until all
66 /// children tasks complete, recommend using a result future.
67 pub type Result = ::core::result::Result<(), Box<Any + Send>>;
68
69 pub struct GarbageCollector;
70 pub struct LocalStorage(pub Option<local_data::Map>);
71
72 /// A handle to a blocked task. Usually this means having the Box<Task>
73 /// pointer by ownership, but if the task is killable, a killer can steal it
74 /// at any time.
75 pub enum BlockedTask {
76     Owned(Box<Task>),
77     Shared(Arc<AtomicUint>),
78 }
79
80 /// Per-task state related to task death, killing, failure, etc.
81 pub struct Death {
82     pub on_exit: Option<proc(Result): Send>,
83 }
84
85 pub struct BlockedTasks {
86     inner: Arc<AtomicUint>,
87 }
88
89 impl Task {
90     pub fn new() -> Task {
91         Task {
92             heap: LocalHeap::new(),
93             gc: GarbageCollector,
94             storage: LocalStorage(None),
95             unwinder: Unwinder::new(),
96             death: Death::new(),
97             destroyed: false,
98             name: None,
99             imp: None,
100         }
101     }
102
103     /// Executes the given closure as if it's running inside this task. The task
104     /// is consumed upon entry, and the destroyed task is returned from this
105     /// function in order for the caller to free. This function is guaranteed to
106     /// not unwind because the closure specified is run inside of a `rust_try`
107     /// block. (this is the only try/catch block in the world).
108     ///
109     /// This function is *not* meant to be abused as a "try/catch" block. This
110     /// is meant to be used at the absolute boundaries of a task's lifetime, and
111     /// only for that purpose.
112     pub fn run(~self, mut f: ||) -> Box<Task> {
113         // Need to put ourselves into TLS, but also need access to the unwinder.
114         // Unsafely get a handle to the task so we can continue to use it after
115         // putting it in tls (so we can invoke the unwinder).
116         let handle: *mut Task = unsafe {
117             *mem::transmute::<&Box<Task>, &*mut Task>(&self)
118         };
119         Local::put(self);
120
121         // The only try/catch block in the world. Attempt to run the task's
122         // client-specified code and catch any failures.
123         let try_block = || {
124
125             // Run the task main function, then do some cleanup.
126             f.finally(|| {
127                 // First, destroy task-local storage. This may run user dtors.
128                 //
129                 // FIXME #8302: Dear diary. I'm so tired and confused.
130                 // There's some interaction in rustc between the box
131                 // annihilator and the TLS dtor by which TLS is
132                 // accessed from annihilated box dtors *after* TLS is
133                 // destroyed. Somehow setting TLS back to null, as the
134                 // old runtime did, makes this work, but I don't currently
135                 // understand how. I would expect that, if the annihilator
136                 // reinvokes TLS while TLS is uninitialized, that
137                 // TLS would be reinitialized but never destroyed,
138                 // but somehow this works. I have no idea what's going
139                 // on but this seems to make things magically work. FML.
140                 //
141                 // (added after initial comment) A possible interaction here is
142                 // that the destructors for the objects in TLS themselves invoke
143                 // TLS, or possibly some destructors for those objects being
144                 // annihilated invoke TLS. Sadly these two operations seemed to
145                 // be intertwined, and miraculously work for now...
146                 drop({
147                     let mut task = Local::borrow(None::<Task>);
148                     let &LocalStorage(ref mut optmap) = &mut task.storage;
149                     optmap.take()
150                 });
151
152                 // Destroy remaining boxes. Also may run user dtors.
153                 let mut heap = {
154                     let mut task = Local::borrow(None::<Task>);
155                     mem::replace(&mut task.heap, LocalHeap::new())
156                 };
157                 unsafe { heap.annihilate() }
158                 drop(heap);
159             })
160         };
161
162         unsafe { (*handle).unwinder.try(try_block); }
163
164         // Here we must unsafely borrow the task in order to not remove it from
165         // TLS. When collecting failure, we may attempt to send on a channel (or
166         // just run arbitrary code), so we must be sure to still have a local
167         // task in TLS.
168         unsafe {
169             let me: *mut Task = Local::unsafe_borrow();
170             (*me).death.collect_failure((*me).unwinder.result());
171         }
172         let mut me: Box<Task> = Local::take();
173         me.destroyed = true;
174         return me;
175     }
176
177     /// Inserts a runtime object into this task, transferring ownership to the
178     /// task. It is illegal to replace a previous runtime object in this task
179     /// with this argument.
180     pub fn put_runtime(&mut self, ops: Box<Runtime + Send>) {
181         assert!(self.imp.is_none());
182         self.imp = Some(ops);
183     }
184
185     /// Attempts to extract the runtime as a specific type. If the runtime does
186     /// not have the provided type, then the runtime is not removed. If the
187     /// runtime does have the specified type, then it is removed and returned
188     /// (transfer of ownership).
189     ///
190     /// It is recommended to only use this method when *absolutely necessary*.
191     /// This function may not be available in the future.
192     pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
193         // This is a terrible, terrible function. The general idea here is to
194         // take the runtime, cast it to Box<Any>, check if it has the right
195         // type, and then re-cast it back if necessary. The method of doing
196         // this is pretty sketchy and involves shuffling vtables of trait
197         // objects around, but it gets the job done.
198         //
199         // FIXME: This function is a serious code smell and should be avoided at
200         //      all costs. I have yet to think of a method to avoid this
201         //      function, and I would be saddened if more usage of the function
202         //      crops up.
203         unsafe {
204             let imp = self.imp.take_unwrap();
205             let vtable = mem::transmute::<_, &raw::TraitObject>(&imp).vtable;
206             match imp.wrap().move::<T>() {
207                 Ok(t) => Some(t),
208                 Err(t) => {
209                     let data = mem::transmute::<_, raw::TraitObject>(t).data;
210                     let obj: Box<Runtime + Send> =
211                         mem::transmute(raw::TraitObject {
212                             vtable: vtable,
213                             data: data,
214                         });
215                     self.put_runtime(obj);
216                     None
217                 }
218             }
219         }
220     }
221
222     /// Spawns a sibling to this task. The newly spawned task is configured with
223     /// the `opts` structure and will run `f` as the body of its code.
224     pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc(): Send) {
225         let ops = self.imp.take_unwrap();
226         ops.spawn_sibling(self, opts, f)
227     }
228
229     /// Deschedules the current task, invoking `f` `amt` times. It is not
230     /// recommended to use this function directly, but rather communication
231     /// primitives in `std::comm` should be used.
232     pub fn deschedule(mut ~self, amt: uint,
233                       f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
234         let ops = self.imp.take_unwrap();
235         ops.deschedule(amt, self, f)
236     }
237
238     /// Wakes up a previously blocked task, optionally specifying whether the
239     /// current task can accept a change in scheduling. This function can only
240     /// be called on tasks that were previously blocked in `deschedule`.
241     pub fn reawaken(mut ~self) {
242         let ops = self.imp.take_unwrap();
243         ops.reawaken(self);
244     }
245
246     /// Yields control of this task to another task. This function will
247     /// eventually return, but possibly not immediately. This is used as an
248     /// opportunity to allow other tasks a chance to run.
249     pub fn yield_now(mut ~self) {
250         let ops = self.imp.take_unwrap();
251         ops.yield_now(self);
252     }
253
254     /// Similar to `yield_now`, except that this function may immediately return
255     /// without yielding (depending on what the runtime decides to do).
256     pub fn maybe_yield(mut ~self) {
257         let ops = self.imp.take_unwrap();
258         ops.maybe_yield(self);
259     }
260
261     /// Acquires a handle to the I/O factory that this task contains, normally
262     /// stored in the task's runtime. This factory may not always be available,
263     /// which is why the return type is `Option`
264     pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
265         self.imp.get_mut_ref().local_io()
266     }
267
268     /// Returns the stack bounds for this task in (lo, hi) format. The stack
269     /// bounds may not be known for all tasks, so the return value may be
270     /// `None`.
271     pub fn stack_bounds(&self) -> (uint, uint) {
272         self.imp.get_ref().stack_bounds()
273     }
274
275     /// Returns whether it is legal for this task to block the OS thread that it
276     /// is running on.
277     pub fn can_block(&self) -> bool {
278         self.imp.get_ref().can_block()
279     }
280 }
281
282 impl Drop for Task {
283     fn drop(&mut self) {
284         rtdebug!("called drop for a task: {}", self as *mut Task as uint);
285         rtassert!(self.destroyed);
286     }
287 }
288
289 impl TaskOpts {
290     pub fn new() -> TaskOpts {
291         TaskOpts { on_exit: None, name: None, stack_size: None }
292     }
293 }
294
295 impl Iterator<BlockedTask> for BlockedTasks {
296     fn next(&mut self) -> Option<BlockedTask> {
297         Some(Shared(self.inner.clone()))
298     }
299 }
300
301 impl BlockedTask {
302     /// Returns Some if the task was successfully woken; None if already killed.
303     pub fn wake(self) -> Option<Box<Task>> {
304         match self {
305             Owned(task) => Some(task),
306             Shared(arc) => {
307                 match arc.swap(0, SeqCst) {
308                     0 => None,
309                     n => Some(unsafe { mem::transmute(n) }),
310                 }
311             }
312         }
313     }
314
315     /// Reawakens this task if ownership is acquired. If finer-grained control
316     /// is desired, use `wake` instead.
317     pub fn reawaken(self) {
318         self.wake().map(|t| t.reawaken());
319     }
320
321     // This assertion has two flavours because the wake involves an atomic op.
322     // In the faster version, destructors will fail dramatically instead.
323     #[cfg(not(test))] pub fn trash(self) { }
324     #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
325
326     /// Create a blocked task, unless the task was already killed.
327     pub fn block(task: Box<Task>) -> BlockedTask {
328         Owned(task)
329     }
330
331     /// Converts one blocked task handle to a list of many handles to the same.
332     pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
333         let arc = match self {
334             Owned(task) => {
335                 let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
336                 Arc::new(flag)
337             }
338             Shared(arc) => arc.clone(),
339         };
340         BlockedTasks{ inner: arc }.take(num_handles)
341     }
342
343     /// Convert to an unsafe uint value. Useful for storing in a pipe's state
344     /// flag.
345     #[inline]
346     pub unsafe fn cast_to_uint(self) -> uint {
347         match self {
348             Owned(task) => {
349                 let blocked_task_ptr: uint = mem::transmute(task);
350                 rtassert!(blocked_task_ptr & 0x1 == 0);
351                 blocked_task_ptr
352             }
353             Shared(arc) => {
354                 let blocked_task_ptr: uint = mem::transmute(box arc);
355                 rtassert!(blocked_task_ptr & 0x1 == 0);
356                 blocked_task_ptr | 0x1
357             }
358         }
359     }
360
361     /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
362     /// flag.
363     #[inline]
364     pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
365         if blocked_task_ptr & 0x1 == 0 {
366             Owned(mem::transmute(blocked_task_ptr))
367         } else {
368             let ptr: Box<Arc<AtomicUint>> =
369                 mem::transmute(blocked_task_ptr & !1);
370             Shared(*ptr)
371         }
372     }
373 }
374
375 impl Death {
376     pub fn new() -> Death {
377         Death { on_exit: None, }
378     }
379
380     /// Collect failure exit codes from children and propagate them to a parent.
381     pub fn collect_failure(&mut self, result: Result) {
382         match self.on_exit.take() {
383             Some(f) => f(result),
384             None => {}
385         }
386     }
387 }
388
389 impl Drop for Death {
390     fn drop(&mut self) {
391         // make this type noncopyable
392     }
393 }
394
395 #[cfg(test)]
396 mod test {
397     use super::*;
398     use std::prelude::*;
399     use std::task;
400     use std::gc::{Gc, GC};
401
402     #[test]
403     fn local_heap() {
404         let a = box(GC) 5;
405         let b = a;
406         assert!(*a == 5);
407         assert!(*b == 5);
408     }
409
410     #[test]
411     fn tls() {
412         local_data_key!(key: Gc<String>)
413         key.replace(Some(box(GC) "data".to_string()));
414         assert_eq!(key.get().unwrap().as_slice(), "data");
415         local_data_key!(key2: Gc<String>)
416         key2.replace(Some(box(GC) "data".to_string()));
417         assert_eq!(key2.get().unwrap().as_slice(), "data");
418     }
419
420     #[test]
421     fn unwind() {
422         let result = task::try(proc()());
423         rtdebug!("trying first assert");
424         assert!(result.is_ok());
425         let result = task::try::<()>(proc() fail!());
426         rtdebug!("trying second assert");
427         assert!(result.is_err());
428     }
429
430     #[test]
431     fn rng() {
432         use std::rand::{StdRng, Rng};
433         let mut r = StdRng::new().ok().unwrap();
434         let _ = r.next_u32();
435     }
436
437     #[test]
438     fn comm_stream() {
439         let (tx, rx) = channel();
440         tx.send(10);
441         assert!(rx.recv() == 10);
442     }
443
444     #[test]
445     fn comm_shared_chan() {
446         let (tx, rx) = channel();
447         tx.send(10);
448         assert!(rx.recv() == 10);
449     }
450
451     #[test]
452     fn heap_cycles() {
453         use std::cell::RefCell;
454
455         struct List {
456             next: Option<Gc<RefCell<List>>>,
457         }
458
459         let a = box(GC) RefCell::new(List { next: None });
460         let b = box(GC) RefCell::new(List { next: Some(a) });
461
462         {
463             let mut a = a.borrow_mut();
464             a.next = Some(b);
465         }
466     }
467
468     #[test]
469     #[should_fail]
470     fn test_begin_unwind() {
471         use std::rt::unwind::begin_unwind;
472         begin_unwind("cause", file!(), line!())
473     }
474
475     // Task blocking tests
476
477     #[test]
478     fn block_and_wake() {
479         let task = box Task::new();
480         let mut task = BlockedTask::block(task).wake().unwrap();
481         task.destroyed = true;
482     }
483 }