]> git.lizzy.rs Git - rust.git/blob - compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
Rollup merge of #102101 - BelovDV:new-check-lld-version, r=petrochenkov
[rust.git] / compiler / rustc_codegen_cranelift / src / concurrency_limiter.rs
1 use std::sync::{Arc, Condvar, Mutex};
2
3 use rustc_session::Session;
4
5 use jobserver::HelperThread;
6
7 // FIXME don't panic when a worker thread panics
8
9 pub(super) struct ConcurrencyLimiter {
10     helper_thread: Option<HelperThread>,
11     state: Arc<Mutex<state::ConcurrencyLimiterState>>,
12     available_token_condvar: Arc<Condvar>,
13     finished: bool,
14 }
15
16 impl ConcurrencyLimiter {
17     pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
18         let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
19         let available_token_condvar = Arc::new(Condvar::new());
20
21         let state_helper = state.clone();
22         let available_token_condvar_helper = available_token_condvar.clone();
23         let helper_thread = sess
24             .jobserver
25             .clone()
26             .into_helper_thread(move |token| {
27                 let mut state = state_helper.lock().unwrap();
28                 state.add_new_token(token.unwrap());
29                 available_token_condvar_helper.notify_one();
30             })
31             .unwrap();
32         ConcurrencyLimiter {
33             helper_thread: Some(helper_thread),
34             state,
35             available_token_condvar: Arc::new(Condvar::new()),
36             finished: false,
37         }
38     }
39
40     pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
41         let mut state = self.state.lock().unwrap();
42         loop {
43             state.assert_invariants();
44
45             if state.try_start_job() {
46                 return ConcurrencyLimiterToken {
47                     state: self.state.clone(),
48                     available_token_condvar: self.available_token_condvar.clone(),
49                 };
50             }
51
52             self.helper_thread.as_mut().unwrap().request_token();
53             state = self.available_token_condvar.wait(state).unwrap();
54         }
55     }
56
57     pub(super) fn job_already_done(&mut self) {
58         let mut state = self.state.lock().unwrap();
59         state.job_already_done();
60     }
61
62     pub(crate) fn finished(mut self) {
63         self.helper_thread.take();
64
65         // Assert that all jobs have finished
66         let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
67         state.assert_done();
68
69         self.finished = true;
70     }
71 }
72
73 impl Drop for ConcurrencyLimiter {
74     fn drop(&mut self) {
75         if !self.finished && !std::thread::panicking() {
76             panic!("Forgot to call finished() on ConcurrencyLimiter");
77         }
78     }
79 }
80
81 #[derive(Debug)]
82 pub(super) struct ConcurrencyLimiterToken {
83     state: Arc<Mutex<state::ConcurrencyLimiterState>>,
84     available_token_condvar: Arc<Condvar>,
85 }
86
87 impl Drop for ConcurrencyLimiterToken {
88     fn drop(&mut self) {
89         let mut state = self.state.lock().unwrap();
90         state.job_finished();
91         self.available_token_condvar.notify_one();
92     }
93 }
94
95 mod state {
96     use jobserver::Acquired;
97
98     #[derive(Debug)]
99     pub(super) struct ConcurrencyLimiterState {
100         pending_jobs: usize,
101         active_jobs: usize,
102
103         // None is used to represent the implicit token, Some to represent explicit tokens
104         tokens: Vec<Option<Acquired>>,
105     }
106
107     impl ConcurrencyLimiterState {
108         pub(super) fn new(pending_jobs: usize) -> Self {
109             ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
110         }
111
112         pub(super) fn assert_invariants(&self) {
113             // There must be no excess active jobs
114             assert!(self.active_jobs <= self.pending_jobs);
115
116             // There may not be more active jobs than there are tokens
117             assert!(self.active_jobs <= self.tokens.len());
118         }
119
120         pub(super) fn assert_done(&self) {
121             assert_eq!(self.pending_jobs, 0);
122             assert_eq!(self.active_jobs, 0);
123         }
124
125         pub(super) fn add_new_token(&mut self, token: Acquired) {
126             self.tokens.push(Some(token));
127             self.drop_excess_capacity();
128         }
129
130         pub(super) fn try_start_job(&mut self) -> bool {
131             if self.active_jobs < self.tokens.len() {
132                 // Using existing token
133                 self.job_started();
134                 return true;
135             }
136
137             false
138         }
139
140         pub(super) fn job_started(&mut self) {
141             self.assert_invariants();
142             self.active_jobs += 1;
143             self.drop_excess_capacity();
144             self.assert_invariants();
145         }
146
147         pub(super) fn job_finished(&mut self) {
148             self.assert_invariants();
149             self.pending_jobs -= 1;
150             self.active_jobs -= 1;
151             self.assert_invariants();
152             self.drop_excess_capacity();
153             self.assert_invariants();
154         }
155
156         pub(super) fn job_already_done(&mut self) {
157             self.assert_invariants();
158             self.pending_jobs -= 1;
159             self.assert_invariants();
160             self.drop_excess_capacity();
161             self.assert_invariants();
162         }
163
164         fn drop_excess_capacity(&mut self) {
165             self.assert_invariants();
166
167             // Drop all tokens that can never be used anymore
168             self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
169
170             // Keep some excess tokens to satisfy requests faster
171             const MAX_EXTRA_CAPACITY: usize = 2;
172             self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
173
174             self.assert_invariants();
175         }
176     }
177 }