]> git.lizzy.rs Git - rust.git/blobdiff - src/librustc/ty/maps/job.rs
Make queries block and handle query cycles
[rust.git] / src / librustc / ty / maps / job.rs
index 3b6af018d6b789d839c4bb2a2f84781bc867cc9a..3fe22dba6e153769866961d85de2217cca32283d 100644 (file)
@@ -8,13 +8,31 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use rustc_data_structures::sync::{Lock, Lrc};
+#![allow(warnings)]
+
+use std::mem;
+use rustc_data_structures::sync::{Lock, LockGuard, Lrc, Weak};
+use rustc_data_structures::OnDrop;
 use syntax_pos::Span;
 use ty::tls;
 use ty::maps::Query;
 use ty::maps::plumbing::CycleError;
 use ty::context::TyCtxt;
 use errors::Diagnostic;
+use std::process;
+use std::fmt;
+use std::collections::HashSet;
+#[cfg(parallel_queries)]
+use {
+    rayon_core,
+    parking_lot::{Mutex, Condvar},
+    std::sync::atomic::Ordering,
+    std::thread,
+    std::iter,
+    std::iter::FromIterator,
+    syntax_pos::DUMMY_SP,
+    rustc_data_structures::stable_hasher::{StableHasherResult, StableHasher, HashStable},
+};
 
 /// Indicates the state of a query for a given key in a query map
 pub(super) enum QueryResult<'tcx> {
@@ -42,6 +60,9 @@ pub struct QueryJob<'tcx> {
 
     /// Diagnostic messages which are emitted while the query executes
     pub diagnostics: Lock<Vec<Diagnostic>>,
+
+    #[cfg(parallel_queries)]
+    latch: QueryLatch,
 }
 
 impl<'tcx> QueryJob<'tcx> {
@@ -51,6 +72,8 @@ pub fn new(info: QueryInfo<'tcx>, parent: Option<Lrc<QueryJob<'tcx>>>) -> Self {
             diagnostics: Lock::new(Vec::new()),
             info,
             parent,
+            #[cfg(parallel_queries)]
+            latch: QueryLatch::new(),
         }
     }
 
@@ -62,6 +85,36 @@ pub(super) fn await<'lcx>(
         &self,
         tcx: TyCtxt<'_, 'tcx, 'lcx>,
         span: Span,
+    ) -> Result<(), CycleError<'tcx>> {
+        #[cfg(not(parallel_queries))]
+        {
+            self.find_cycle_in_stack(tcx, span)
+        }
+
+        #[cfg(parallel_queries)]
+        {
+            tls::with_related_context(tcx, move |icx| {
+                let mut waiter = QueryWaiter {
+                    query: &icx.query,
+                    span,
+                    cycle: None,
+                    condvar: Condvar::new(),
+                };
+                self.latch.await(&mut waiter);
+
+                match waiter.cycle {
+                    None => Ok(()),
+                    Some(cycle) => Err(cycle)
+                }
+            })
+        }
+    }
+
+    #[cfg(not(parallel_queries))]
+    fn find_cycle_in_stack<'lcx>(
+        &self,
+        tcx: TyCtxt<'_, 'tcx, 'lcx>,
+        span: Span,
     ) -> Result<(), CycleError<'tcx>> {
         // Get the current executing query (waiter) and find the waitee amongst its parents
         let mut current_job = tls::with_related_context(tcx, |icx| icx.query.clone());
@@ -93,5 +146,315 @@ pub(super) fn await<'lcx>(
     ///
     /// This does nothing for single threaded rustc,
     /// as there are no concurrent jobs which could be waiting on us
-    pub fn signal_complete(&self) {}
+    pub fn signal_complete(&self, tcx: TyCtxt<'_, 'tcx, '_>) {
+        #[cfg(parallel_queries)]
+        self.latch.set(tcx);
+    }
+}
+
+#[cfg(parallel_queries)]
+struct QueryWaiter<'a, 'tcx: 'a> {
+    query: &'a Option<Lrc<QueryJob<'tcx>>>,
+    condvar: Condvar,
+    span: Span,
+    cycle: Option<CycleError<'tcx>>,
+}
+
+#[cfg(parallel_queries)]
+impl<'a, 'tcx> QueryWaiter<'a, 'tcx> {
+    fn notify(&self, tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
+        rayon_core::mark_unblocked(registry);
+        self.condvar.notify_one();
+    }
+}
+
+#[cfg(parallel_queries)]
+struct QueryLatchInfo {
+    complete: bool,
+    waiters: Vec<&'static mut QueryWaiter<'static, 'static>>,
+}
+
+#[cfg(parallel_queries)]
+struct QueryLatch {
+    info: Mutex<QueryLatchInfo>,
+}
+
+#[cfg(parallel_queries)]
+impl QueryLatch {
+    fn new() -> Self {
+        QueryLatch {
+            info: Mutex::new(QueryLatchInfo {
+                complete: false,
+                waiters: Vec::new(),
+            }),
+        }
+    }
+
+    fn await(&self, waiter: &mut QueryWaiter<'_, '_>) {
+        let mut info = self.info.lock();
+        if !info.complete {
+            let waiter = &*waiter;
+            unsafe {
+                #[allow(mutable_transmutes)]
+                info.waiters.push(mem::transmute(waiter));
+            }
+            // If this detects a deadlock and the deadlock handler want to resume this thread
+            // we have to be in the `wait` call. This is ensured by the deadlock handler
+            // getting the self.info lock.
+            rayon_core::mark_blocked();
+            waiter.condvar.wait(&mut info);
+        }
+    }
+
+    fn set(&self, tcx: TyCtxt<'_, '_, '_>) {
+        let mut info = self.info.lock();
+        debug_assert!(!info.complete);
+        info.complete = true;
+        let registry = rayon_core::Registry::current();
+        for waiter in info.waiters.drain(..) {
+            waiter.notify(tcx, &registry);
+        }
+    }
+
+    fn resume_waiter(
+        &self,
+        waiter: usize,
+        error: CycleError
+    ) -> &'static mut QueryWaiter<'static, 'static> {
+        let mut info = self.info.lock();
+        debug_assert!(!info.complete);
+        // Remove the waiter from the list of waiters
+        let waiter = info.waiters.remove(waiter);
+
+        // Set the cycle error it will be picked it up when resumed
+        waiter.cycle = unsafe { Some(mem::transmute(error)) };
+
+        waiter
+    }
+}
+
+#[cfg(parallel_queries)]
+type Ref<'tcx> = *const QueryJob<'tcx>;
+
+#[cfg(parallel_queries)]
+type Waiter<'tcx> = (Ref<'tcx>, usize);
+
+#[cfg(parallel_queries)]
+fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option<Option<Waiter<'tcx>>>
+where
+    F: FnMut(Span, Ref<'tcx>) -> Option<Option<Waiter<'tcx>>>
+{
+    let query = unsafe { &*query_ref };
+    if let Some(ref parent) = query.parent {
+        if let Some(cycle) = visit(query.info.span, &**parent as Ref) {
+            return Some(cycle);
+        }
+    }
+    for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() {
+        if let Some(ref waiter_query) = waiter.query {
+            if visit(waiter.span, &**waiter_query as Ref).is_some() {
+                return Some(Some((query_ref, i)));
+            }
+        }
+    }
+    None
+}
+
+#[cfg(parallel_queries)]
+fn cycle_check<'tcx>(query: Ref<'tcx>,
+                     span: Span,
+                     stack: &mut Vec<(Span, Ref<'tcx>)>,
+                     visited: &mut HashSet<Ref<'tcx>>) -> Option<Option<Waiter<'tcx>>> {
+    if visited.contains(&query) {
+        return if let Some(p) = stack.iter().position(|q| q.1 == query) {
+            // Remove previous stack entries
+            stack.splice(0..p, iter::empty());
+            // Replace the span for the first query with the cycle cause
+            stack[0].0 = span;
+            Some(None)
+        } else {
+            None
+        }
+    }
+
+    visited.insert(query);
+    stack.push((span, query));
+
+    let r = visit_waiters(query, |span, successor| {
+        cycle_check(successor, span, stack, visited)
+    });
+
+    if r.is_none() {
+        stack.pop();
+    }
+
+    r
+}
+
+#[cfg(parallel_queries)]
+fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -> bool {
+    if visited.contains(&query) {
+        return false;
+    }
+
+    if unsafe { (*query).parent.is_none() } {
+        return true;
+    }
+
+    visited.insert(query);
+
+    let mut connected = false;
+
+    visit_waiters(query, |_, successor| {
+        if connected_to_root(successor, visited) {
+            Some(None)
+        } else {
+            None
+        }
+    }).is_some()
+}
+
+#[cfg(parallel_queries)]
+fn query_entry<'tcx>(r: Ref<'tcx>) -> QueryInfo<'tcx> {
+    unsafe { (*r).info.clone() }
+}
+
+#[cfg(parallel_queries)]
+fn remove_cycle<'tcx>(
+    jobs: &mut Vec<Ref<'tcx>>,
+    wakelist: &mut Vec<&'static mut QueryWaiter<'static, 'static>>,
+    tcx: TyCtxt<'_, 'tcx, '_>
+) {
+    let mut visited = HashSet::new();
+    let mut stack = Vec::new();
+    if let Some(waiter) = cycle_check(jobs.pop().unwrap(),
+                                      DUMMY_SP,
+                                      &mut stack,
+                                      &mut visited) {
+        // Reverse the stack so earlier entries require later entries
+        stack.reverse();
+
+        let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect();
+        let queries = stack.iter().map(|e| e.1);
+
+        // Shift the spans so that a query is matched the span for its waitee
+        let last = spans.pop().unwrap();
+        spans.insert(0, last);
+
+        let mut stack: Vec<_> = spans.into_iter().zip(queries).collect();
+
+        // Remove the queries in our cycle from the list of jobs to look at
+        for r in &stack {
+            jobs.remove_item(&r.1);
+        }
+
+        let (waitee_query, waiter_idx) = waiter.unwrap();
+        let waitee_query = unsafe { &*waitee_query };
+
+        // Find the queries in the cycle which are
+        // connected to queries outside the cycle
+        let entry_points: Vec<Ref<'_>> = stack.iter().filter_map(|query| {
+            // Mark all the other queries in the cycle as already visited
+            let mut visited = HashSet::from_iter(stack.iter().filter_map(|q| {
+                if q.1 != query.1 {
+                    Some(q.1)
+                } else {
+                    None
+                }
+            }));
+
+            if connected_to_root(query.1, &mut visited) {
+                Some(query.1)
+            } else {
+                None
+            }
+        }).collect();
+
+        // Deterministically pick an entry point
+        // FIXME: Sort this instead
+        let mut hcx = tcx.create_stable_hashing_context();
+        let entry_point = *entry_points.iter().min_by_key(|&&q| {
+            let mut stable_hasher = StableHasher::<u64>::new();
+            unsafe { (*q).info.query.hash_stable(&mut hcx, &mut stable_hasher); }
+            stable_hasher.finish()
+        }).unwrap();
+
+        // Shift the stack until our entry point is first
+        while stack[0].1 != entry_point {
+            let last = stack.pop().unwrap();
+            stack.insert(0, last);
+        }
+
+        let mut error = CycleError {
+            usage: None,
+            cycle: stack.iter().map(|&(s, q)| QueryInfo {
+                span: s,
+                query: unsafe { (*q).info.query.clone() },
+            } ).collect(),
+        };
+
+        wakelist.push(waitee_query.latch.resume_waiter(waiter_idx, error));
+    }
+}
+
+#[cfg(parallel_queries)]
+pub fn handle_deadlock() {
+    use syntax;
+    use syntax_pos;
+
+    let registry = rayon_core::Registry::current();
+
+    let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| {
+        gcx_ptr as *const _
+    });
+    let gcx_ptr = unsafe { &*gcx_ptr };
+
+    let syntax_globals = syntax::GLOBALS.with(|syntax_globals| {
+        syntax_globals as *const _
+    });
+    let syntax_globals = unsafe { &*syntax_globals };
+
+    let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| {
+        syntax_pos_globals as *const _
+    });
+    let syntax_pos_globals = unsafe { &*syntax_pos_globals };
+    thread::spawn(move || {
+        tls::GCX_PTR.set(gcx_ptr, || {
+            syntax_pos::GLOBALS.set(syntax_pos_globals, || {
+                syntax_pos::GLOBALS.set(syntax_pos_globals, || {
+                    tls::with_thread_locals(|| {
+                        unsafe {
+                            tls::with_global(|tcx| deadlock(tcx, &registry))
+                        }
+                    })
+                })
+            })
+        })
+    });
+}
+
+#[cfg(parallel_queries)]
+fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
+    let on_panic = OnDrop(|| {
+        eprintln!("deadlock handler panicked, aborting process");
+        process::abort();
+    });
+
+    let mut wakelist = Vec::new();
+    let mut jobs: Vec<_> = tcx.maps.collect_active_jobs().iter().map(|j| &**j as Ref).collect();
+
+    while jobs.len() > 0 {
+        remove_cycle(&mut jobs, &mut wakelist, tcx);
+    }
+
+    // FIXME: Panic if no cycle is detected
+
+    // FIXME: Write down the conditions when a deadlock happens without a cycle
+
+    // FIXME: Ensure this won't cause a deadlock before we return
+    for waiter in wakelist.into_iter() {
+        waiter.notify(tcx, registry);
+    }
+
+    mem::forget(on_panic);
 }