]> git.lizzy.rs Git - rust.git/blob - src/libnative/task.rs
auto merge of #13932 : MrAlert/rust/win-compat, r=brson
[rust.git] / src / libnative / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Tasks implemented on top of OS threads
12 //!
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
16
17 use std::any::Any;
18 use std::mem;
19 use std::rt::bookkeeping;
20 use std::rt::env;
21 use std::rt::local::Local;
22 use std::rt::rtio;
23 use std::rt::stack;
24 use std::rt::task::{Task, BlockedTask, SendMessage};
25 use std::rt::thread::Thread;
26 use std::rt;
27 use std::task::TaskOpts;
28 use std::unstable::mutex::NativeMutex;
29
30 use io;
31 use task;
32
33 /// Creates a new Task which is ready to execute as a 1:1 task.
34 pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
35     let mut task = box Task::new();
36     let mut ops = ops();
37     ops.stack_bounds = stack_bounds;
38     task.put_runtime(ops);
39     return task;
40 }
41
42 fn ops() -> Box<Ops> {
43     box Ops {
44         lock: unsafe { NativeMutex::new() },
45         awoken: false,
46         io: io::IoFactory::new(),
47         // these *should* get overwritten
48         stack_bounds: (0, 0),
49     }
50 }
51
52 /// Spawns a function with the default configuration
53 pub fn spawn(f: proc():Send) {
54     spawn_opts(TaskOpts::new(), f)
55 }
56
57 /// Spawns a new task given the configuration options and a procedure to run
58 /// inside the task.
59 pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
60     let TaskOpts {
61         notify_chan, name, stack_size,
62         stderr, stdout,
63     } = opts;
64
65     let mut task = box Task::new();
66     task.name = name;
67     task.stderr = stderr;
68     task.stdout = stdout;
69     match notify_chan {
70         Some(chan) => { task.death.on_exit = Some(SendMessage(chan)); }
71         None => {}
72     }
73
74     let stack = stack_size.unwrap_or(env::min_stack());
75     let task = task;
76     let ops = ops();
77
78     // Note that this increment must happen *before* the spawn in order to
79     // guarantee that if this task exits it will always end up waiting for the
80     // spawned task to exit.
81     bookkeeping::increment();
82
83     // Spawning a new OS thread guarantees that __morestack will never get
84     // triggered, but we must manually set up the actual stack bounds once this
85     // function starts executing. This raises the lower limit by a bit because
86     // by the time that this function is executing we've already consumed at
87     // least a little bit of stack (we don't know the exact byte address at
88     // which our stack started).
89     Thread::spawn_stack(stack, proc() {
90         let something_around_the_top_of_the_stack = 1;
91         let addr = &something_around_the_top_of_the_stack as *int;
92         let my_stack = addr as uint;
93         unsafe {
94             stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
95         }
96         let mut ops = ops;
97         ops.stack_bounds = (my_stack - stack + 1024, my_stack);
98
99         let mut f = Some(f);
100         let mut task = task;
101         task.put_runtime(ops);
102         let t = task.run(|| { f.take_unwrap()() });
103         drop(t);
104         bookkeeping::decrement();
105     })
106 }
107
108 // This structure is the glue between channels and the 1:1 scheduling mode. This
109 // structure is allocated once per task.
110 struct Ops {
111     lock: NativeMutex,       // native synchronization
112     awoken: bool,      // used to prevent spurious wakeups
113     io: io::IoFactory, // local I/O factory
114
115     // This field holds the known bounds of the stack in (lo, hi) form. Not all
116     // native tasks necessarily know their precise bounds, hence this is
117     // optional.
118     stack_bounds: (uint, uint),
119 }
120
121 impl rt::Runtime for Ops {
122     fn yield_now(~self, mut cur_task: Box<Task>) {
123         // put the task back in TLS and then invoke the OS thread yield
124         cur_task.put_runtime(self);
125         Local::put(cur_task);
126         Thread::yield_now();
127     }
128
129     fn maybe_yield(~self, mut cur_task: Box<Task>) {
130         // just put the task back in TLS, on OS threads we never need to
131         // opportunistically yield b/c the OS will do that for us (preemption)
132         cur_task.put_runtime(self);
133         Local::put(cur_task);
134     }
135
136     fn wrap(~self) -> Box<Any> {
137         self as Box<Any>
138     }
139
140     fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
141
142     fn can_block(&self) -> bool { true }
143
144     // This function gets a little interesting. There are a few safety and
145     // ownership violations going on here, but this is all done in the name of
146     // shared state. Additionally, all of the violations are protected with a
147     // mutex, so in theory there are no races.
148     //
149     // The first thing we need to do is to get a pointer to the task's internal
150     // mutex. This address will not be changing (because the task is allocated
151     // on the heap). We must have this handle separately because the task will
152     // have its ownership transferred to the given closure. We're guaranteed,
153     // however, that this memory will remain valid because *this* is the current
154     // task's execution thread.
155     //
156     // The next weird part is where ownership of the task actually goes. We
157     // relinquish it to the `f` blocking function, but upon returning this
158     // function needs to replace the task back in TLS. There is no communication
159     // from the wakeup thread back to this thread about the task pointer, and
160     // there's really no need to. In order to get around this, we cast the task
161     // to a `uint` which is then used at the end of this function to cast back
162     // to a `Box<Task>` object. Naturally, this looks like it violates
163     // ownership semantics in that there may be two `Box<Task>` objects.
164     //
165     // The fun part is that the wakeup half of this implementation knows to
166     // "forget" the task on the other end. This means that the awakening half of
167     // things silently relinquishes ownership back to this thread, but not in a
168     // way that the compiler can understand. The task's memory is always valid
169     // for both tasks because these operations are all done inside of a mutex.
170     //
171     // You'll also find that if blocking fails (the `f` function hands the
172     // BlockedTask back to us), we will `mem::forget` the handles. The
173     // reasoning for this is the same logic as above in that the task silently
174     // transfers ownership via the `uint`, not through normal compiler
175     // semantics.
176     //
177     // On a mildly unrelated note, it should also be pointed out that OS
178     // condition variables are susceptible to spurious wakeups, which we need to
179     // be ready for. In order to accomodate for this fact, we have an extra
180     // `awoken` field which indicates whether we were actually woken up via some
181     // invocation of `reawaken`. This flag is only ever accessed inside the
182     // lock, so there's no need to make it atomic.
183     fn deschedule(mut ~self, times: uint, mut cur_task: Box<Task>,
184                   f: |BlockedTask| -> Result<(), BlockedTask>) {
185         let me = &mut *self as *mut Ops;
186         cur_task.put_runtime(self);
187
188         unsafe {
189             let cur_task_dupe = &*cur_task as *Task;
190             let task = BlockedTask::block(cur_task);
191
192             if times == 1 {
193                 let guard = (*me).lock.lock();
194                 (*me).awoken = false;
195                 match f(task) {
196                     Ok(()) => {
197                         while !(*me).awoken {
198                             guard.wait();
199                         }
200                     }
201                     Err(task) => { mem::forget(task.wake()); }
202                 }
203             } else {
204                 let iter = task.make_selectable(times);
205                 let guard = (*me).lock.lock();
206                 (*me).awoken = false;
207
208                 // Apply the given closure to all of the "selectable tasks",
209                 // bailing on the first one that produces an error. Note that
210                 // care must be taken such that when an error is occurred, we
211                 // may not own the task, so we may still have to wait for the
212                 // task to become available. In other words, if task.wake()
213                 // returns `None`, then someone else has ownership and we must
214                 // wait for their signal.
215                 match iter.map(f).filter_map(|a| a.err()).next() {
216                     None => {}
217                     Some(task) => {
218                         match task.wake() {
219                             Some(task) => {
220                                 mem::forget(task);
221                                 (*me).awoken = true;
222                             }
223                             None => {}
224                         }
225                     }
226                 }
227                 while !(*me).awoken {
228                     guard.wait();
229                 }
230             }
231             // re-acquire ownership of the task
232             cur_task = mem::transmute(cur_task_dupe);
233         }
234
235         // put the task back in TLS, and everything is as it once was.
236         Local::put(cur_task);
237     }
238
239     // See the comments on `deschedule` for why the task is forgotten here, and
240     // why it's valid to do so.
241     fn reawaken(mut ~self, mut to_wake: Box<Task>) {
242         unsafe {
243             let me = &mut *self as *mut Ops;
244             to_wake.put_runtime(self);
245             mem::forget(to_wake);
246             let guard = (*me).lock.lock();
247             (*me).awoken = true;
248             guard.signal();
249         }
250     }
251
252     fn spawn_sibling(~self,
253                      mut cur_task: Box<Task>,
254                      opts: TaskOpts,
255                      f: proc():Send) {
256         cur_task.put_runtime(self);
257         Local::put(cur_task);
258
259         task::spawn_opts(opts, f);
260     }
261
262     fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
263         Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
264     }
265 }
266
267 #[cfg(test)]
268 mod tests {
269     use std::rt::local::Local;
270     use std::rt::task::Task;
271     use std::task;
272     use std::task::TaskOpts;
273     use super::{spawn, spawn_opts, Ops};
274
275     #[test]
276     fn smoke() {
277         let (tx, rx) = channel();
278         spawn(proc() {
279             tx.send(());
280         });
281         rx.recv();
282     }
283
284     #[test]
285     fn smoke_fail() {
286         let (tx, rx) = channel::<()>();
287         spawn(proc() {
288             let _tx = tx;
289             fail!()
290         });
291         assert_eq!(rx.recv_opt(), Err(()));
292     }
293
294     #[test]
295     fn smoke_opts() {
296         let mut opts = TaskOpts::new();
297         opts.name = Some("test".into_maybe_owned());
298         opts.stack_size = Some(20 * 4096);
299         let (tx, rx) = channel();
300         opts.notify_chan = Some(tx);
301         spawn_opts(opts, proc() {});
302         assert!(rx.recv().is_ok());
303     }
304
305     #[test]
306     fn smoke_opts_fail() {
307         let mut opts = TaskOpts::new();
308         let (tx, rx) = channel();
309         opts.notify_chan = Some(tx);
310         spawn_opts(opts, proc() { fail!() });
311         assert!(rx.recv().is_err());
312     }
313
314     #[test]
315     fn yield_test() {
316         let (tx, rx) = channel();
317         spawn(proc() {
318             for _ in range(0, 10) { task::deschedule(); }
319             tx.send(());
320         });
321         rx.recv();
322     }
323
324     #[test]
325     fn spawn_children() {
326         let (tx1, rx) = channel();
327         spawn(proc() {
328             let (tx2, rx) = channel();
329             spawn(proc() {
330                 let (tx3, rx) = channel();
331                 spawn(proc() {
332                     tx3.send(());
333                 });
334                 rx.recv();
335                 tx2.send(());
336             });
337             rx.recv();
338             tx1.send(());
339         });
340         rx.recv();
341     }
342
343     #[test]
344     fn spawn_inherits() {
345         let (tx, rx) = channel();
346         spawn(proc() {
347             spawn(proc() {
348                 let mut task: Box<Task> = Local::take();
349                 match task.maybe_take_runtime::<Ops>() {
350                     Some(ops) => {
351                         task.put_runtime(ops);
352                     }
353                     None => fail!(),
354                 }
355                 Local::put(task);
356                 tx.send(());
357             });
358         });
359         rx.recv();
360     }
361 }