X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fthread.rs;h=7d6fe8041e98000b7dbea1be756ec1edd80412de;hb=c151af5cf597d66251d4ab56cffc67a2f0507365;hp=0a83b71665c5cef90e3539dc3b3bee3b02ab9089;hpb=0a4e8caa8c303e8a8b5459bb79c7474eb53619ae;p=rust.git diff --git a/src/thread.rs b/src/thread.rs index 0a83b71665c..7d6fe8041e9 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -4,6 +4,7 @@ use std::collections::hash_map::Entry; use std::convert::TryFrom; use std::num::TryFromIntError; +use std::rc::Rc; use std::time::{Duration, Instant, SystemTime}; use log::trace; @@ -11,7 +12,6 @@ use rustc_data_structures::fx::FxHashMap; use rustc_hir::def_id::DefId; use rustc_index::vec::{Idx, IndexVec}; -use rustc_middle::ty::{self, Instance}; use crate::sync::SynchronizationState; use crate::*; @@ -107,12 +107,23 @@ enum ThreadJoinStatus { /// A thread. pub struct Thread<'mir, 'tcx> { state: ThreadState, + /// Name of the thread. thread_name: Option>, + /// The virtual call stack. stack: Vec>>, + /// The join status. join_status: ThreadJoinStatus, + + /// The temporary used for storing the argument of + /// the call to `miri_start_panic` (the panic payload) when unwinding. + /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`. + pub(crate) panic_payload: Option>, + + /// Last OS error location in memory. It is a 32-bit integer. + pub(crate) last_error: Option>, } impl<'mir, 'tcx> Thread<'mir, 'tcx> { @@ -130,17 +141,19 @@ fn check_terminated(&mut self) -> bool { /// Get the name of the current thread, or `` if it was not set. fn thread_name(&self) -> &[u8] { - if let Some(ref thread_name) = self.thread_name { - thread_name - } else { - b"" - } + if let Some(ref thread_name) = self.thread_name { thread_name } else { b"" } } } impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}({:?}, {:?})", String::from_utf8_lossy(self.thread_name()), self.state, self.join_status) + write!( + f, + "{}({:?}, {:?})", + String::from_utf8_lossy(self.thread_name()), + self.state, + self.join_status + ) } } @@ -151,6 +164,8 @@ fn default() -> Self { thread_name: None, stack: Vec::new(), join_status: ThreadJoinStatus::Joinable, + panic_payload: None, + last_error: None, } } } @@ -244,8 +259,8 @@ fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option { fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) { self.thread_local_alloc_ids .borrow_mut() - .insert((def_id, self.active_thread), new_alloc_id) - .unwrap_none(); + .try_insert((def_id, self.active_thread), new_alloc_id) + .unwrap(); } /// Borrow the stack of the active thread. @@ -315,7 +330,11 @@ fn detach_thread(&mut self, id: ThreadId) -> InterpResult<'tcx> { } /// Mark that the active thread tries to join the thread with `joined_thread_id`. - fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { + fn join_thread( + &mut self, + joined_thread_id: ThreadId, + data_race: &Option>, + ) -> InterpResult<'tcx> { if self.threads[joined_thread_id].join_status != ThreadJoinStatus::Joinable { throw_ub_format!("trying to join a detached or already joined thread"); } @@ -339,6 +358,11 @@ fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { self.active_thread, joined_thread_id ); + } else { + // The thread has already terminated - mark join happens-before + if let Some(data_race) = data_race { + data_race.thread_joined(self.active_thread, joined_thread_id); + } } Ok(()) } @@ -386,8 +410,8 @@ fn register_timeout_callback( callback: TimeoutCallback<'mir, 'tcx>, ) { self.timeout_callbacks - .insert(thread, TimeoutCallbackInfo { call_time, callback }) - .unwrap_none(); + .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) + .unwrap(); } /// Unregister the callback for the `thread`. @@ -411,26 +435,61 @@ fn get_ready_callback(&mut self) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx None } + /// Wakes up threads joining on the active one and deallocates thread-local statics. + /// The `AllocId` that can now be freed is returned. + fn thread_terminated( + &mut self, + data_race: &Option>, + ) -> Vec { + let mut free_tls_statics = Vec::new(); + { + let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); + thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { + if thread != self.active_thread { + // Keep this static around. + return true; + } + // Delete this static from the map and from memory. + // We cannot free directly here as we cannot use `?` in this context. + free_tls_statics.push(alloc_id); + return false; + }); + } + // Set the thread into a terminated state in the data-race detector + if let Some(data_race) = data_race { + data_race.thread_terminated(); + } + // Check if we need to unblock any threads. + for (i, thread) in self.threads.iter_enumerated_mut() { + if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { + // The thread has terminated, mark happens-before edge to joining thread + if let Some(data_race) = data_race { + data_race.thread_joined(i, self.active_thread); + } + trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); + thread.state = ThreadState::Enabled; + } + } + return free_tls_statics; + } + /// Decide which action to take next and on which thread. /// /// The currently implemented scheduling policy is the one that is commonly /// used in stateless model checkers such as Loom: run the active thread as /// long as we can and switch only when we have to (the active thread was /// blocked, terminated, or has explicitly asked to be preempted). - fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { + fn schedule( + &mut self, + data_race: &Option>, + ) -> InterpResult<'tcx, SchedulingAction> { // Check whether the thread has **just** terminated (`check_terminated` // checks whether the thread has popped all its stack and if yes, sets // the thread state to terminated). if self.threads[self.active_thread].check_terminated() { - // Check if we need to unblock any threads. - for (i, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { - trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); - thread.state = ThreadState::Enabled; - } - } return Ok(SchedulingAction::ExecuteDtors); } + // If we get here again and the thread is *still* terminated, there are no more dtors to run. if self.threads[MAIN_THREAD].state == ThreadState::Terminated { // The main thread terminated; stop the program. if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) { @@ -444,31 +503,33 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { } return Ok(SchedulingAction::Stop); } - // At least for `pthread_cond_timedwait` we need to report timeout when - // the function is called already after the specified time even if a - // signal is received before the thread gets scheduled. Therefore, we - // need to schedule all timeout callbacks before we continue regular - // execution. - // - // Documentation: - // https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html# - let potential_sleep_time = - self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min(); - if potential_sleep_time == Some(Duration::new(0, 0)) { - return Ok(SchedulingAction::ExecuteTimeoutCallback); - } - // No callbacks scheduled, pick a regular thread to execute. + // This thread and the program can keep going. if self.threads[self.active_thread].state == ThreadState::Enabled && !self.yield_active_thread { // The currently active thread is still enabled, just continue with it. return Ok(SchedulingAction::ExecuteStep); } + // The active thread yielded. Let's see if there are any timeouts to take care of. We do + // this *before* running any other thread, to ensure that timeouts "in the past" fire before + // any other thread can take an action. This ensures that for `pthread_cond_timedwait`, "an + // error is returned if [...] the absolute time specified by abstime has already been passed + // at the time of the call". + // + let potential_sleep_time = + self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min(); + if potential_sleep_time == Some(Duration::new(0, 0)) { + return Ok(SchedulingAction::ExecuteTimeoutCallback); + } + // No callbacks scheduled, pick a regular thread to execute. // We need to pick a new thread for execution. for (id, thread) in self.threads.iter_enumerated() { if thread.state == ThreadState::Enabled { if !self.yield_active_thread || id != self.active_thread { self.active_thread = id; + if let Some(data_race) = data_race { + data_race.thread_set_active(self.active_thread); + } break; } } @@ -497,11 +558,11 @@ impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mi pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { /// Get a thread-specific allocation id for the given thread-local static. /// If needed, allocate a new one. - /// - /// FIXME: This method should be replaced as soon as - /// https://github.com/rust-lang/rust/issues/70685 gets fixed. - fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tcx, AllocId> { - let this = self.eval_context_ref(); + fn get_or_create_thread_local_alloc_id( + &mut self, + def_id: DefId, + ) -> InterpResult<'tcx, AllocId> { + let this = self.eval_context_mut(); let tcx = this.tcx; if let Some(new_alloc_id) = this.machine.threads.get_thread_local_alloc_id(def_id) { // We already have a thread-specific allocation id for this @@ -510,35 +571,14 @@ fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tc } else { // We need to allocate a thread-specific allocation id for this // thread-local static. - // - // At first, we invoke the `const_eval_raw` query and extract the - // allocation from it. Unfortunately, we have to duplicate the code - // from `Memory::get_global_alloc` that does this. - // - // Then we store the retrieved allocation back into the `alloc_map` - // to get a fresh allocation id, which we can use as a - // thread-specific allocation id for the thread-local static. + // First, we compute the initial value for this static. if tcx.is_foreign_item(def_id) { throw_unsup_format!("foreign thread-local statics are not supported"); } - // Invoke the `const_eval_raw` query. - let instance = Instance::mono(tcx.tcx, def_id); - let gid = GlobalId { instance, promoted: None }; - let raw_const = - tcx.const_eval_raw(ty::ParamEnv::reveal_all().and(gid)).map_err(|err| { - // no need to report anything, the const_eval call takes care of that - // for statics - assert!(tcx.is_static(def_id)); - err - })?; - let id = raw_const.alloc_id; - // Extract the allocation from the query result. - let allocation = tcx.global_alloc(id).unwrap_memory(); - // Create a new allocation id for the same allocation in this hacky - // way. Internally, `alloc_map` deduplicates allocations, but this - // is fine because Miri will make a copy before a first mutable - // access. - let new_alloc_id = tcx.create_memory_alloc(allocation); + let allocation = tcx.eval_static_initializer(def_id)?; + // Create a fresh allocation with this content. + let new_alloc_id = + this.memory.allocate_with(allocation.clone(), MiriMemoryKind::Tls.into()).alloc_id; this.machine.threads.set_thread_local_alloc_id(def_id, new_alloc_id); Ok(new_alloc_id) } @@ -547,7 +587,11 @@ fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tc #[inline] fn create_thread(&mut self) -> ThreadId { let this = self.eval_context_mut(); - this.machine.threads.create_thread() + let id = this.machine.threads.create_thread(); + if let Some(data_race) = &this.memory.extra.data_race { + data_race.thread_created(id); + } + id } #[inline] @@ -559,12 +603,17 @@ fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> { #[inline] fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - this.machine.threads.join_thread(joined_thread_id) + let data_race = &this.memory.extra.data_race; + this.machine.threads.join_thread(joined_thread_id, data_race)?; + Ok(()) } #[inline] fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { let this = self.eval_context_mut(); + if let Some(data_race) = &this.memory.extra.data_race { + data_race.thread_set_active(thread_id); + } this.machine.threads.set_active_thread_id(thread_id) } @@ -574,6 +623,18 @@ fn get_active_thread(&self) -> ThreadId { this.machine.threads.get_active_thread_id() } + #[inline] + fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.active_thread_mut() + } + + #[inline] + fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { + let this = self.eval_context_ref(); + this.machine.threads.active_thread_ref() + } + #[inline] fn get_total_thread_count(&self) -> usize { let this = self.eval_context_ref(); @@ -607,6 +668,11 @@ fn active_thread_stack_mut(&mut self) -> &mut Vec) { let this = self.eval_context_mut(); + if let Some(data_race) = &this.memory.extra.data_race { + if let Ok(string) = String::from_utf8(new_thread_name.clone()) { + data_race.thread_set_name(this.machine.threads.active_thread, string); + } + } this.machine.threads.set_thread_name(new_thread_name); } @@ -659,7 +725,17 @@ fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { let this = self.eval_context_mut(); let (thread, callback) = - this.machine.threads.get_ready_callback().expect("no callback found"); + if let Some((thread, callback)) = this.machine.threads.get_ready_callback() { + (thread, callback) + } else { + // get_ready_callback can return None if the computer's clock + // was shifted after calling the scheduler and before the call + // to get_ready_callback (see issue + // https://github.com/rust-lang/miri/issues/1763). In this case, + // just do nothing, which effectively just returns to the + // scheduler. + return Ok(()); + }; // This back-and-forth with `set_active_thread` is here because of two // design decisions: // 1. Make the caller and not the callback responsible for changing @@ -676,6 +752,22 @@ fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { #[inline] fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { let this = self.eval_context_mut(); - this.machine.threads.schedule() + let data_race = &this.memory.extra.data_race; + this.machine.threads.schedule(data_race) + } + + /// Handles thread termination of the active thread: wakes up threads joining on this one, + /// and deallocated thread-local statics. + /// + /// This is called from `tls.rs` after handling the TLS dtors. + #[inline] + fn thread_terminated(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let data_race = &this.memory.extra.data_race; + for alloc_id in this.machine.threads.thread_terminated(data_race) { + let ptr = this.memory.global_base_pointer(alloc_id.into())?; + this.memory.deallocate(ptr, None, MiriMemoryKind::Tls.into())?; + } + Ok(()) } }