]> git.lizzy.rs Git - rust.git/blob - src/libgreen/lib.rs
auto merge of #15610 : brson/rust/0.12.0, r=alexcrichton
[rust.git] / src / libgreen / lib.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The "green scheduling" library
12 //!
13 //! This library provides M:N threading for rust programs. Internally this has
14 //! the implementation of a green scheduler along with context switching and a
15 //! stack-allocation strategy. This can be optionally linked in to rust
16 //! programs in order to provide M:N functionality inside of 1:1 programs.
17 //!
18 //! # Architecture
19 //!
20 //! An M:N scheduling library implies that there are N OS thread upon which M
21 //! "green threads" are multiplexed. In other words, a set of green threads are
22 //! all run inside a pool of OS threads.
23 //!
24 //! With this design, you can achieve _concurrency_ by spawning many green
25 //! threads, and you can achieve _parallelism_ by running the green threads
26 //! simultaneously on multiple OS threads. Each OS thread is a candidate for
27 //! being scheduled on a different core (the source of parallelism), and then
28 //! all of the green threads cooperatively schedule amongst one another (the
29 //! source of concurrency).
30 //!
31 //! ## Schedulers
32 //!
33 //! In order to coordinate among green threads, each OS thread is primarily
34 //! running something which we call a Scheduler. Whenever a reference to a
35 //! Scheduler is made, it is synonymous to referencing one OS thread. Each
36 //! scheduler is bound to one and exactly one OS thread, and the thread that it
37 //! is bound to never changes.
38 //!
39 //! Each scheduler is connected to a pool of other schedulers (a `SchedPool`)
40 //! which is the thread pool term from above. A pool of schedulers all share the
41 //! work that they create. Furthermore, whenever a green thread is created (also
42 //! synonymously referred to as a green task), it is associated with a
43 //! `SchedPool` forevermore. A green thread cannot leave its scheduler pool.
44 //!
45 //! Schedulers can have at most one green thread running on them at a time. When
46 //! a scheduler is asleep on its event loop, there are no green tasks running on
47 //! the OS thread or the scheduler. The term "context switch" is used for when
48 //! the running green thread is swapped out, but this simply changes the one
49 //! green thread which is running on the scheduler.
50 //!
51 //! ## Green Threads
52 //!
53 //! A green thread can largely be summarized by a stack and a register context.
54 //! Whenever a green thread is spawned, it allocates a stack, and then prepares
55 //! a register context for execution. The green task may be executed across
56 //! multiple OS threads, but it will always use the same stack and it will carry
57 //! its register context across OS threads.
58 //!
59 //! Each green thread is cooperatively scheduled with other green threads.
60 //! Primarily, this means that there is no pre-emption of a green thread. The
61 //! major consequence of this design is that a green thread stuck in an infinite
62 //! loop will prevent all other green threads from running on that particular
63 //! scheduler.
64 //!
65 //! Scheduling events for green threads occur on communication and I/O
66 //! boundaries. For example, if a green task blocks waiting for a message on a
67 //! channel some other green thread can now run on the scheduler. This also has
68 //! the consequence that until a green thread performs any form of scheduling
69 //! event, it will be running on the same OS thread (unconditionally).
70 //!
71 //! ## Work Stealing
72 //!
73 //! With a pool of schedulers, a new green task has a number of options when
74 //! deciding where to run initially. The current implementation uses a concept
75 //! called work stealing in order to spread out work among schedulers.
76 //!
77 //! In a work-stealing model, each scheduler maintains a local queue of tasks to
78 //! run, and this queue is stolen from by other schedulers. Implementation-wise,
79 //! work stealing has some hairy parts, but from a user-perspective, work
80 //! stealing simply implies what with M green threads and N schedulers where
81 //! M > N it is very likely that all schedulers will be busy executing work.
82 //!
83 //! # Considerations when using libgreen
84 //!
85 //! An M:N runtime has both pros and cons, and there is no one answer as to
86 //! whether M:N or 1:1 is appropriate to use. As always, there are many
87 //! advantages and disadvantages between the two. Regardless of the workload,
88 //! however, there are some aspects of using green thread which you should be
89 //! aware of:
90 //!
91 //! * The largest concern when using libgreen is interoperating with native
92 //!   code. Care should be taken when calling native code that will block the OS
93 //!   thread as it will prevent further green tasks from being scheduled on the
94 //!   OS thread.
95 //!
96 //! * Native code using thread-local-storage should be approached
97 //!   with care. Green threads may migrate among OS threads at any time, so
98 //!   native libraries using thread-local state may not always work.
99 //!
100 //! * Native synchronization primitives (e.g. pthread mutexes) will also not
101 //!   work for green threads. The reason for this is because native primitives
102 //!   often operate on a _os thread_ granularity whereas green threads are
103 //!   operating on a more granular unit of work.
104 //!
105 //! * A green threading runtime is not fork-safe. If the process forks(), it
106 //!   cannot expect to make reasonable progress by continuing to use green
107 //!   threads.
108 //!
109 //! Note that these concerns do not mean that operating with native code is a
110 //! lost cause. These are simply just concerns which should be considered when
111 //! invoking native code.
112 //!
113 //! # Starting with libgreen
114 //!
115 //! ```rust
116 //! extern crate green;
117 //!
118 //! #[start]
119 //! fn start(argc: int, argv: *const *const u8) -> int {
120 //!     green::start(argc, argv, green::basic::event_loop, main)
121 //! }
122 //!
123 //! fn main() {
124 //!     // this code is running in a pool of schedulers
125 //! }
126 //! ```
127 //!
128 //! > **Note**: This `main` function in this example does *not* have I/O
129 //! >           support. The basic event loop does not provide any support
130 //!
131 //! # Starting with I/O support in libgreen
132 //!
133 //! ```rust
134 //! extern crate green;
135 //! extern crate rustuv;
136 //!
137 //! #[start]
138 //! fn start(argc: int, argv: *const *const u8) -> int {
139 //!     green::start(argc, argv, rustuv::event_loop, main)
140 //! }
141 //!
142 //! fn main() {
143 //!     // this code is running in a pool of schedulers all powered by libuv
144 //! }
145 //! ```
146 //!
147 //! The above code can also be shortened with a macro from libgreen.
148 //!
149 //! ```
150 //! #![feature(phase)]
151 //! #[phase(plugin)] extern crate green;
152 //!
153 //! green_start!(main)
154 //!
155 //! fn main() {
156 //!     // run inside of a green pool
157 //! }
158 //! ```
159 //!
160 //! # Using a scheduler pool
161 //!
162 //! This library adds a `GreenTaskBuilder` trait that extends the methods
163 //! available on `std::task::TaskBuilder` to allow spawning a green task,
164 //! possibly pinned to a particular scheduler thread:
165 //!
166 //! ```rust
167 //! extern crate green;
168 //! extern crate rustuv;
169 //!
170 //! # fn main() {
171 //! use std::task::TaskBuilder;
172 //! use green::{SchedPool, PoolConfig, GreenTaskBuilder};
173 //!
174 //! let mut config = PoolConfig::new();
175 //!
176 //! // Optional: Set the event loop to be rustuv's to allow I/O to work
177 //! config.event_loop_factory = rustuv::event_loop;
178 //!
179 //! let mut pool = SchedPool::new(config);
180 //!
181 //! // Spawn tasks into the pool of schedulers
182 //! TaskBuilder::new().green(&mut pool).spawn(proc() {
183 //!     // this code is running inside the pool of schedulers
184 //!
185 //!     spawn(proc() {
186 //!         // this code is also running inside the same scheduler pool
187 //!     });
188 //! });
189 //!
190 //! // Dynamically add a new scheduler to the scheduler pool. This adds another
191 //! // OS thread that green threads can be multiplexed on to.
192 //! let mut handle = pool.spawn_sched();
193 //!
194 //! // Pin a task to the spawned scheduler
195 //! TaskBuilder::new().green_pinned(&mut pool, &mut handle).spawn(proc() {
196 //!     /* ... */
197 //! });
198 //!
199 //! // Handles keep schedulers alive, so be sure to drop all handles before
200 //! // destroying the sched pool
201 //! drop(handle);
202 //!
203 //! // Required to shut down this scheduler pool.
204 //! // The task will fail if `shutdown` is not called.
205 //! pool.shutdown();
206 //! # }
207 //! ```
208
209 #![crate_name = "green"]
210 #![experimental]
211 #![license = "MIT/ASL2"]
212 #![crate_type = "rlib"]
213 #![crate_type = "dylib"]
214 #![doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png",
215        html_favicon_url = "http://www.rust-lang.org/favicon.ico",
216        html_root_url = "http://doc.rust-lang.org/master/",
217        html_playground_url = "http://play.rust-lang.org/")]
218
219 // NB this does *not* include globs, please keep it that way.
220 #![feature(macro_rules, phase, default_type_params)]
221 #![allow(visible_private_types, deprecated)]
222
223 #[cfg(test)] #[phase(plugin, link)] extern crate log;
224 #[cfg(test)] extern crate rustuv;
225 extern crate libc;
226 extern crate alloc;
227
228 use alloc::arc::Arc;
229 use std::mem::replace;
230 use std::os;
231 use std::rt::rtio;
232 use std::rt::thread::Thread;
233 use std::rt::task::TaskOpts;
234 use std::rt;
235 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
236 use std::sync::deque;
237 use std::task::{TaskBuilder, Spawner};
238
239 use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor};
240 use sleeper_list::SleeperList;
241 use stack::StackPool;
242 use task::GreenTask;
243
244 mod macros;
245 mod simple;
246 mod message_queue;
247
248 pub mod basic;
249 pub mod context;
250 pub mod coroutine;
251 pub mod sched;
252 pub mod sleeper_list;
253 pub mod stack;
254 pub mod task;
255
256 /// A helper macro for booting a program with libgreen
257 ///
258 /// # Example
259 ///
260 /// ```
261 /// #![feature(phase)]
262 /// #[phase(plugin)] extern crate green;
263 ///
264 /// green_start!(main)
265 ///
266 /// fn main() {
267 ///     // running with libgreen
268 /// }
269 /// ```
270 #[macro_export]
271 macro_rules! green_start( ($f:ident) => (
272     mod __start {
273         extern crate green;
274         extern crate rustuv;
275
276         #[start]
277         fn start(argc: int, argv: *const *const u8) -> int {
278             green::start(argc, argv, rustuv::event_loop, super::$f)
279         }
280     }
281 ) )
282
283 /// Set up a default runtime configuration, given compiler-supplied arguments.
284 ///
285 /// This function will block until the entire pool of M:N schedulers have
286 /// exited. This function also requires a local task to be available.
287 ///
288 /// # Arguments
289 ///
290 /// * `argc` & `argv` - The argument vector. On Unix this information is used
291 ///   by os::args.
292 /// * `main` - The initial procedure to run inside of the M:N scheduling pool.
293 ///            Once this procedure exits, the scheduling pool will begin to shut
294 ///            down. The entire pool (and this function) will only return once
295 ///            all child tasks have finished executing.
296 ///
297 /// # Return value
298 ///
299 /// The return value is used as the process return code. 0 on success, 101 on
300 /// error.
301 pub fn start(argc: int, argv: *const *const u8,
302              event_loop_factory: fn() -> Box<rtio::EventLoop + Send>,
303              main: proc():Send) -> int {
304     rt::init(argc, argv);
305     let mut main = Some(main);
306     let mut ret = None;
307     simple::task().run(|| {
308         ret = Some(run(event_loop_factory, main.take_unwrap()));
309     }).destroy();
310     // unsafe is ok b/c we're sure that the runtime is gone
311     unsafe { rt::cleanup() }
312     ret.unwrap()
313 }
314
315 /// Execute the main function in a pool of M:N schedulers.
316 ///
317 /// Configures the runtime according to the environment, by default using a task
318 /// scheduler with the same number of threads as cores.  Returns a process exit
319 /// code.
320 ///
321 /// This function will not return until all schedulers in the associated pool
322 /// have returned.
323 pub fn run(event_loop_factory: fn() -> Box<rtio::EventLoop + Send>,
324            main: proc():Send) -> int {
325     // Create a scheduler pool and spawn the main task into this pool. We will
326     // get notified over a channel when the main task exits.
327     let mut cfg = PoolConfig::new();
328     cfg.event_loop_factory = event_loop_factory;
329     let mut pool = SchedPool::new(cfg);
330     let (tx, rx) = channel();
331     let mut opts = TaskOpts::new();
332     opts.on_exit = Some(proc(r) tx.send(r));
333     opts.name = Some("<main>".into_maybe_owned());
334     pool.spawn(opts, main);
335
336     // Wait for the main task to return, and set the process error code
337     // appropriately.
338     if rx.recv().is_err() {
339         os::set_exit_status(rt::DEFAULT_ERROR_CODE);
340     }
341
342     // Now that we're sure all tasks are dead, shut down the pool of schedulers,
343     // waiting for them all to return.
344     pool.shutdown();
345     os::get_exit_status()
346 }
347
348 /// Configuration of how an M:N pool of schedulers is spawned.
349 pub struct PoolConfig {
350     /// The number of schedulers (OS threads) to spawn into this M:N pool.
351     pub threads: uint,
352     /// A factory function used to create new event loops. If this is not
353     /// specified then the default event loop factory is used.
354     pub event_loop_factory: fn() -> Box<rtio::EventLoop + Send>,
355 }
356
357 impl PoolConfig {
358     /// Returns the default configuration, as determined the environment
359     /// variables of this process.
360     pub fn new() -> PoolConfig {
361         PoolConfig {
362             threads: rt::default_sched_threads(),
363             event_loop_factory: basic::event_loop,
364         }
365     }
366 }
367
368 /// A structure representing a handle to a pool of schedulers. This handle is
369 /// used to keep the pool alive and also reap the status from the pool.
370 pub struct SchedPool {
371     id: uint,
372     threads: Vec<Thread<()>>,
373     handles: Vec<SchedHandle>,
374     stealers: Vec<deque::Stealer<Box<task::GreenTask>>>,
375     next_friend: uint,
376     stack_pool: StackPool,
377     deque_pool: deque::BufferPool<Box<task::GreenTask>>,
378     sleepers: SleeperList,
379     factory: fn() -> Box<rtio::EventLoop + Send>,
380     task_state: TaskState,
381     tasks_done: Receiver<()>,
382 }
383
384 /// This is an internal state shared among a pool of schedulers. This is used to
385 /// keep track of how many tasks are currently running in the pool and then
386 /// sending on a channel once the entire pool has been drained of all tasks.
387 #[deriving(Clone)]
388 struct TaskState {
389     cnt: Arc<AtomicUint>,
390     done: Sender<()>,
391 }
392
393 impl SchedPool {
394     /// Execute the main function in a pool of M:N schedulers.
395     ///
396     /// This will configure the pool according to the `config` parameter, and
397     /// initially run `main` inside the pool of schedulers.
398     pub fn new(config: PoolConfig) -> SchedPool {
399         static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
400
401         let PoolConfig {
402             threads: nscheds,
403             event_loop_factory: factory
404         } = config;
405         assert!(nscheds > 0);
406
407         // The pool of schedulers that will be returned from this function
408         let (p, state) = TaskState::new();
409         let mut pool = SchedPool {
410             threads: vec![],
411             handles: vec![],
412             stealers: vec![],
413             id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
414             sleepers: SleeperList::new(),
415             stack_pool: StackPool::new(),
416             deque_pool: deque::BufferPool::new(),
417             next_friend: 0,
418             factory: factory,
419             task_state: state,
420             tasks_done: p,
421         };
422
423         // Create a work queue for each scheduler, ntimes. Create an extra
424         // for the main thread if that flag is set. We won't steal from it.
425         let mut workers = Vec::with_capacity(nscheds);
426         let mut stealers = Vec::with_capacity(nscheds);
427
428         for _ in range(0, nscheds) {
429             let (w, s) = pool.deque_pool.deque();
430             workers.push(w);
431             stealers.push(s);
432         }
433         pool.stealers = stealers;
434
435         // Now that we've got all our work queues, create one scheduler per
436         // queue, spawn the scheduler into a thread, and be sure to keep a
437         // handle to the scheduler and the thread to keep them alive.
438         for worker in workers.move_iter() {
439             rtdebug!("inserting a regular scheduler");
440
441             let mut sched = box Scheduler::new(pool.id,
442                                             (pool.factory)(),
443                                             worker,
444                                             pool.stealers.clone(),
445                                             pool.sleepers.clone(),
446                                             pool.task_state.clone());
447             pool.handles.push(sched.make_handle());
448             pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
449         }
450
451         return pool;
452     }
453
454     /// Creates a new task configured to run inside of this pool of schedulers.
455     /// This is useful to create a task which can then be sent to a specific
456     /// scheduler created by `spawn_sched` (and possibly pin it to that
457     /// scheduler).
458     #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"]
459     pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box<GreenTask> {
460         GreenTask::configure(&mut self.stack_pool, opts, f)
461     }
462
463     /// Spawns a new task into this pool of schedulers, using the specified
464     /// options to configure the new task which is spawned.
465     ///
466     /// New tasks are spawned in a round-robin fashion to the schedulers in this
467     /// pool, but tasks can certainly migrate among schedulers once they're in
468     /// the pool.
469     #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"]
470     pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) {
471         let task = self.task(opts, f);
472
473         // Figure out someone to send this task to
474         let idx = self.next_friend;
475         self.next_friend += 1;
476         if self.next_friend >= self.handles.len() {
477             self.next_friend = 0;
478         }
479
480         // Jettison the task away!
481         self.handles.get_mut(idx).send(TaskFromFriend(task));
482     }
483
484     /// Spawns a new scheduler into this M:N pool. A handle is returned to the
485     /// scheduler for use. The scheduler will not exit as long as this handle is
486     /// active.
487     ///
488     /// The scheduler spawned will participate in work stealing with all of the
489     /// other schedulers currently in the scheduler pool.
490     pub fn spawn_sched(&mut self) -> SchedHandle {
491         let (worker, stealer) = self.deque_pool.deque();
492         self.stealers.push(stealer.clone());
493
494         // Tell all existing schedulers about this new scheduler so they can all
495         // steal work from it
496         for handle in self.handles.mut_iter() {
497             handle.send(NewNeighbor(stealer.clone()));
498         }
499
500         // Create the new scheduler, using the same sleeper list as all the
501         // other schedulers as well as having a stealer handle to all other
502         // schedulers.
503         let mut sched = box Scheduler::new(self.id,
504                                         (self.factory)(),
505                                         worker,
506                                         self.stealers.clone(),
507                                         self.sleepers.clone(),
508                                         self.task_state.clone());
509         let ret = sched.make_handle();
510         self.handles.push(sched.make_handle());
511         self.threads.push(Thread::start(proc() { sched.bootstrap() }));
512
513         return ret;
514     }
515
516     /// Consumes the pool of schedulers, waiting for all tasks to exit and all
517     /// schedulers to shut down.
518     ///
519     /// This function is required to be called in order to drop a pool of
520     /// schedulers, it is considered an error to drop a pool without calling
521     /// this method.
522     ///
523     /// This only waits for all tasks in *this pool* of schedulers to exit, any
524     /// native tasks or extern pools will not be waited on
525     pub fn shutdown(mut self) {
526         self.stealers = vec![];
527
528         // Wait for everyone to exit. We may have reached a 0-task count
529         // multiple times in the past, meaning there could be several buffered
530         // messages on the `tasks_done` port. We're guaranteed that after *some*
531         // message the current task count will be 0, so we just receive in a
532         // loop until everything is totally dead.
533         while self.task_state.active() {
534             self.tasks_done.recv();
535         }
536
537         // Now that everyone's gone, tell everything to shut down.
538         for mut handle in replace(&mut self.handles, vec![]).move_iter() {
539             handle.send(Shutdown);
540         }
541         for thread in replace(&mut self.threads, vec![]).move_iter() {
542             thread.join();
543         }
544     }
545 }
546
547 impl TaskState {
548     fn new() -> (Receiver<()>, TaskState) {
549         let (tx, rx) = channel();
550         (rx, TaskState {
551             cnt: Arc::new(AtomicUint::new(0)),
552             done: tx,
553         })
554     }
555
556     fn increment(&mut self) {
557         self.cnt.fetch_add(1, SeqCst);
558     }
559
560     fn active(&self) -> bool {
561         self.cnt.load(SeqCst) != 0
562     }
563
564     fn decrement(&mut self) {
565         let prev = self.cnt.fetch_sub(1, SeqCst);
566         if prev == 1 {
567             self.done.send(());
568         }
569     }
570 }
571
572 impl Drop for SchedPool {
573     fn drop(&mut self) {
574         if self.threads.len() > 0 {
575             fail!("dropping a M:N scheduler pool that wasn't shut down");
576         }
577     }
578 }
579
580 /// A spawner for green tasks
581 pub struct GreenSpawner<'a>{
582     pool: &'a mut SchedPool,
583     handle: Option<&'a mut SchedHandle>
584 }
585
586 impl<'a> Spawner for GreenSpawner<'a> {
587     #[inline]
588     fn spawn(self, opts: TaskOpts, f: proc():Send) {
589         let GreenSpawner { pool, handle } = self;
590         match handle {
591             None    => pool.spawn(opts, f),
592             Some(h) => h.send(PinnedTask(pool.task(opts, f)))
593         }
594     }
595 }
596
597 /// An extension trait adding `green` configuration methods to `TaskBuilder`.
598 pub trait GreenTaskBuilder {
599     fn green<'a>(self, &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>>;
600     fn green_pinned<'a>(self, &'a mut SchedPool, &'a mut SchedHandle)
601                         -> TaskBuilder<GreenSpawner<'a>>;
602 }
603
604 impl<S: Spawner> GreenTaskBuilder for TaskBuilder<S> {
605     fn green<'a>(self, pool: &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>> {
606         self.spawner(GreenSpawner {pool: pool, handle: None})
607     }
608
609     fn green_pinned<'a>(self, pool: &'a mut SchedPool, handle: &'a mut SchedHandle)
610                         -> TaskBuilder<GreenSpawner<'a>> {
611         self.spawner(GreenSpawner {pool: pool, handle: Some(handle)})
612     }
613 }
614
615 #[cfg(test)]
616 mod test {
617     use std::task::TaskBuilder;
618     use super::{SchedPool, PoolConfig, GreenTaskBuilder};
619
620     #[test]
621     fn test_green_builder() {
622         let mut pool = SchedPool::new(PoolConfig::new());
623         let res = TaskBuilder::new().green(&mut pool).try(proc() {
624             "Success!".to_string()
625         });
626         assert_eq!(res.ok().unwrap(), "Success!".to_string());
627         pool.shutdown();
628     }
629 }