1 use std::sync::{Arc, Condvar, Mutex};
3 use rustc_session::Session;
5 use jobserver::HelperThread;
7 // FIXME don't panic when a worker thread panics
9 pub(super) struct ConcurrencyLimiter {
10 helper_thread: Option<HelperThread>,
11 state: Arc<Mutex<state::ConcurrencyLimiterState>>,
12 available_token_condvar: Arc<Condvar>,
16 impl ConcurrencyLimiter {
17 pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
18 let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
19 let available_token_condvar = Arc::new(Condvar::new());
21 let state_helper = state.clone();
22 let available_token_condvar_helper = available_token_condvar.clone();
23 let helper_thread = sess
26 .into_helper_thread(move |token| {
27 let mut state = state_helper.lock().unwrap();
28 state.add_new_token(token.unwrap());
29 available_token_condvar_helper.notify_one();
33 helper_thread: Some(helper_thread),
35 available_token_condvar: Arc::new(Condvar::new()),
40 pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
41 let mut state = self.state.lock().unwrap();
43 state.assert_invariants();
45 if state.try_start_job() {
46 return ConcurrencyLimiterToken {
47 state: self.state.clone(),
48 available_token_condvar: self.available_token_condvar.clone(),
52 self.helper_thread.as_mut().unwrap().request_token();
53 state = self.available_token_condvar.wait(state).unwrap();
57 pub(super) fn job_already_done(&mut self) {
58 let mut state = self.state.lock().unwrap();
59 state.job_already_done();
62 pub(crate) fn finished(mut self) {
63 self.helper_thread.take();
65 // Assert that all jobs have finished
66 let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
73 impl Drop for ConcurrencyLimiter {
75 if !self.finished && !std::thread::panicking() {
76 panic!("Forgot to call finished() on ConcurrencyLimiter");
82 pub(super) struct ConcurrencyLimiterToken {
83 state: Arc<Mutex<state::ConcurrencyLimiterState>>,
84 available_token_condvar: Arc<Condvar>,
87 impl Drop for ConcurrencyLimiterToken {
89 let mut state = self.state.lock().unwrap();
91 self.available_token_condvar.notify_one();
96 use jobserver::Acquired;
99 pub(super) struct ConcurrencyLimiterState {
103 // None is used to represent the implicit token, Some to represent explicit tokens
104 tokens: Vec<Option<Acquired>>,
107 impl ConcurrencyLimiterState {
108 pub(super) fn new(pending_jobs: usize) -> Self {
109 ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
112 pub(super) fn assert_invariants(&self) {
113 // There must be no excess active jobs
114 assert!(self.active_jobs <= self.pending_jobs);
116 // There may not be more active jobs than there are tokens
117 assert!(self.active_jobs <= self.tokens.len());
120 pub(super) fn assert_done(&self) {
121 assert_eq!(self.pending_jobs, 0);
122 assert_eq!(self.active_jobs, 0);
125 pub(super) fn add_new_token(&mut self, token: Acquired) {
126 self.tokens.push(Some(token));
127 self.drop_excess_capacity();
130 pub(super) fn try_start_job(&mut self) -> bool {
131 if self.active_jobs < self.tokens.len() {
132 // Using existing token
140 pub(super) fn job_started(&mut self) {
141 self.assert_invariants();
142 self.active_jobs += 1;
143 self.drop_excess_capacity();
144 self.assert_invariants();
147 pub(super) fn job_finished(&mut self) {
148 self.assert_invariants();
149 self.pending_jobs -= 1;
150 self.active_jobs -= 1;
151 self.assert_invariants();
152 self.drop_excess_capacity();
153 self.assert_invariants();
156 pub(super) fn job_already_done(&mut self) {
157 self.assert_invariants();
158 self.pending_jobs -= 1;
159 self.assert_invariants();
160 self.drop_excess_capacity();
161 self.assert_invariants();
164 fn drop_excess_capacity(&mut self) {
165 self.assert_invariants();
167 // Drop all tokens that can never be used anymore
168 self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
170 // Keep some excess tokens to satisfy requests faster
171 const MAX_EXTRA_CAPACITY: usize = 2;
172 self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
174 self.assert_invariants();