]> git.lizzy.rs Git - rust.git/blob - src/librustc/ty/query/job.rs
Rollup merge of #67102 - Aaron1011:patch-3, r=Mark-Simulacrum
[rust.git] / src / librustc / ty / query / job.rs
1 use crate::ty::context::TyCtxt;
2 use crate::ty::query::plumbing::CycleError;
3 use crate::ty::query::Query;
4 use crate::ty::tls;
5
6 use rustc_data_structures::sync::Lrc;
7 use syntax_pos::Span;
8
9 #[cfg(not(parallel_compiler))]
10 use std::ptr;
11
12 #[cfg(parallel_compiler)]
13 use {
14     parking_lot::{Mutex, Condvar},
15     rustc_data_structures::{jobserver, OnDrop},
16     rustc_data_structures::fx::FxHashSet,
17     rustc_data_structures::stable_hasher::{StableHasher, HashStable},
18     rustc_data_structures::sync::Lock,
19     rustc_rayon_core as rayon_core,
20     syntax_pos::DUMMY_SP,
21     std::{mem, process, thread},
22     std::iter::FromIterator,
23 };
24
25 /// Represents a span and a query key.
26 #[derive(Clone, Debug)]
27 pub struct QueryInfo<'tcx> {
28     /// The span corresponding to the reason for which this query was required.
29     pub span: Span,
30     pub query: Query<'tcx>,
31 }
32
33 /// Representss an object representing an active query job.
34 pub struct QueryJob<'tcx> {
35     pub info: QueryInfo<'tcx>,
36
37     /// The parent query job which created this job and is implicitly waiting on it.
38     pub parent: Option<Lrc<QueryJob<'tcx>>>,
39
40     /// The latch that is used to wait on this job.
41     #[cfg(parallel_compiler)]
42     latch: QueryLatch<'tcx>,
43 }
44
45 impl<'tcx> QueryJob<'tcx> {
46     /// Creates a new query job.
47     pub fn new(info: QueryInfo<'tcx>, parent: Option<Lrc<QueryJob<'tcx>>>) -> Self {
48         QueryJob {
49             info,
50             parent,
51             #[cfg(parallel_compiler)]
52             latch: QueryLatch::new(),
53         }
54     }
55
56     /// Awaits for the query job to complete.
57     #[cfg(parallel_compiler)]
58     pub(super) fn r#await(
59         &self,
60         tcx: TyCtxt<'tcx>,
61         span: Span,
62     ) -> Result<(), CycleError<'tcx>> {
63         tls::with_related_context(tcx, move |icx| {
64             let waiter = Lrc::new(QueryWaiter {
65                 query: icx.query.clone(),
66                 span,
67                 cycle: Lock::new(None),
68                 condvar: Condvar::new(),
69             });
70             self.latch.r#await(&waiter);
71             // FIXME: Get rid of this lock. We have ownership of the QueryWaiter
72             // although another thread may still have a Lrc reference so we cannot
73             // use Lrc::get_mut
74             let mut cycle = waiter.cycle.lock();
75             match cycle.take() {
76                 None => Ok(()),
77                 Some(cycle) => Err(cycle)
78             }
79         })
80     }
81
82     #[cfg(not(parallel_compiler))]
83     pub(super) fn find_cycle_in_stack(&self, tcx: TyCtxt<'tcx>, span: Span) -> CycleError<'tcx> {
84         // Get the current executing query (waiter) and find the waitee amongst its parents
85         let mut current_job = tls::with_related_context(tcx, |icx| icx.query.clone());
86         let mut cycle = Vec::new();
87
88         while let Some(job) = current_job {
89             cycle.push(job.info.clone());
90
91             if ptr::eq(&*job, self) {
92                 cycle.reverse();
93
94                 // This is the end of the cycle
95                 // The span entry we included was for the usage
96                 // of the cycle itself, and not part of the cycle
97                 // Replace it with the span which caused the cycle to form
98                 cycle[0].span = span;
99                 // Find out why the cycle itself was used
100                 let usage = job.parent.as_ref().map(|parent| {
101                     (job.info.span, parent.info.query.clone())
102                 });
103                 return CycleError { usage, cycle };
104             }
105
106             current_job = job.parent.clone();
107         }
108
109         panic!("did not find a cycle")
110     }
111
112     /// Signals to waiters that the query is complete.
113     ///
114     /// This does nothing for single threaded rustc,
115     /// as there are no concurrent jobs which could be waiting on us
116     pub fn signal_complete(&self) {
117         #[cfg(parallel_compiler)]
118         self.latch.set();
119     }
120
121     #[cfg(parallel_compiler)]
122     fn as_ptr(&self) -> *const QueryJob<'tcx> {
123         self as *const _
124     }
125 }
126
127 #[cfg(parallel_compiler)]
128 struct QueryWaiter<'tcx> {
129     query: Option<Lrc<QueryJob<'tcx>>>,
130     condvar: Condvar,
131     span: Span,
132     cycle: Lock<Option<CycleError<'tcx>>>,
133 }
134
135 #[cfg(parallel_compiler)]
136 impl<'tcx> QueryWaiter<'tcx> {
137     fn notify(&self, registry: &rayon_core::Registry) {
138         rayon_core::mark_unblocked(registry);
139         self.condvar.notify_one();
140     }
141 }
142
143 #[cfg(parallel_compiler)]
144 struct QueryLatchInfo<'tcx> {
145     complete: bool,
146     waiters: Vec<Lrc<QueryWaiter<'tcx>>>,
147 }
148
149 #[cfg(parallel_compiler)]
150 struct QueryLatch<'tcx> {
151     info: Mutex<QueryLatchInfo<'tcx>>,
152 }
153
154 #[cfg(parallel_compiler)]
155 impl<'tcx> QueryLatch<'tcx> {
156     fn new() -> Self {
157         QueryLatch {
158             info: Mutex::new(QueryLatchInfo {
159                 complete: false,
160                 waiters: Vec::new(),
161             }),
162         }
163     }
164
165     /// Awaits the caller on this latch by blocking the current thread.
166     fn r#await(&self, waiter: &Lrc<QueryWaiter<'tcx>>) {
167         let mut info = self.info.lock();
168         if !info.complete {
169             // We push the waiter on to the `waiters` list. It can be accessed inside
170             // the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
171             // Both of these will remove it from the `waiters` list before resuming
172             // this thread.
173             info.waiters.push(waiter.clone());
174
175             // If this detects a deadlock and the deadlock handler wants to resume this thread
176             // we have to be in the `wait` call. This is ensured by the deadlock handler
177             // getting the self.info lock.
178             rayon_core::mark_blocked();
179             jobserver::release_thread();
180             waiter.condvar.wait(&mut info);
181             // Release the lock before we potentially block in `acquire_thread`
182             mem::drop(info);
183             jobserver::acquire_thread();
184         }
185     }
186
187     /// Sets the latch and resumes all waiters on it
188     fn set(&self) {
189         let mut info = self.info.lock();
190         debug_assert!(!info.complete);
191         info.complete = true;
192         let registry = rayon_core::Registry::current();
193         for waiter in info.waiters.drain(..) {
194             waiter.notify(&registry);
195         }
196     }
197
198     /// Removes a single waiter from the list of waiters.
199     /// This is used to break query cycles.
200     fn extract_waiter(
201         &self,
202         waiter: usize,
203     ) -> Lrc<QueryWaiter<'tcx>> {
204         let mut info = self.info.lock();
205         debug_assert!(!info.complete);
206         // Remove the waiter from the list of waiters
207         info.waiters.remove(waiter)
208     }
209 }
210
211 /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
212 #[cfg(parallel_compiler)]
213 type Waiter<'tcx> = (Lrc<QueryJob<'tcx>>, usize);
214
215 /// Visits all the non-resumable and resumable waiters of a query.
216 /// Only waiters in a query are visited.
217 /// `visit` is called for every waiter and is passed a query waiting on `query_ref`
218 /// and a span indicating the reason the query waited on `query_ref`.
219 /// If `visit` returns Some, this function returns.
220 /// For visits of non-resumable waiters it returns the return value of `visit`.
221 /// For visits of resumable waiters it returns Some(Some(Waiter)) which has the
222 /// required information to resume the waiter.
223 /// If all `visit` calls returns None, this function also returns None.
224 #[cfg(parallel_compiler)]
225 fn visit_waiters<'tcx, F>(query: Lrc<QueryJob<'tcx>>, mut visit: F) -> Option<Option<Waiter<'tcx>>>
226 where
227     F: FnMut(Span, Lrc<QueryJob<'tcx>>) -> Option<Option<Waiter<'tcx>>>
228 {
229     // Visit the parent query which is a non-resumable waiter since it's on the same stack
230     if let Some(ref parent) = query.parent {
231         if let Some(cycle) = visit(query.info.span, parent.clone()) {
232             return Some(cycle);
233         }
234     }
235
236     // Visit the explicit waiters which use condvars and are resumable
237     for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() {
238         if let Some(ref waiter_query) = waiter.query {
239             if visit(waiter.span, waiter_query.clone()).is_some() {
240                 // Return a value which indicates that this waiter can be resumed
241                 return Some(Some((query.clone(), i)));
242             }
243         }
244     }
245     None
246 }
247
248 /// Look for query cycles by doing a depth first search starting at `query`.
249 /// `span` is the reason for the `query` to execute. This is initially DUMMY_SP.
250 /// If a cycle is detected, this initial value is replaced with the span causing
251 /// the cycle.
252 #[cfg(parallel_compiler)]
253 fn cycle_check<'tcx>(query: Lrc<QueryJob<'tcx>>,
254                      span: Span,
255                      stack: &mut Vec<(Span, Lrc<QueryJob<'tcx>>)>,
256                      visited: &mut FxHashSet<*const QueryJob<'tcx>>
257 ) -> Option<Option<Waiter<'tcx>>> {
258     if !visited.insert(query.as_ptr()) {
259         return if let Some(p) = stack.iter().position(|q| q.1.as_ptr() == query.as_ptr()) {
260             // We detected a query cycle, fix up the initial span and return Some
261
262             // Remove previous stack entries
263             stack.drain(0..p);
264             // Replace the span for the first query with the cycle cause
265             stack[0].0 = span;
266             Some(None)
267         } else {
268             None
269         }
270     }
271
272     // Query marked as visited is added it to the stack
273     stack.push((span, query.clone()));
274
275     // Visit all the waiters
276     let r = visit_waiters(query, |span, successor| {
277         cycle_check(successor, span, stack, visited)
278     });
279
280     // Remove the entry in our stack if we didn't find a cycle
281     if r.is_none() {
282         stack.pop();
283     }
284
285     r
286 }
287
288 /// Finds out if there's a path to the compiler root (aka. code which isn't in a query)
289 /// from `query` without going through any of the queries in `visited`.
290 /// This is achieved with a depth first search.
291 #[cfg(parallel_compiler)]
292 fn connected_to_root<'tcx>(
293     query: Lrc<QueryJob<'tcx>>,
294     visited: &mut FxHashSet<*const QueryJob<'tcx>>
295 ) -> bool {
296     // We already visited this or we're deliberately ignoring it
297     if !visited.insert(query.as_ptr()) {
298         return false;
299     }
300
301     // This query is connected to the root (it has no query parent), return true
302     if query.parent.is_none() {
303         return true;
304     }
305
306     visit_waiters(query, |_, successor| connected_to_root(successor, visited).then_some(None))
307         .is_some()
308 }
309
310 // Deterministically pick an query from a list
311 #[cfg(parallel_compiler)]
312 fn pick_query<'a, 'tcx, T, F: Fn(&T) -> (Span, Lrc<QueryJob<'tcx>>)>(
313     tcx: TyCtxt<'tcx>,
314     queries: &'a [T],
315     f: F,
316 ) -> &'a T {
317     // Deterministically pick an entry point
318     // FIXME: Sort this instead
319     let mut hcx = tcx.create_stable_hashing_context();
320     queries.iter().min_by_key(|v| {
321         let (span, query) = f(v);
322         let mut stable_hasher = StableHasher::new();
323         query.info.query.hash_stable(&mut hcx, &mut stable_hasher);
324         // Prefer entry points which have valid spans for nicer error messages
325         // We add an integer to the tuple ensuring that entry points
326         // with valid spans are picked first
327         let span_cmp = if span == DUMMY_SP { 1 } else { 0 };
328         (span_cmp, stable_hasher.finish::<u64>())
329     }).unwrap()
330 }
331
332 /// Looks for query cycles starting from the last query in `jobs`.
333 /// If a cycle is found, all queries in the cycle is removed from `jobs` and
334 /// the function return true.
335 /// If a cycle was not found, the starting query is removed from `jobs` and
336 /// the function returns false.
337 #[cfg(parallel_compiler)]
338 fn remove_cycle<'tcx>(
339     jobs: &mut Vec<Lrc<QueryJob<'tcx>>>,
340     wakelist: &mut Vec<Lrc<QueryWaiter<'tcx>>>,
341     tcx: TyCtxt<'tcx>,
342 ) -> bool {
343     let mut visited = FxHashSet::default();
344     let mut stack = Vec::new();
345     // Look for a cycle starting with the last query in `jobs`
346     if let Some(waiter) = cycle_check(jobs.pop().unwrap(),
347                                       DUMMY_SP,
348                                       &mut stack,
349                                       &mut visited) {
350         // The stack is a vector of pairs of spans and queries; reverse it so that
351         // the earlier entries require later entries
352         let (mut spans, queries): (Vec<_>, Vec<_>) = stack.into_iter().rev().unzip();
353
354         // Shift the spans so that queries are matched with the span for their waitee
355         spans.rotate_right(1);
356
357         // Zip them back together
358         let mut stack: Vec<_> = spans.into_iter().zip(queries).collect();
359
360         // Remove the queries in our cycle from the list of jobs to look at
361         for r in &stack {
362             if let Some(pos) = jobs.iter().position(|j| j.as_ptr() == r.1.as_ptr()) {
363                 jobs.remove(pos);
364             }
365         }
366
367         // Find the queries in the cycle which are
368         // connected to queries outside the cycle
369         let entry_points = stack.iter().filter_map(|(span, query)| {
370             if query.parent.is_none() {
371                 // This query is connected to the root (it has no query parent)
372                 Some((*span, query.clone(), None))
373             } else {
374                 let mut waiters = Vec::new();
375                 // Find all the direct waiters who lead to the root
376                 visit_waiters(query.clone(), |span, waiter| {
377                     // Mark all the other queries in the cycle as already visited
378                     let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1.as_ptr()));
379
380                     if connected_to_root(waiter.clone(), &mut visited) {
381                         waiters.push((span, waiter));
382                     }
383
384                     None
385                 });
386                 if waiters.is_empty() {
387                     None
388                 } else {
389                     // Deterministically pick one of the waiters to show to the user
390                     let waiter = pick_query(tcx, &waiters, |s| s.clone()).clone();
391                     Some((*span, query.clone(), Some(waiter)))
392                 }
393             }
394         }).collect::<Vec<(Span, Lrc<QueryJob<'tcx>>, Option<(Span, Lrc<QueryJob<'tcx>>)>)>>();
395
396         // Deterministically pick an entry point
397         let (_, entry_point, usage) = pick_query(tcx, &entry_points, |e| (e.0, e.1.clone()));
398
399         // Shift the stack so that our entry point is first
400         let entry_point_pos = stack.iter().position(|(_, query)| {
401             query.as_ptr() == entry_point.as_ptr()
402         });
403         if let Some(pos) = entry_point_pos {
404             stack.rotate_left(pos);
405         }
406
407         let usage = usage.as_ref().map(|(span, query)| (*span, query.info.query.clone()));
408
409         // Create the cycle error
410         let error = CycleError {
411             usage,
412             cycle: stack.iter().map(|&(s, ref q)| QueryInfo {
413                 span: s,
414                 query: q.info.query.clone(),
415             } ).collect(),
416         };
417
418         // We unwrap `waiter` here since there must always be one
419         // edge which is resumeable / waited using a query latch
420         let (waitee_query, waiter_idx) = waiter.unwrap();
421
422         // Extract the waiter we want to resume
423         let waiter = waitee_query.latch.extract_waiter(waiter_idx);
424
425         // Set the cycle error so it will be picked up when resumed
426         *waiter.cycle.lock() = Some(error);
427
428         // Put the waiter on the list of things to resume
429         wakelist.push(waiter);
430
431         true
432     } else {
433         false
434     }
435 }
436
437 /// Creates a new thread and forwards information in thread locals to it.
438 /// The new thread runs the deadlock handler.
439 /// Must only be called when a deadlock is about to happen.
440 #[cfg(parallel_compiler)]
441 pub unsafe fn handle_deadlock() {
442     let registry = rayon_core::Registry::current();
443
444     let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| {
445         gcx_ptr as *const _
446     });
447     let gcx_ptr = &*gcx_ptr;
448
449     let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| {
450         syntax_pos_globals as *const _
451     });
452     let syntax_pos_globals = &*syntax_pos_globals;
453     thread::spawn(move || {
454         tls::GCX_PTR.set(gcx_ptr, || {
455             syntax_pos::GLOBALS.set(syntax_pos_globals, || {
456                 syntax_pos::GLOBALS.set(syntax_pos_globals, || {
457                     tls::with_thread_locals(|| {
458                         tls::with_global(|tcx| deadlock(tcx, &registry))
459                     })
460                 })
461             })
462         })
463     });
464 }
465
466 /// Detects query cycles by using depth first search over all active query jobs.
467 /// If a query cycle is found it will break the cycle by finding an edge which
468 /// uses a query latch and then resuming that waiter.
469 /// There may be multiple cycles involved in a deadlock, so this searches
470 /// all active queries for cycles before finally resuming all the waiters at once.
471 #[cfg(parallel_compiler)]
472 fn deadlock(tcx: TyCtxt<'_>, registry: &rayon_core::Registry) {
473     let on_panic = OnDrop(|| {
474         eprintln!("deadlock handler panicked, aborting process");
475         process::abort();
476     });
477
478     let mut wakelist = Vec::new();
479     let mut jobs: Vec<_> = tcx.queries.collect_active_jobs();
480
481     let mut found_cycle = false;
482
483     while jobs.len() > 0 {
484         if remove_cycle(&mut jobs, &mut wakelist, tcx) {
485             found_cycle = true;
486         }
487     }
488
489     // Check that a cycle was found. It is possible for a deadlock to occur without
490     // a query cycle if a query which can be waited on uses Rayon to do multithreading
491     // internally. Such a query (X) may be executing on 2 threads (A and B) and A may
492     // wait using Rayon on B. Rayon may then switch to executing another query (Y)
493     // which in turn will wait on X causing a deadlock. We have a false dependency from
494     // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here
495     // only considers the true dependency and won't detect a cycle.
496     assert!(found_cycle);
497
498     // FIXME: Ensure this won't cause a deadlock before we return
499     for waiter in wakelist.into_iter() {
500         waiter.notify(registry);
501     }
502
503     on_panic.disable();
504 }