]> git.lizzy.rs Git - rust.git/blob - src/libnative/task.rs
auto merge of #15014 : brson/rust/all-crates-experimental, r=cmr
[rust.git] / src / libnative / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Tasks implemented on top of OS threads
12 //!
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
16
17 use std::any::Any;
18 use std::mem;
19 use std::rt::bookkeeping;
20 use std::rt::local::Local;
21 use std::rt::mutex::NativeMutex;
22 use std::rt::rtio;
23 use std::rt::stack;
24 use std::rt::task::{Task, BlockedTask, TaskOpts};
25 use std::rt::thread::Thread;
26 use std::rt;
27
28 use io;
29 use task;
30 use std::task::{TaskBuilder, Spawner};
31
32 /// Creates a new Task which is ready to execute as a 1:1 task.
33 pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
34     let mut task = box Task::new();
35     let mut ops = ops();
36     ops.stack_bounds = stack_bounds;
37     task.put_runtime(ops);
38     return task;
39 }
40
41 fn ops() -> Box<Ops> {
42     box Ops {
43         lock: unsafe { NativeMutex::new() },
44         awoken: false,
45         io: io::IoFactory::new(),
46         // these *should* get overwritten
47         stack_bounds: (0, 0),
48     }
49 }
50
51 /// Spawns a function with the default configuration
52 #[deprecated = "use the native method of NativeTaskBuilder instead"]
53 pub fn spawn(f: proc():Send) {
54     spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
55 }
56
57 /// Spawns a new task given the configuration options and a procedure to run
58 /// inside the task.
59 #[deprecated = "use the native method of NativeTaskBuilder instead"]
60 pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
61     let TaskOpts { name, stack_size, on_exit } = opts;
62
63     let mut task = box Task::new();
64     task.name = name;
65     task.death.on_exit = on_exit;
66
67     let stack = stack_size.unwrap_or(rt::min_stack());
68     let task = task;
69     let ops = ops();
70
71     // Note that this increment must happen *before* the spawn in order to
72     // guarantee that if this task exits it will always end up waiting for the
73     // spawned task to exit.
74     bookkeeping::increment();
75
76     // Spawning a new OS thread guarantees that __morestack will never get
77     // triggered, but we must manually set up the actual stack bounds once this
78     // function starts executing. This raises the lower limit by a bit because
79     // by the time that this function is executing we've already consumed at
80     // least a little bit of stack (we don't know the exact byte address at
81     // which our stack started).
82     Thread::spawn_stack(stack, proc() {
83         let something_around_the_top_of_the_stack = 1;
84         let addr = &something_around_the_top_of_the_stack as *int;
85         let my_stack = addr as uint;
86         unsafe {
87             stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
88         }
89         let mut ops = ops;
90         ops.stack_bounds = (my_stack - stack + 1024, my_stack);
91
92         let mut f = Some(f);
93         let mut task = task;
94         task.put_runtime(ops);
95         let t = task.run(|| { f.take_unwrap()() });
96         drop(t);
97         bookkeeping::decrement();
98     })
99 }
100
101 /// A spawner for native tasks
102 pub struct NativeSpawner;
103
104 impl Spawner for NativeSpawner {
105     fn spawn(self, opts: TaskOpts, f: proc():Send) {
106         spawn_opts(opts, f)
107     }
108 }
109
110 /// An extension trait adding a `native` configuration method to `TaskBuilder`.
111 pub trait NativeTaskBuilder {
112     fn native(self) -> TaskBuilder<NativeSpawner>;
113 }
114
115 impl<S: Spawner> NativeTaskBuilder for TaskBuilder<S> {
116     fn native(self) -> TaskBuilder<NativeSpawner> {
117         self.spawner(NativeSpawner)
118     }
119 }
120
121 // This structure is the glue between channels and the 1:1 scheduling mode. This
122 // structure is allocated once per task.
123 struct Ops {
124     lock: NativeMutex,       // native synchronization
125     awoken: bool,      // used to prevent spurious wakeups
126     io: io::IoFactory, // local I/O factory
127
128     // This field holds the known bounds of the stack in (lo, hi) form. Not all
129     // native tasks necessarily know their precise bounds, hence this is
130     // optional.
131     stack_bounds: (uint, uint),
132 }
133
134 impl rt::Runtime for Ops {
135     fn yield_now(~self, mut cur_task: Box<Task>) {
136         // put the task back in TLS and then invoke the OS thread yield
137         cur_task.put_runtime(self);
138         Local::put(cur_task);
139         Thread::yield_now();
140     }
141
142     fn maybe_yield(~self, mut cur_task: Box<Task>) {
143         // just put the task back in TLS, on OS threads we never need to
144         // opportunistically yield b/c the OS will do that for us (preemption)
145         cur_task.put_runtime(self);
146         Local::put(cur_task);
147     }
148
149     fn wrap(~self) -> Box<Any> {
150         self as Box<Any>
151     }
152
153     fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
154
155     fn can_block(&self) -> bool { true }
156
157     // This function gets a little interesting. There are a few safety and
158     // ownership violations going on here, but this is all done in the name of
159     // shared state. Additionally, all of the violations are protected with a
160     // mutex, so in theory there are no races.
161     //
162     // The first thing we need to do is to get a pointer to the task's internal
163     // mutex. This address will not be changing (because the task is allocated
164     // on the heap). We must have this handle separately because the task will
165     // have its ownership transferred to the given closure. We're guaranteed,
166     // however, that this memory will remain valid because *this* is the current
167     // task's execution thread.
168     //
169     // The next weird part is where ownership of the task actually goes. We
170     // relinquish it to the `f` blocking function, but upon returning this
171     // function needs to replace the task back in TLS. There is no communication
172     // from the wakeup thread back to this thread about the task pointer, and
173     // there's really no need to. In order to get around this, we cast the task
174     // to a `uint` which is then used at the end of this function to cast back
175     // to a `Box<Task>` object. Naturally, this looks like it violates
176     // ownership semantics in that there may be two `Box<Task>` objects.
177     //
178     // The fun part is that the wakeup half of this implementation knows to
179     // "forget" the task on the other end. This means that the awakening half of
180     // things silently relinquishes ownership back to this thread, but not in a
181     // way that the compiler can understand. The task's memory is always valid
182     // for both tasks because these operations are all done inside of a mutex.
183     //
184     // You'll also find that if blocking fails (the `f` function hands the
185     // BlockedTask back to us), we will `mem::forget` the handles. The
186     // reasoning for this is the same logic as above in that the task silently
187     // transfers ownership via the `uint`, not through normal compiler
188     // semantics.
189     //
190     // On a mildly unrelated note, it should also be pointed out that OS
191     // condition variables are susceptible to spurious wakeups, which we need to
192     // be ready for. In order to accommodate for this fact, we have an extra
193     // `awoken` field which indicates whether we were actually woken up via some
194     // invocation of `reawaken`. This flag is only ever accessed inside the
195     // lock, so there's no need to make it atomic.
196     fn deschedule(mut ~self, times: uint, mut cur_task: Box<Task>,
197                   f: |BlockedTask| -> Result<(), BlockedTask>) {
198         let me = &mut *self as *mut Ops;
199         cur_task.put_runtime(self);
200
201         unsafe {
202             let cur_task_dupe = &*cur_task as *Task;
203             let task = BlockedTask::block(cur_task);
204
205             if times == 1 {
206                 let guard = (*me).lock.lock();
207                 (*me).awoken = false;
208                 match f(task) {
209                     Ok(()) => {
210                         while !(*me).awoken {
211                             guard.wait();
212                         }
213                     }
214                     Err(task) => { mem::forget(task.wake()); }
215                 }
216             } else {
217                 let iter = task.make_selectable(times);
218                 let guard = (*me).lock.lock();
219                 (*me).awoken = false;
220
221                 // Apply the given closure to all of the "selectable tasks",
222                 // bailing on the first one that produces an error. Note that
223                 // care must be taken such that when an error is occurred, we
224                 // may not own the task, so we may still have to wait for the
225                 // task to become available. In other words, if task.wake()
226                 // returns `None`, then someone else has ownership and we must
227                 // wait for their signal.
228                 match iter.map(f).filter_map(|a| a.err()).next() {
229                     None => {}
230                     Some(task) => {
231                         match task.wake() {
232                             Some(task) => {
233                                 mem::forget(task);
234                                 (*me).awoken = true;
235                             }
236                             None => {}
237                         }
238                     }
239                 }
240                 while !(*me).awoken {
241                     guard.wait();
242                 }
243             }
244             // re-acquire ownership of the task
245             cur_task = mem::transmute(cur_task_dupe);
246         }
247
248         // put the task back in TLS, and everything is as it once was.
249         Local::put(cur_task);
250     }
251
252     // See the comments on `deschedule` for why the task is forgotten here, and
253     // why it's valid to do so.
254     fn reawaken(mut ~self, mut to_wake: Box<Task>) {
255         unsafe {
256             let me = &mut *self as *mut Ops;
257             to_wake.put_runtime(self);
258             mem::forget(to_wake);
259             let guard = (*me).lock.lock();
260             (*me).awoken = true;
261             guard.signal();
262         }
263     }
264
265     fn spawn_sibling(~self,
266                      mut cur_task: Box<Task>,
267                      opts: TaskOpts,
268                      f: proc():Send) {
269         cur_task.put_runtime(self);
270         Local::put(cur_task);
271
272         task::spawn_opts(opts, f);
273     }
274
275     fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
276         Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
277     }
278 }
279
280 #[cfg(test)]
281 mod tests {
282     use std::rt::local::Local;
283     use std::rt::task::{Task, TaskOpts};
284     use std::task;
285     use std::task::TaskBuilder;
286     use super::{spawn, spawn_opts, Ops, NativeTaskBuilder};
287
288     #[test]
289     fn smoke() {
290         let (tx, rx) = channel();
291         spawn(proc() {
292             tx.send(());
293         });
294         rx.recv();
295     }
296
297     #[test]
298     fn smoke_fail() {
299         let (tx, rx) = channel::<()>();
300         spawn(proc() {
301             let _tx = tx;
302             fail!()
303         });
304         assert_eq!(rx.recv_opt(), Err(()));
305     }
306
307     #[test]
308     fn smoke_opts() {
309         let mut opts = TaskOpts::new();
310         opts.name = Some("test".into_maybe_owned());
311         opts.stack_size = Some(20 * 4096);
312         let (tx, rx) = channel();
313         opts.on_exit = Some(proc(r) tx.send(r));
314         spawn_opts(opts, proc() {});
315         assert!(rx.recv().is_ok());
316     }
317
318     #[test]
319     fn smoke_opts_fail() {
320         let mut opts = TaskOpts::new();
321         let (tx, rx) = channel();
322         opts.on_exit = Some(proc(r) tx.send(r));
323         spawn_opts(opts, proc() { fail!() });
324         assert!(rx.recv().is_err());
325     }
326
327     #[test]
328     fn yield_test() {
329         let (tx, rx) = channel();
330         spawn(proc() {
331             for _ in range(0, 10) { task::deschedule(); }
332             tx.send(());
333         });
334         rx.recv();
335     }
336
337     #[test]
338     fn spawn_children() {
339         let (tx1, rx) = channel();
340         spawn(proc() {
341             let (tx2, rx) = channel();
342             spawn(proc() {
343                 let (tx3, rx) = channel();
344                 spawn(proc() {
345                     tx3.send(());
346                 });
347                 rx.recv();
348                 tx2.send(());
349             });
350             rx.recv();
351             tx1.send(());
352         });
353         rx.recv();
354     }
355
356     #[test]
357     fn spawn_inherits() {
358         let (tx, rx) = channel();
359         spawn(proc() {
360             spawn(proc() {
361                 let mut task: Box<Task> = Local::take();
362                 match task.maybe_take_runtime::<Ops>() {
363                     Some(ops) => {
364                         task.put_runtime(ops);
365                     }
366                     None => fail!(),
367                 }
368                 Local::put(task);
369                 tx.send(());
370             });
371         });
372         rx.recv();
373     }
374
375     #[test]
376     fn test_native_builder() {
377         let res = TaskBuilder::new().native().try(proc() {
378             "Success!".to_string()
379         });
380         assert_eq!(res.ok().unwrap(), "Success!".to_string());
381     }
382 }