From f9e6fbc4f751a5fd4befaf91ad4b1022efb99501 Mon Sep 17 00:00:00 2001 From: =?utf8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 6 Apr 2018 12:56:59 +0200 Subject: [PATCH] Make queries block and handle query cycles --- src/librustc/Cargo.toml | 4 + src/librustc/lib.rs | 5 + src/librustc/ty/context.rs | 50 ++++- src/librustc/ty/maps/job.rs | 367 ++++++++++++++++++++++++++++++- src/librustc/ty/maps/mod.rs | 4 +- src/librustc/ty/maps/plumbing.rs | 36 ++- src/librustc_driver/driver.rs | 13 +- 7 files changed, 469 insertions(+), 10 deletions(-) diff --git a/src/librustc/Cargo.toml b/src/librustc/Cargo.toml index 1d1166ad2c4..4dc818c650e 100644 --- a/src/librustc/Cargo.toml +++ b/src/librustc/Cargo.toml @@ -15,9 +15,12 @@ fmt_macros = { path = "../libfmt_macros" } graphviz = { path = "../libgraphviz" } jobserver = "0.1" lazy_static = "1.0.0" +scoped-tls = { version = "0.1.1", features = ["nightly"] } log = { version = "0.4", features = ["release_max_level_info", "std"] } polonius-engine = "0.5.0" proc_macro = { path = "../libproc_macro" } +rustc-rayon = "0.1.0" +rustc-rayon-core = "0.1.0" rustc_apfloat = { path = "../librustc_apfloat" } rustc_target = { path = "../librustc_target" } rustc_data_structures = { path = "../librustc_data_structures" } @@ -26,6 +29,7 @@ serialize = { path = "../libserialize" } syntax = { path = "../libsyntax" } syntax_pos = { path = "../libsyntax_pos" } backtrace = "0.3.3" +parking_lot = "0.5.5" byteorder = { version = "1.1", features = ["i128"]} chalk-engine = { version = "0.6.0", default-features=false } diff --git a/src/librustc/lib.rs b/src/librustc/lib.rs index 10e8905054d..a006856f58b 100644 --- a/src/librustc/lib.rs +++ b/src/librustc/lib.rs @@ -67,6 +67,7 @@ #![feature(unboxed_closures)] #![feature(trace_macros)] #![feature(trusted_len)] +#![feature(vec_remove_item)] #![feature(catch_expr)] #![feature(integer_atomics)] #![feature(test)] @@ -83,13 +84,17 @@ extern crate getopts; extern crate graphviz; #[macro_use] extern crate lazy_static; +#[macro_use] extern crate scoped_tls; #[cfg(windows)] extern crate libc; extern crate polonius_engine; extern crate rustc_target; #[macro_use] extern crate rustc_data_structures; extern crate serialize; +extern crate parking_lot; extern crate rustc_errors as errors; +extern crate rustc_rayon as rayon; +extern crate rustc_rayon_core as rayon_core; #[macro_use] extern crate log; #[macro_use] extern crate syntax; extern crate syntax_pos; diff --git a/src/librustc/ty/context.rs b/src/librustc/ty/context.rs index 35b2ce50da7..ef584774f69 100644 --- a/src/librustc/ty/context.rs +++ b/src/librustc/ty/context.rs @@ -1699,16 +1699,21 @@ fn lift_to_tcx<'b, 'gcx>(&self, tcx: TyCtxt<'b, 'gcx, 'tcx>) -> Option { pub task: &'a OpenTask, } + #[cfg(parallel_queries)] + fn set_tlv R, R>(value: usize, f: F) -> R { + rayon_core::tlv::with(value, f) + } + + #[cfg(parallel_queries)] + fn get_tlv() -> usize { + rayon_core::tlv::get() + } + // A thread local value which stores a pointer to the current ImplicitCtxt + #[cfg(not(parallel_queries))] thread_local!(static TLV: Cell = Cell::new(0)); + #[cfg(not(parallel_queries))] fn set_tlv R, R>(value: usize, f: F) -> R { let old = get_tlv(); let _reset = OnDrop(move || TLV.with(|tlv| tlv.set(old))); @@ -1742,6 +1759,7 @@ fn set_tlv R, R>(value: usize, f: F) -> R { f() } + #[cfg(not(parallel_queries))] fn get_tlv() -> usize { TLV.with(|tlv| tlv.get()) } @@ -1810,6 +1828,13 @@ pub fn enter_global<'gcx, F, R>(gcx: &GlobalCtxt<'gcx>, f: F) -> R where F: for<'a> FnOnce(TyCtxt<'a, 'gcx, 'gcx>) -> R { with_thread_locals(|| { + GCX_PTR.with(|lock| { + *lock.lock() = gcx as *const _ as usize; + }); + let _on_drop = OnDrop(move || { + GCX_PTR.with(|lock| *lock.lock() = 0); + }); + let tcx = TyCtxt { gcx, interners: &gcx.global_interners, @@ -1826,6 +1851,27 @@ pub fn enter_global<'gcx, F, R>(gcx: &GlobalCtxt<'gcx>, f: F) -> R }) } + scoped_thread_local!(pub static GCX_PTR: Lock); + + pub unsafe fn with_global(f: F) -> R + where F: for<'a, 'gcx, 'tcx> FnOnce(TyCtxt<'a, 'gcx, 'tcx>) -> R + { + let gcx = GCX_PTR.with(|lock| *lock.lock()); + assert!(gcx != 0); + let gcx = &*(gcx as *const GlobalCtxt<'_>); + let tcx = TyCtxt { + gcx, + interners: &gcx.global_interners, + }; + let icx = ImplicitCtxt { + query: None, + tcx, + layout_depth: 0, + task: &OpenTask::Ignore, + }; + enter_context(&icx, |_| f(tcx)) + } + /// Allows access to the current ImplicitCtxt in a closure if one is available pub fn with_context_opt(f: F) -> R where F: for<'a, 'gcx, 'tcx> FnOnce(Option<&ImplicitCtxt<'a, 'gcx, 'tcx>>) -> R diff --git a/src/librustc/ty/maps/job.rs b/src/librustc/ty/maps/job.rs index 3b6af018d6b..3fe22dba6e1 100644 --- a/src/librustc/ty/maps/job.rs +++ b/src/librustc/ty/maps/job.rs @@ -8,13 +8,31 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use rustc_data_structures::sync::{Lock, Lrc}; +#![allow(warnings)] + +use std::mem; +use rustc_data_structures::sync::{Lock, LockGuard, Lrc, Weak}; +use rustc_data_structures::OnDrop; use syntax_pos::Span; use ty::tls; use ty::maps::Query; use ty::maps::plumbing::CycleError; use ty::context::TyCtxt; use errors::Diagnostic; +use std::process; +use std::fmt; +use std::collections::HashSet; +#[cfg(parallel_queries)] +use { + rayon_core, + parking_lot::{Mutex, Condvar}, + std::sync::atomic::Ordering, + std::thread, + std::iter, + std::iter::FromIterator, + syntax_pos::DUMMY_SP, + rustc_data_structures::stable_hasher::{StableHasherResult, StableHasher, HashStable}, +}; /// Indicates the state of a query for a given key in a query map pub(super) enum QueryResult<'tcx> { @@ -42,6 +60,9 @@ pub struct QueryJob<'tcx> { /// Diagnostic messages which are emitted while the query executes pub diagnostics: Lock>, + + #[cfg(parallel_queries)] + latch: QueryLatch, } impl<'tcx> QueryJob<'tcx> { @@ -51,6 +72,8 @@ pub fn new(info: QueryInfo<'tcx>, parent: Option>>) -> Self { diagnostics: Lock::new(Vec::new()), info, parent, + #[cfg(parallel_queries)] + latch: QueryLatch::new(), } } @@ -62,6 +85,36 @@ pub(super) fn await<'lcx>( &self, tcx: TyCtxt<'_, 'tcx, 'lcx>, span: Span, + ) -> Result<(), CycleError<'tcx>> { + #[cfg(not(parallel_queries))] + { + self.find_cycle_in_stack(tcx, span) + } + + #[cfg(parallel_queries)] + { + tls::with_related_context(tcx, move |icx| { + let mut waiter = QueryWaiter { + query: &icx.query, + span, + cycle: None, + condvar: Condvar::new(), + }; + self.latch.await(&mut waiter); + + match waiter.cycle { + None => Ok(()), + Some(cycle) => Err(cycle) + } + }) + } + } + + #[cfg(not(parallel_queries))] + fn find_cycle_in_stack<'lcx>( + &self, + tcx: TyCtxt<'_, 'tcx, 'lcx>, + span: Span, ) -> Result<(), CycleError<'tcx>> { // Get the current executing query (waiter) and find the waitee amongst its parents let mut current_job = tls::with_related_context(tcx, |icx| icx.query.clone()); @@ -93,5 +146,315 @@ pub(super) fn await<'lcx>( /// /// This does nothing for single threaded rustc, /// as there are no concurrent jobs which could be waiting on us - pub fn signal_complete(&self) {} + pub fn signal_complete(&self, tcx: TyCtxt<'_, 'tcx, '_>) { + #[cfg(parallel_queries)] + self.latch.set(tcx); + } +} + +#[cfg(parallel_queries)] +struct QueryWaiter<'a, 'tcx: 'a> { + query: &'a Option>>, + condvar: Condvar, + span: Span, + cycle: Option>, +} + +#[cfg(parallel_queries)] +impl<'a, 'tcx> QueryWaiter<'a, 'tcx> { + fn notify(&self, tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) { + rayon_core::mark_unblocked(registry); + self.condvar.notify_one(); + } +} + +#[cfg(parallel_queries)] +struct QueryLatchInfo { + complete: bool, + waiters: Vec<&'static mut QueryWaiter<'static, 'static>>, +} + +#[cfg(parallel_queries)] +struct QueryLatch { + info: Mutex, +} + +#[cfg(parallel_queries)] +impl QueryLatch { + fn new() -> Self { + QueryLatch { + info: Mutex::new(QueryLatchInfo { + complete: false, + waiters: Vec::new(), + }), + } + } + + fn await(&self, waiter: &mut QueryWaiter<'_, '_>) { + let mut info = self.info.lock(); + if !info.complete { + let waiter = &*waiter; + unsafe { + #[allow(mutable_transmutes)] + info.waiters.push(mem::transmute(waiter)); + } + // If this detects a deadlock and the deadlock handler want to resume this thread + // we have to be in the `wait` call. This is ensured by the deadlock handler + // getting the self.info lock. + rayon_core::mark_blocked(); + waiter.condvar.wait(&mut info); + } + } + + fn set(&self, tcx: TyCtxt<'_, '_, '_>) { + let mut info = self.info.lock(); + debug_assert!(!info.complete); + info.complete = true; + let registry = rayon_core::Registry::current(); + for waiter in info.waiters.drain(..) { + waiter.notify(tcx, ®istry); + } + } + + fn resume_waiter( + &self, + waiter: usize, + error: CycleError + ) -> &'static mut QueryWaiter<'static, 'static> { + let mut info = self.info.lock(); + debug_assert!(!info.complete); + // Remove the waiter from the list of waiters + let waiter = info.waiters.remove(waiter); + + // Set the cycle error it will be picked it up when resumed + waiter.cycle = unsafe { Some(mem::transmute(error)) }; + + waiter + } +} + +#[cfg(parallel_queries)] +type Ref<'tcx> = *const QueryJob<'tcx>; + +#[cfg(parallel_queries)] +type Waiter<'tcx> = (Ref<'tcx>, usize); + +#[cfg(parallel_queries)] +fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option>> +where + F: FnMut(Span, Ref<'tcx>) -> Option>> +{ + let query = unsafe { &*query_ref }; + if let Some(ref parent) = query.parent { + if let Some(cycle) = visit(query.info.span, &**parent as Ref) { + return Some(cycle); + } + } + for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() { + if let Some(ref waiter_query) = waiter.query { + if visit(waiter.span, &**waiter_query as Ref).is_some() { + return Some(Some((query_ref, i))); + } + } + } + None +} + +#[cfg(parallel_queries)] +fn cycle_check<'tcx>(query: Ref<'tcx>, + span: Span, + stack: &mut Vec<(Span, Ref<'tcx>)>, + visited: &mut HashSet>) -> Option>> { + if visited.contains(&query) { + return if let Some(p) = stack.iter().position(|q| q.1 == query) { + // Remove previous stack entries + stack.splice(0..p, iter::empty()); + // Replace the span for the first query with the cycle cause + stack[0].0 = span; + Some(None) + } else { + None + } + } + + visited.insert(query); + stack.push((span, query)); + + let r = visit_waiters(query, |span, successor| { + cycle_check(successor, span, stack, visited) + }); + + if r.is_none() { + stack.pop(); + } + + r +} + +#[cfg(parallel_queries)] +fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet>) -> bool { + if visited.contains(&query) { + return false; + } + + if unsafe { (*query).parent.is_none() } { + return true; + } + + visited.insert(query); + + let mut connected = false; + + visit_waiters(query, |_, successor| { + if connected_to_root(successor, visited) { + Some(None) + } else { + None + } + }).is_some() +} + +#[cfg(parallel_queries)] +fn query_entry<'tcx>(r: Ref<'tcx>) -> QueryInfo<'tcx> { + unsafe { (*r).info.clone() } +} + +#[cfg(parallel_queries)] +fn remove_cycle<'tcx>( + jobs: &mut Vec>, + wakelist: &mut Vec<&'static mut QueryWaiter<'static, 'static>>, + tcx: TyCtxt<'_, 'tcx, '_> +) { + let mut visited = HashSet::new(); + let mut stack = Vec::new(); + if let Some(waiter) = cycle_check(jobs.pop().unwrap(), + DUMMY_SP, + &mut stack, + &mut visited) { + // Reverse the stack so earlier entries require later entries + stack.reverse(); + + let mut spans: Vec<_> = stack.iter().map(|e| e.0).collect(); + let queries = stack.iter().map(|e| e.1); + + // Shift the spans so that a query is matched the span for its waitee + let last = spans.pop().unwrap(); + spans.insert(0, last); + + let mut stack: Vec<_> = spans.into_iter().zip(queries).collect(); + + // Remove the queries in our cycle from the list of jobs to look at + for r in &stack { + jobs.remove_item(&r.1); + } + + let (waitee_query, waiter_idx) = waiter.unwrap(); + let waitee_query = unsafe { &*waitee_query }; + + // Find the queries in the cycle which are + // connected to queries outside the cycle + let entry_points: Vec> = stack.iter().filter_map(|query| { + // Mark all the other queries in the cycle as already visited + let mut visited = HashSet::from_iter(stack.iter().filter_map(|q| { + if q.1 != query.1 { + Some(q.1) + } else { + None + } + })); + + if connected_to_root(query.1, &mut visited) { + Some(query.1) + } else { + None + } + }).collect(); + + // Deterministically pick an entry point + // FIXME: Sort this instead + let mut hcx = tcx.create_stable_hashing_context(); + let entry_point = *entry_points.iter().min_by_key(|&&q| { + let mut stable_hasher = StableHasher::::new(); + unsafe { (*q).info.query.hash_stable(&mut hcx, &mut stable_hasher); } + stable_hasher.finish() + }).unwrap(); + + // Shift the stack until our entry point is first + while stack[0].1 != entry_point { + let last = stack.pop().unwrap(); + stack.insert(0, last); + } + + let mut error = CycleError { + usage: None, + cycle: stack.iter().map(|&(s, q)| QueryInfo { + span: s, + query: unsafe { (*q).info.query.clone() }, + } ).collect(), + }; + + wakelist.push(waitee_query.latch.resume_waiter(waiter_idx, error)); + } +} + +#[cfg(parallel_queries)] +pub fn handle_deadlock() { + use syntax; + use syntax_pos; + + let registry = rayon_core::Registry::current(); + + let gcx_ptr = tls::GCX_PTR.with(|gcx_ptr| { + gcx_ptr as *const _ + }); + let gcx_ptr = unsafe { &*gcx_ptr }; + + let syntax_globals = syntax::GLOBALS.with(|syntax_globals| { + syntax_globals as *const _ + }); + let syntax_globals = unsafe { &*syntax_globals }; + + let syntax_pos_globals = syntax_pos::GLOBALS.with(|syntax_pos_globals| { + syntax_pos_globals as *const _ + }); + let syntax_pos_globals = unsafe { &*syntax_pos_globals }; + thread::spawn(move || { + tls::GCX_PTR.set(gcx_ptr, || { + syntax_pos::GLOBALS.set(syntax_pos_globals, || { + syntax_pos::GLOBALS.set(syntax_pos_globals, || { + tls::with_thread_locals(|| { + unsafe { + tls::with_global(|tcx| deadlock(tcx, ®istry)) + } + }) + }) + }) + }) + }); +} + +#[cfg(parallel_queries)] +fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) { + let on_panic = OnDrop(|| { + eprintln!("deadlock handler panicked, aborting process"); + process::abort(); + }); + + let mut wakelist = Vec::new(); + let mut jobs: Vec<_> = tcx.maps.collect_active_jobs().iter().map(|j| &**j as Ref).collect(); + + while jobs.len() > 0 { + remove_cycle(&mut jobs, &mut wakelist, tcx); + } + + // FIXME: Panic if no cycle is detected + + // FIXME: Write down the conditions when a deadlock happens without a cycle + + // FIXME: Ensure this won't cause a deadlock before we return + for waiter in wakelist.into_iter() { + waiter.notify(tcx, registry); + } + + mem::forget(on_panic); } diff --git a/src/librustc/ty/maps/mod.rs b/src/librustc/ty/maps/mod.rs index 6556e47720c..b50b43aace7 100644 --- a/src/librustc/ty/maps/mod.rs +++ b/src/librustc/ty/maps/mod.rs @@ -63,10 +63,12 @@ #[macro_use] mod plumbing; use self::plumbing::*; -pub use self::plumbing::force_from_dep_node; +pub use self::plumbing::{force_from_dep_node, CycleError}; mod job; pub use self::job::{QueryJob, QueryInfo}; +#[cfg(parallel_queries)] +pub use self::job::handle_deadlock; mod keys; pub use self::keys::Key; diff --git a/src/librustc/ty/maps/plumbing.rs b/src/librustc/ty/maps/plumbing.rs index 4a9d44b7403..2ab8d18e3e7 100644 --- a/src/librustc/ty/maps/plumbing.rs +++ b/src/librustc/ty/maps/plumbing.rs @@ -223,7 +223,7 @@ fn drop(&mut self) { } #[derive(Clone)] -pub(super) struct CycleError<'tcx> { +pub struct CycleError<'tcx> { /// The query and related span which uses the cycle pub(super) usage: Option<(Span, Query<'tcx>)>, pub(super) cycle: Vec>, @@ -632,7 +632,15 @@ macro_rules! define_maps { $($(#[$attr:meta])* [$($modifiers:tt)*] fn $name:ident: $node:ident($K:ty) -> $V:ty,)*) => { + use std::mem; + use ty::maps::job::QueryResult; use rustc_data_structures::sync::Lock; + use { + rustc_data_structures::stable_hasher::HashStable, + rustc_data_structures::stable_hasher::StableHasherResult, + rustc_data_structures::stable_hasher::StableHasher, + ich::StableHashingContext + }; define_map_struct! { tcx: $tcx, @@ -647,10 +655,23 @@ pub fn new(providers: IndexVec>) $($name: Lock::new(QueryMap::new())),* } } + + pub fn collect_active_jobs(&self) -> Vec>> { + let mut jobs = Vec::new(); + + $(for v in self.$name.lock().active.values() { + match *v { + QueryResult::Started(ref job) => jobs.push(job.clone()), + _ => (), + } + })* + + return jobs; + } } #[allow(bad_style)] - #[derive(Copy, Clone, Debug, PartialEq, Eq)] + #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum Query<$tcx> { $($(#[$attr])* $name($K)),* } @@ -692,6 +713,17 @@ pub fn default_span(&self, tcx: TyCtxt<'_, $tcx, '_>, span: Span) -> Span { } } + impl<'a, $tcx> HashStable> for Query<$tcx> { + fn hash_stable(&self, + hcx: &mut StableHashingContext<'a>, + hasher: &mut StableHasher) { + mem::discriminant(self).hash_stable(hcx, hasher); + match *self { + $(Query::$name(key) => key.hash_stable(hcx, hasher),)* + } + } + } + pub mod queries { use std::marker::PhantomData; diff --git a/src/librustc_driver/driver.rs b/src/librustc_driver/driver.rs index 8dcbda917b2..c2a450a1122 100644 --- a/src/librustc_driver/driver.rs +++ b/src/librustc_driver/driver.rs @@ -49,7 +49,7 @@ use std::io::{self, Write}; use std::iter; use std::path::{Path, PathBuf}; -use rustc_data_structures::sync::{self, Lrc}; +use rustc_data_structures::sync::{self, Lrc, Lock}; use std::sync::mpsc; use syntax::{self, ast, attr, diagnostics, visit}; use syntax::ext::base::ExtCtxt; @@ -69,7 +69,9 @@ pub fn spawn_thread_pool R + sync::Send, R: sync:: opts: config::Options, f: F ) -> R { - f(opts) + ty::tls::GCX_PTR.set(&Lock::new(0), || { + f(opts) + }) } #[cfg(parallel_queries)] @@ -81,7 +83,10 @@ pub fn spawn_thread_pool R + sync::Send, R: sync:: use syntax_pos; use rayon::{ThreadPoolBuilder, ThreadPool}; + let gcx_ptr = &Lock::new(0); + let config = ThreadPoolBuilder::new().num_threads(Session::query_threads_from_opts(&opts)) + .deadlock_handler(ty::maps::handle_deadlock) .stack_size(16 * 1024 * 1024); let with_pool = move |pool: &ThreadPool| { @@ -98,7 +103,9 @@ pub fn spawn_thread_pool R + sync::Send, R: sync:: syntax::GLOBALS.set(syntax_globals, || { syntax_pos::GLOBALS.set(syntax_pos_globals, || { ty::tls::with_thread_locals(|| { - worker() + ty::tls::GCX_PTR.set(gcx_ptr, || { + worker() + }) }) }) }) -- 2.44.0