X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fthread.rs;h=7d6fe8041e98000b7dbea1be756ec1edd80412de;hb=c151af5cf597d66251d4ab56cffc67a2f0507365;hp=eb7af536cf119c34c60ad515c1ff313e2e8f5303;hpb=04abf066f15c2ce2d1a788a1021bb14dcb9ac045;p=rust.git diff --git a/src/thread.rs b/src/thread.rs index eb7af536cf1..7d6fe8041e9 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,57 +1,71 @@ //! Implements threads. use std::cell::RefCell; +use std::collections::hash_map::Entry; use std::convert::TryFrom; -use std::num::NonZeroU32; +use std::num::TryFromIntError; +use std::rc::Rc; +use std::time::{Duration, Instant, SystemTime}; use log::trace; use rustc_data_structures::fx::FxHashMap; use rustc_hir::def_id::DefId; use rustc_index::vec::{Idx, IndexVec}; -use rustc_middle::{ - middle::codegen_fn_attrs::CodegenFnAttrFlags, - mir, - ty::{self, Instance}, -}; +use crate::sync::SynchronizationState; use crate::*; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SchedulingAction { /// Execute step on the active thread. ExecuteStep, + /// Execute a timeout callback. + ExecuteTimeoutCallback, /// Execute destructors of the active thread. ExecuteDtors, /// Stop the program. Stop, } +/// Timeout callbacks can be created by synchronization primitives to tell the +/// scheduler that they should be called once some period of time passes. +type TimeoutCallback<'mir, 'tcx> = + Box>) -> InterpResult<'tcx> + 'tcx>; + /// A thread identifier. #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct ThreadId(usize); +pub struct ThreadId(u32); /// The main thread. When it terminates, the whole application terminates. const MAIN_THREAD: ThreadId = ThreadId(0); +impl ThreadId { + pub fn to_u32(self) -> u32 { + self.0 + } +} + impl Idx for ThreadId { fn new(idx: usize) -> Self { - ThreadId(idx) + ThreadId(u32::try_from(idx).unwrap()) } + fn index(self) -> usize { - self.0 + usize::try_from(self.0).unwrap() } } -impl From for ThreadId { - fn from(id: u64) -> Self { - Self(usize::try_from(id).unwrap()) +impl TryFrom for ThreadId { + type Error = TryFromIntError; + fn try_from(id: u64) -> Result { + u32::try_from(id).map(|id_u32| Self(id_u32)) } } impl From for ThreadId { fn from(id: u32) -> Self { - Self(usize::try_from(id).unwrap()) + Self(id) } } @@ -61,22 +75,6 @@ pub fn to_u32_scalar<'tcx>(&self) -> Scalar { } } -/// An identifier of a set of blocked threads. -#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct BlockSetId(NonZeroU32); - -impl From for BlockSetId { - fn from(id: u32) -> Self { - Self(NonZeroU32::new(id).expect("0 is not a valid blockset id")) - } -} - -impl BlockSetId { - pub fn to_u32_scalar<'tcx>(&self) -> Scalar { - Scalar::from_u32(self.0.get()) - } -} - /// The state of a thread. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ThreadState { @@ -85,10 +83,12 @@ pub enum ThreadState { /// The thread tried to join the specified thread and is blocked until that /// thread terminates. BlockedOnJoin(ThreadId), - /// The thread is blocked and belongs to the given blockset. - Blocked(BlockSetId), - /// The thread has terminated its execution (we do not delete terminated - /// threads). + /// The thread is blocked on some synchronization primitive. It is the + /// responsibility of the synchronization primitives to track threads that + /// are blocked by them. + BlockedOnSync, + /// The thread has terminated its execution. We do not delete terminated + /// threads (FIXME: why?). Terminated, } @@ -107,17 +107,28 @@ enum ThreadJoinStatus { /// A thread. pub struct Thread<'mir, 'tcx> { state: ThreadState, + /// Name of the thread. thread_name: Option>, + /// The virtual call stack. stack: Vec>>, + /// The join status. join_status: ThreadJoinStatus, + + /// The temporary used for storing the argument of + /// the call to `miri_start_panic` (the panic payload) when unwinding. + /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`. + pub(crate) panic_payload: Option>, + + /// Last OS error location in memory. It is a 32-bit integer. + pub(crate) last_error: Option>, } impl<'mir, 'tcx> Thread<'mir, 'tcx> { - /// Check if the thread terminated. If yes, change the state to terminated - /// and return `true`. + /// Check if the thread is done executing (no more stack frames). If yes, + /// change the state to terminated and return `true`. fn check_terminated(&mut self) -> bool { if self.state == ThreadState::Enabled { if self.stack.is_empty() { @@ -127,20 +138,22 @@ fn check_terminated(&mut self) -> bool { } false } + + /// Get the name of the current thread, or `` if it was not set. + fn thread_name(&self) -> &[u8] { + if let Some(ref thread_name) = self.thread_name { thread_name } else { b"" } + } } impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(ref name) = self.thread_name { - if let Ok(name_str) = std::str::from_utf8(name) { - write!(f, "{}", name_str)?; - } else { - write!(f, "")?; - } - } else { - write!(f, "unnamed")?; - } - write!(f, "({:?}, {:?})", self.state, self.join_status) + write!( + f, + "{}({:?}, {:?})", + String::from_utf8_lossy(self.thread_name()), + self.state, + self.join_status + ) } } @@ -151,10 +164,47 @@ fn default() -> Self { thread_name: None, stack: Vec::new(), join_status: ThreadJoinStatus::Joinable, + panic_payload: None, + last_error: None, } } } +/// A specific moment in time. +#[derive(Debug)] +pub enum Time { + Monotonic(Instant), + RealTime(SystemTime), +} + +impl Time { + /// How long do we have to wait from now until the specified time? + fn get_wait_time(&self) -> Duration { + match self { + Time::Monotonic(instant) => instant.saturating_duration_since(Instant::now()), + Time::RealTime(time) => + time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)), + } + } +} + +/// Callbacks are used to implement timeouts. For example, waiting on a +/// conditional variable with a timeout creates a callback that is called after +/// the specified time and unblocks the thread. If another thread signals on the +/// conditional variable, the signal handler deletes the callback. +struct TimeoutCallbackInfo<'mir, 'tcx> { + /// The callback should be called no earlier than this time. + call_time: Time, + /// The called function. + callback: TimeoutCallback<'mir, 'tcx>, +} + +impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TimeoutCallback({:?})", self.call_time) + } +} + /// A set of threads. #[derive(Debug)] pub struct ThreadManager<'mir, 'tcx> { @@ -164,13 +214,16 @@ pub struct ThreadManager<'mir, 'tcx> { /// /// Note that this vector also contains terminated threads. threads: IndexVec>, - /// A counter used to generate unique identifiers for blocksets. - blockset_counter: u32, + /// This field is pub(crate) because the synchronization primitives + /// (`crate::sync`) need a way to access it. + pub(crate) sync: SynchronizationState, /// A mapping from a thread-local static to an allocation id of a thread /// specific allocation. thread_local_alloc_ids: RefCell>, /// A flag that indicates that we should change the active thread. yield_active_thread: bool, + /// Callbacks that are called once the specified time passes. + timeout_callbacks: FxHashMap>, } impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { @@ -178,14 +231,16 @@ fn default() -> Self { let mut threads = IndexVec::new(); // Create the main thread and add it to the list of threads. let mut main_thread = Thread::default(); + // The main thread can *not* be joined on. main_thread.join_status = ThreadJoinStatus::Detached; threads.push(main_thread); Self { active_thread: ThreadId::new(0), threads: threads, - blockset_counter: 0, + sync: SynchronizationState::default(), thread_local_alloc_ids: Default::default(), yield_active_thread: false, + timeout_callbacks: FxHashMap::default(), } } } @@ -204,8 +259,8 @@ fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option { fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) { self.thread_local_alloc_ids .borrow_mut() - .insert((def_id, self.active_thread), new_alloc_id) - .unwrap_none(); + .try_insert((def_id, self.active_thread), new_alloc_id) + .unwrap(); } /// Borrow the stack of the active thread. @@ -275,7 +330,11 @@ fn detach_thread(&mut self, id: ThreadId) -> InterpResult<'tcx> { } /// Mark that the active thread tries to join the thread with `joined_thread_id`. - fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { + fn join_thread( + &mut self, + joined_thread_id: ThreadId, + data_race: &Option>, + ) -> InterpResult<'tcx> { if self.threads[joined_thread_id].join_status != ThreadJoinStatus::Joinable { throw_ub_format!("trying to join a detached or already joined thread"); } @@ -286,7 +345,7 @@ fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { self.threads .iter() .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)), - "a joinable thread has threads waiting for its termination" + "a joinable thread already has threads waiting for its termination" ); // Mark the joined thread as being joined so that we detect if other // threads try to join it. @@ -299,6 +358,11 @@ fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { self.active_thread, joined_thread_id ); + } else { + // The thread has already terminated - mark join happens-before + if let Some(data_race) = data_race { + data_race.thread_joined(self.active_thread, joined_thread_id); + } } Ok(()) } @@ -309,43 +373,104 @@ fn set_thread_name(&mut self, new_thread_name: Vec) { } /// Get the name of the active thread. - fn get_thread_name(&self) -> InterpResult<'tcx, &[u8]> { - if let Some(ref thread_name) = self.active_thread_ref().thread_name { - Ok(thread_name) - } else { - throw_ub_format!("thread {:?} has no name set", self.active_thread) - } + fn get_thread_name(&self) -> &[u8] { + self.active_thread_ref().thread_name() } - /// Allocate a new blockset id. - fn create_blockset(&mut self) -> BlockSetId { - self.blockset_counter = self.blockset_counter.checked_add(1).unwrap(); - self.blockset_counter.into() + /// Put the thread into the blocked state. + fn block_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; + assert_eq!(*state, ThreadState::Enabled); + *state = ThreadState::BlockedOnSync; } - /// Block the currently active thread and put it into the given blockset. - fn block_active_thread(&mut self, set: BlockSetId) { - let state = &mut self.active_thread_mut().state; - assert_eq!(*state, ThreadState::Enabled); - *state = ThreadState::Blocked(set); + /// Put the blocked thread into the enabled state. + fn unblock_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; + assert_eq!(*state, ThreadState::BlockedOnSync); + *state = ThreadState::Enabled; } - /// Unblock any one thread from the given blockset if it contains at least - /// one. Return the id of the unblocked thread. - fn unblock_some_thread(&mut self, set: BlockSetId) -> Option { - for (id, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::Blocked(set) { - trace!("unblocking {:?} in blockset {:?}", id, set); - thread.state = ThreadState::Enabled; - return Some(id); + /// Change the active thread to some enabled thread. + fn yield_active_thread(&mut self) { + // We do not yield immediately, as swapping out the current stack while executing a MIR statement + // could lead to all sorts of confusion. + // We should only switch stacks between steps. + self.yield_active_thread = true; + } + + /// Register the given `callback` to be called once the `call_time` passes. + /// + /// The callback will be called with `thread` being the active thread, and + /// the callback may not change the active thread. + fn register_timeout_callback( + &mut self, + thread: ThreadId, + call_time: Time, + callback: TimeoutCallback<'mir, 'tcx>, + ) { + self.timeout_callbacks + .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) + .unwrap(); + } + + /// Unregister the callback for the `thread`. + fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { + self.timeout_callbacks.remove(&thread); + } + + /// Get a callback that is ready to be called. + fn get_ready_callback(&mut self) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> { + // We iterate over all threads in the order of their indices because + // this allows us to have a deterministic scheduler. + for thread in self.threads.indices() { + match self.timeout_callbacks.entry(thread) { + Entry::Occupied(entry) => + if entry.get().call_time.get_wait_time() == Duration::new(0, 0) { + return Some((thread, entry.remove().callback)); + }, + Entry::Vacant(_) => {} } } None } - /// Change the active thread to some enabled thread. - fn yield_active_thread(&mut self) { - self.yield_active_thread = true; + /// Wakes up threads joining on the active one and deallocates thread-local statics. + /// The `AllocId` that can now be freed is returned. + fn thread_terminated( + &mut self, + data_race: &Option>, + ) -> Vec { + let mut free_tls_statics = Vec::new(); + { + let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); + thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { + if thread != self.active_thread { + // Keep this static around. + return true; + } + // Delete this static from the map and from memory. + // We cannot free directly here as we cannot use `?` in this context. + free_tls_statics.push(alloc_id); + return false; + }); + } + // Set the thread into a terminated state in the data-race detector + if let Some(data_race) = data_race { + data_race.thread_terminated(); + } + // Check if we need to unblock any threads. + for (i, thread) in self.threads.iter_enumerated_mut() { + if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { + // The thread has terminated, mark happens-before edge to joining thread + if let Some(data_race) = data_race { + data_race.thread_joined(i, self.active_thread); + } + trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); + thread.state = ThreadState::Enabled; + } + } + return free_tls_statics; } /// Decide which action to take next and on which thread. @@ -353,18 +478,18 @@ fn yield_active_thread(&mut self) { /// The currently implemented scheduling policy is the one that is commonly /// used in stateless model checkers such as Loom: run the active thread as /// long as we can and switch only when we have to (the active thread was - /// blocked, terminated, or was explicitly asked to be preempted). - fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { + /// blocked, terminated, or has explicitly asked to be preempted). + fn schedule( + &mut self, + data_race: &Option>, + ) -> InterpResult<'tcx, SchedulingAction> { + // Check whether the thread has **just** terminated (`check_terminated` + // checks whether the thread has popped all its stack and if yes, sets + // the thread state to terminated). if self.threads[self.active_thread].check_terminated() { - // Check if we need to unblock any threads. - for (i, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { - trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); - thread.state = ThreadState::Enabled; - } - } return Ok(SchedulingAction::ExecuteDtors); } + // If we get here again and the thread is *still* terminated, there are no more dtors to run. if self.threads[MAIN_THREAD].state == ThreadState::Terminated { // The main thread terminated; stop the program. if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) { @@ -378,17 +503,33 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { } return Ok(SchedulingAction::Stop); } + // This thread and the program can keep going. if self.threads[self.active_thread].state == ThreadState::Enabled && !self.yield_active_thread { // The currently active thread is still enabled, just continue with it. return Ok(SchedulingAction::ExecuteStep); } + // The active thread yielded. Let's see if there are any timeouts to take care of. We do + // this *before* running any other thread, to ensure that timeouts "in the past" fire before + // any other thread can take an action. This ensures that for `pthread_cond_timedwait`, "an + // error is returned if [...] the absolute time specified by abstime has already been passed + // at the time of the call". + // + let potential_sleep_time = + self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min(); + if potential_sleep_time == Some(Duration::new(0, 0)) { + return Ok(SchedulingAction::ExecuteTimeoutCallback); + } + // No callbacks scheduled, pick a regular thread to execute. // We need to pick a new thread for execution. for (id, thread) in self.threads.iter_enumerated() { if thread.state == ThreadState::Enabled { if !self.yield_active_thread || id != self.active_thread { self.active_thread = id; + if let Some(data_race) = data_race { + data_race.thread_set_active(self.active_thread); + } break; } } @@ -399,7 +540,13 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { } // We have not found a thread to execute. if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { - unreachable!(); + unreachable!("all threads terminated without the main thread terminating?!"); + } else if let Some(sleep_time) = potential_sleep_time { + // All threads are currently blocked, but we have unexecuted + // timeout_callbacks, which may unblock some of the threads. Hence, + // sleep until the first callback. + std::thread::sleep(sleep_time); + Ok(SchedulingAction::ExecuteTimeoutCallback) } else { throw_machine_stop!(TerminationInfo::Deadlock); } @@ -409,48 +556,13 @@ fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { // Public interface to thread management. impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { - /// A workaround for thread-local statics until - /// https://github.com/rust-lang/rust/issues/70685 is fixed: change the - /// thread-local allocation id with a freshly generated allocation id for - /// the currently active thread. - fn remap_thread_local_alloc_ids( - &self, - val: &mut mir::interpret::ConstValue<'tcx>, - ) -> InterpResult<'tcx> { - let this = self.eval_context_ref(); - match *val { - mir::interpret::ConstValue::Scalar(Scalar::Ptr(ref mut ptr)) => { - let alloc_id = ptr.alloc_id; - let alloc = this.tcx.alloc_map.lock().get(alloc_id); - let tcx = this.tcx; - let is_thread_local = |def_id| { - tcx.codegen_fn_attrs(def_id).flags.contains(CodegenFnAttrFlags::THREAD_LOCAL) - }; - match alloc { - Some(GlobalAlloc::Static(def_id)) if is_thread_local(def_id) => { - ptr.alloc_id = this.get_or_create_thread_local_alloc_id(def_id)?; - } - _ => {} - } - } - _ => { - // FIXME: Handling only `Scalar` seems to work for now, but at - // least in principle thread-locals could be in any constant, so - // we should also consider other cases. However, once - // https://github.com/rust-lang/rust/issues/70685 gets fixed, - // this code will have to be rewritten anyway. - } - } - Ok(()) - } - /// Get a thread-specific allocation id for the given thread-local static. /// If needed, allocate a new one. - /// - /// FIXME: This method should be replaced as soon as - /// https://github.com/rust-lang/rust/issues/70685 gets fixed. - fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tcx, AllocId> { - let this = self.eval_context_ref(); + fn get_or_create_thread_local_alloc_id( + &mut self, + def_id: DefId, + ) -> InterpResult<'tcx, AllocId> { + let this = self.eval_context_mut(); let tcx = this.tcx; if let Some(new_alloc_id) = this.machine.threads.get_thread_local_alloc_id(def_id) { // We already have a thread-specific allocation id for this @@ -459,45 +571,27 @@ fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tc } else { // We need to allocate a thread-specific allocation id for this // thread-local static. - // - // At first, we invoke the `const_eval_raw` query and extract the - // allocation from it. Unfortunately, we have to duplicate the code - // from `Memory::get_global_alloc` that does this. - // - // Then we store the retrieved allocation back into the `alloc_map` - // to get a fresh allocation id, which we can use as a - // thread-specific allocation id for the thread-local static. + // First, we compute the initial value for this static. if tcx.is_foreign_item(def_id) { throw_unsup_format!("foreign thread-local statics are not supported"); } - // Invoke the `const_eval_raw` query. - let instance = Instance::mono(tcx.tcx, def_id); - let gid = GlobalId { instance, promoted: None }; - let raw_const = - tcx.const_eval_raw(ty::ParamEnv::reveal_all().and(gid)).map_err(|err| { - // no need to report anything, the const_eval call takes care of that - // for statics - assert!(tcx.is_static(def_id)); - err - })?; - let id = raw_const.alloc_id; - // Extract the allocation from the query result. - let mut alloc_map = tcx.alloc_map.lock(); - let allocation = alloc_map.unwrap_memory(id); - // Create a new allocation id for the same allocation in this hacky - // way. Internally, `alloc_map` deduplicates allocations, but this - // is fine because Miri will make a copy before a first mutable - // access. - let new_alloc_id = alloc_map.create_memory_alloc(allocation); + let allocation = tcx.eval_static_initializer(def_id)?; + // Create a fresh allocation with this content. + let new_alloc_id = + this.memory.allocate_with(allocation.clone(), MiriMemoryKind::Tls.into()).alloc_id; this.machine.threads.set_thread_local_alloc_id(def_id, new_alloc_id); Ok(new_alloc_id) } } #[inline] - fn create_thread(&mut self) -> InterpResult<'tcx, ThreadId> { + fn create_thread(&mut self) -> ThreadId { let this = self.eval_context_mut(); - Ok(this.machine.threads.create_thread()) + let id = this.machine.threads.create_thread(); + if let Some(data_race) = &this.memory.extra.data_race { + data_race.thread_created(id); + } + id } #[inline] @@ -509,38 +603,54 @@ fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> { #[inline] fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - this.machine.threads.join_thread(joined_thread_id) + let data_race = &this.memory.extra.data_race; + this.machine.threads.join_thread(joined_thread_id, data_race)?; + Ok(()) + } + + #[inline] + fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { + let this = self.eval_context_mut(); + if let Some(data_race) = &this.memory.extra.data_race { + data_race.thread_set_active(thread_id); + } + this.machine.threads.set_active_thread_id(thread_id) } #[inline] - fn set_active_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx, ThreadId> { + fn get_active_thread(&self) -> ThreadId { + let this = self.eval_context_ref(); + this.machine.threads.get_active_thread_id() + } + + #[inline] + fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { let this = self.eval_context_mut(); - Ok(this.machine.threads.set_active_thread_id(thread_id)) + this.machine.threads.active_thread_mut() } #[inline] - fn get_active_thread(&self) -> InterpResult<'tcx, ThreadId> { + fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { let this = self.eval_context_ref(); - Ok(this.machine.threads.get_active_thread_id()) + this.machine.threads.active_thread_ref() } #[inline] - fn get_total_thread_count(&self) -> InterpResult<'tcx, usize> { + fn get_total_thread_count(&self) -> usize { let this = self.eval_context_ref(); - Ok(this.machine.threads.get_total_thread_count()) + this.machine.threads.get_total_thread_count() } #[inline] - fn has_terminated(&self, thread_id: ThreadId) -> InterpResult<'tcx, bool> { + fn has_terminated(&self, thread_id: ThreadId) -> bool { let this = self.eval_context_ref(); - Ok(this.machine.threads.has_terminated(thread_id)) + this.machine.threads.has_terminated(thread_id) } #[inline] - fn enable_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> { + fn enable_thread(&mut self, thread_id: ThreadId) { let this = self.eval_context_mut(); this.machine.threads.enable_thread(thread_id); - Ok(()) } #[inline] @@ -556,13 +666,18 @@ fn active_thread_stack_mut(&mut self) -> &mut Vec) -> InterpResult<'tcx, ()> { + fn set_active_thread_name(&mut self, new_thread_name: Vec) { let this = self.eval_context_mut(); - Ok(this.machine.threads.set_thread_name(new_thread_name)) + if let Some(data_race) = &this.memory.extra.data_race { + if let Ok(string) = String::from_utf8(new_thread_name.clone()) { + data_race.thread_set_name(this.machine.threads.active_thread, string); + } + } + this.machine.threads.set_thread_name(new_thread_name); } #[inline] - fn get_active_thread_name<'c>(&'c self) -> InterpResult<'tcx, &'c [u8]> + fn get_active_thread_name<'c>(&'c self) -> &'c [u8] where 'mir: 'c, { @@ -571,27 +686,65 @@ fn get_active_thread_name<'c>(&'c self) -> InterpResult<'tcx, &'c [u8]> } #[inline] - fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> { + fn block_thread(&mut self, thread: ThreadId) { let this = self.eval_context_mut(); - Ok(this.machine.threads.create_blockset()) + this.machine.threads.block_thread(thread); } #[inline] - fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> { + fn unblock_thread(&mut self, thread: ThreadId) { let this = self.eval_context_mut(); - Ok(this.machine.threads.block_active_thread(set)) + this.machine.threads.unblock_thread(thread); } #[inline] - fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option> { + fn yield_active_thread(&mut self) { let this = self.eval_context_mut(); - Ok(this.machine.threads.unblock_some_thread(set)) + this.machine.threads.yield_active_thread(); } #[inline] - fn yield_active_thread(&mut self) -> InterpResult<'tcx> { + fn register_timeout_callback( + &mut self, + thread: ThreadId, + call_time: Time, + callback: TimeoutCallback<'mir, 'tcx>, + ) { let this = self.eval_context_mut(); - this.machine.threads.yield_active_thread(); + this.machine.threads.register_timeout_callback(thread, call_time, callback); + } + + #[inline] + fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.unregister_timeout_callback_if_exists(thread); + } + + /// Execute a timeout callback on the callback's thread. + #[inline] + fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let (thread, callback) = + if let Some((thread, callback)) = this.machine.threads.get_ready_callback() { + (thread, callback) + } else { + // get_ready_callback can return None if the computer's clock + // was shifted after calling the scheduler and before the call + // to get_ready_callback (see issue + // https://github.com/rust-lang/miri/issues/1763). In this case, + // just do nothing, which effectively just returns to the + // scheduler. + return Ok(()); + }; + // This back-and-forth with `set_active_thread` is here because of two + // design decisions: + // 1. Make the caller and not the callback responsible for changing + // thread. + // 2. Make the scheduler the only place that can change the active + // thread. + let old_thread = this.set_active_thread(thread); + callback(this)?; + this.set_active_thread(old_thread); Ok(()) } @@ -599,6 +752,22 @@ fn yield_active_thread(&mut self) -> InterpResult<'tcx> { #[inline] fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { let this = self.eval_context_mut(); - this.machine.threads.schedule() + let data_race = &this.memory.extra.data_race; + this.machine.threads.schedule(data_race) + } + + /// Handles thread termination of the active thread: wakes up threads joining on this one, + /// and deallocated thread-local statics. + /// + /// This is called from `tls.rs` after handling the TLS dtors. + #[inline] + fn thread_terminated(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let data_race = &this.memory.extra.data_race; + for alloc_id in this.machine.threads.thread_terminated(data_race) { + let ptr = this.memory.global_base_pointer(alloc_id.into())?; + this.memory.deallocate(ptr, None, MiriMemoryKind::Tls.into())?; + } + Ok(()) } }