]> git.lizzy.rs Git - rust.git/blob - src/librustrt/task.rs
auto merge of #16074 : nham/rust/bitflags_traits, r=alexcrichton
[rust.git] / src / librustrt / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Language-level runtime services that should reasonably expected
12 //! to be available 'everywhere'. Local heaps, GC, unwinding,
13 //! local storage, and logging. Even a 'freestanding' Rust would likely want
14 //! to implement this.
15
16 use core::prelude::*;
17
18 use alloc::arc::Arc;
19 use alloc::boxed::{BoxAny, Box};
20 use core::any::Any;
21 use core::atomics::{AtomicUint, SeqCst};
22 use core::iter::Take;
23 use core::kinds::marker;
24 use core::mem;
25 use core::raw;
26
27 use local_data;
28 use Runtime;
29 use local::Local;
30 use local_heap::LocalHeap;
31 use rtio::LocalIo;
32 use unwind;
33 use unwind::Unwinder;
34 use collections::str::SendStr;
35
36 /// State associated with Rust tasks.
37 ///
38 /// Rust tasks are primarily built with two separate components. One is this
39 /// structure which handles standard services such as TLD, unwinding support,
40 /// naming of a task, etc. The second component is the runtime of this task, a
41 /// `Runtime` trait object.
42 ///
43 /// The `Runtime` object instructs this task how it can perform critical
44 /// operations such as blocking, rescheduling, I/O constructors, etc. The two
45 /// halves are separately owned, but one is often found contained in the other.
46 /// A task's runtime can be reflected upon with the `maybe_take_runtime` method,
47 /// and otherwise its ownership is managed with `take_runtime` and
48 /// `put_runtime`.
49 ///
50 /// In general, this structure should not be used. This is meant to be an
51 /// unstable internal detail of the runtime itself. From time-to-time, however,
52 /// it is useful to manage tasks directly. An example of this would be
53 /// interoperating with the Rust runtime from FFI callbacks or such. For this
54 /// reason, there are two methods of note with the `Task` structure.
55 ///
56 /// * `run` - This function will execute a closure inside the context of a task.
57 ///           Failure is caught and handled via the task's on_exit callback. If
58 ///           this fails, the task is still returned, but it can no longer be
59 ///           used, it is poisoned.
60 ///
61 /// * `destroy` - This is a required function to call to destroy a task. If a
62 ///               task falls out of scope without calling `destroy`, its
63 ///               destructor bomb will go off, aborting the process.
64 ///
65 /// With these two methods, tasks can be re-used to execute code inside of its
66 /// context while having a point in the future where destruction is allowed.
67 /// More information can be found on these specific methods.
68 ///
69 /// # Example
70 ///
71 /// ```no_run
72 /// extern crate native;
73 /// use std::uint;
74 /// # fn main() {
75 ///
76 /// // Create a task using a native runtime
77 /// let task = native::task::new((0, uint::MAX));
78 ///
79 /// // Run some code, catching any possible failures
80 /// let task = task.run(|| {
81 ///     // Run some code inside this task
82 ///     println!("Hello with a native runtime!");
83 /// });
84 ///
85 /// // Run some code again, catching the failure
86 /// let task = task.run(|| {
87 ///     fail!("oh no, what to do!");
88 /// });
89 ///
90 /// // Now that the task is failed, it can never be used again
91 /// assert!(task.is_destroyed());
92 ///
93 /// // Deallocate the resources associated with this task
94 /// task.destroy();
95 /// # }
96 /// ```
97 pub struct Task {
98     pub heap: LocalHeap,
99     pub gc: GarbageCollector,
100     pub storage: LocalStorage,
101     pub unwinder: Unwinder,
102     pub death: Death,
103     pub name: Option<SendStr>,
104
105     state: TaskState,
106     imp: Option<Box<Runtime + Send>>,
107 }
108
109 // Once a task has entered the `Armed` state it must be destroyed via `drop`,
110 // and no other method. This state is used to track this transition.
111 #[deriving(PartialEq)]
112 enum TaskState {
113     New,
114     Armed,
115     Destroyed,
116 }
117
118 pub struct TaskOpts {
119     /// Invoke this procedure with the result of the task when it finishes.
120     pub on_exit: Option<proc(Result): Send>,
121     /// A name for the task-to-be, for identification in failure messages
122     pub name: Option<SendStr>,
123     /// The size of the stack for the spawned task
124     pub stack_size: Option<uint>,
125 }
126
127 /// Indicates the manner in which a task exited.
128 ///
129 /// A task that completes without failing is considered to exit successfully.
130 ///
131 /// If you wish for this result's delivery to block until all
132 /// children tasks complete, recommend using a result future.
133 pub type Result = ::core::result::Result<(), Box<Any + Send>>;
134
135 pub struct GarbageCollector;
136 pub struct LocalStorage(pub Option<local_data::Map>);
137
138 /// A handle to a blocked task. Usually this means having the Box<Task>
139 /// pointer by ownership, but if the task is killable, a killer can steal it
140 /// at any time.
141 pub enum BlockedTask {
142     Owned(Box<Task>),
143     Shared(Arc<AtomicUint>),
144 }
145
146 /// Per-task state related to task death, killing, failure, etc.
147 pub struct Death {
148     pub on_exit: Option<proc(Result):Send>,
149     marker: marker::NoCopy,
150 }
151
152 pub struct BlockedTasks {
153     inner: Arc<AtomicUint>,
154 }
155
156 impl Task {
157     /// Creates a new uninitialized task.
158     ///
159     /// This method cannot be used to immediately invoke `run` because the task
160     /// itself will likely require a runtime to be inserted via `put_runtime`.
161     ///
162     /// Note that you likely don't want to call this function, but rather the
163     /// task creation functions through libnative or libgreen.
164     pub fn new() -> Task {
165         Task {
166             heap: LocalHeap::new(),
167             gc: GarbageCollector,
168             storage: LocalStorage(None),
169             unwinder: Unwinder::new(),
170             death: Death::new(),
171             state: New,
172             name: None,
173             imp: None,
174         }
175     }
176
177     /// Consumes ownership of a task, runs some code, and returns the task back.
178     ///
179     /// This function can be used as an emulated "try/catch" to interoperate
180     /// with the rust runtime at the outermost boundary. It is not possible to
181     /// use this function in a nested fashion (a try/catch inside of another
182     /// try/catch). Invoking this function is quite cheap.
183     ///
184     /// If the closure `f` succeeds, then the returned task can be used again
185     /// for another invocation of `run`. If the closure `f` fails then `self`
186     /// will be internally destroyed along with all of the other associated
187     /// resources of this task. The `on_exit` callback is invoked with the
188     /// cause of failure (not returned here). This can be discovered by querying
189     /// `is_destroyed()`.
190     ///
191     /// Note that it is possible to view partial execution of the closure `f`
192     /// because it is not guaranteed to run to completion, but this function is
193     /// guaranteed to return if it fails. Care should be taken to ensure that
194     /// stack references made by `f` are handled appropriately.
195     ///
196     /// It is invalid to call this function with a task that has been previously
197     /// destroyed via a failed call to `run`.
198     ///
199     /// # Example
200     ///
201     /// ```no_run
202     /// extern crate native;
203     /// use std::uint;
204     /// # fn main() {
205     ///
206     /// // Create a new native task
207     /// let task = native::task::new((0, uint::MAX));
208     ///
209     /// // Run some code once and then destroy this task
210     /// task.run(|| {
211     ///     println!("Hello with a native runtime!");
212     /// }).destroy();
213     /// # }
214     /// ```
215     pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
216         assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
217
218         // First, make sure that no one else is in TLS. This does not allow
219         // recursive invocations of run(). If there's no one else, then
220         // relinquish ownership of ourselves back into TLS.
221         if Local::exists(None::<Task>) {
222             fail!("cannot run a task recursively inside another");
223         }
224         self.state = Armed;
225         Local::put(self);
226
227         // There are two primary reasons that general try/catch is unsafe. The
228         // first is that we do not support nested try/catch. The above check for
229         // an existing task in TLS is sufficient for this invariant to be
230         // upheld. The second is that unwinding while unwinding is not defined.
231         // We take care of that by having an 'unwinding' flag in the task
232         // itself. For these reasons, this unsafety should be ok.
233         let result = unsafe { unwind::try(f) };
234
235         // After running the closure given return the task back out if it ran
236         // successfully, or clean up the task if it failed.
237         let task: Box<Task> = Local::take();
238         match result {
239             Ok(()) => task,
240             Err(cause) => { task.cleanup(Err(cause)) }
241         }
242     }
243
244     /// Destroy all associated resources of this task.
245     ///
246     /// This function will perform any necessary clean up to prepare the task
247     /// for destruction. It is required that this is called before a `Task`
248     /// falls out of scope.
249     ///
250     /// The returned task cannot be used for running any more code, but it may
251     /// be used to extract the runtime as necessary.
252     pub fn destroy(self: Box<Task>) -> Box<Task> {
253         if self.is_destroyed() {
254             self
255         } else {
256             self.cleanup(Ok(()))
257         }
258     }
259
260     /// Cleans up a task, processing the result of the task as appropriate.
261     ///
262     /// This function consumes ownership of the task, deallocating it once it's
263     /// done being processed. It is assumed that TLD and the local heap have
264     /// already been destroyed and/or annihilated.
265     fn cleanup(self: Box<Task>, result: Result) -> Box<Task> {
266         // The first thing to do when cleaning up is to deallocate our local
267         // resources, such as TLD and GC data.
268         //
269         // FIXME: there are a number of problems with this code
270         //
271         // 1. If any TLD object fails destruction, then all of TLD will leak.
272         //    This appears to be a consequence of #14875.
273         //
274         // 2. Failing during GC annihilation aborts the runtime #14876.
275         //
276         // 3. Setting a TLD key while destroying TLD or while destroying GC will
277         //    abort the runtime #14807.
278         //
279         // 4. Invoking GC in GC destructors will abort the runtime #6996.
280         //
281         // 5. The order of destruction of TLD and GC matters, but either way is
282         //    susceptible to leaks (see 3/4) #8302.
283         //
284         // That being said, there are a few upshots to this code
285         //
286         // 1. If TLD destruction fails, heap destruction will be attempted.
287         //    There is a test for this at fail-during-tld-destroy.rs. Sadly the
288         //    other way can't be tested due to point 2 above. Note that we must
289         //    immortalize the heap first because if any deallocations are
290         //    attempted while TLD is being dropped it will attempt to free the
291         //    allocation from the wrong heap (because the current one has been
292         //    replaced).
293         //
294         // 2. One failure in destruction is tolerable, so long as the task
295         //    didn't originally fail while it was running.
296         //
297         // And with all that in mind, we attempt to clean things up!
298         let mut task = self.run(|| {
299             let mut task = Local::borrow(None::<Task>);
300             let tld = {
301                 let &LocalStorage(ref mut optmap) = &mut task.storage;
302                 optmap.take()
303             };
304             let mut heap = mem::replace(&mut task.heap, LocalHeap::new());
305             unsafe { heap.immortalize() }
306             drop(task);
307
308             // First, destroy task-local storage. This may run user dtors.
309             drop(tld);
310
311             // Destroy remaining boxes. Also may run user dtors.
312             drop(heap);
313         });
314
315         // If the above `run` block failed, then it must be the case that the
316         // task had previously succeeded. This also means that the code below
317         // was recursively run via the `run` method invoking this method. In
318         // this case, we just make sure the world is as we thought, and return.
319         if task.is_destroyed() {
320             rtassert!(result.is_ok())
321             return task
322         }
323
324         // After taking care of the data above, we need to transmit the result
325         // of this task.
326         let what_to_do = task.death.on_exit.take();
327         Local::put(task);
328
329         // FIXME: this is running in a seriously constrained context. If this
330         //        allocates GC or allocates TLD then it will likely abort the
331         //        runtime. Similarly, if this fails, this will also likely abort
332         //        the runtime.
333         //
334         //        This closure is currently limited to a channel send via the
335         //        standard library's task interface, but this needs
336         //        reconsideration to whether it's a reasonable thing to let a
337         //        task to do or not.
338         match what_to_do {
339             Some(f) => { f(result) }
340             None => { drop(result) }
341         }
342
343         // Now that we're done, we remove the task from TLS and flag it for
344         // destruction.
345         let mut task: Box<Task> = Local::take();
346         task.state = Destroyed;
347         return task;
348     }
349
350     /// Queries whether this can be destroyed or not.
351     pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
352
353     /// Inserts a runtime object into this task, transferring ownership to the
354     /// task. It is illegal to replace a previous runtime object in this task
355     /// with this argument.
356     pub fn put_runtime(&mut self, ops: Box<Runtime + Send>) {
357         assert!(self.imp.is_none());
358         self.imp = Some(ops);
359     }
360
361     /// Removes the runtime from this task, transferring ownership to the
362     /// caller.
363     pub fn take_runtime(&mut self) -> Box<Runtime + Send> {
364         assert!(self.imp.is_some());
365         self.imp.take().unwrap()
366     }
367
368     /// Attempts to extract the runtime as a specific type. If the runtime does
369     /// not have the provided type, then the runtime is not removed. If the
370     /// runtime does have the specified type, then it is removed and returned
371     /// (transfer of ownership).
372     ///
373     /// It is recommended to only use this method when *absolutely necessary*.
374     /// This function may not be available in the future.
375     pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
376         // This is a terrible, terrible function. The general idea here is to
377         // take the runtime, cast it to Box<Any>, check if it has the right
378         // type, and then re-cast it back if necessary. The method of doing
379         // this is pretty sketchy and involves shuffling vtables of trait
380         // objects around, but it gets the job done.
381         //
382         // FIXME: This function is a serious code smell and should be avoided at
383         //      all costs. I have yet to think of a method to avoid this
384         //      function, and I would be saddened if more usage of the function
385         //      crops up.
386         unsafe {
387             let imp = self.imp.take_unwrap();
388             let vtable = mem::transmute::<_, &raw::TraitObject>(&imp).vtable;
389             match imp.wrap().downcast::<T>() {
390                 Ok(t) => Some(t),
391                 Err(t) => {
392                     let data = mem::transmute::<_, raw::TraitObject>(t).data;
393                     let obj: Box<Runtime + Send> =
394                         mem::transmute(raw::TraitObject {
395                             vtable: vtable,
396                             data: data,
397                         });
398                     self.put_runtime(obj);
399                     None
400                 }
401             }
402         }
403     }
404
405     /// Spawns a sibling to this task. The newly spawned task is configured with
406     /// the `opts` structure and will run `f` as the body of its code.
407     pub fn spawn_sibling(mut self: Box<Task>,
408                          opts: TaskOpts,
409                          f: proc(): Send) {
410         let ops = self.imp.take_unwrap();
411         ops.spawn_sibling(self, opts, f)
412     }
413
414     /// Deschedules the current task, invoking `f` `amt` times. It is not
415     /// recommended to use this function directly, but rather communication
416     /// primitives in `std::comm` should be used.
417     pub fn deschedule(mut self: Box<Task>,
418                       amt: uint,
419                       f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
420         let ops = self.imp.take_unwrap();
421         ops.deschedule(amt, self, f)
422     }
423
424     /// Wakes up a previously blocked task, optionally specifying whether the
425     /// current task can accept a change in scheduling. This function can only
426     /// be called on tasks that were previously blocked in `deschedule`.
427     pub fn reawaken(mut self: Box<Task>) {
428         let ops = self.imp.take_unwrap();
429         ops.reawaken(self);
430     }
431
432     /// Yields control of this task to another task. This function will
433     /// eventually return, but possibly not immediately. This is used as an
434     /// opportunity to allow other tasks a chance to run.
435     pub fn yield_now(mut self: Box<Task>) {
436         let ops = self.imp.take_unwrap();
437         ops.yield_now(self);
438     }
439
440     /// Similar to `yield_now`, except that this function may immediately return
441     /// without yielding (depending on what the runtime decides to do).
442     pub fn maybe_yield(mut self: Box<Task>) {
443         let ops = self.imp.take_unwrap();
444         ops.maybe_yield(self);
445     }
446
447     /// Acquires a handle to the I/O factory that this task contains, normally
448     /// stored in the task's runtime. This factory may not always be available,
449     /// which is why the return type is `Option`
450     pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
451         self.imp.get_mut_ref().local_io()
452     }
453
454     /// Returns the stack bounds for this task in (lo, hi) format. The stack
455     /// bounds may not be known for all tasks, so the return value may be
456     /// `None`.
457     pub fn stack_bounds(&self) -> (uint, uint) {
458         self.imp.get_ref().stack_bounds()
459     }
460
461     /// Returns whether it is legal for this task to block the OS thread that it
462     /// is running on.
463     pub fn can_block(&self) -> bool {
464         self.imp.get_ref().can_block()
465     }
466
467     /// Consume this task, flagging it as a candidate for destruction.
468     ///
469     /// This function is required to be invoked to destroy a task. A task
470     /// destroyed through a normal drop will abort.
471     pub fn drop(mut self) {
472         self.state = Destroyed;
473     }
474 }
475
476 impl Drop for Task {
477     fn drop(&mut self) {
478         rtdebug!("called drop for a task: {}", self as *mut Task as uint);
479         rtassert!(self.state != Armed);
480     }
481 }
482
483 impl TaskOpts {
484     pub fn new() -> TaskOpts {
485         TaskOpts { on_exit: None, name: None, stack_size: None }
486     }
487 }
488
489 impl Iterator<BlockedTask> for BlockedTasks {
490     fn next(&mut self) -> Option<BlockedTask> {
491         Some(Shared(self.inner.clone()))
492     }
493 }
494
495 impl BlockedTask {
496     /// Returns Some if the task was successfully woken; None if already killed.
497     pub fn wake(self) -> Option<Box<Task>> {
498         match self {
499             Owned(task) => Some(task),
500             Shared(arc) => {
501                 match arc.swap(0, SeqCst) {
502                     0 => None,
503                     n => Some(unsafe { mem::transmute(n) }),
504                 }
505             }
506         }
507     }
508
509     /// Reawakens this task if ownership is acquired. If finer-grained control
510     /// is desired, use `wake` instead.
511     pub fn reawaken(self) {
512         self.wake().map(|t| t.reawaken());
513     }
514
515     // This assertion has two flavours because the wake involves an atomic op.
516     // In the faster version, destructors will fail dramatically instead.
517     #[cfg(not(test))] pub fn trash(self) { }
518     #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
519
520     /// Create a blocked task, unless the task was already killed.
521     pub fn block(task: Box<Task>) -> BlockedTask {
522         Owned(task)
523     }
524
525     /// Converts one blocked task handle to a list of many handles to the same.
526     pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
527         let arc = match self {
528             Owned(task) => {
529                 let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
530                 Arc::new(flag)
531             }
532             Shared(arc) => arc.clone(),
533         };
534         BlockedTasks{ inner: arc }.take(num_handles)
535     }
536
537     /// Convert to an unsafe uint value. Useful for storing in a pipe's state
538     /// flag.
539     #[inline]
540     pub unsafe fn cast_to_uint(self) -> uint {
541         match self {
542             Owned(task) => {
543                 let blocked_task_ptr: uint = mem::transmute(task);
544                 rtassert!(blocked_task_ptr & 0x1 == 0);
545                 blocked_task_ptr
546             }
547             Shared(arc) => {
548                 let blocked_task_ptr: uint = mem::transmute(box arc);
549                 rtassert!(blocked_task_ptr & 0x1 == 0);
550                 blocked_task_ptr | 0x1
551             }
552         }
553     }
554
555     /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
556     /// flag.
557     #[inline]
558     pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
559         if blocked_task_ptr & 0x1 == 0 {
560             Owned(mem::transmute(blocked_task_ptr))
561         } else {
562             let ptr: Box<Arc<AtomicUint>> =
563                 mem::transmute(blocked_task_ptr & !1);
564             Shared(*ptr)
565         }
566     }
567 }
568
569 impl Death {
570     pub fn new() -> Death {
571         Death { on_exit: None, marker: marker::NoCopy }
572     }
573 }
574
575 #[cfg(test)]
576 mod test {
577     use super::*;
578     use std::prelude::*;
579     use std::task;
580     use std::gc::{Gc, GC};
581
582     #[test]
583     fn local_heap() {
584         let a = box(GC) 5i;
585         let b = a;
586         assert!(*a == 5);
587         assert!(*b == 5);
588     }
589
590     #[test]
591     fn tls() {
592         local_data_key!(key: Gc<String>)
593         key.replace(Some(box(GC) "data".to_string()));
594         assert_eq!(key.get().unwrap().as_slice(), "data");
595         local_data_key!(key2: Gc<String>)
596         key2.replace(Some(box(GC) "data".to_string()));
597         assert_eq!(key2.get().unwrap().as_slice(), "data");
598     }
599
600     #[test]
601     fn unwind() {
602         let result = task::try(proc()());
603         rtdebug!("trying first assert");
604         assert!(result.is_ok());
605         let result = task::try::<()>(proc() fail!());
606         rtdebug!("trying second assert");
607         assert!(result.is_err());
608     }
609
610     #[test]
611     fn rng() {
612         use std::rand::{StdRng, Rng};
613         let mut r = StdRng::new().ok().unwrap();
614         let _ = r.next_u32();
615     }
616
617     #[test]
618     fn comm_stream() {
619         let (tx, rx) = channel();
620         tx.send(10i);
621         assert!(rx.recv() == 10);
622     }
623
624     #[test]
625     fn comm_shared_chan() {
626         let (tx, rx) = channel();
627         tx.send(10i);
628         assert!(rx.recv() == 10);
629     }
630
631     #[test]
632     fn heap_cycles() {
633         use std::cell::RefCell;
634
635         struct List {
636             next: Option<Gc<RefCell<List>>>,
637         }
638
639         let a = box(GC) RefCell::new(List { next: None });
640         let b = box(GC) RefCell::new(List { next: Some(a) });
641
642         {
643             let mut a = a.borrow_mut();
644             a.next = Some(b);
645         }
646     }
647
648     #[test]
649     #[should_fail]
650     fn test_begin_unwind() {
651         use std::rt::unwind::begin_unwind;
652         begin_unwind("cause", file!(), line!())
653     }
654
655     #[test]
656     fn drop_new_task_ok() {
657         drop(Task::new());
658     }
659
660     // Task blocking tests
661
662     #[test]
663     fn block_and_wake() {
664         let task = box Task::new();
665         let mut task = BlockedTask::block(task).wake().unwrap();
666         task.drop();
667     }
668 }