1 use std::sync::{Arc, Condvar, Mutex};
3 use rustc_session::Session;
5 use jobserver::HelperThread;
7 // FIXME don't panic when a worker thread panics
9 pub(super) struct ConcurrencyLimiter {
10 helper_thread: Option<HelperThread>,
11 state: Arc<Mutex<state::ConcurrencyLimiterState>>,
12 available_token_condvar: Arc<Condvar>,
15 impl ConcurrencyLimiter {
16 pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
17 let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
18 let available_token_condvar = Arc::new(Condvar::new());
20 let state_helper = state.clone();
21 let available_token_condvar_helper = available_token_condvar.clone();
22 let helper_thread = sess
25 .into_helper_thread(move |token| {
26 let mut state = state_helper.lock().unwrap();
27 state.add_new_token(token.unwrap());
28 available_token_condvar_helper.notify_one();
32 helper_thread: Some(helper_thread),
34 available_token_condvar: Arc::new(Condvar::new()),
38 pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
39 let mut state = self.state.lock().unwrap();
41 state.assert_invariants();
43 if state.try_start_job() {
44 return ConcurrencyLimiterToken {
45 state: self.state.clone(),
46 available_token_condvar: self.available_token_condvar.clone(),
50 self.helper_thread.as_mut().unwrap().request_token();
51 state = self.available_token_condvar.wait(state).unwrap();
55 pub(super) fn job_already_done(&mut self) {
56 let mut state = self.state.lock().unwrap();
57 state.job_already_done();
61 impl Drop for ConcurrencyLimiter {
64 self.helper_thread.take();
66 // Assert that all jobs have finished
67 let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
73 pub(super) struct ConcurrencyLimiterToken {
74 state: Arc<Mutex<state::ConcurrencyLimiterState>>,
75 available_token_condvar: Arc<Condvar>,
78 impl Drop for ConcurrencyLimiterToken {
80 let mut state = self.state.lock().unwrap();
82 self.available_token_condvar.notify_one();
87 use jobserver::Acquired;
90 pub(super) struct ConcurrencyLimiterState {
94 // None is used to represent the implicit token, Some to represent explicit tokens
95 tokens: Vec<Option<Acquired>>,
98 impl ConcurrencyLimiterState {
99 pub(super) fn new(pending_jobs: usize) -> Self {
100 ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
103 pub(super) fn assert_invariants(&self) {
104 // There must be no excess active jobs
105 assert!(self.active_jobs <= self.pending_jobs);
107 // There may not be more active jobs than there are tokens
108 assert!(self.active_jobs <= self.tokens.len());
111 pub(super) fn assert_done(&self) {
112 assert_eq!(self.pending_jobs, 0);
113 assert_eq!(self.active_jobs, 0);
116 pub(super) fn add_new_token(&mut self, token: Acquired) {
117 self.tokens.push(Some(token));
118 self.drop_excess_capacity();
121 pub(super) fn try_start_job(&mut self) -> bool {
122 if self.active_jobs < self.tokens.len() {
123 // Using existing token
131 pub(super) fn job_started(&mut self) {
132 self.assert_invariants();
133 self.active_jobs += 1;
134 self.drop_excess_capacity();
135 self.assert_invariants();
138 pub(super) fn job_finished(&mut self) {
139 self.assert_invariants();
140 self.pending_jobs -= 1;
141 self.active_jobs -= 1;
142 self.assert_invariants();
143 self.drop_excess_capacity();
144 self.assert_invariants();
147 pub(super) fn job_already_done(&mut self) {
148 self.assert_invariants();
149 self.pending_jobs -= 1;
150 self.assert_invariants();
151 self.drop_excess_capacity();
152 self.assert_invariants();
155 fn drop_excess_capacity(&mut self) {
156 self.assert_invariants();
158 // Drop all tokens that can never be used anymore
159 self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
161 // Keep some excess tokens to satisfy requests faster
162 const MAX_EXTRA_CAPACITY: usize = 2;
163 self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
165 self.assert_invariants();