1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Tasks implemented on top of OS threads
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
19 use std::rt::local::Local;
21 use std::rt::task::{Task, BlockedTask};
22 use std::rt::thread::Thread;
24 use std::task::TaskOpts;
25 use std::unstable::mutex::Mutex;
26 use std::unstable::stack;
32 /// Creates a new Task which is ready to execute as a 1:1 task.
33 pub fn new(stack_bounds: (uint, uint)) -> ~Task {
34 let mut task = ~Task::new();
36 ops.stack_bounds = stack_bounds;
37 task.put_runtime(ops as ~rt::Runtime);
43 lock: unsafe { Mutex::new() },
45 io: io::IoFactory::new(),
46 // these *should* get overwritten
51 /// Spawns a function with the default configuration
52 pub fn spawn(f: proc()) {
53 spawn_opts(TaskOpts::new(), f)
56 /// Spawns a new task given the configuration options and a procedure to run
58 pub fn spawn_opts(opts: TaskOpts, f: proc()) {
61 notify_chan, name, stack_size,
62 logger, stderr, stdout,
65 let mut task = ~Task::new();
72 let on_exit = proc(task_result) { chan.send(task_result) };
73 task.death.on_exit = Some(on_exit);
78 let stack = stack_size.unwrap_or(env::min_stack());
82 // Note that this increment must happen *before* the spawn in order to
83 // guarantee that if this task exits it will always end up waiting for the
84 // spawned task to exit.
85 bookeeping::increment();
87 // Spawning a new OS thread guarantees that __morestack will never get
88 // triggered, but we must manually set up the actual stack bounds once this
89 // function starts executing. This raises the lower limit by a bit because
90 // by the time that this function is executing we've already consumed at
91 // least a little bit of stack (we don't know the exact byte address at
92 // which our stack started).
93 Thread::spawn_stack(stack, proc() {
94 let something_around_the_top_of_the_stack = 1;
95 let addr = &something_around_the_top_of_the_stack as *int;
96 let my_stack = addr as uint;
98 stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
101 ops.stack_bounds = (my_stack - stack + 1024, my_stack);
105 task.put_runtime(ops as ~rt::Runtime);
106 task.run(|| { f.take_unwrap()() });
107 bookeeping::decrement();
111 // This structure is the glue between channels and the 1:1 scheduling mode. This
112 // structure is allocated once per task.
114 lock: Mutex, // native synchronization
115 awoken: bool, // used to prevent spurious wakeups
116 io: io::IoFactory, // local I/O factory
118 // This field holds the known bounds of the stack in (lo, hi) form. Not all
119 // native tasks necessarily know their precise bounds, hence this is
121 stack_bounds: (uint, uint),
124 impl rt::Runtime for Ops {
125 fn yield_now(~self, mut cur_task: ~Task) {
126 // put the task back in TLS and then invoke the OS thread yield
127 cur_task.put_runtime(self as ~rt::Runtime);
128 Local::put(cur_task);
132 fn maybe_yield(~self, mut cur_task: ~Task) {
133 // just put the task back in TLS, on OS threads we never need to
134 // opportunistically yield b/c the OS will do that for us (preemption)
135 cur_task.put_runtime(self as ~rt::Runtime);
136 Local::put(cur_task);
139 fn wrap(~self) -> ~Any {
143 fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
145 // This function gets a little interesting. There are a few safety and
146 // ownership violations going on here, but this is all done in the name of
147 // shared state. Additionally, all of the violations are protected with a
148 // mutex, so in theory there are no races.
150 // The first thing we need to do is to get a pointer to the task's internal
151 // mutex. This address will not be changing (because the task is allocated
152 // on the heap). We must have this handle separately because the task will
153 // have its ownership transferred to the given closure. We're guaranteed,
154 // however, that this memory will remain valid because *this* is the current
155 // task's execution thread.
157 // The next weird part is where ownership of the task actually goes. We
158 // relinquish it to the `f` blocking function, but upon returning this
159 // function needs to replace the task back in TLS. There is no communication
160 // from the wakeup thread back to this thread about the task pointer, and
161 // there's really no need to. In order to get around this, we cast the task
162 // to a `uint` which is then used at the end of this function to cast back
163 // to a `~Task` object. Naturally, this looks like it violates ownership
164 // semantics in that there may be two `~Task` objects.
166 // The fun part is that the wakeup half of this implementation knows to
167 // "forget" the task on the other end. This means that the awakening half of
168 // things silently relinquishes ownership back to this thread, but not in a
169 // way that the compiler can understand. The task's memory is always valid
170 // for both tasks because these operations are all done inside of a mutex.
172 // You'll also find that if blocking fails (the `f` function hands the
173 // BlockedTask back to us), we will `cast::forget` the handles. The
174 // reasoning for this is the same logic as above in that the task silently
175 // transfers ownership via the `uint`, not through normal compiler
178 // On a mildly unrelated note, it should also be pointed out that OS
179 // condition variables are susceptible to spurious wakeups, which we need to
180 // be ready for. In order to accomodate for this fact, we have an extra
181 // `awoken` field which indicates whether we were actually woken up via some
182 // invocation of `reawaken`. This flag is only ever accessed inside the
183 // lock, so there's no need to make it atomic.
184 fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
185 f: |BlockedTask| -> Result<(), BlockedTask>) {
186 let me = &mut *self as *mut Ops;
187 cur_task.put_runtime(self as ~rt::Runtime);
190 let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
191 let task = BlockedTask::block(cur_task);
195 (*me).awoken = false;
198 while !(*me).awoken {
202 Err(task) => { cast::forget(task.wake()); }
206 let mut iter = task.make_selectable(times);
208 (*me).awoken = false;
209 let success = iter.all(|task| {
213 cast::forget(task.wake());
218 while success && !(*me).awoken {
223 // re-acquire ownership of the task
224 cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
227 // put the task back in TLS, and everything is as it once was.
228 Local::put(cur_task);
231 // See the comments on `deschedule` for why the task is forgotten here, and
232 // why it's valid to do so.
233 fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
235 let me = &mut *self as *mut Ops;
236 to_wake.put_runtime(self as ~rt::Runtime);
237 cast::forget(to_wake);
245 fn spawn_sibling(~self, mut cur_task: ~Task, opts: TaskOpts, f: proc()) {
246 cur_task.put_runtime(self as ~rt::Runtime);
247 Local::put(cur_task);
249 task::spawn_opts(opts, f);
252 fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
253 Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
259 unsafe { self.lock.destroy() }
265 use std::rt::Runtime;
266 use std::rt::local::Local;
267 use std::rt::task::Task;
269 use std::task::TaskOpts;
270 use super::{spawn, spawn_opts, Ops};
274 let (p, c) = Chan::new();
283 let (p, c) = Chan::<()>::new();
288 assert_eq!(p.recv_opt(), None);
293 let mut opts = TaskOpts::new();
294 opts.name = Some(SendStrStatic("test"));
295 opts.stack_size = Some(20 * 4096);
296 let (p, c) = Chan::new();
297 opts.notify_chan = Some(c);
298 spawn_opts(opts, proc() {});
299 assert!(p.recv().is_ok());
303 fn smoke_opts_fail() {
304 let mut opts = TaskOpts::new();
305 let (p, c) = Chan::new();
306 opts.notify_chan = Some(c);
307 spawn_opts(opts, proc() { fail!() });
308 assert!(p.recv().is_err());
313 let (p, c) = Chan::new();
315 10.times(task::deschedule);
322 fn spawn_children() {
323 let (p, c) = Chan::new();
325 let (p, c2) = Chan::new();
327 let (p, c3) = Chan::new();
341 fn spawn_inherits() {
342 let (p, c) = Chan::new();
346 let mut task: ~Task = Local::take();
347 match task.maybe_take_runtime::<Ops>() {
349 task.put_runtime(ops as ~Runtime);