]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/task.rs
auto merge of #11274 : michaelwoerister/rust/issue11083, r=pcwalton
[rust.git] / src / libstd / rt / task.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Language-level runtime services that should reasonably expected
12 //! to be available 'everywhere'. Local heaps, GC, unwinding,
13 //! local storage, and logging. Even a 'freestanding' Rust would likely want
14 //! to implement this.
15
16 use any::AnyOwnExt;
17 use borrow;
18 use cast;
19 use cleanup;
20 use io::Writer;
21 use iter::{Iterator, Take};
22 use local_data;
23 use ops::Drop;
24 use option::{Option, Some, None};
25 use prelude::drop;
26 use result::{Result, Ok, Err};
27 use rt::Runtime;
28 use rt::borrowck::BorrowRecord;
29 use rt::borrowck;
30 use rt::local::Local;
31 use rt::local_heap::LocalHeap;
32 use rt::logging::StdErrLogger;
33 use rt::rtio::LocalIo;
34 use rt::unwind::Unwinder;
35 use send_str::SendStr;
36 use sync::arc::UnsafeArc;
37 use sync::atomics::{AtomicUint, SeqCst};
38 use task::{TaskResult, TaskOpts};
39 use unstable::finally::Finally;
40
41 #[cfg(stage0)]
42 pub use rt::unwind::begin_unwind;
43
44 // The Task struct represents all state associated with a rust
45 // task. There are at this point two primary "subtypes" of task,
46 // however instead of using a subtype we just have a "task_type" field
47 // in the struct. This contains a pointer to another struct that holds
48 // the type-specific state.
49
50 pub struct Task {
51     heap: LocalHeap,
52     gc: GarbageCollector,
53     storage: LocalStorage,
54     unwinder: Unwinder,
55     death: Death,
56     destroyed: bool,
57     name: Option<SendStr>,
58     // Dynamic borrowck debugging info
59     borrow_list: Option<~[BorrowRecord]>,
60
61     logger: Option<StdErrLogger>,
62     stdout_handle: Option<~Writer>,
63
64     priv imp: Option<~Runtime>,
65 }
66
67 pub struct GarbageCollector;
68 pub struct LocalStorage(Option<local_data::Map>);
69
70 /// A handle to a blocked task. Usually this means having the ~Task pointer by
71 /// ownership, but if the task is killable, a killer can steal it at any time.
72 pub enum BlockedTask {
73     Owned(~Task),
74     Shared(UnsafeArc<AtomicUint>),
75 }
76
77 /// Per-task state related to task death, killing, failure, etc.
78 pub struct Death {
79     // Action to be done with the exit code. If set, also makes the task wait
80     // until all its watched children exit before collecting the status.
81     on_exit: Option<proc(TaskResult)>,
82 }
83
84 pub struct BlockedTaskIterator {
85     priv inner: UnsafeArc<AtomicUint>,
86 }
87
88 impl Task {
89     pub fn new() -> Task {
90         Task {
91             heap: LocalHeap::new(),
92             gc: GarbageCollector,
93             storage: LocalStorage(None),
94             unwinder: Unwinder::new(),
95             death: Death::new(),
96             destroyed: false,
97             name: None,
98             borrow_list: None,
99             logger: None,
100             stdout_handle: None,
101             imp: None,
102         }
103     }
104
105     /// Executes the given closure as if it's running inside this task. The task
106     /// is consumed upon entry, and the destroyed task is returned from this
107     /// function in order for the caller to free. This function is guaranteed to
108     /// not unwind because the closure specified is run inside of a `rust_try`
109     /// block. (this is the only try/catch block in the world).
110     ///
111     /// This function is *not* meant to be abused as a "try/catch" block. This
112     /// is meant to be used at the absolute boundaries of a task's lifetime, and
113     /// only for that purpose.
114     pub fn run(~self, f: ||) -> ~Task {
115         // Need to put ourselves into TLS, but also need access to the unwinder.
116         // Unsafely get a handle to the task so we can continue to use it after
117         // putting it in tls (so we can invoke the unwinder).
118         let handle: *mut Task = unsafe {
119             *cast::transmute::<&~Task, &*mut Task>(&self)
120         };
121         Local::put(self);
122
123         // The only try/catch block in the world. Attempt to run the task's
124         // client-specified code and catch any failures.
125         let try_block = || {
126
127             // Run the task main function, then do some cleanup.
128             f.finally(|| {
129                 fn flush(w: Option<~Writer>) {
130                     match w {
131                         Some(mut w) => { w.flush(); }
132                         None => {}
133                     }
134                 }
135
136                 // First, destroy task-local storage. This may run user dtors.
137                 //
138                 // FIXME #8302: Dear diary. I'm so tired and confused.
139                 // There's some interaction in rustc between the box
140                 // annihilator and the TLS dtor by which TLS is
141                 // accessed from annihilated box dtors *after* TLS is
142                 // destroyed. Somehow setting TLS back to null, as the
143                 // old runtime did, makes this work, but I don't currently
144                 // understand how. I would expect that, if the annihilator
145                 // reinvokes TLS while TLS is uninitialized, that
146                 // TLS would be reinitialized but never destroyed,
147                 // but somehow this works. I have no idea what's going
148                 // on but this seems to make things magically work. FML.
149                 //
150                 // (added after initial comment) A possible interaction here is
151                 // that the destructors for the objects in TLS themselves invoke
152                 // TLS, or possibly some destructors for those objects being
153                 // annihilated invoke TLS. Sadly these two operations seemed to
154                 // be intertwined, and miraculously work for now...
155                 let mut task = Local::borrow(None::<Task>);
156                 let storage = task.get().storage.take();
157                 drop(task);
158                 drop(storage);
159
160                 // Destroy remaining boxes. Also may run user dtors.
161                 unsafe { cleanup::annihilate(); }
162
163                 // Finally flush and destroy any output handles which the task
164                 // owns. There are no boxes here, and no user destructors should
165                 // run after this any more.
166                 let mut task = Local::borrow(None::<Task>);
167                 let stdout = task.get().stdout_handle.take();
168                 let logger = task.get().logger.take();
169                 drop(task);
170
171                 flush(stdout);
172                 drop(logger);
173             })
174         };
175
176         unsafe { (*handle).unwinder.try(try_block); }
177
178         // Cleanup the dynamic borrowck debugging info
179         borrowck::clear_task_borrow_list();
180
181         // Here we must unsafely borrow the task in order to not remove it from
182         // TLS. When collecting failure, we may attempt to send on a channel (or
183         // just run aribitrary code), so we must be sure to still have a local
184         // task in TLS.
185         unsafe {
186             let me: *mut Task = Local::unsafe_borrow();
187             (*me).death.collect_failure((*me).unwinder.result());
188         }
189         let mut me: ~Task = Local::take();
190         me.destroyed = true;
191         return me;
192     }
193
194     /// Inserts a runtime object into this task, transferring ownership to the
195     /// task. It is illegal to replace a previous runtime object in this task
196     /// with this argument.
197     pub fn put_runtime(&mut self, ops: ~Runtime) {
198         assert!(self.imp.is_none());
199         self.imp = Some(ops);
200     }
201
202     /// Attempts to extract the runtime as a specific type. If the runtime does
203     /// not have the provided type, then the runtime is not removed. If the
204     /// runtime does have the specified type, then it is removed and returned
205     /// (transfer of ownership).
206     ///
207     /// It is recommended to only use this method when *absolutely necessary*.
208     /// This function may not be available in the future.
209     pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<~T> {
210         // This is a terrible, terrible function. The general idea here is to
211         // take the runtime, cast it to ~Any, check if it has the right type,
212         // and then re-cast it back if necessary. The method of doing this is
213         // pretty sketchy and involves shuffling vtables of trait objects
214         // around, but it gets the job done.
215         //
216         // XXX: This function is a serious code smell and should be avoided at
217         //      all costs. I have yet to think of a method to avoid this
218         //      function, and I would be saddened if more usage of the function
219         //      crops up.
220         unsafe {
221             let imp = self.imp.take_unwrap();
222             let &(vtable, _): &(uint, uint) = cast::transmute(&imp);
223             match imp.wrap().move::<T>() {
224                 Ok(t) => Some(t),
225                 Err(t) => {
226                     let (_, obj): (uint, uint) = cast::transmute(t);
227                     let obj: ~Runtime = cast::transmute((vtable, obj));
228                     self.put_runtime(obj);
229                     None
230                 }
231             }
232         }
233     }
234
235     /// Spawns a sibling to this task. The newly spawned task is configured with
236     /// the `opts` structure and will run `f` as the body of its code.
237     pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc()) {
238         let ops = self.imp.take_unwrap();
239         ops.spawn_sibling(self, opts, f)
240     }
241
242     /// Deschedules the current task, invoking `f` `amt` times. It is not
243     /// recommended to use this function directly, but rather communication
244     /// primitives in `std::comm` should be used.
245     pub fn deschedule(mut ~self, amt: uint,
246                       f: |BlockedTask| -> Result<(), BlockedTask>) {
247         let ops = self.imp.take_unwrap();
248         ops.deschedule(amt, self, f)
249     }
250
251     /// Wakes up a previously blocked task, optionally specifiying whether the
252     /// current task can accept a change in scheduling. This function can only
253     /// be called on tasks that were previously blocked in `deschedule`.
254     pub fn reawaken(mut ~self, can_resched: bool) {
255         let ops = self.imp.take_unwrap();
256         ops.reawaken(self, can_resched);
257     }
258
259     /// Yields control of this task to another task. This function will
260     /// eventually return, but possibly not immediately. This is used as an
261     /// opportunity to allow other tasks a chance to run.
262     pub fn yield_now(mut ~self) {
263         let ops = self.imp.take_unwrap();
264         ops.yield_now(self);
265     }
266
267     /// Similar to `yield_now`, except that this function may immediately return
268     /// without yielding (depending on what the runtime decides to do).
269     pub fn maybe_yield(mut ~self) {
270         let ops = self.imp.take_unwrap();
271         ops.maybe_yield(self);
272     }
273
274     /// Acquires a handle to the I/O factory that this task contains, normally
275     /// stored in the task's runtime. This factory may not always be available,
276     /// which is why the return type is `Option`
277     pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
278         self.imp.get_mut_ref().local_io()
279     }
280 }
281
282 impl Drop for Task {
283     fn drop(&mut self) {
284         rtdebug!("called drop for a task: {}", borrow::to_uint(self));
285         rtassert!(self.destroyed);
286     }
287 }
288
289 impl Iterator<BlockedTask> for BlockedTaskIterator {
290     fn next(&mut self) -> Option<BlockedTask> {
291         Some(Shared(self.inner.clone()))
292     }
293 }
294
295 impl BlockedTask {
296     /// Returns Some if the task was successfully woken; None if already killed.
297     pub fn wake(self) -> Option<~Task> {
298         match self {
299             Owned(task) => Some(task),
300             Shared(arc) => unsafe {
301                 match (*arc.get()).swap(0, SeqCst) {
302                     0 => None,
303                     n => Some(cast::transmute(n)),
304                 }
305             }
306         }
307     }
308
309     // This assertion has two flavours because the wake involves an atomic op.
310     // In the faster version, destructors will fail dramatically instead.
311     #[cfg(not(test))] pub fn trash(self) { }
312     #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
313
314     /// Create a blocked task, unless the task was already killed.
315     pub fn block(task: ~Task) -> BlockedTask {
316         Owned(task)
317     }
318
319     /// Converts one blocked task handle to a list of many handles to the same.
320     pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTaskIterator>
321     {
322         let arc = match self {
323             Owned(task) => {
324                 let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
325                 UnsafeArc::new(flag)
326             }
327             Shared(arc) => arc.clone(),
328         };
329         BlockedTaskIterator{ inner: arc }.take(num_handles)
330     }
331
332     /// Convert to an unsafe uint value. Useful for storing in a pipe's state
333     /// flag.
334     #[inline]
335     pub unsafe fn cast_to_uint(self) -> uint {
336         match self {
337             Owned(task) => {
338                 let blocked_task_ptr: uint = cast::transmute(task);
339                 rtassert!(blocked_task_ptr & 0x1 == 0);
340                 blocked_task_ptr
341             }
342             Shared(arc) => {
343                 let blocked_task_ptr: uint = cast::transmute(~arc);
344                 rtassert!(blocked_task_ptr & 0x1 == 0);
345                 blocked_task_ptr | 0x1
346             }
347         }
348     }
349
350     /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
351     /// flag.
352     #[inline]
353     pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
354         if blocked_task_ptr & 0x1 == 0 {
355             Owned(cast::transmute(blocked_task_ptr))
356         } else {
357             let ptr: ~UnsafeArc<AtomicUint> =
358                 cast::transmute(blocked_task_ptr & !1);
359             Shared(*ptr)
360         }
361     }
362 }
363
364 impl Death {
365     pub fn new() -> Death {
366         Death { on_exit: None, }
367     }
368
369     /// Collect failure exit codes from children and propagate them to a parent.
370     pub fn collect_failure(&mut self, result: TaskResult) {
371         match self.on_exit.take() {
372             Some(f) => f(result),
373             None => {}
374         }
375     }
376 }
377
378 impl Drop for Death {
379     fn drop(&mut self) {
380         // make this type noncopyable
381     }
382 }
383
384 #[cfg(test)]
385 mod test {
386     use super::*;
387     use prelude::*;
388     use task;
389
390     #[test]
391     fn local_heap() {
392         let a = @5;
393         let b = a;
394         assert!(*a == 5);
395         assert!(*b == 5);
396     }
397
398     #[test]
399     fn tls() {
400         use local_data;
401         local_data_key!(key: @~str)
402         local_data::set(key, @~"data");
403         assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data");
404         local_data_key!(key2: @~str)
405         local_data::set(key2, @~"data");
406         assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data");
407     }
408
409     #[test]
410     fn unwind() {
411         let result = task::try(proc()());
412         rtdebug!("trying first assert");
413         assert!(result.is_ok());
414         let result = task::try::<()>(proc() fail!());
415         rtdebug!("trying second assert");
416         assert!(result.is_err());
417     }
418
419     #[test]
420     fn rng() {
421         use rand::{rng, Rng};
422         let mut r = rng();
423         let _ = r.next_u32();
424     }
425
426     #[test]
427     fn logging() {
428         info!("here i am. logging in a newsched task");
429     }
430
431     #[test]
432     fn comm_stream() {
433         let (port, chan) = Chan::new();
434         chan.send(10);
435         assert!(port.recv() == 10);
436     }
437
438     #[test]
439     fn comm_shared_chan() {
440         let (port, chan) = SharedChan::new();
441         chan.send(10);
442         assert!(port.recv() == 10);
443     }
444
445     #[test]
446     fn heap_cycles() {
447         use option::{Option, Some, None};
448
449         struct List {
450             next: Option<@mut List>,
451         }
452
453         let a = @mut List { next: None };
454         let b = @mut List { next: Some(a) };
455
456         a.next = Some(b);
457     }
458
459     #[test]
460     #[should_fail]
461     fn test_begin_unwind() {
462         use rt::unwind::begin_unwind;
463         begin_unwind("cause", file!(), line!())
464     }
465
466     // Task blocking tests
467
468     #[test]
469     fn block_and_wake() {
470         let task = ~Task::new();
471         let mut task = BlockedTask::block(task).wake().unwrap();
472         task.destroyed = true;
473     }
474 }