]> git.lizzy.rs Git - rust.git/blob - compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
Auto merge of #100640 - reitermarkus:socket-display-buffer, r=thomcc
[rust.git] / compiler / rustc_codegen_cranelift / src / concurrency_limiter.rs
1 use std::sync::{Arc, Condvar, Mutex};
2
3 use rustc_session::Session;
4
5 use jobserver::HelperThread;
6
7 // FIXME don't panic when a worker thread panics
8
9 pub(super) struct ConcurrencyLimiter {
10     helper_thread: Option<HelperThread>,
11     state: Arc<Mutex<state::ConcurrencyLimiterState>>,
12     available_token_condvar: Arc<Condvar>,
13 }
14
15 impl ConcurrencyLimiter {
16     pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
17         let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
18         let available_token_condvar = Arc::new(Condvar::new());
19
20         let state_helper = state.clone();
21         let available_token_condvar_helper = available_token_condvar.clone();
22         let helper_thread = sess
23             .jobserver
24             .clone()
25             .into_helper_thread(move |token| {
26                 let mut state = state_helper.lock().unwrap();
27                 state.add_new_token(token.unwrap());
28                 available_token_condvar_helper.notify_one();
29             })
30             .unwrap();
31         ConcurrencyLimiter {
32             helper_thread: Some(helper_thread),
33             state,
34             available_token_condvar: Arc::new(Condvar::new()),
35         }
36     }
37
38     pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
39         let mut state = self.state.lock().unwrap();
40         loop {
41             state.assert_invariants();
42
43             if state.try_start_job() {
44                 return ConcurrencyLimiterToken {
45                     state: self.state.clone(),
46                     available_token_condvar: self.available_token_condvar.clone(),
47                 };
48             }
49
50             self.helper_thread.as_mut().unwrap().request_token();
51             state = self.available_token_condvar.wait(state).unwrap();
52         }
53     }
54
55     pub(super) fn job_already_done(&mut self) {
56         let mut state = self.state.lock().unwrap();
57         state.job_already_done();
58     }
59 }
60
61 impl Drop for ConcurrencyLimiter {
62     fn drop(&mut self) {
63         //
64         self.helper_thread.take();
65
66         // Assert that all jobs have finished
67         let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
68         state.assert_done();
69     }
70 }
71
72 #[derive(Debug)]
73 pub(super) struct ConcurrencyLimiterToken {
74     state: Arc<Mutex<state::ConcurrencyLimiterState>>,
75     available_token_condvar: Arc<Condvar>,
76 }
77
78 impl Drop for ConcurrencyLimiterToken {
79     fn drop(&mut self) {
80         let mut state = self.state.lock().unwrap();
81         state.job_finished();
82         self.available_token_condvar.notify_one();
83     }
84 }
85
86 mod state {
87     use jobserver::Acquired;
88
89     #[derive(Debug)]
90     pub(super) struct ConcurrencyLimiterState {
91         pending_jobs: usize,
92         active_jobs: usize,
93
94         // None is used to represent the implicit token, Some to represent explicit tokens
95         tokens: Vec<Option<Acquired>>,
96     }
97
98     impl ConcurrencyLimiterState {
99         pub(super) fn new(pending_jobs: usize) -> Self {
100             ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
101         }
102
103         pub(super) fn assert_invariants(&self) {
104             // There must be no excess active jobs
105             assert!(self.active_jobs <= self.pending_jobs);
106
107             // There may not be more active jobs than there are tokens
108             assert!(self.active_jobs <= self.tokens.len());
109         }
110
111         pub(super) fn assert_done(&self) {
112             assert_eq!(self.pending_jobs, 0);
113             assert_eq!(self.active_jobs, 0);
114         }
115
116         pub(super) fn add_new_token(&mut self, token: Acquired) {
117             self.tokens.push(Some(token));
118             self.drop_excess_capacity();
119         }
120
121         pub(super) fn try_start_job(&mut self) -> bool {
122             if self.active_jobs < self.tokens.len() {
123                 // Using existing token
124                 self.job_started();
125                 return true;
126             }
127
128             false
129         }
130
131         pub(super) fn job_started(&mut self) {
132             self.assert_invariants();
133             self.active_jobs += 1;
134             self.drop_excess_capacity();
135             self.assert_invariants();
136         }
137
138         pub(super) fn job_finished(&mut self) {
139             self.assert_invariants();
140             self.pending_jobs -= 1;
141             self.active_jobs -= 1;
142             self.assert_invariants();
143             self.drop_excess_capacity();
144             self.assert_invariants();
145         }
146
147         pub(super) fn job_already_done(&mut self) {
148             self.assert_invariants();
149             self.pending_jobs -= 1;
150             self.assert_invariants();
151             self.drop_excess_capacity();
152             self.assert_invariants();
153         }
154
155         fn drop_excess_capacity(&mut self) {
156             self.assert_invariants();
157
158             // Drop all tokens that can never be used anymore
159             self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
160
161             // Keep some excess tokens to satisfy requests faster
162             const MAX_EXTRA_CAPACITY: usize = 2;
163             self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
164
165             self.assert_invariants();
166         }
167     }
168 }