1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Tasks implemented on top of OS threads
13 //! This module contains the implementation of the 1:1 threading module required
14 //! by rust tasks. This implements the necessary API traits laid out by std::rt
15 //! in order to spawn new tasks and deschedule the current task.
19 use std::rt::bookkeeping;
20 use std::rt::local::Local;
21 use std::rt::mutex::NativeMutex;
24 use std::rt::task::{Task, BlockedTask, TaskOpts};
25 use std::rt::thread::Thread;
30 use std::task::{TaskBuilder, Spawner};
32 /// Creates a new Task which is ready to execute as a 1:1 task.
33 pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
34 let mut task = box Task::new();
36 ops.stack_bounds = stack_bounds;
37 task.put_runtime(ops);
41 fn ops() -> Box<Ops> {
43 lock: unsafe { NativeMutex::new() },
45 io: io::IoFactory::new(),
46 // these *should* get overwritten
51 /// Spawns a function with the default configuration
52 #[deprecated = "use the native method of NativeTaskBuilder instead"]
53 pub fn spawn(f: proc():Send) {
54 spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
57 /// Spawns a new task given the configuration options and a procedure to run
59 #[deprecated = "use the native method of NativeTaskBuilder instead"]
60 pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
61 let TaskOpts { name, stack_size, on_exit } = opts;
63 let mut task = box Task::new();
65 task.death.on_exit = on_exit;
67 let stack = stack_size.unwrap_or(rt::min_stack());
71 // Note that this increment must happen *before* the spawn in order to
72 // guarantee that if this task exits it will always end up waiting for the
73 // spawned task to exit.
74 bookkeeping::increment();
76 // Spawning a new OS thread guarantees that __morestack will never get
77 // triggered, but we must manually set up the actual stack bounds once this
78 // function starts executing. This raises the lower limit by a bit because
79 // by the time that this function is executing we've already consumed at
80 // least a little bit of stack (we don't know the exact byte address at
81 // which our stack started).
82 Thread::spawn_stack(stack, proc() {
83 let something_around_the_top_of_the_stack = 1;
84 let addr = &something_around_the_top_of_the_stack as *int;
85 let my_stack = addr as uint;
87 stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
90 ops.stack_bounds = (my_stack - stack + 1024, my_stack);
94 task.put_runtime(ops);
95 let t = task.run(|| { f.take_unwrap()() });
97 bookkeeping::decrement();
101 /// A spawner for native tasks
102 pub struct NativeSpawner;
104 impl Spawner for NativeSpawner {
105 fn spawn(self, opts: TaskOpts, f: proc():Send) {
110 /// An extension trait adding a `native` configuration method to `TaskBuilder`.
111 pub trait NativeTaskBuilder {
112 fn native(self) -> TaskBuilder<NativeSpawner>;
115 impl<S: Spawner> NativeTaskBuilder for TaskBuilder<S> {
116 fn native(self) -> TaskBuilder<NativeSpawner> {
117 self.spawner(NativeSpawner)
121 // This structure is the glue between channels and the 1:1 scheduling mode. This
122 // structure is allocated once per task.
124 lock: NativeMutex, // native synchronization
125 awoken: bool, // used to prevent spurious wakeups
126 io: io::IoFactory, // local I/O factory
128 // This field holds the known bounds of the stack in (lo, hi) form. Not all
129 // native tasks necessarily know their precise bounds, hence this is
131 stack_bounds: (uint, uint),
134 impl rt::Runtime for Ops {
135 fn yield_now(~self, mut cur_task: Box<Task>) {
136 // put the task back in TLS and then invoke the OS thread yield
137 cur_task.put_runtime(self);
138 Local::put(cur_task);
142 fn maybe_yield(~self, mut cur_task: Box<Task>) {
143 // just put the task back in TLS, on OS threads we never need to
144 // opportunistically yield b/c the OS will do that for us (preemption)
145 cur_task.put_runtime(self);
146 Local::put(cur_task);
149 fn wrap(~self) -> Box<Any> {
153 fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
155 fn can_block(&self) -> bool { true }
157 // This function gets a little interesting. There are a few safety and
158 // ownership violations going on here, but this is all done in the name of
159 // shared state. Additionally, all of the violations are protected with a
160 // mutex, so in theory there are no races.
162 // The first thing we need to do is to get a pointer to the task's internal
163 // mutex. This address will not be changing (because the task is allocated
164 // on the heap). We must have this handle separately because the task will
165 // have its ownership transferred to the given closure. We're guaranteed,
166 // however, that this memory will remain valid because *this* is the current
167 // task's execution thread.
169 // The next weird part is where ownership of the task actually goes. We
170 // relinquish it to the `f` blocking function, but upon returning this
171 // function needs to replace the task back in TLS. There is no communication
172 // from the wakeup thread back to this thread about the task pointer, and
173 // there's really no need to. In order to get around this, we cast the task
174 // to a `uint` which is then used at the end of this function to cast back
175 // to a `Box<Task>` object. Naturally, this looks like it violates
176 // ownership semantics in that there may be two `Box<Task>` objects.
178 // The fun part is that the wakeup half of this implementation knows to
179 // "forget" the task on the other end. This means that the awakening half of
180 // things silently relinquishes ownership back to this thread, but not in a
181 // way that the compiler can understand. The task's memory is always valid
182 // for both tasks because these operations are all done inside of a mutex.
184 // You'll also find that if blocking fails (the `f` function hands the
185 // BlockedTask back to us), we will `mem::forget` the handles. The
186 // reasoning for this is the same logic as above in that the task silently
187 // transfers ownership via the `uint`, not through normal compiler
190 // On a mildly unrelated note, it should also be pointed out that OS
191 // condition variables are susceptible to spurious wakeups, which we need to
192 // be ready for. In order to accommodate for this fact, we have an extra
193 // `awoken` field which indicates whether we were actually woken up via some
194 // invocation of `reawaken`. This flag is only ever accessed inside the
195 // lock, so there's no need to make it atomic.
196 fn deschedule(mut ~self, times: uint, mut cur_task: Box<Task>,
197 f: |BlockedTask| -> Result<(), BlockedTask>) {
198 let me = &mut *self as *mut Ops;
199 cur_task.put_runtime(self);
202 let cur_task_dupe = &*cur_task as *Task;
203 let task = BlockedTask::block(cur_task);
206 let guard = (*me).lock.lock();
207 (*me).awoken = false;
210 while !(*me).awoken {
214 Err(task) => { mem::forget(task.wake()); }
217 let iter = task.make_selectable(times);
218 let guard = (*me).lock.lock();
219 (*me).awoken = false;
221 // Apply the given closure to all of the "selectable tasks",
222 // bailing on the first one that produces an error. Note that
223 // care must be taken such that when an error is occurred, we
224 // may not own the task, so we may still have to wait for the
225 // task to become available. In other words, if task.wake()
226 // returns `None`, then someone else has ownership and we must
227 // wait for their signal.
228 match iter.map(f).filter_map(|a| a.err()).next() {
240 while !(*me).awoken {
244 // re-acquire ownership of the task
245 cur_task = mem::transmute(cur_task_dupe);
248 // put the task back in TLS, and everything is as it once was.
249 Local::put(cur_task);
252 // See the comments on `deschedule` for why the task is forgotten here, and
253 // why it's valid to do so.
254 fn reawaken(mut ~self, mut to_wake: Box<Task>) {
256 let me = &mut *self as *mut Ops;
257 to_wake.put_runtime(self);
258 mem::forget(to_wake);
259 let guard = (*me).lock.lock();
265 fn spawn_sibling(~self,
266 mut cur_task: Box<Task>,
269 cur_task.put_runtime(self);
270 Local::put(cur_task);
272 task::spawn_opts(opts, f);
275 fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
276 Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
282 use std::rt::local::Local;
283 use std::rt::task::{Task, TaskOpts};
285 use std::task::TaskBuilder;
286 use super::{spawn, spawn_opts, Ops, NativeTaskBuilder};
290 let (tx, rx) = channel();
299 let (tx, rx) = channel::<()>();
304 assert_eq!(rx.recv_opt(), Err(()));
309 let mut opts = TaskOpts::new();
310 opts.name = Some("test".into_maybe_owned());
311 opts.stack_size = Some(20 * 4096);
312 let (tx, rx) = channel();
313 opts.on_exit = Some(proc(r) tx.send(r));
314 spawn_opts(opts, proc() {});
315 assert!(rx.recv().is_ok());
319 fn smoke_opts_fail() {
320 let mut opts = TaskOpts::new();
321 let (tx, rx) = channel();
322 opts.on_exit = Some(proc(r) tx.send(r));
323 spawn_opts(opts, proc() { fail!() });
324 assert!(rx.recv().is_err());
329 let (tx, rx) = channel();
331 for _ in range(0, 10) { task::deschedule(); }
338 fn spawn_children() {
339 let (tx1, rx) = channel();
341 let (tx2, rx) = channel();
343 let (tx3, rx) = channel();
357 fn spawn_inherits() {
358 let (tx, rx) = channel();
361 let mut task: Box<Task> = Local::take();
362 match task.maybe_take_runtime::<Ops>() {
364 task.put_runtime(ops);
376 fn test_native_builder() {
377 let res = TaskBuilder::new().native().try(proc() {
378 "Success!".to_string()
380 assert_eq!(res.ok().unwrap(), "Success!".to_string());