1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! This is a basic event loop implementation not meant for any "real purposes"
12 //! other than testing the scheduler and proving that it's possible to have a
13 //! pluggable event loop.
15 //! This implementation is also used as the fallback implementation of an event
16 //! loop if no other one is provided (and M:N scheduling is desired).
19 use std::sync::atomics;
21 use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
22 use std::rt::rtio::{PausableIdleCallback, Callback};
23 use std::rt::exclusive::Exclusive;
25 /// This is the only exported function from this module.
26 pub fn event_loop() -> Box<EventLoop + Send> {
27 box BasicLoop::new() as Box<EventLoop + Send>
31 work: Vec<proc(): Send>, // pending work
32 remotes: Vec<(uint, Box<Callback + Send>)>,
34 messages: Arc<Exclusive<Vec<Message>>>,
35 idle: Option<Box<Callback + Send>>,
36 idle_active: Option<Arc<atomics::AtomicBool>>,
39 enum Message { RunRemote(uint), RemoveRemote(uint) }
42 fn new() -> BasicLoop {
49 messages: Arc::new(Exclusive::new(Vec::new())),
53 /// Process everything in the work queue (continually)
55 while self.work.len() > 0 {
56 for work in mem::replace(&mut self.work, vec![]).move_iter() {
62 fn remote_work(&mut self) {
63 let messages = unsafe {
64 mem::replace(&mut *self.messages.lock(), Vec::new())
66 for message in messages.move_iter() {
67 self.message(message);
71 fn message(&mut self, message: Message) {
74 match self.remotes.mut_iter().find(|& &(id, _)| id == i) {
75 Some(&(_, ref mut f)) => f.call(),
76 None => unreachable!()
80 match self.remotes.iter().position(|&(id, _)| id == i) {
81 Some(i) => { self.remotes.remove(i).unwrap(); }
82 None => unreachable!()
88 /// Run the idle callback if one is registered
91 Some(ref mut idle) => {
92 if self.idle_active.get_ref().load(atomics::SeqCst) {
100 fn has_idle(&self) -> bool {
101 self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst)
105 impl EventLoop for BasicLoop {
107 // Not exactly efficient, but it gets the job done.
108 while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
119 let mut messages = self.messages.lock();
120 // We block here if we have no messages to process and we may
121 // receive a message at a later date
122 if self.remotes.len() > 0 && messages.len() == 0 &&
123 self.work.len() == 0 {
130 fn callback(&mut self, f: proc():Send) {
134 // FIXME: Seems like a really weird requirement to have an event loop provide.
135 fn pausable_idle_callback(&mut self, cb: Box<Callback + Send>)
136 -> Box<PausableIdleCallback + Send> {
137 rtassert!(self.idle.is_none());
138 self.idle = Some(cb);
139 let a = Arc::new(atomics::AtomicBool::new(true));
140 self.idle_active = Some(a.clone());
141 box BasicPausable { active: a } as Box<PausableIdleCallback + Send>
144 fn remote_callback(&mut self, f: Box<Callback + Send>)
145 -> Box<RemoteCallback + Send> {
146 let id = self.next_remote;
147 self.next_remote += 1;
148 self.remotes.push((id, f));
149 box BasicRemote::new(self.messages.clone(), id) as
150 Box<RemoteCallback + Send>
153 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
155 fn has_active_io(&self) -> bool { false }
159 queue: Arc<Exclusive<Vec<Message>>>,
164 fn new(queue: Arc<Exclusive<Vec<Message>>>, id: uint) -> BasicRemote {
165 BasicRemote { queue: queue, id: id }
169 impl RemoteCallback for BasicRemote {
171 let mut queue = unsafe { self.queue.lock() };
172 queue.push(RunRemote(self.id));
177 impl Drop for BasicRemote {
179 let mut queue = unsafe { self.queue.lock() };
180 queue.push(RemoveRemote(self.id));
185 struct BasicPausable {
186 active: Arc<atomics::AtomicBool>,
189 impl PausableIdleCallback for BasicPausable {
190 fn pause(&mut self) {
191 self.active.store(false, atomics::SeqCst);
193 fn resume(&mut self) {
194 self.active.store(true, atomics::SeqCst);
198 impl Drop for BasicPausable {
200 self.active.store(false, atomics::SeqCst);
206 use std::rt::task::TaskOpts;
212 fn pool() -> SchedPool {
213 SchedPool::new(PoolConfig {
215 event_loop_factory: basic::event_loop,
219 fn run(f: proc():Send) {
220 let mut pool = pool();
221 pool.spawn(TaskOpts::new(), f);
233 let (tx, rx) = channel();
243 let mut pool = SchedPool::new(PoolConfig {
245 event_loop_factory: basic::event_loop,
248 for _ in range(0, 20) {
249 pool.spawn(TaskOpts::new(), proc() {
250 let (tx, rx) = channel();