]> git.lizzy.rs Git - rust.git/blob - src/libnative/task.rs
auto merge of #11360 : huonw/rust/stack_bounds, r=alexcrichton
[rust.git] / src / libnative / task.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Tasks implemented on top of OS threads
12 //!
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
16
17 use std::cast;
18 use std::rt::env;
19 use std::rt::local::Local;
20 use std::rt::rtio;
21 use std::rt::task::{Task, BlockedTask};
22 use std::rt::thread::Thread;
23 use std::rt;
24 use std::task::TaskOpts;
25 use std::unstable::mutex::Mutex;
26 use std::unstable::stack;
27
28 use io;
29 use task;
30 use bookeeping;
31
32 /// Creates a new Task which is ready to execute as a 1:1 task.
33 pub fn new(stack_bounds: (uint, uint)) -> ~Task {
34     let mut task = ~Task::new();
35     let mut ops = ops();
36     ops.stack_bounds = stack_bounds;
37     task.put_runtime(ops as ~rt::Runtime);
38     return task;
39 }
40
41 fn ops() -> ~Ops {
42     ~Ops {
43         lock: unsafe { Mutex::new() },
44         awoken: false,
45         io: io::IoFactory::new(),
46         // these *should* get overwritten
47         stack_bounds: (0, 0),
48     }
49 }
50
51 /// Spawns a function with the default configuration
52 pub fn spawn(f: proc()) {
53     spawn_opts(TaskOpts::new(), f)
54 }
55
56 /// Spawns a new task given the configuration options and a procedure to run
57 /// inside the task.
58 pub fn spawn_opts(opts: TaskOpts, f: proc()) {
59     let TaskOpts {
60         watched: _watched,
61         notify_chan, name, stack_size,
62         logger, stderr, stdout,
63     } = opts;
64
65     let mut task = ~Task::new();
66     task.name = name;
67     task.logger = logger;
68     task.stderr = stderr;
69     task.stdout = stdout;
70     match notify_chan {
71         Some(chan) => {
72             let on_exit = proc(task_result) { chan.send(task_result) };
73             task.death.on_exit = Some(on_exit);
74         }
75         None => {}
76     }
77
78     let stack = stack_size.unwrap_or(env::min_stack());
79     let task = task;
80     let ops = ops();
81
82     // Note that this increment must happen *before* the spawn in order to
83     // guarantee that if this task exits it will always end up waiting for the
84     // spawned task to exit.
85     bookeeping::increment();
86
87     // Spawning a new OS thread guarantees that __morestack will never get
88     // triggered, but we must manually set up the actual stack bounds once this
89     // function starts executing. This raises the lower limit by a bit because
90     // by the time that this function is executing we've already consumed at
91     // least a little bit of stack (we don't know the exact byte address at
92     // which our stack started).
93     Thread::spawn_stack(stack, proc() {
94         let something_around_the_top_of_the_stack = 1;
95         let addr = &something_around_the_top_of_the_stack as *int;
96         let my_stack = addr as uint;
97         unsafe {
98             stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
99         }
100         let mut ops = ops;
101         ops.stack_bounds = (my_stack - stack + 1024, my_stack);
102
103         let mut f = Some(f);
104         let mut task = task;
105         task.put_runtime(ops as ~rt::Runtime);
106         task.run(|| { f.take_unwrap()() });
107         bookeeping::decrement();
108     })
109 }
110
111 // This structure is the glue between channels and the 1:1 scheduling mode. This
112 // structure is allocated once per task.
113 struct Ops {
114     lock: Mutex,       // native synchronization
115     awoken: bool,      // used to prevent spurious wakeups
116     io: io::IoFactory, // local I/O factory
117
118     // This field holds the known bounds of the stack in (lo, hi) form. Not all
119     // native tasks necessarily know their precise bounds, hence this is
120     // optional.
121     stack_bounds: (uint, uint),
122 }
123
124 impl rt::Runtime for Ops {
125     fn yield_now(~self, mut cur_task: ~Task) {
126         // put the task back in TLS and then invoke the OS thread yield
127         cur_task.put_runtime(self as ~rt::Runtime);
128         Local::put(cur_task);
129         Thread::yield_now();
130     }
131
132     fn maybe_yield(~self, mut cur_task: ~Task) {
133         // just put the task back in TLS, on OS threads we never need to
134         // opportunistically yield b/c the OS will do that for us (preemption)
135         cur_task.put_runtime(self as ~rt::Runtime);
136         Local::put(cur_task);
137     }
138
139     fn wrap(~self) -> ~Any {
140         self as ~Any
141     }
142
143     fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
144
145     // This function gets a little interesting. There are a few safety and
146     // ownership violations going on here, but this is all done in the name of
147     // shared state. Additionally, all of the violations are protected with a
148     // mutex, so in theory there are no races.
149     //
150     // The first thing we need to do is to get a pointer to the task's internal
151     // mutex. This address will not be changing (because the task is allocated
152     // on the heap). We must have this handle separately because the task will
153     // have its ownership transferred to the given closure. We're guaranteed,
154     // however, that this memory will remain valid because *this* is the current
155     // task's execution thread.
156     //
157     // The next weird part is where ownership of the task actually goes. We
158     // relinquish it to the `f` blocking function, but upon returning this
159     // function needs to replace the task back in TLS. There is no communication
160     // from the wakeup thread back to this thread about the task pointer, and
161     // there's really no need to. In order to get around this, we cast the task
162     // to a `uint` which is then used at the end of this function to cast back
163     // to a `~Task` object. Naturally, this looks like it violates ownership
164     // semantics in that there may be two `~Task` objects.
165     //
166     // The fun part is that the wakeup half of this implementation knows to
167     // "forget" the task on the other end. This means that the awakening half of
168     // things silently relinquishes ownership back to this thread, but not in a
169     // way that the compiler can understand. The task's memory is always valid
170     // for both tasks because these operations are all done inside of a mutex.
171     //
172     // You'll also find that if blocking fails (the `f` function hands the
173     // BlockedTask back to us), we will `cast::forget` the handles. The
174     // reasoning for this is the same logic as above in that the task silently
175     // transfers ownership via the `uint`, not through normal compiler
176     // semantics.
177     //
178     // On a mildly unrelated note, it should also be pointed out that OS
179     // condition variables are susceptible to spurious wakeups, which we need to
180     // be ready for. In order to accomodate for this fact, we have an extra
181     // `awoken` field which indicates whether we were actually woken up via some
182     // invocation of `reawaken`. This flag is only ever accessed inside the
183     // lock, so there's no need to make it atomic.
184     fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
185                   f: |BlockedTask| -> Result<(), BlockedTask>) {
186         let me = &mut *self as *mut Ops;
187         cur_task.put_runtime(self as ~rt::Runtime);
188
189         unsafe {
190             let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
191             let task = BlockedTask::block(cur_task);
192
193             if times == 1 {
194                 (*me).lock.lock();
195                 (*me).awoken = false;
196                 match f(task) {
197                     Ok(()) => {
198                         while !(*me).awoken {
199                             (*me).lock.wait();
200                         }
201                     }
202                     Err(task) => { cast::forget(task.wake()); }
203                 }
204                 (*me).lock.unlock();
205             } else {
206                 let mut iter = task.make_selectable(times);
207                 (*me).lock.lock();
208                 (*me).awoken = false;
209                 let success = iter.all(|task| {
210                     match f(task) {
211                         Ok(()) => true,
212                         Err(task) => {
213                             cast::forget(task.wake());
214                             false
215                         }
216                     }
217                 });
218                 while success && !(*me).awoken {
219                     (*me).lock.wait();
220                 }
221                 (*me).lock.unlock();
222             }
223             // re-acquire ownership of the task
224             cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
225         }
226
227         // put the task back in TLS, and everything is as it once was.
228         Local::put(cur_task);
229     }
230
231     // See the comments on `deschedule` for why the task is forgotten here, and
232     // why it's valid to do so.
233     fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
234         unsafe {
235             let me = &mut *self as *mut Ops;
236             to_wake.put_runtime(self as ~rt::Runtime);
237             cast::forget(to_wake);
238             (*me).lock.lock();
239             (*me).awoken = true;
240             (*me).lock.signal();
241             (*me).lock.unlock();
242         }
243     }
244
245     fn spawn_sibling(~self, mut cur_task: ~Task, opts: TaskOpts, f: proc()) {
246         cur_task.put_runtime(self as ~rt::Runtime);
247         Local::put(cur_task);
248
249         task::spawn_opts(opts, f);
250     }
251
252     fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
253         Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
254     }
255 }
256
257 impl Drop for Ops {
258     fn drop(&mut self) {
259         unsafe { self.lock.destroy() }
260     }
261 }
262
263 #[cfg(test)]
264 mod tests {
265     use std::rt::Runtime;
266     use std::rt::local::Local;
267     use std::rt::task::Task;
268     use std::task;
269     use std::task::TaskOpts;
270     use super::{spawn, spawn_opts, Ops};
271
272     #[test]
273     fn smoke() {
274         let (p, c) = Chan::new();
275         do spawn {
276             c.send(());
277         }
278         p.recv();
279     }
280
281     #[test]
282     fn smoke_fail() {
283         let (p, c) = Chan::<()>::new();
284         do spawn {
285             let _c = c;
286             fail!()
287         }
288         assert_eq!(p.recv_opt(), None);
289     }
290
291     #[test]
292     fn smoke_opts() {
293         let mut opts = TaskOpts::new();
294         opts.name = Some(SendStrStatic("test"));
295         opts.stack_size = Some(20 * 4096);
296         let (p, c) = Chan::new();
297         opts.notify_chan = Some(c);
298         spawn_opts(opts, proc() {});
299         assert!(p.recv().is_ok());
300     }
301
302     #[test]
303     fn smoke_opts_fail() {
304         let mut opts = TaskOpts::new();
305         let (p, c) = Chan::new();
306         opts.notify_chan = Some(c);
307         spawn_opts(opts, proc() { fail!() });
308         assert!(p.recv().is_err());
309     }
310
311     #[test]
312     fn yield_test() {
313         let (p, c) = Chan::new();
314         do spawn {
315             10.times(task::deschedule);
316             c.send(());
317         }
318         p.recv();
319     }
320
321     #[test]
322     fn spawn_children() {
323         let (p, c) = Chan::new();
324         do spawn {
325             let (p, c2) = Chan::new();
326             do spawn {
327                 let (p, c3) = Chan::new();
328                 do spawn {
329                     c3.send(());
330                 }
331                 p.recv();
332                 c2.send(());
333             }
334             p.recv();
335             c.send(());
336         }
337         p.recv();
338     }
339
340     #[test]
341     fn spawn_inherits() {
342         let (p, c) = Chan::new();
343         do spawn {
344             let c = c;
345             do spawn {
346                 let mut task: ~Task = Local::take();
347                 match task.maybe_take_runtime::<Ops>() {
348                     Some(ops) => {
349                         task.put_runtime(ops as ~Runtime);
350                     }
351                     None => fail!(),
352                 }
353                 Local::put(task);
354                 c.send(());
355             }
356         }
357         p.recv();
358     }
359 }