]> git.lizzy.rs Git - rust.git/blob - src/librustc_query_system/query/job.rs
Move QueryContext to the parent module.
[rust.git] / src / librustc_query_system / query / job.rs
1 use crate::dep_graph::{DepContext, DepKind};
2 use crate::query::plumbing::CycleError;
3 use crate::query::QueryContext;
4
5 use rustc_data_structures::fx::FxHashMap;
6 use rustc_span::Span;
7
8 use std::convert::TryFrom;
9 use std::marker::PhantomData;
10 use std::num::NonZeroU32;
11
12 #[cfg(parallel_compiler)]
13 use {
14     parking_lot::{Condvar, Mutex},
15     rustc_data_structures::fx::FxHashSet,
16     rustc_data_structures::stable_hasher::{HashStable, StableHasher},
17     rustc_data_structures::sync::Lock,
18     rustc_data_structures::sync::Lrc,
19     rustc_data_structures::{jobserver, OnDrop},
20     rustc_rayon_core as rayon_core,
21     rustc_span::DUMMY_SP,
22     std::iter::FromIterator,
23     std::{mem, process},
24 };
25
26 /// Represents a span and a query key.
27 #[derive(Clone, Debug)]
28 pub struct QueryInfo<Q> {
29     /// The span corresponding to the reason for which this query was required.
30     pub span: Span,
31     pub query: Q,
32 }
33
34 type QueryMap<CTX> = FxHashMap<QueryJobId<<CTX as DepContext>::DepKind>, QueryJobInfo<CTX>>;
35
36 /// A value uniquely identifiying an active query job within a shard in the query cache.
37 #[derive(Copy, Clone, Eq, PartialEq, Hash)]
38 pub struct QueryShardJobId(pub NonZeroU32);
39
40 /// A value uniquely identifiying an active query job.
41 #[derive(Copy, Clone, Eq, PartialEq, Hash)]
42 pub struct QueryJobId<K> {
43     /// Which job within a shard is this
44     pub job: QueryShardJobId,
45
46     /// In which shard is this job
47     pub shard: u16,
48
49     /// What kind of query this job is
50     pub kind: K,
51 }
52
53 impl<K: DepKind> QueryJobId<K> {
54     pub fn new(job: QueryShardJobId, shard: usize, kind: K) -> Self {
55         QueryJobId { job, shard: u16::try_from(shard).unwrap(), kind }
56     }
57
58     fn query<CTX: QueryContext<DepKind = K>>(self, map: &QueryMap<CTX>) -> CTX::Query {
59         map.get(&self).unwrap().info.query.clone()
60     }
61
62     #[cfg(parallel_compiler)]
63     fn span<CTX: QueryContext<DepKind = K>>(self, map: &QueryMap<CTX>) -> Span {
64         map.get(&self).unwrap().job.span
65     }
66
67     #[cfg(parallel_compiler)]
68     fn parent<CTX: QueryContext<DepKind = K>>(self, map: &QueryMap<CTX>) -> Option<QueryJobId<K>> {
69         map.get(&self).unwrap().job.parent
70     }
71
72     #[cfg(parallel_compiler)]
73     fn latch<'a, CTX: QueryContext<DepKind = K>>(
74         self,
75         map: &'a QueryMap<CTX>,
76     ) -> Option<&'a QueryLatch<CTX>> {
77         map.get(&self).unwrap().job.latch.as_ref()
78     }
79 }
80
81 pub struct QueryJobInfo<CTX: QueryContext> {
82     pub info: QueryInfo<CTX::Query>,
83     pub job: QueryJob<CTX>,
84 }
85
86 /// Represents an active query job.
87 #[derive(Clone)]
88 pub struct QueryJob<CTX: QueryContext> {
89     pub id: QueryShardJobId,
90
91     /// The span corresponding to the reason for which this query was required.
92     pub span: Span,
93
94     /// The parent query job which created this job and is implicitly waiting on it.
95     pub parent: Option<QueryJobId<CTX::DepKind>>,
96
97     /// The latch that is used to wait on this job.
98     #[cfg(parallel_compiler)]
99     latch: Option<QueryLatch<CTX>>,
100
101     dummy: PhantomData<QueryLatch<CTX>>,
102 }
103
104 impl<CTX: QueryContext> QueryJob<CTX> {
105     /// Creates a new query job.
106     pub fn new(id: QueryShardJobId, span: Span, parent: Option<QueryJobId<CTX::DepKind>>) -> Self {
107         QueryJob {
108             id,
109             span,
110             parent,
111             #[cfg(parallel_compiler)]
112             latch: None,
113             dummy: PhantomData,
114         }
115     }
116
117     #[cfg(parallel_compiler)]
118     pub(super) fn latch(&mut self, _id: QueryJobId<CTX::DepKind>) -> QueryLatch<CTX> {
119         if self.latch.is_none() {
120             self.latch = Some(QueryLatch::new());
121         }
122         self.latch.as_ref().unwrap().clone()
123     }
124
125     #[cfg(not(parallel_compiler))]
126     pub(super) fn latch(&mut self, id: QueryJobId<CTX::DepKind>) -> QueryLatch<CTX> {
127         QueryLatch { id, dummy: PhantomData }
128     }
129
130     /// Signals to waiters that the query is complete.
131     ///
132     /// This does nothing for single threaded rustc,
133     /// as there are no concurrent jobs which could be waiting on us
134     pub fn signal_complete(self) {
135         #[cfg(parallel_compiler)]
136         self.latch.map(|latch| latch.set());
137     }
138 }
139
140 #[cfg(not(parallel_compiler))]
141 #[derive(Clone)]
142 pub(super) struct QueryLatch<CTX: QueryContext> {
143     id: QueryJobId<CTX::DepKind>,
144     dummy: PhantomData<CTX>,
145 }
146
147 #[cfg(not(parallel_compiler))]
148 impl<CTX: QueryContext> QueryLatch<CTX> {
149     pub(super) fn find_cycle_in_stack(&self, tcx: CTX, span: Span) -> CycleError<CTX::Query> {
150         let query_map = tcx.try_collect_active_jobs().unwrap();
151
152         // Get the current executing query (waiter) and find the waitee amongst its parents
153         let mut current_job = tcx.current_query_job();
154         let mut cycle = Vec::new();
155
156         while let Some(job) = current_job {
157             let info = query_map.get(&job).unwrap();
158             cycle.push(info.info.clone());
159
160             if job == self.id {
161                 cycle.reverse();
162
163                 // This is the end of the cycle
164                 // The span entry we included was for the usage
165                 // of the cycle itself, and not part of the cycle
166                 // Replace it with the span which caused the cycle to form
167                 cycle[0].span = span;
168                 // Find out why the cycle itself was used
169                 let usage = info
170                     .job
171                     .parent
172                     .as_ref()
173                     .map(|parent| (info.info.span, parent.query(&query_map)));
174                 return CycleError { usage, cycle };
175             }
176
177             current_job = info.job.parent;
178         }
179
180         panic!("did not find a cycle")
181     }
182 }
183
184 #[cfg(parallel_compiler)]
185 struct QueryWaiter<CTX: QueryContext> {
186     query: Option<QueryJobId<CTX::DepKind>>,
187     condvar: Condvar,
188     span: Span,
189     cycle: Lock<Option<CycleError<CTX::Query>>>,
190 }
191
192 #[cfg(parallel_compiler)]
193 impl<CTX: QueryContext> QueryWaiter<CTX> {
194     fn notify(&self, registry: &rayon_core::Registry) {
195         rayon_core::mark_unblocked(registry);
196         self.condvar.notify_one();
197     }
198 }
199
200 #[cfg(parallel_compiler)]
201 struct QueryLatchInfo<CTX: QueryContext> {
202     complete: bool,
203     waiters: Vec<Lrc<QueryWaiter<CTX>>>,
204 }
205
206 #[cfg(parallel_compiler)]
207 #[derive(Clone)]
208 pub(super) struct QueryLatch<CTX: QueryContext> {
209     info: Lrc<Mutex<QueryLatchInfo<CTX>>>,
210 }
211
212 #[cfg(parallel_compiler)]
213 impl<CTX: QueryContext> QueryLatch<CTX> {
214     fn new() -> Self {
215         QueryLatch {
216             info: Lrc::new(Mutex::new(QueryLatchInfo { complete: false, waiters: Vec::new() })),
217         }
218     }
219 }
220
221 #[cfg(parallel_compiler)]
222 impl<CTX: QueryContext> QueryLatch<CTX> {
223     /// Awaits for the query job to complete.
224     pub(super) fn wait_on(&self, tcx: CTX, span: Span) -> Result<(), CycleError<CTX::Query>> {
225         let query = tcx.current_query_job();
226         let waiter =
227             Lrc::new(QueryWaiter { query, span, cycle: Lock::new(None), condvar: Condvar::new() });
228         self.wait_on_inner(&waiter);
229         // FIXME: Get rid of this lock. We have ownership of the QueryWaiter
230         // although another thread may still have a Lrc reference so we cannot
231         // use Lrc::get_mut
232         let mut cycle = waiter.cycle.lock();
233         match cycle.take() {
234             None => Ok(()),
235             Some(cycle) => Err(cycle),
236         }
237     }
238 }
239
240 #[cfg(parallel_compiler)]
241 impl<CTX: QueryContext> QueryLatch<CTX> {
242     /// Awaits the caller on this latch by blocking the current thread.
243     fn wait_on_inner(&self, waiter: &Lrc<QueryWaiter<CTX>>) {
244         let mut info = self.info.lock();
245         if !info.complete {
246             // We push the waiter on to the `waiters` list. It can be accessed inside
247             // the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
248             // Both of these will remove it from the `waiters` list before resuming
249             // this thread.
250             info.waiters.push(waiter.clone());
251
252             // If this detects a deadlock and the deadlock handler wants to resume this thread
253             // we have to be in the `wait` call. This is ensured by the deadlock handler
254             // getting the self.info lock.
255             rayon_core::mark_blocked();
256             jobserver::release_thread();
257             waiter.condvar.wait(&mut info);
258             // Release the lock before we potentially block in `acquire_thread`
259             mem::drop(info);
260             jobserver::acquire_thread();
261         }
262     }
263
264     /// Sets the latch and resumes all waiters on it
265     fn set(&self) {
266         let mut info = self.info.lock();
267         debug_assert!(!info.complete);
268         info.complete = true;
269         let registry = rayon_core::Registry::current();
270         for waiter in info.waiters.drain(..) {
271             waiter.notify(&registry);
272         }
273     }
274
275     /// Removes a single waiter from the list of waiters.
276     /// This is used to break query cycles.
277     fn extract_waiter(&self, waiter: usize) -> Lrc<QueryWaiter<CTX>> {
278         let mut info = self.info.lock();
279         debug_assert!(!info.complete);
280         // Remove the waiter from the list of waiters
281         info.waiters.remove(waiter)
282     }
283 }
284
285 /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
286 #[cfg(parallel_compiler)]
287 type Waiter<K> = (QueryJobId<K>, usize);
288
289 /// Visits all the non-resumable and resumable waiters of a query.
290 /// Only waiters in a query are visited.
291 /// `visit` is called for every waiter and is passed a query waiting on `query_ref`
292 /// and a span indicating the reason the query waited on `query_ref`.
293 /// If `visit` returns Some, this function returns.
294 /// For visits of non-resumable waiters it returns the return value of `visit`.
295 /// For visits of resumable waiters it returns Some(Some(Waiter)) which has the
296 /// required information to resume the waiter.
297 /// If all `visit` calls returns None, this function also returns None.
298 #[cfg(parallel_compiler)]
299 fn visit_waiters<CTX: QueryContext, F>(
300     query_map: &QueryMap<CTX>,
301     query: QueryJobId<CTX::DepKind>,
302     mut visit: F,
303 ) -> Option<Option<Waiter<CTX::DepKind>>>
304 where
305     F: FnMut(Span, QueryJobId<CTX::DepKind>) -> Option<Option<Waiter<CTX::DepKind>>>,
306 {
307     // Visit the parent query which is a non-resumable waiter since it's on the same stack
308     if let Some(parent) = query.parent(query_map) {
309         if let Some(cycle) = visit(query.span(query_map), parent) {
310             return Some(cycle);
311         }
312     }
313
314     // Visit the explicit waiters which use condvars and are resumable
315     if let Some(latch) = query.latch(query_map) {
316         for (i, waiter) in latch.info.lock().waiters.iter().enumerate() {
317             if let Some(waiter_query) = waiter.query {
318                 if visit(waiter.span, waiter_query).is_some() {
319                     // Return a value which indicates that this waiter can be resumed
320                     return Some(Some((query, i)));
321                 }
322             }
323         }
324     }
325
326     None
327 }
328
329 /// Look for query cycles by doing a depth first search starting at `query`.
330 /// `span` is the reason for the `query` to execute. This is initially DUMMY_SP.
331 /// If a cycle is detected, this initial value is replaced with the span causing
332 /// the cycle.
333 #[cfg(parallel_compiler)]
334 fn cycle_check<CTX: QueryContext>(
335     query_map: &QueryMap<CTX>,
336     query: QueryJobId<CTX::DepKind>,
337     span: Span,
338     stack: &mut Vec<(Span, QueryJobId<CTX::DepKind>)>,
339     visited: &mut FxHashSet<QueryJobId<CTX::DepKind>>,
340 ) -> Option<Option<Waiter<CTX::DepKind>>> {
341     if !visited.insert(query) {
342         return if let Some(p) = stack.iter().position(|q| q.1 == query) {
343             // We detected a query cycle, fix up the initial span and return Some
344
345             // Remove previous stack entries
346             stack.drain(0..p);
347             // Replace the span for the first query with the cycle cause
348             stack[0].0 = span;
349             Some(None)
350         } else {
351             None
352         };
353     }
354
355     // Query marked as visited is added it to the stack
356     stack.push((span, query));
357
358     // Visit all the waiters
359     let r = visit_waiters(query_map, query, |span, successor| {
360         cycle_check(query_map, successor, span, stack, visited)
361     });
362
363     // Remove the entry in our stack if we didn't find a cycle
364     if r.is_none() {
365         stack.pop();
366     }
367
368     r
369 }
370
371 /// Finds out if there's a path to the compiler root (aka. code which isn't in a query)
372 /// from `query` without going through any of the queries in `visited`.
373 /// This is achieved with a depth first search.
374 #[cfg(parallel_compiler)]
375 fn connected_to_root<CTX: QueryContext>(
376     query_map: &QueryMap<CTX>,
377     query: QueryJobId<CTX::DepKind>,
378     visited: &mut FxHashSet<QueryJobId<CTX::DepKind>>,
379 ) -> bool {
380     // We already visited this or we're deliberately ignoring it
381     if !visited.insert(query) {
382         return false;
383     }
384
385     // This query is connected to the root (it has no query parent), return true
386     if query.parent(query_map).is_none() {
387         return true;
388     }
389
390     visit_waiters(query_map, query, |_, successor| {
391         connected_to_root(query_map, successor, visited).then_some(None)
392     })
393     .is_some()
394 }
395
396 // Deterministically pick an query from a list
397 #[cfg(parallel_compiler)]
398 fn pick_query<'a, CTX, T, F>(query_map: &QueryMap<CTX>, tcx: CTX, queries: &'a [T], f: F) -> &'a T
399 where
400     CTX: QueryContext,
401     F: Fn(&T) -> (Span, QueryJobId<CTX::DepKind>),
402 {
403     // Deterministically pick an entry point
404     // FIXME: Sort this instead
405     let mut hcx = tcx.create_stable_hashing_context();
406     queries
407         .iter()
408         .min_by_key(|v| {
409             let (span, query) = f(v);
410             let mut stable_hasher = StableHasher::new();
411             query.query(query_map).hash_stable(&mut hcx, &mut stable_hasher);
412             // Prefer entry points which have valid spans for nicer error messages
413             // We add an integer to the tuple ensuring that entry points
414             // with valid spans are picked first
415             let span_cmp = if span == DUMMY_SP { 1 } else { 0 };
416             (span_cmp, stable_hasher.finish::<u64>())
417         })
418         .unwrap()
419 }
420
421 /// Looks for query cycles starting from the last query in `jobs`.
422 /// If a cycle is found, all queries in the cycle is removed from `jobs` and
423 /// the function return true.
424 /// If a cycle was not found, the starting query is removed from `jobs` and
425 /// the function returns false.
426 #[cfg(parallel_compiler)]
427 fn remove_cycle<CTX: QueryContext>(
428     query_map: &QueryMap<CTX>,
429     jobs: &mut Vec<QueryJobId<CTX::DepKind>>,
430     wakelist: &mut Vec<Lrc<QueryWaiter<CTX>>>,
431     tcx: CTX,
432 ) -> bool {
433     let mut visited = FxHashSet::default();
434     let mut stack = Vec::new();
435     // Look for a cycle starting with the last query in `jobs`
436     if let Some(waiter) =
437         cycle_check(query_map, jobs.pop().unwrap(), DUMMY_SP, &mut stack, &mut visited)
438     {
439         // The stack is a vector of pairs of spans and queries; reverse it so that
440         // the earlier entries require later entries
441         let (mut spans, queries): (Vec<_>, Vec<_>) = stack.into_iter().rev().unzip();
442
443         // Shift the spans so that queries are matched with the span for their waitee
444         spans.rotate_right(1);
445
446         // Zip them back together
447         let mut stack: Vec<_> = spans.into_iter().zip(queries).collect();
448
449         // Remove the queries in our cycle from the list of jobs to look at
450         for r in &stack {
451             jobs.remove_item(&r.1);
452         }
453
454         // Find the queries in the cycle which are
455         // connected to queries outside the cycle
456         let entry_points = stack
457             .iter()
458             .filter_map(|&(span, query)| {
459                 if query.parent(query_map).is_none() {
460                     // This query is connected to the root (it has no query parent)
461                     Some((span, query, None))
462                 } else {
463                     let mut waiters = Vec::new();
464                     // Find all the direct waiters who lead to the root
465                     visit_waiters(query_map, query, |span, waiter| {
466                         // Mark all the other queries in the cycle as already visited
467                         let mut visited = FxHashSet::from_iter(stack.iter().map(|q| q.1));
468
469                         if connected_to_root(query_map, waiter, &mut visited) {
470                             waiters.push((span, waiter));
471                         }
472
473                         None
474                     });
475                     if waiters.is_empty() {
476                         None
477                     } else {
478                         // Deterministically pick one of the waiters to show to the user
479                         let waiter = *pick_query(query_map, tcx, &waiters, |s| *s);
480                         Some((span, query, Some(waiter)))
481                     }
482                 }
483             })
484             .collect::<Vec<(Span, QueryJobId<CTX::DepKind>, Option<(Span, QueryJobId<CTX::DepKind>)>)>>();
485
486         // Deterministically pick an entry point
487         let (_, entry_point, usage) = pick_query(query_map, tcx, &entry_points, |e| (e.0, e.1));
488
489         // Shift the stack so that our entry point is first
490         let entry_point_pos = stack.iter().position(|(_, query)| query == entry_point);
491         if let Some(pos) = entry_point_pos {
492             stack.rotate_left(pos);
493         }
494
495         let usage = usage.as_ref().map(|(span, query)| (*span, query.query(query_map)));
496
497         // Create the cycle error
498         let error = CycleError {
499             usage,
500             cycle: stack
501                 .iter()
502                 .map(|&(s, ref q)| QueryInfo { span: s, query: q.query(query_map) })
503                 .collect(),
504         };
505
506         // We unwrap `waiter` here since there must always be one
507         // edge which is resumeable / waited using a query latch
508         let (waitee_query, waiter_idx) = waiter.unwrap();
509
510         // Extract the waiter we want to resume
511         let waiter = waitee_query.latch(query_map).unwrap().extract_waiter(waiter_idx);
512
513         // Set the cycle error so it will be picked up when resumed
514         *waiter.cycle.lock() = Some(error);
515
516         // Put the waiter on the list of things to resume
517         wakelist.push(waiter);
518
519         true
520     } else {
521         false
522     }
523 }
524
525 /// Detects query cycles by using depth first search over all active query jobs.
526 /// If a query cycle is found it will break the cycle by finding an edge which
527 /// uses a query latch and then resuming that waiter.
528 /// There may be multiple cycles involved in a deadlock, so this searches
529 /// all active queries for cycles before finally resuming all the waiters at once.
530 #[cfg(parallel_compiler)]
531 pub fn deadlock<CTX: QueryContext>(tcx: CTX, registry: &rayon_core::Registry) {
532     let on_panic = OnDrop(|| {
533         eprintln!("deadlock handler panicked, aborting process");
534         process::abort();
535     });
536
537     let mut wakelist = Vec::new();
538     let query_map = tcx.try_collect_active_jobs().unwrap();
539     let mut jobs: Vec<QueryJobId<CTX::DepKind>> = query_map.keys().cloned().collect();
540
541     let mut found_cycle = false;
542
543     while jobs.len() > 0 {
544         if remove_cycle(&query_map, &mut jobs, &mut wakelist, tcx) {
545             found_cycle = true;
546         }
547     }
548
549     // Check that a cycle was found. It is possible for a deadlock to occur without
550     // a query cycle if a query which can be waited on uses Rayon to do multithreading
551     // internally. Such a query (X) may be executing on 2 threads (A and B) and A may
552     // wait using Rayon on B. Rayon may then switch to executing another query (Y)
553     // which in turn will wait on X causing a deadlock. We have a false dependency from
554     // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here
555     // only considers the true dependency and won't detect a cycle.
556     assert!(found_cycle);
557
558     // FIXME: Ensure this won't cause a deadlock before we return
559     for waiter in wakelist.into_iter() {
560         waiter.notify(registry);
561     }
562
563     on_panic.disable();
564 }