1 //! Implements threads.
3 use std::cell::RefCell;
4 use std::collections::hash_map::Entry;
5 use std::num::TryFromIntError;
7 use std::time::{Duration, SystemTime};
11 use rustc_data_structures::fx::FxHashMap;
12 use rustc_hir::def_id::DefId;
13 use rustc_index::vec::{Idx, IndexVec};
14 use rustc_middle::mir::Mutability;
15 use rustc_middle::ty::layout::TyAndLayout;
17 use rustc_target::spec::abi::Abi;
19 use crate::concurrency::data_race;
20 use crate::concurrency::sync::SynchronizationState;
21 use crate::shims::tls;
24 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
25 enum SchedulingAction {
26 /// Execute step on the active thread.
28 /// Execute a timeout callback.
29 ExecuteTimeoutCallback,
30 /// Wait for a bit, until there is a timeout to be called.
34 /// Trait for callbacks that can be executed when some event happens, such as after a timeout.
35 pub trait MachineCallback<'mir, 'tcx>: VisitTags {
36 fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>;
39 type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>;
41 /// A thread identifier.
42 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
43 pub struct ThreadId(u32);
46 pub fn to_u32(self) -> u32 {
51 impl Idx for ThreadId {
52 fn new(idx: usize) -> Self {
53 ThreadId(u32::try_from(idx).unwrap())
56 fn index(self) -> usize {
57 usize::try_from(self.0).unwrap()
61 impl TryFrom<u64> for ThreadId {
62 type Error = TryFromIntError;
63 fn try_from(id: u64) -> Result<Self, Self::Error> {
64 u32::try_from(id).map(Self)
68 impl From<u32> for ThreadId {
69 fn from(id: u32) -> Self {
74 impl From<ThreadId> for u64 {
75 fn from(t: ThreadId) -> Self {
80 /// The state of a thread.
81 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
82 pub enum ThreadState {
83 /// The thread is enabled and can be executed.
85 /// The thread tried to join the specified thread and is blocked until that
86 /// thread terminates.
87 BlockedOnJoin(ThreadId),
88 /// The thread is blocked on some synchronization primitive. It is the
89 /// responsibility of the synchronization primitives to track threads that
90 /// are blocked by them.
92 /// The thread has terminated its execution. We do not delete terminated
93 /// threads (FIXME: why?).
97 /// The join status of a thread.
98 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
99 enum ThreadJoinStatus {
100 /// The thread can be joined.
102 /// A thread is detached if its join handle was destroyed and no other
103 /// thread can join it.
105 /// The thread was already joined by some thread and cannot be joined again.
110 pub struct Thread<'mir, 'tcx> {
113 /// Name of the thread.
114 thread_name: Option<Vec<u8>>,
116 /// The virtual call stack.
117 stack: Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>,
119 /// The function to call when the stack ran empty, to figure out what to do next.
120 /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
121 /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
122 /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
123 pub(crate) on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>,
125 /// The index of the topmost user-relevant frame in `stack`. This field must contain
126 /// the value produced by `get_top_user_relevant_frame`.
127 /// The `None` state here represents
128 /// This field is a cache to reduce how often we call that method. The cache is manually
129 /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
130 top_user_relevant_frame: Option<usize>,
133 join_status: ThreadJoinStatus,
135 /// The temporary used for storing the argument of
136 /// the call to `miri_start_panic` (the panic payload) when unwinding.
137 /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
138 pub(crate) panic_payload: Option<Scalar<Provenance>>,
140 /// Last OS error location in memory. It is a 32-bit integer.
141 pub(crate) last_error: Option<MPlaceTy<'tcx, Provenance>>,
144 pub type StackEmptyCallback<'mir, 'tcx> =
145 Box<dyn FnMut(&mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx, Poll<()>>>;
147 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
148 /// Get the name of the current thread, or `<unnamed>` if it was not set.
149 fn thread_name(&self) -> &[u8] {
150 if let Some(ref thread_name) = self.thread_name { thread_name } else { b"<unnamed>" }
153 /// Return the top user-relevant frame, if there is one.
154 /// Note that the choice to return `None` here when there is no user-relevant frame is part of
155 /// justifying the optimization that only pushes of user-relevant frames require updating the
156 /// `top_user_relevant_frame` field.
157 fn compute_top_user_relevant_frame(&self) -> Option<usize> {
162 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
165 /// Re-compute the top user-relevant frame from scratch.
166 pub fn recompute_top_user_relevant_frame(&mut self) {
167 self.top_user_relevant_frame = self.compute_top_user_relevant_frame();
170 /// Set the top user-relevant frame to the given value. Must be equal to what
171 /// `get_top_user_relevant_frame` would return!
172 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
173 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame());
174 self.top_user_relevant_frame = Some(frame_idx);
177 /// Returns the topmost frame that is considered user-relevant, or the
178 /// top of the stack if there is no such frame, or `None` if the stack is empty.
179 pub fn top_user_relevant_frame(&self) -> Option<usize> {
180 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame());
181 // This can be called upon creation of an allocation. We create allocations while setting up
182 // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
184 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
188 impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 String::from_utf8_lossy(self.thread_name()),
200 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
201 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self {
203 state: ThreadState::Enabled,
204 thread_name: name.map(|name| Vec::from(name.as_bytes())),
206 top_user_relevant_frame: None,
207 join_status: ThreadJoinStatus::Joinable,
215 impl VisitTags for Thread<'_, '_> {
216 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
221 top_user_relevant_frame: _,
225 on_stack_empty: _, // we assume the closure captures no GC-relevant state
228 panic_payload.visit_tags(visit);
229 last_error.visit_tags(visit);
231 frame.visit_tags(visit)
236 impl VisitTags for Frame<'_, '_, Provenance, FrameExtra<'_>> {
237 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
246 // There are some private fields we cannot access; they contain no tags.
251 return_place.visit_tags(visit);
253 for local in locals.iter() {
254 if let LocalValue::Live(value) = &local.value {
255 value.visit_tags(visit);
259 extra.visit_tags(visit);
263 /// A specific moment in time.
267 RealTime(SystemTime),
271 /// How long do we have to wait from now until the specified time?
272 fn get_wait_time(&self, clock: &Clock) -> Duration {
274 Time::Monotonic(instant) => instant.duration_since(clock.now()),
275 Time::RealTime(time) =>
276 time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
281 /// Callbacks are used to implement timeouts. For example, waiting on a
282 /// conditional variable with a timeout creates a callback that is called after
283 /// the specified time and unblocks the thread. If another thread signals on the
284 /// conditional variable, the signal handler deletes the callback.
285 struct TimeoutCallbackInfo<'mir, 'tcx> {
286 /// The callback should be called no earlier than this time.
288 /// The called function.
289 callback: TimeoutCallback<'mir, 'tcx>,
292 impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 write!(f, "TimeoutCallback({:?})", self.call_time)
298 /// A set of threads.
300 pub struct ThreadManager<'mir, 'tcx> {
301 /// Identifier of the currently active thread.
302 active_thread: ThreadId,
303 /// Threads used in the program.
305 /// Note that this vector also contains terminated threads.
306 threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
307 /// This field is pub(crate) because the synchronization primitives
308 /// (`crate::sync`) need a way to access it.
309 pub(crate) sync: SynchronizationState<'mir, 'tcx>,
310 /// A mapping from a thread-local static to an allocation id of a thread
311 /// specific allocation.
312 thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>,
313 /// A flag that indicates that we should change the active thread.
314 yield_active_thread: bool,
315 /// Callbacks that are called once the specified time passes.
316 timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
319 impl VisitTags for ThreadManager<'_, '_> {
320 fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
323 thread_local_alloc_ids,
326 yield_active_thread: _,
330 for thread in threads {
331 thread.visit_tags(visit);
333 for ptr in thread_local_alloc_ids.borrow().values() {
334 ptr.visit_tags(visit);
336 for callback in timeout_callbacks.values() {
337 callback.callback.visit_tags(visit);
339 sync.visit_tags(visit);
343 impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
344 fn default() -> Self {
345 let mut threads = IndexVec::new();
346 // Create the main thread and add it to the list of threads.
347 threads.push(Thread::new(Some("main"), None));
349 active_thread: ThreadId::new(0),
351 sync: SynchronizationState::default(),
352 thread_local_alloc_ids: Default::default(),
353 yield_active_thread: false,
354 timeout_callbacks: FxHashMap::default(),
359 impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
361 ecx: &mut MiriInterpCx<'mir, 'tcx>,
362 on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>,
364 ecx.machine.threads.threads[ThreadId::new(0)].on_stack_empty = Some(on_main_stack_empty);
365 if ecx.tcx.sess.target.os.as_ref() != "windows" {
366 // The main thread can *not* be joined on except on windows.
367 ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached;
371 /// Check if we have an allocation for the given thread local static for the
373 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> {
374 self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
377 /// Set the pointer for the allocation of the given thread local
378 /// static for the active thread.
380 /// Panics if a thread local is initialized twice for the same thread.
381 fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) {
382 self.thread_local_alloc_ids
384 .try_insert((def_id, self.active_thread), ptr)
388 /// Borrow the stack of the active thread.
389 pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
390 &self.threads[self.active_thread].stack
393 /// Mutably borrow the stack of the active thread.
394 fn active_thread_stack_mut(
396 ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
397 &mut self.threads[self.active_thread].stack
402 ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]> {
403 self.threads.iter().map(|t| &t.stack[..])
406 /// Create a new thread and returns its id.
407 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId {
408 let new_thread_id = ThreadId::new(self.threads.len());
409 self.threads.push(Thread::new(None, Some(on_stack_empty)));
413 /// Set an active thread and return the id of the thread that was active before.
414 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
415 let active_thread_id = self.active_thread;
416 self.active_thread = id;
417 assert!(self.active_thread.index() < self.threads.len());
421 /// Get the id of the currently active thread.
422 pub fn get_active_thread_id(&self) -> ThreadId {
426 /// Get the total number of threads that were ever spawn by this program.
427 pub fn get_total_thread_count(&self) -> usize {
431 /// Get the total of threads that are currently live, i.e., not yet terminated.
432 /// (They might be blocked.)
433 pub fn get_live_thread_count(&self) -> usize {
434 self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count()
437 /// Has the given thread terminated?
438 fn has_terminated(&self, thread_id: ThreadId) -> bool {
439 self.threads[thread_id].state == ThreadState::Terminated
442 /// Have all threads terminated?
443 fn have_all_terminated(&self) -> bool {
444 self.threads.iter().all(|thread| thread.state == ThreadState::Terminated)
447 /// Enable the thread for execution. The thread must be terminated.
448 fn enable_thread(&mut self, thread_id: ThreadId) {
449 assert!(self.has_terminated(thread_id));
450 self.threads[thread_id].state = ThreadState::Enabled;
453 /// Get a mutable borrow of the currently active thread.
454 pub fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
455 &mut self.threads[self.active_thread]
458 /// Get a shared borrow of the currently active thread.
459 pub fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
460 &self.threads[self.active_thread]
463 /// Mark the thread as detached, which means that no other thread will try
464 /// to join it and the thread is responsible for cleaning up.
466 /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
467 /// This matches Windows's behavior for `CloseHandle`.
469 /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
470 /// > The handle is valid until closed, even after the thread it represents has been terminated.
471 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
472 trace!("detaching {:?}", id);
474 let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated
476 // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
477 self.threads[id].join_status == ThreadJoinStatus::Detached
479 self.threads[id].join_status != ThreadJoinStatus::Joinable
482 throw_ub_format!("trying to detach thread that was already detached or joined");
485 self.threads[id].join_status = ThreadJoinStatus::Detached;
489 /// Mark that the active thread tries to join the thread with `joined_thread_id`.
492 joined_thread_id: ThreadId,
493 data_race: Option<&mut data_race::GlobalState>,
494 ) -> InterpResult<'tcx> {
495 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
496 // On Windows this corresponds to joining on a closed handle.
497 throw_ub_format!("trying to join a detached thread");
500 // Mark the joined thread as being joined so that we detect if other
501 // threads try to join it.
502 self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
503 if self.threads[joined_thread_id].state != ThreadState::Terminated {
504 // The joined thread is still running, we need to wait for it.
505 self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
507 "{:?} blocked on {:?} when trying to join",
512 // The thread has already terminated - mark join happens-before
513 if let Some(data_race) = data_race {
514 data_race.thread_joined(self, self.active_thread, joined_thread_id);
520 /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
521 /// If the thread is already joined by another thread, it will throw UB
522 fn join_thread_exclusive(
524 joined_thread_id: ThreadId,
525 data_race: Option<&mut data_race::GlobalState>,
526 ) -> InterpResult<'tcx> {
527 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
528 throw_ub_format!("trying to join an already joined thread");
531 if joined_thread_id == self.active_thread {
532 throw_ub_format!("trying to join itself");
538 .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
539 "this thread already has threads waiting for its termination"
542 self.join_thread(joined_thread_id, data_race)
545 /// Set the name of the given thread.
546 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
547 self.threads[thread].thread_name = Some(new_thread_name);
550 /// Get the name of the given thread.
551 pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] {
552 self.threads[thread].thread_name()
555 /// Put the thread into the blocked state.
556 fn block_thread(&mut self, thread: ThreadId) {
557 let state = &mut self.threads[thread].state;
558 assert_eq!(*state, ThreadState::Enabled);
559 *state = ThreadState::BlockedOnSync;
562 /// Put the blocked thread into the enabled state.
563 fn unblock_thread(&mut self, thread: ThreadId) {
564 let state = &mut self.threads[thread].state;
565 assert_eq!(*state, ThreadState::BlockedOnSync);
566 *state = ThreadState::Enabled;
569 /// Change the active thread to some enabled thread.
570 fn yield_active_thread(&mut self) {
571 // We do not yield immediately, as swapping out the current stack while executing a MIR statement
572 // could lead to all sorts of confusion.
573 // We should only switch stacks between steps.
574 self.yield_active_thread = true;
577 /// Register the given `callback` to be called once the `call_time` passes.
579 /// The callback will be called with `thread` being the active thread, and
580 /// the callback may not change the active thread.
581 fn register_timeout_callback(
585 callback: TimeoutCallback<'mir, 'tcx>,
587 self.timeout_callbacks
588 .try_insert(thread, TimeoutCallbackInfo { call_time, callback })
592 /// Unregister the callback for the `thread`.
593 fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
594 self.timeout_callbacks.remove(&thread);
597 /// Get a callback that is ready to be called.
598 fn get_ready_callback(
601 ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
602 // We iterate over all threads in the order of their indices because
603 // this allows us to have a deterministic scheduler.
604 for thread in self.threads.indices() {
605 match self.timeout_callbacks.entry(thread) {
606 Entry::Occupied(entry) =>
607 if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) {
608 return Some((thread, entry.remove().callback));
610 Entry::Vacant(_) => {}
616 /// Wakes up threads joining on the active one and deallocates thread-local statics.
617 /// The `AllocId` that can now be freed are returned.
618 fn thread_terminated(
620 mut data_race: Option<&mut data_race::GlobalState>,
622 ) -> Vec<Pointer<Provenance>> {
623 let mut free_tls_statics = Vec::new();
625 let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut();
626 thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| {
627 if thread != self.active_thread {
628 // Keep this static around.
631 // Delete this static from the map and from memory.
632 // We cannot free directly here as we cannot use `?` in this context.
633 free_tls_statics.push(alloc_id);
637 // Set the thread into a terminated state in the data-race detector.
638 if let Some(ref mut data_race) = data_race {
639 data_race.thread_terminated(self, current_span);
641 // Check if we need to unblock any threads.
642 let mut joined_threads = vec![]; // store which threads joined, we'll need it
643 for (i, thread) in self.threads.iter_enumerated_mut() {
644 if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
645 // The thread has terminated, mark happens-before edge to joining thread
646 if data_race.is_some() {
647 joined_threads.push(i);
649 trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
650 thread.state = ThreadState::Enabled;
653 for &i in &joined_threads {
654 data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread);
659 /// Decide which action to take next and on which thread.
661 /// The currently implemented scheduling policy is the one that is commonly
662 /// used in stateless model checkers such as Loom: run the active thread as
663 /// long as we can and switch only when we have to (the active thread was
664 /// blocked, terminated, or has explicitly asked to be preempted).
665 fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
666 // This thread and the program can keep going.
667 if self.threads[self.active_thread].state == ThreadState::Enabled
668 && !self.yield_active_thread
670 // The currently active thread is still enabled, just continue with it.
671 return Ok(SchedulingAction::ExecuteStep);
673 // The active thread yielded or got terminated. Let's see if there are any timeouts to take
674 // care of. We do this *before* running any other thread, to ensure that timeouts "in the
675 // past" fire before any other thread can take an action. This ensures that for
676 // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
677 // abstime has already been passed at the time of the call".
678 // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
679 let potential_sleep_time =
680 self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min();
681 if potential_sleep_time == Some(Duration::new(0, 0)) {
682 return Ok(SchedulingAction::ExecuteTimeoutCallback);
684 // No callbacks immediately scheduled, pick a regular thread to execute.
685 // The active thread blocked or yielded. So we go search for another enabled thread.
686 // Crucially, we start searching at the current active thread ID, rather than at 0, since we
687 // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
689 // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
690 // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
695 .skip(self.active_thread.index() + 1)
696 .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
697 for (id, thread) in threads {
698 debug_assert_ne!(self.active_thread, id);
699 if thread.state == ThreadState::Enabled {
700 self.active_thread = id;
704 self.yield_active_thread = false;
705 if self.threads[self.active_thread].state == ThreadState::Enabled {
706 return Ok(SchedulingAction::ExecuteStep);
708 // We have not found a thread to execute.
709 if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
710 unreachable!("all threads terminated without the main thread terminating?!");
711 } else if let Some(sleep_time) = potential_sleep_time {
712 // All threads are currently blocked, but we have unexecuted
713 // timeout_callbacks, which may unblock some of the threads. Hence,
714 // sleep until the first callback.
715 Ok(SchedulingAction::Sleep(sleep_time))
717 throw_machine_stop!(TerminationInfo::Deadlock);
722 impl<'mir, 'tcx: 'mir> EvalContextPrivExt<'mir, 'tcx> for MiriInterpCx<'mir, 'tcx> {}
723 trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
724 /// Execute a timeout callback on the callback's thread.
726 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
727 let this = self.eval_context_mut();
728 let (thread, callback) = if let Some((thread, callback)) =
729 this.machine.threads.get_ready_callback(&this.machine.clock)
733 // get_ready_callback can return None if the computer's clock
734 // was shifted after calling the scheduler and before the call
735 // to get_ready_callback (see issue
736 // https://github.com/rust-lang/miri/issues/1763). In this case,
737 // just do nothing, which effectively just returns to the
741 // This back-and-forth with `set_active_thread` is here because of two
743 // 1. Make the caller and not the callback responsible for changing
745 // 2. Make the scheduler the only place that can change the active
747 let old_thread = this.set_active_thread(thread);
748 callback.call(this)?;
749 this.set_active_thread(old_thread);
754 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
755 let this = self.eval_context_mut();
756 let mut callback = this
760 .expect("`on_stack_empty` not set up, or already running");
761 let res = callback(this)?;
762 this.active_thread_mut().on_stack_empty = Some(callback);
767 // Public interface to thread management.
768 impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
769 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
770 /// Get a thread-specific allocation id for the given thread-local static.
771 /// If needed, allocate a new one.
772 fn get_or_create_thread_local_alloc(
775 ) -> InterpResult<'tcx, Pointer<Provenance>> {
776 let this = self.eval_context_mut();
778 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
779 // We already have a thread-specific allocation id for this
780 // thread-local static.
783 // We need to allocate a thread-specific allocation id for this
784 // thread-local static.
785 // First, we compute the initial value for this static.
786 if tcx.is_foreign_item(def_id) {
787 throw_unsup_format!("foreign thread-local statics are not supported");
789 // We don't give a span -- statics don't need that, they cannot be generic or associated.
790 let allocation = this.ctfe_query(None, |tcx| tcx.eval_static_initializer(def_id))?;
791 let mut allocation = allocation.inner().clone();
792 // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
793 allocation.mutability = Mutability::Mut;
794 // Create a fresh allocation with this content.
795 let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?;
796 this.machine.threads.set_thread_local_alloc(def_id, new_alloc);
801 /// Start a regular (non-main) thread.
803 fn start_regular_thread(
805 thread: Option<MPlaceTy<'tcx, Provenance>>,
806 start_routine: Pointer<Option<Provenance>>,
808 func_arg: ImmTy<'tcx, Provenance>,
809 ret_layout: TyAndLayout<'tcx>,
810 ) -> InterpResult<'tcx, ThreadId> {
811 let this = self.eval_context_mut();
813 // Create the new thread
814 let new_thread_id = this.machine.threads.create_thread({
815 let mut state = tls::TlsDtorsState::default();
816 Box::new(move |m| state.on_stack_empty(m))
818 let current_span = this.machine.current_span();
819 if let Some(data_race) = &mut this.machine.data_race {
820 data_race.thread_created(&this.machine.threads, new_thread_id, current_span);
823 // Write the current thread-id, switch to the next thread later
824 // to treat this write operation as occuring on the current thread.
825 if let Some(thread_info_place) = thread {
827 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
828 &thread_info_place.into(),
832 // Finally switch to new thread so that we can push the first stackframe.
833 // After this all accesses will be treated as occuring in the new thread.
834 let old_thread_id = this.set_active_thread(new_thread_id);
836 // Perform the function pointer load in the new thread frame.
837 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
839 // Note: the returned value is currently ignored (see the FIXME in
840 // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
842 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
848 Some(&ret_place.into()),
849 StackPopCleanup::Root { cleanup: true },
852 // Restore the old active thread frame.
853 this.set_active_thread(old_thread_id);
862 allow_terminated_joined: bool,
863 ) -> InterpResult<'tcx> {
864 let this = self.eval_context_mut();
865 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
869 fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
870 let this = self.eval_context_mut();
871 this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?;
876 fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
877 let this = self.eval_context_mut();
880 .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?;
885 fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId {
886 let this = self.eval_context_mut();
887 this.machine.threads.set_active_thread_id(thread_id)
891 fn get_active_thread(&self) -> ThreadId {
892 let this = self.eval_context_ref();
893 this.machine.threads.get_active_thread_id()
897 fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
898 let this = self.eval_context_mut();
899 this.machine.threads.active_thread_mut()
903 fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
904 let this = self.eval_context_ref();
905 this.machine.threads.active_thread_ref()
909 fn get_total_thread_count(&self) -> usize {
910 let this = self.eval_context_ref();
911 this.machine.threads.get_total_thread_count()
915 fn have_all_terminated(&self) -> bool {
916 let this = self.eval_context_ref();
917 this.machine.threads.have_all_terminated()
921 fn enable_thread(&mut self, thread_id: ThreadId) {
922 let this = self.eval_context_mut();
923 this.machine.threads.enable_thread(thread_id);
927 fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
928 let this = self.eval_context_ref();
929 this.machine.threads.active_thread_stack()
933 fn active_thread_stack_mut(
935 ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
936 let this = self.eval_context_mut();
937 this.machine.threads.active_thread_stack_mut()
940 /// Set the name of the current thread. The buffer must not include the null terminator.
942 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
943 let this = self.eval_context_mut();
944 this.machine.threads.set_thread_name(thread, new_thread_name);
948 fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) {
949 let this = self.eval_context_mut();
951 // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay.
952 // This is only read by diagnostics, which already use `from_utf8_lossy`.
955 .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes());
959 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8]
963 self.eval_context_ref().machine.threads.get_thread_name(thread)
967 fn block_thread(&mut self, thread: ThreadId) {
968 self.eval_context_mut().machine.threads.block_thread(thread);
972 fn unblock_thread(&mut self, thread: ThreadId) {
973 self.eval_context_mut().machine.threads.unblock_thread(thread);
977 fn yield_active_thread(&mut self) {
978 self.eval_context_mut().machine.threads.yield_active_thread();
982 fn maybe_preempt_active_thread(&mut self) {
985 let this = self.eval_context_mut();
986 if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) {
987 this.yield_active_thread();
992 fn register_timeout_callback(
996 callback: TimeoutCallback<'mir, 'tcx>,
998 let this = self.eval_context_mut();
999 if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) {
1000 panic!("cannot have `RealTime` callback with isolation enabled!")
1002 this.machine.threads.register_timeout_callback(thread, call_time, callback);
1006 fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
1007 let this = self.eval_context_mut();
1008 this.machine.threads.unregister_timeout_callback_if_exists(thread);
1011 /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1013 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1014 let this = self.eval_context_mut();
1016 match this.machine.threads.schedule(&this.machine.clock)? {
1017 SchedulingAction::ExecuteStep => {
1019 // See if this thread can do something else.
1020 match this.run_on_stack_empty()? {
1021 Poll::Pending => {} // keep going
1022 Poll::Ready(()) => this.terminate_active_thread()?,
1026 SchedulingAction::ExecuteTimeoutCallback => {
1027 this.run_timeout_callback()?;
1029 SchedulingAction::Sleep(duration) => {
1030 this.machine.clock.sleep(duration);
1036 /// Handles thread termination of the active thread: wakes up threads joining on this one,
1037 /// and deallocated thread-local statics.
1039 /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
1041 fn terminate_active_thread(&mut self) -> InterpResult<'tcx> {
1042 let this = self.eval_context_mut();
1043 let thread = this.active_thread_mut();
1044 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1045 thread.state = ThreadState::Terminated;
1047 let current_span = this.machine.current_span();
1049 this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span)
1051 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?;