]> git.lizzy.rs Git - rust.git/blob - src/libnative/task.rs
auto merge of #14728 : TeXitoi/rust/relicense-shootout-regex-dna, r=brson
[rust.git] / src / libnative / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Tasks implemented on top of OS threads
12 //!
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
16
17 use std::any::Any;
18 use std::mem;
19 use std::rt::bookkeeping;
20 use std::rt::local::Local;
21 use std::rt::mutex::NativeMutex;
22 use std::rt::rtio;
23 use std::rt::stack;
24 use std::rt::task::{Task, BlockedTask, TaskOpts};
25 use std::rt::thread::Thread;
26 use std::rt;
27
28 use io;
29 use task;
30
31 /// Creates a new Task which is ready to execute as a 1:1 task.
32 pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
33     let mut task = box Task::new();
34     let mut ops = ops();
35     ops.stack_bounds = stack_bounds;
36     task.put_runtime(ops);
37     return task;
38 }
39
40 fn ops() -> Box<Ops> {
41     box Ops {
42         lock: unsafe { NativeMutex::new() },
43         awoken: false,
44         io: io::IoFactory::new(),
45         // these *should* get overwritten
46         stack_bounds: (0, 0),
47     }
48 }
49
50 /// Spawns a function with the default configuration
51 pub fn spawn(f: proc():Send) {
52     spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
53 }
54
55 /// Spawns a new task given the configuration options and a procedure to run
56 /// inside the task.
57 pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
58     let TaskOpts { name, stack_size, on_exit } = opts;
59
60     let mut task = box Task::new();
61     task.name = name;
62     task.death.on_exit = on_exit;
63
64     let stack = stack_size.unwrap_or(rt::min_stack());
65     let task = task;
66     let ops = ops();
67
68     // Note that this increment must happen *before* the spawn in order to
69     // guarantee that if this task exits it will always end up waiting for the
70     // spawned task to exit.
71     bookkeeping::increment();
72
73     // Spawning a new OS thread guarantees that __morestack will never get
74     // triggered, but we must manually set up the actual stack bounds once this
75     // function starts executing. This raises the lower limit by a bit because
76     // by the time that this function is executing we've already consumed at
77     // least a little bit of stack (we don't know the exact byte address at
78     // which our stack started).
79     Thread::spawn_stack(stack, proc() {
80         let something_around_the_top_of_the_stack = 1;
81         let addr = &something_around_the_top_of_the_stack as *int;
82         let my_stack = addr as uint;
83         unsafe {
84             stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
85         }
86         let mut ops = ops;
87         ops.stack_bounds = (my_stack - stack + 1024, my_stack);
88
89         let mut f = Some(f);
90         let mut task = task;
91         task.put_runtime(ops);
92         let t = task.run(|| { f.take_unwrap()() });
93         drop(t);
94         bookkeeping::decrement();
95     })
96 }
97
98 // This structure is the glue between channels and the 1:1 scheduling mode. This
99 // structure is allocated once per task.
100 struct Ops {
101     lock: NativeMutex,       // native synchronization
102     awoken: bool,      // used to prevent spurious wakeups
103     io: io::IoFactory, // local I/O factory
104
105     // This field holds the known bounds of the stack in (lo, hi) form. Not all
106     // native tasks necessarily know their precise bounds, hence this is
107     // optional.
108     stack_bounds: (uint, uint),
109 }
110
111 impl rt::Runtime for Ops {
112     fn yield_now(~self, mut cur_task: Box<Task>) {
113         // put the task back in TLS and then invoke the OS thread yield
114         cur_task.put_runtime(self);
115         Local::put(cur_task);
116         Thread::yield_now();
117     }
118
119     fn maybe_yield(~self, mut cur_task: Box<Task>) {
120         // just put the task back in TLS, on OS threads we never need to
121         // opportunistically yield b/c the OS will do that for us (preemption)
122         cur_task.put_runtime(self);
123         Local::put(cur_task);
124     }
125
126     fn wrap(~self) -> Box<Any> {
127         self as Box<Any>
128     }
129
130     fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
131
132     fn can_block(&self) -> bool { true }
133
134     // This function gets a little interesting. There are a few safety and
135     // ownership violations going on here, but this is all done in the name of
136     // shared state. Additionally, all of the violations are protected with a
137     // mutex, so in theory there are no races.
138     //
139     // The first thing we need to do is to get a pointer to the task's internal
140     // mutex. This address will not be changing (because the task is allocated
141     // on the heap). We must have this handle separately because the task will
142     // have its ownership transferred to the given closure. We're guaranteed,
143     // however, that this memory will remain valid because *this* is the current
144     // task's execution thread.
145     //
146     // The next weird part is where ownership of the task actually goes. We
147     // relinquish it to the `f` blocking function, but upon returning this
148     // function needs to replace the task back in TLS. There is no communication
149     // from the wakeup thread back to this thread about the task pointer, and
150     // there's really no need to. In order to get around this, we cast the task
151     // to a `uint` which is then used at the end of this function to cast back
152     // to a `Box<Task>` object. Naturally, this looks like it violates
153     // ownership semantics in that there may be two `Box<Task>` objects.
154     //
155     // The fun part is that the wakeup half of this implementation knows to
156     // "forget" the task on the other end. This means that the awakening half of
157     // things silently relinquishes ownership back to this thread, but not in a
158     // way that the compiler can understand. The task's memory is always valid
159     // for both tasks because these operations are all done inside of a mutex.
160     //
161     // You'll also find that if blocking fails (the `f` function hands the
162     // BlockedTask back to us), we will `mem::forget` the handles. The
163     // reasoning for this is the same logic as above in that the task silently
164     // transfers ownership via the `uint`, not through normal compiler
165     // semantics.
166     //
167     // On a mildly unrelated note, it should also be pointed out that OS
168     // condition variables are susceptible to spurious wakeups, which we need to
169     // be ready for. In order to accomodate for this fact, we have an extra
170     // `awoken` field which indicates whether we were actually woken up via some
171     // invocation of `reawaken`. This flag is only ever accessed inside the
172     // lock, so there's no need to make it atomic.
173     fn deschedule(mut ~self, times: uint, mut cur_task: Box<Task>,
174                   f: |BlockedTask| -> Result<(), BlockedTask>) {
175         let me = &mut *self as *mut Ops;
176         cur_task.put_runtime(self);
177
178         unsafe {
179             let cur_task_dupe = &*cur_task as *Task;
180             let task = BlockedTask::block(cur_task);
181
182             if times == 1 {
183                 let guard = (*me).lock.lock();
184                 (*me).awoken = false;
185                 match f(task) {
186                     Ok(()) => {
187                         while !(*me).awoken {
188                             guard.wait();
189                         }
190                     }
191                     Err(task) => { mem::forget(task.wake()); }
192                 }
193             } else {
194                 let iter = task.make_selectable(times);
195                 let guard = (*me).lock.lock();
196                 (*me).awoken = false;
197
198                 // Apply the given closure to all of the "selectable tasks",
199                 // bailing on the first one that produces an error. Note that
200                 // care must be taken such that when an error is occurred, we
201                 // may not own the task, so we may still have to wait for the
202                 // task to become available. In other words, if task.wake()
203                 // returns `None`, then someone else has ownership and we must
204                 // wait for their signal.
205                 match iter.map(f).filter_map(|a| a.err()).next() {
206                     None => {}
207                     Some(task) => {
208                         match task.wake() {
209                             Some(task) => {
210                                 mem::forget(task);
211                                 (*me).awoken = true;
212                             }
213                             None => {}
214                         }
215                     }
216                 }
217                 while !(*me).awoken {
218                     guard.wait();
219                 }
220             }
221             // re-acquire ownership of the task
222             cur_task = mem::transmute(cur_task_dupe);
223         }
224
225         // put the task back in TLS, and everything is as it once was.
226         Local::put(cur_task);
227     }
228
229     // See the comments on `deschedule` for why the task is forgotten here, and
230     // why it's valid to do so.
231     fn reawaken(mut ~self, mut to_wake: Box<Task>) {
232         unsafe {
233             let me = &mut *self as *mut Ops;
234             to_wake.put_runtime(self);
235             mem::forget(to_wake);
236             let guard = (*me).lock.lock();
237             (*me).awoken = true;
238             guard.signal();
239         }
240     }
241
242     fn spawn_sibling(~self,
243                      mut cur_task: Box<Task>,
244                      opts: TaskOpts,
245                      f: proc():Send) {
246         cur_task.put_runtime(self);
247         Local::put(cur_task);
248
249         task::spawn_opts(opts, f);
250     }
251
252     fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
253         Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
254     }
255 }
256
257 #[cfg(test)]
258 mod tests {
259     use std::rt::local::Local;
260     use std::rt::task::{Task, TaskOpts};
261     use std::task;
262     use super::{spawn, spawn_opts, Ops};
263
264     #[test]
265     fn smoke() {
266         let (tx, rx) = channel();
267         spawn(proc() {
268             tx.send(());
269         });
270         rx.recv();
271     }
272
273     #[test]
274     fn smoke_fail() {
275         let (tx, rx) = channel::<()>();
276         spawn(proc() {
277             let _tx = tx;
278             fail!()
279         });
280         assert_eq!(rx.recv_opt(), Err(()));
281     }
282
283     #[test]
284     fn smoke_opts() {
285         let mut opts = TaskOpts::new();
286         opts.name = Some("test".into_maybe_owned());
287         opts.stack_size = Some(20 * 4096);
288         let (tx, rx) = channel();
289         opts.on_exit = Some(proc(r) tx.send(r));
290         spawn_opts(opts, proc() {});
291         assert!(rx.recv().is_ok());
292     }
293
294     #[test]
295     fn smoke_opts_fail() {
296         let mut opts = TaskOpts::new();
297         let (tx, rx) = channel();
298         opts.on_exit = Some(proc(r) tx.send(r));
299         spawn_opts(opts, proc() { fail!() });
300         assert!(rx.recv().is_err());
301     }
302
303     #[test]
304     fn yield_test() {
305         let (tx, rx) = channel();
306         spawn(proc() {
307             for _ in range(0, 10) { task::deschedule(); }
308             tx.send(());
309         });
310         rx.recv();
311     }
312
313     #[test]
314     fn spawn_children() {
315         let (tx1, rx) = channel();
316         spawn(proc() {
317             let (tx2, rx) = channel();
318             spawn(proc() {
319                 let (tx3, rx) = channel();
320                 spawn(proc() {
321                     tx3.send(());
322                 });
323                 rx.recv();
324                 tx2.send(());
325             });
326             rx.recv();
327             tx1.send(());
328         });
329         rx.recv();
330     }
331
332     #[test]
333     fn spawn_inherits() {
334         let (tx, rx) = channel();
335         spawn(proc() {
336             spawn(proc() {
337                 let mut task: Box<Task> = Local::take();
338                 match task.maybe_take_runtime::<Ops>() {
339                     Some(ops) => {
340                         task.put_runtime(ops);
341                     }
342                     None => fail!(),
343                 }
344                 Local::put(task);
345                 tx.send(());
346             });
347         });
348         rx.recv();
349     }
350 }