]> git.lizzy.rs Git - rust.git/blob - src/libgreen/basic.rs
2381626b7c87697d8cd0e97ebe8b12b7fc7fa9b2
[rust.git] / src / libgreen / basic.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! This is a basic event loop implementation not meant for any "real purposes"
12 //! other than testing the scheduler and proving that it's possible to have a
13 //! pluggable event loop.
14 //!
15 //! This implementation is also used as the fallback implementation of an event
16 //! loop if no other one is provided (and M:N scheduling is desired).
17
18 use alloc::arc::Arc;
19 use std::sync::atomics;
20 use std::mem;
21 use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
22 use std::rt::rtio::{PausableIdleCallback, Callback};
23 use std::rt::exclusive::Exclusive;
24
25 /// This is the only exported function from this module.
26 pub fn event_loop() -> Box<EventLoop + Send> {
27     box BasicLoop::new() as Box<EventLoop + Send>
28 }
29
30 struct BasicLoop {
31     work: Vec<proc(): Send>,             // pending work
32     remotes: Vec<(uint, Box<Callback + Send>)>,
33     next_remote: uint,
34     messages: Arc<Exclusive<Vec<Message>>>,
35     idle: Option<Box<Callback + Send>>,
36     idle_active: Option<Arc<atomics::AtomicBool>>,
37 }
38
39 enum Message { RunRemote(uint), RemoveRemote(uint) }
40
41 impl BasicLoop {
42     fn new() -> BasicLoop {
43         BasicLoop {
44             work: vec![],
45             idle: None,
46             idle_active: None,
47             next_remote: 0,
48             remotes: vec![],
49             messages: Arc::new(Exclusive::new(Vec::new())),
50         }
51     }
52
53     /// Process everything in the work queue (continually)
54     fn work(&mut self) {
55         while self.work.len() > 0 {
56             for work in mem::replace(&mut self.work, vec![]).move_iter() {
57                 work();
58             }
59         }
60     }
61
62     fn remote_work(&mut self) {
63         let messages = unsafe {
64             mem::replace(&mut *self.messages.lock(), Vec::new())
65         };
66         for message in messages.move_iter() {
67             self.message(message);
68         }
69     }
70
71     fn message(&mut self, message: Message) {
72         match message {
73             RunRemote(i) => {
74                 match self.remotes.mut_iter().find(|& &(id, _)| id == i) {
75                     Some(&(_, ref mut f)) => f.call(),
76                     None => unreachable!()
77                 }
78             }
79             RemoveRemote(i) => {
80                 match self.remotes.iter().position(|&(id, _)| id == i) {
81                     Some(i) => { self.remotes.remove(i).unwrap(); }
82                     None => unreachable!()
83                 }
84             }
85         }
86     }
87
88     /// Run the idle callback if one is registered
89     fn idle(&mut self) {
90         match self.idle {
91             Some(ref mut idle) => {
92                 if self.idle_active.get_ref().load(atomics::SeqCst) {
93                     idle.call();
94                 }
95             }
96             None => {}
97         }
98     }
99
100     fn has_idle(&self) -> bool {
101         self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst)
102     }
103 }
104
105 impl EventLoop for BasicLoop {
106     fn run(&mut self) {
107         // Not exactly efficient, but it gets the job done.
108         while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
109
110             self.work();
111             self.remote_work();
112
113             if self.has_idle() {
114                 self.idle();
115                 continue
116             }
117
118             unsafe {
119                 let mut messages = self.messages.lock();
120                 // We block here if we have no messages to process and we may
121                 // receive a message at a later date
122                 if self.remotes.len() > 0 && messages.len() == 0 &&
123                    self.work.len() == 0 {
124                     messages.wait()
125                 }
126             }
127         }
128     }
129
130     fn callback(&mut self, f: proc():Send) {
131         self.work.push(f);
132     }
133
134     // FIXME: Seems like a really weird requirement to have an event loop provide.
135     fn pausable_idle_callback(&mut self, cb: Box<Callback + Send>)
136                               -> Box<PausableIdleCallback + Send> {
137         rtassert!(self.idle.is_none());
138         self.idle = Some(cb);
139         let a = Arc::new(atomics::AtomicBool::new(true));
140         self.idle_active = Some(a.clone());
141         box BasicPausable { active: a } as Box<PausableIdleCallback + Send>
142     }
143
144     fn remote_callback(&mut self, f: Box<Callback + Send>)
145                        -> Box<RemoteCallback + Send> {
146         let id = self.next_remote;
147         self.next_remote += 1;
148         self.remotes.push((id, f));
149         box BasicRemote::new(self.messages.clone(), id) as
150             Box<RemoteCallback + Send>
151     }
152
153     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
154
155     fn has_active_io(&self) -> bool { false }
156 }
157
158 struct BasicRemote {
159     queue: Arc<Exclusive<Vec<Message>>>,
160     id: uint,
161 }
162
163 impl BasicRemote {
164     fn new(queue: Arc<Exclusive<Vec<Message>>>, id: uint) -> BasicRemote {
165         BasicRemote { queue: queue, id: id }
166     }
167 }
168
169 impl RemoteCallback for BasicRemote {
170     fn fire(&mut self) {
171         let mut queue = unsafe { self.queue.lock() };
172         queue.push(RunRemote(self.id));
173         queue.signal();
174     }
175 }
176
177 impl Drop for BasicRemote {
178     fn drop(&mut self) {
179         let mut queue = unsafe { self.queue.lock() };
180         queue.push(RemoveRemote(self.id));
181         queue.signal();
182     }
183 }
184
185 struct BasicPausable {
186     active: Arc<atomics::AtomicBool>,
187 }
188
189 impl PausableIdleCallback for BasicPausable {
190     fn pause(&mut self) {
191         self.active.store(false, atomics::SeqCst);
192     }
193     fn resume(&mut self) {
194         self.active.store(true, atomics::SeqCst);
195     }
196 }
197
198 impl Drop for BasicPausable {
199     fn drop(&mut self) {
200         self.active.store(false, atomics::SeqCst);
201     }
202 }
203
204 #[cfg(test)]
205 mod test {
206     use std::rt::task::TaskOpts;
207
208     use basic;
209     use PoolConfig;
210     use SchedPool;
211
212     fn pool() -> SchedPool {
213         SchedPool::new(PoolConfig {
214             threads: 1,
215             event_loop_factory: basic::event_loop,
216         })
217     }
218
219     fn run(f: proc():Send) {
220         let mut pool = pool();
221         pool.spawn(TaskOpts::new(), f);
222         pool.shutdown();
223     }
224
225     #[test]
226     fn smoke() {
227         run(proc() {});
228     }
229
230     #[test]
231     fn some_channels() {
232         run(proc() {
233             let (tx, rx) = channel();
234             spawn(proc() {
235                 tx.send(());
236             });
237             rx.recv();
238         });
239     }
240
241     #[test]
242     fn multi_thread() {
243         let mut pool = SchedPool::new(PoolConfig {
244             threads: 2,
245             event_loop_factory: basic::event_loop,
246         });
247
248         for _ in range(0, 20) {
249             pool.spawn(TaskOpts::new(), proc() {
250                 let (tx, rx) = channel();
251                 spawn(proc() {
252                     tx.send(());
253                 });
254                 rx.recv();
255             });
256         }
257
258         pool.shutdown();
259     }
260 }