]> git.lizzy.rs Git - rust.git/blob - src/libgreen/lib.rs
auto merge of #13049 : alexcrichton/rust/io-fill, r=huonw
[rust.git] / src / libgreen / lib.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The "green scheduling" library
12 //!
13 //! This library provides M:N threading for rust programs. Internally this has
14 //! the implementation of a green scheduler along with context switching and a
15 //! stack-allocation strategy. This can be optionally linked in to rust
16 //! programs in order to provide M:N functionality inside of 1:1 programs.
17 //!
18 //! # Architecture
19 //!
20 //! An M:N scheduling library implies that there are N OS thread upon which M
21 //! "green threads" are multiplexed. In other words, a set of green threads are
22 //! all run inside a pool of OS threads.
23 //!
24 //! With this design, you can achieve _concurrency_ by spawning many green
25 //! threads, and you can achieve _parallelism_ by running the green threads
26 //! simultaneously on multiple OS threads. Each OS thread is a candidate for
27 //! being scheduled on a different core (the source of parallelism), and then
28 //! all of the green threads cooperatively schedule amongst one another (the
29 //! source of concurrency).
30 //!
31 //! ## Schedulers
32 //!
33 //! In order to coordinate among green threads, each OS thread is primarily
34 //! running something which we call a Scheduler. Whenever a reference to a
35 //! Scheduler is made, it is synonymous to referencing one OS thread. Each
36 //! scheduler is bound to one and exactly one OS thread, and the thread that it
37 //! is bound to never changes.
38 //!
39 //! Each scheduler is connected to a pool of other schedulers (a `SchedPool`)
40 //! which is the thread pool term from above. A pool of schedulers all share the
41 //! work that they create. Furthermore, whenever a green thread is created (also
42 //! synonymously referred to as a green task), it is associated with a
43 //! `SchedPool` forevermore. A green thread cannot leave its scheduler pool.
44 //!
45 //! Schedulers can have at most one green thread running on them at a time. When
46 //! a scheduler is asleep on its event loop, there are no green tasks running on
47 //! the OS thread or the scheduler. The term "context switch" is used for when
48 //! the running green thread is swapped out, but this simply changes the one
49 //! green thread which is running on the scheduler.
50 //!
51 //! ## Green Threads
52 //!
53 //! A green thread can largely be summarized by a stack and a register context.
54 //! Whenever a green thread is spawned, it allocates a stack, and then prepares
55 //! a register context for execution. The green task may be executed across
56 //! multiple OS threads, but it will always use the same stack and it will carry
57 //! its register context across OS threads.
58 //!
59 //! Each green thread is cooperatively scheduled with other green threads.
60 //! Primarily, this means that there is no pre-emption of a green thread. The
61 //! major consequence of this design is that a green thread stuck in an infinite
62 //! loop will prevent all other green threads from running on that particular
63 //! scheduler.
64 //!
65 //! Scheduling events for green threads occur on communication and I/O
66 //! boundaries. For example, if a green task blocks waiting for a message on a
67 //! channel some other green thread can now run on the scheduler. This also has
68 //! the consequence that until a green thread performs any form of scheduling
69 //! event, it will be running on the same OS thread (unconditionally).
70 //!
71 //! ## Work Stealing
72 //!
73 //! With a pool of schedulers, a new green task has a number of options when
74 //! deciding where to run initially. The current implementation uses a concept
75 //! called work stealing in order to spread out work among schedulers.
76 //!
77 //! In a work-stealing model, each scheduler maintains a local queue of tasks to
78 //! run, and this queue is stolen from by other schedulers. Implementation-wise,
79 //! work stealing has some hairy parts, but from a user-perspective, work
80 //! stealing simply implies what with M green threads and N schedulers where
81 //! M > N it is very likely that all schedulers will be busy executing work.
82 //!
83 //! # Considerations when using libgreen
84 //!
85 //! An M:N runtime has both pros and cons, and there is no one answer as to
86 //! whether M:N or 1:1 is appropriate to use. As always, there are many
87 //! advantages and disadvantages between the two. Regardless of the workload,
88 //! however, there are some aspects of using green thread which you should be
89 //! aware of:
90 //!
91 //! * The largest concern when using libgreen is interoperating with native
92 //!   code. Care should be taken when calling native code that will block the OS
93 //!   thread as it will prevent further green tasks from being scheduled on the
94 //!   OS thread.
95 //!
96 //! * Native code using thread-local-storage should be approached
97 //!   with care. Green threads may migrate among OS threads at any time, so
98 //!   native libraries using thread-local state may not always work.
99 //!
100 //! * Native synchronization primitives (e.g. pthread mutexes) will also not
101 //!   work for green threads. The reason for this is because native primitives
102 //!   often operate on a _os thread_ granularity whereas green threads are
103 //!   operating on a more granular unit of work.
104 //!
105 //! * A green threading runtime is not fork-safe. If the process forks(), it
106 //!   cannot expect to make reasonable progress by continuing to use green
107 //!   threads.
108 //!
109 //! Note that these concerns do not mean that operating with native code is a
110 //! lost cause. These are simply just concerns which should be considered when
111 //! invoking native code.
112 //!
113 //! # Starting with libgreen
114 //!
115 //! ```rust
116 //! extern crate green;
117 //!
118 //! #[start]
119 //! fn start(argc: int, argv: **u8) -> int { green::start(argc, argv, main) }
120 //!
121 //! fn main() {
122 //!     // this code is running in a pool of schedulers
123 //! }
124 //! ```
125 //!
126 //! # Using a scheduler pool
127 //!
128 //! ```rust
129 //! use std::task::TaskOpts;
130 //! use green::{SchedPool, PoolConfig};
131 //! use green::sched::{PinnedTask, TaskFromFriend};
132 //!
133 //! let config = PoolConfig::new();
134 //! let mut pool = SchedPool::new(config);
135 //!
136 //! // Spawn tasks into the pool of schedulers
137 //! pool.spawn(TaskOpts::new(), proc() {
138 //!     // this code is running inside the pool of schedulers
139 //!
140 //!     spawn(proc() {
141 //!         // this code is also running inside the same scheduler pool
142 //!     });
143 //! });
144 //!
145 //! // Dynamically add a new scheduler to the scheduler pool. This adds another
146 //! // OS thread that green threads can be multiplexed on to.
147 //! let mut handle = pool.spawn_sched();
148 //!
149 //! // Pin a task to the spawned scheduler
150 //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
151 //! handle.send(PinnedTask(task));
152 //!
153 //! // Schedule a task on this new scheduler
154 //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
155 //! handle.send(TaskFromFriend(task));
156 //!
157 //! // Handles keep schedulers alive, so be sure to drop all handles before
158 //! // destroying the sched pool
159 //! drop(handle);
160 //!
161 //! // Required to shut down this scheduler pool.
162 //! // The task will fail if `shutdown` is not called.
163 //! pool.shutdown();
164 //! ```
165
166 #[crate_id = "green#0.10-pre"];
167 #[license = "MIT/ASL2"];
168 #[crate_type = "rlib"];
169 #[crate_type = "dylib"];
170 #[doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png",
171       html_favicon_url = "http://www.rust-lang.org/favicon.ico",
172       html_root_url = "http://static.rust-lang.org/doc/master")];
173
174 // NB this does *not* include globs, please keep it that way.
175 #[feature(macro_rules, phase)];
176 #[allow(visible_private_types)];
177
178 #[cfg(test)] #[phase(syntax, link)] extern crate log;
179 extern crate rand;
180
181 use std::mem::replace;
182 use std::os;
183 use std::rt::crate_map;
184 use std::rt::rtio;
185 use std::rt::thread::Thread;
186 use std::rt;
187 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
188 use std::sync::deque;
189 use std::task::TaskOpts;
190 use std::slice;
191 use std::sync::arc::UnsafeArc;
192
193 use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
194 use sleeper_list::SleeperList;
195 use stack::StackPool;
196 use task::GreenTask;
197
198 mod macros;
199 mod simple;
200 mod message_queue;
201
202 pub mod basic;
203 pub mod context;
204 pub mod coroutine;
205 pub mod sched;
206 pub mod sleeper_list;
207 pub mod stack;
208 pub mod task;
209
210 #[lang = "start"]
211 #[cfg(not(test), stage0)]
212 pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
213     use std::cast;
214     start(argc, argv, proc() {
215         let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
216         main();
217     })
218 }
219
220 /// Set up a default runtime configuration, given compiler-supplied arguments.
221 ///
222 /// This function will block until the entire pool of M:N schedulers have
223 /// exited. This function also requires a local task to be available.
224 ///
225 /// # Arguments
226 ///
227 /// * `argc` & `argv` - The argument vector. On Unix this information is used
228 ///   by os::args.
229 /// * `main` - The initial procedure to run inside of the M:N scheduling pool.
230 ///            Once this procedure exits, the scheduling pool will begin to shut
231 ///            down. The entire pool (and this function) will only return once
232 ///            all child tasks have finished executing.
233 ///
234 /// # Return value
235 ///
236 /// The return value is used as the process return code. 0 on success, 101 on
237 /// error.
238 pub fn start(argc: int, argv: **u8, main: proc()) -> int {
239     rt::init(argc, argv);
240     let mut main = Some(main);
241     let mut ret = None;
242     simple::task().run(|| {
243         ret = Some(run(main.take_unwrap()));
244     });
245     // unsafe is ok b/c we're sure that the runtime is gone
246     unsafe { rt::cleanup() }
247     ret.unwrap()
248 }
249
250 /// Execute the main function in a pool of M:N schedulers.
251 ///
252 /// Configures the runtime according to the environment, by default using a task
253 /// scheduler with the same number of threads as cores.  Returns a process exit
254 /// code.
255 ///
256 /// This function will not return until all schedulers in the associated pool
257 /// have returned.
258 pub fn run(main: proc()) -> int {
259     // Create a scheduler pool and spawn the main task into this pool. We will
260     // get notified over a channel when the main task exits.
261     let mut pool = SchedPool::new(PoolConfig::new());
262     let (tx, rx) = channel();
263     let mut opts = TaskOpts::new();
264     opts.notify_chan = Some(tx);
265     opts.name = Some("<main>".into_maybe_owned());
266     pool.spawn(opts, main);
267
268     // Wait for the main task to return, and set the process error code
269     // appropriately.
270     if rx.recv().is_err() {
271         os::set_exit_status(rt::DEFAULT_ERROR_CODE);
272     }
273
274     // Now that we're sure all tasks are dead, shut down the pool of schedulers,
275     // waiting for them all to return.
276     pool.shutdown();
277     os::get_exit_status()
278 }
279
280 /// Configuration of how an M:N pool of schedulers is spawned.
281 pub struct PoolConfig {
282     /// The number of schedulers (OS threads) to spawn into this M:N pool.
283     threads: uint,
284     /// A factory function used to create new event loops. If this is not
285     /// specified then the default event loop factory is used.
286     event_loop_factory: Option<fn() -> ~rtio::EventLoop>,
287 }
288
289 impl PoolConfig {
290     /// Returns the default configuration, as determined the environment
291     /// variables of this process.
292     pub fn new() -> PoolConfig {
293         PoolConfig {
294             threads: rt::default_sched_threads(),
295             event_loop_factory: None,
296         }
297     }
298 }
299
300 /// A structure representing a handle to a pool of schedulers. This handle is
301 /// used to keep the pool alive and also reap the status from the pool.
302 pub struct SchedPool {
303     priv id: uint,
304     priv threads: ~[Thread<()>],
305     priv handles: ~[SchedHandle],
306     priv stealers: ~[deque::Stealer<~task::GreenTask>],
307     priv next_friend: uint,
308     priv stack_pool: StackPool,
309     priv deque_pool: deque::BufferPool<~task::GreenTask>,
310     priv sleepers: SleeperList,
311     priv factory: fn() -> ~rtio::EventLoop,
312     priv task_state: TaskState,
313     priv tasks_done: Receiver<()>,
314 }
315
316 /// This is an internal state shared among a pool of schedulers. This is used to
317 /// keep track of how many tasks are currently running in the pool and then
318 /// sending on a channel once the entire pool has been drained of all tasks.
319 #[deriving(Clone)]
320 struct TaskState {
321     cnt: UnsafeArc<AtomicUint>,
322     done: Sender<()>,
323 }
324
325 impl SchedPool {
326     /// Execute the main function in a pool of M:N schedulers.
327     ///
328     /// This will configure the pool according to the `config` parameter, and
329     /// initially run `main` inside the pool of schedulers.
330     pub fn new(config: PoolConfig) -> SchedPool {
331         static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
332
333         let PoolConfig {
334             threads: nscheds,
335             event_loop_factory: factory
336         } = config;
337         let factory = factory.unwrap_or(default_event_loop_factory());
338         assert!(nscheds > 0);
339
340         // The pool of schedulers that will be returned from this function
341         let (p, state) = TaskState::new();
342         let mut pool = SchedPool {
343             threads: ~[],
344             handles: ~[],
345             stealers: ~[],
346             id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
347             sleepers: SleeperList::new(),
348             stack_pool: StackPool::new(),
349             deque_pool: deque::BufferPool::new(),
350             next_friend: 0,
351             factory: factory,
352             task_state: state,
353             tasks_done: p,
354         };
355
356         // Create a work queue for each scheduler, ntimes. Create an extra
357         // for the main thread if that flag is set. We won't steal from it.
358         let arr = slice::from_fn(nscheds, |_| pool.deque_pool.deque());
359         let (workers, stealers) = slice::unzip(arr.move_iter());
360         pool.stealers = stealers;
361
362         // Now that we've got all our work queues, create one scheduler per
363         // queue, spawn the scheduler into a thread, and be sure to keep a
364         // handle to the scheduler and the thread to keep them alive.
365         for worker in workers.move_iter() {
366             rtdebug!("inserting a regular scheduler");
367
368             let mut sched = ~Scheduler::new(pool.id,
369                                             (pool.factory)(),
370                                             worker,
371                                             pool.stealers.clone(),
372                                             pool.sleepers.clone(),
373                                             pool.task_state.clone());
374             pool.handles.push(sched.make_handle());
375             let sched = sched;
376             pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
377         }
378
379         return pool;
380     }
381
382     /// Creates a new task configured to run inside of this pool of schedulers.
383     /// This is useful to create a task which can then be sent to a specific
384     /// scheduler created by `spawn_sched` (and possibly pin it to that
385     /// scheduler).
386     pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
387         GreenTask::configure(&mut self.stack_pool, opts, f)
388     }
389
390     /// Spawns a new task into this pool of schedulers, using the specified
391     /// options to configure the new task which is spawned.
392     ///
393     /// New tasks are spawned in a round-robin fashion to the schedulers in this
394     /// pool, but tasks can certainly migrate among schedulers once they're in
395     /// the pool.
396     pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
397         let task = self.task(opts, f);
398
399         // Figure out someone to send this task to
400         let idx = self.next_friend;
401         self.next_friend += 1;
402         if self.next_friend >= self.handles.len() {
403             self.next_friend = 0;
404         }
405
406         // Jettison the task away!
407         self.handles[idx].send(TaskFromFriend(task));
408     }
409
410     /// Spawns a new scheduler into this M:N pool. A handle is returned to the
411     /// scheduler for use. The scheduler will not exit as long as this handle is
412     /// active.
413     ///
414     /// The scheduler spawned will participate in work stealing with all of the
415     /// other schedulers currently in the scheduler pool.
416     pub fn spawn_sched(&mut self) -> SchedHandle {
417         let (worker, stealer) = self.deque_pool.deque();
418         self.stealers.push(stealer.clone());
419
420         // Tell all existing schedulers about this new scheduler so they can all
421         // steal work from it
422         for handle in self.handles.mut_iter() {
423             handle.send(NewNeighbor(stealer.clone()));
424         }
425
426         // Create the new scheduler, using the same sleeper list as all the
427         // other schedulers as well as having a stealer handle to all other
428         // schedulers.
429         let mut sched = ~Scheduler::new(self.id,
430                                         (self.factory)(),
431                                         worker,
432                                         self.stealers.clone(),
433                                         self.sleepers.clone(),
434                                         self.task_state.clone());
435         let ret = sched.make_handle();
436         self.handles.push(sched.make_handle());
437         let sched = sched;
438         self.threads.push(Thread::start(proc() { sched.bootstrap() }));
439
440         return ret;
441     }
442
443     /// Consumes the pool of schedulers, waiting for all tasks to exit and all
444     /// schedulers to shut down.
445     ///
446     /// This function is required to be called in order to drop a pool of
447     /// schedulers, it is considered an error to drop a pool without calling
448     /// this method.
449     ///
450     /// This only waits for all tasks in *this pool* of schedulers to exit, any
451     /// native tasks or extern pools will not be waited on
452     pub fn shutdown(mut self) {
453         self.stealers = ~[];
454
455         // Wait for everyone to exit. We may have reached a 0-task count
456         // multiple times in the past, meaning there could be several buffered
457         // messages on the `tasks_done` port. We're guaranteed that after *some*
458         // message the current task count will be 0, so we just receive in a
459         // loop until everything is totally dead.
460         while self.task_state.active() {
461             self.tasks_done.recv();
462         }
463
464         // Now that everyone's gone, tell everything to shut down.
465         for mut handle in replace(&mut self.handles, ~[]).move_iter() {
466             handle.send(Shutdown);
467         }
468         for thread in replace(&mut self.threads, ~[]).move_iter() {
469             thread.join();
470         }
471     }
472 }
473
474 impl TaskState {
475     fn new() -> (Receiver<()>, TaskState) {
476         let (tx, rx) = channel();
477         (rx, TaskState {
478             cnt: UnsafeArc::new(AtomicUint::new(0)),
479             done: tx,
480         })
481     }
482
483     fn increment(&mut self) {
484         unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
485     }
486
487     fn active(&self) -> bool {
488         unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
489     }
490
491     fn decrement(&mut self) {
492         let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
493         if prev == 1 {
494             self.done.send(());
495         }
496     }
497 }
498
499 impl Drop for SchedPool {
500     fn drop(&mut self) {
501         if self.threads.len() > 0 {
502             fail!("dropping a M:N scheduler pool that wasn't shut down");
503         }
504     }
505 }
506
507 fn default_event_loop_factory() -> fn() -> ~rtio::EventLoop {
508     match crate_map::get_crate_map() {
509         None => {}
510         Some(map) => {
511             match map.event_loop_factory {
512                 None => {}
513                 Some(factory) => return factory
514             }
515         }
516     }
517
518     // If the crate map didn't specify a factory to create an event loop, then
519     // instead just use a basic event loop missing all I/O services to at least
520     // get the scheduler running.
521     return basic::event_loop;
522 }