]> git.lizzy.rs Git - rust.git/blob - src/tools/miri/src/concurrency/thread.rs
Rollup merge of #106958 - jyn514:labels, r=m-ou-se
[rust.git] / src / tools / miri / src / concurrency / thread.rs
1 //! Implements threads.
2
3 use std::cell::RefCell;
4 use std::collections::hash_map::Entry;
5 use std::num::TryFromIntError;
6 use std::task::Poll;
7 use std::time::{Duration, SystemTime};
8
9 use log::trace;
10
11 use rustc_data_structures::fx::FxHashMap;
12 use rustc_hir::def_id::DefId;
13 use rustc_index::vec::{Idx, IndexVec};
14 use rustc_middle::mir::Mutability;
15 use rustc_middle::ty::layout::TyAndLayout;
16 use rustc_span::Span;
17 use rustc_target::spec::abi::Abi;
18
19 use crate::concurrency::data_race;
20 use crate::concurrency::sync::SynchronizationState;
21 use crate::shims::tls;
22 use crate::*;
23
24 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
25 enum SchedulingAction {
26     /// Execute step on the active thread.
27     ExecuteStep,
28     /// Execute a timeout callback.
29     ExecuteTimeoutCallback,
30     /// Wait for a bit, until there is a timeout to be called.
31     Sleep(Duration),
32 }
33
34 /// Trait for callbacks that can be executed when some event happens, such as after a timeout.
35 pub trait MachineCallback<'mir, 'tcx>: VisitTags {
36     fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>;
37 }
38
39 type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>;
40
41 /// A thread identifier.
42 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
43 pub struct ThreadId(u32);
44
45 impl ThreadId {
46     pub fn to_u32(self) -> u32 {
47         self.0
48     }
49 }
50
51 impl Idx for ThreadId {
52     fn new(idx: usize) -> Self {
53         ThreadId(u32::try_from(idx).unwrap())
54     }
55
56     fn index(self) -> usize {
57         usize::try_from(self.0).unwrap()
58     }
59 }
60
61 impl TryFrom<u64> for ThreadId {
62     type Error = TryFromIntError;
63     fn try_from(id: u64) -> Result<Self, Self::Error> {
64         u32::try_from(id).map(Self)
65     }
66 }
67
68 impl From<u32> for ThreadId {
69     fn from(id: u32) -> Self {
70         Self(id)
71     }
72 }
73
74 impl From<ThreadId> for u64 {
75     fn from(t: ThreadId) -> Self {
76         t.0.into()
77     }
78 }
79
80 /// The state of a thread.
81 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
82 pub enum ThreadState {
83     /// The thread is enabled and can be executed.
84     Enabled,
85     /// The thread tried to join the specified thread and is blocked until that
86     /// thread terminates.
87     BlockedOnJoin(ThreadId),
88     /// The thread is blocked on some synchronization primitive. It is the
89     /// responsibility of the synchronization primitives to track threads that
90     /// are blocked by them.
91     BlockedOnSync,
92     /// The thread has terminated its execution. We do not delete terminated
93     /// threads (FIXME: why?).
94     Terminated,
95 }
96
97 /// The join status of a thread.
98 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
99 enum ThreadJoinStatus {
100     /// The thread can be joined.
101     Joinable,
102     /// A thread is detached if its join handle was destroyed and no other
103     /// thread can join it.
104     Detached,
105     /// The thread was already joined by some thread and cannot be joined again.
106     Joined,
107 }
108
109 /// A thread.
110 pub struct Thread<'mir, 'tcx> {
111     state: ThreadState,
112
113     /// Name of the thread.
114     thread_name: Option<Vec<u8>>,
115
116     /// The virtual call stack.
117     stack: Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>>,
118
119     /// The function to call when the stack ran empty, to figure out what to do next.
120     /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
121     /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
122     /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
123     pub(crate) on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>,
124
125     /// The index of the topmost user-relevant frame in `stack`. This field must contain
126     /// the value produced by `get_top_user_relevant_frame`.
127     /// The `None` state here represents
128     /// This field is a cache to reduce how often we call that method. The cache is manually
129     /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
130     top_user_relevant_frame: Option<usize>,
131
132     /// The join status.
133     join_status: ThreadJoinStatus,
134
135     /// The temporary used for storing the argument of
136     /// the call to `miri_start_panic` (the panic payload) when unwinding.
137     /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
138     pub(crate) panic_payload: Option<Scalar<Provenance>>,
139
140     /// Last OS error location in memory. It is a 32-bit integer.
141     pub(crate) last_error: Option<MPlaceTy<'tcx, Provenance>>,
142 }
143
144 pub type StackEmptyCallback<'mir, 'tcx> =
145     Box<dyn FnMut(&mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx, Poll<()>>>;
146
147 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
148     /// Get the name of the current thread, or `<unnamed>` if it was not set.
149     fn thread_name(&self) -> &[u8] {
150         if let Some(ref thread_name) = self.thread_name { thread_name } else { b"<unnamed>" }
151     }
152
153     /// Return the top user-relevant frame, if there is one.
154     /// Note that the choice to return `None` here when there is no user-relevant frame is part of
155     /// justifying the optimization that only pushes of user-relevant frames require updating the
156     /// `top_user_relevant_frame` field.
157     fn compute_top_user_relevant_frame(&self) -> Option<usize> {
158         self.stack
159             .iter()
160             .enumerate()
161             .rev()
162             .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
163     }
164
165     /// Re-compute the top user-relevant frame from scratch.
166     pub fn recompute_top_user_relevant_frame(&mut self) {
167         self.top_user_relevant_frame = self.compute_top_user_relevant_frame();
168     }
169
170     /// Set the top user-relevant frame to the given value. Must be equal to what
171     /// `get_top_user_relevant_frame` would return!
172     pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
173         debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame());
174         self.top_user_relevant_frame = Some(frame_idx);
175     }
176
177     /// Returns the topmost frame that is considered user-relevant, or the
178     /// top of the stack if there is no such frame, or `None` if the stack is empty.
179     pub fn top_user_relevant_frame(&self) -> Option<usize> {
180         debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame());
181         // This can be called upon creation of an allocation. We create allocations while setting up
182         // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
183         // empty stacks.
184         self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
185     }
186 }
187
188 impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> {
189     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190         write!(
191             f,
192             "{}({:?}, {:?})",
193             String::from_utf8_lossy(self.thread_name()),
194             self.state,
195             self.join_status
196         )
197     }
198 }
199
200 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
201     fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'mir, 'tcx>>) -> Self {
202         Self {
203             state: ThreadState::Enabled,
204             thread_name: name.map(|name| Vec::from(name.as_bytes())),
205             stack: Vec::new(),
206             top_user_relevant_frame: None,
207             join_status: ThreadJoinStatus::Joinable,
208             panic_payload: None,
209             last_error: None,
210             on_stack_empty,
211         }
212     }
213 }
214
215 impl VisitTags for Thread<'_, '_> {
216     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
217         let Thread {
218             panic_payload,
219             last_error,
220             stack,
221             top_user_relevant_frame: _,
222             state: _,
223             thread_name: _,
224             join_status: _,
225             on_stack_empty: _, // we assume the closure captures no GC-relevant state
226         } = self;
227
228         panic_payload.visit_tags(visit);
229         last_error.visit_tags(visit);
230         for frame in stack {
231             frame.visit_tags(visit)
232         }
233     }
234 }
235
236 impl VisitTags for Frame<'_, '_, Provenance, FrameExtra<'_>> {
237     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
238         let Frame {
239             return_place,
240             locals,
241             extra,
242             body: _,
243             instance: _,
244             return_to_block: _,
245             loc: _,
246             // There are some private fields we cannot access; they contain no tags.
247             ..
248         } = self;
249
250         // Return place.
251         return_place.visit_tags(visit);
252         // Locals.
253         for local in locals.iter() {
254             if let LocalValue::Live(value) = &local.value {
255                 value.visit_tags(visit);
256             }
257         }
258
259         extra.visit_tags(visit);
260     }
261 }
262
263 /// A specific moment in time.
264 #[derive(Debug)]
265 pub enum Time {
266     Monotonic(Instant),
267     RealTime(SystemTime),
268 }
269
270 impl Time {
271     /// How long do we have to wait from now until the specified time?
272     fn get_wait_time(&self, clock: &Clock) -> Duration {
273         match self {
274             Time::Monotonic(instant) => instant.duration_since(clock.now()),
275             Time::RealTime(time) =>
276                 time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
277         }
278     }
279 }
280
281 /// Callbacks are used to implement timeouts. For example, waiting on a
282 /// conditional variable with a timeout creates a callback that is called after
283 /// the specified time and unblocks the thread. If another thread signals on the
284 /// conditional variable, the signal handler deletes the callback.
285 struct TimeoutCallbackInfo<'mir, 'tcx> {
286     /// The callback should be called no earlier than this time.
287     call_time: Time,
288     /// The called function.
289     callback: TimeoutCallback<'mir, 'tcx>,
290 }
291
292 impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
293     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294         write!(f, "TimeoutCallback({:?})", self.call_time)
295     }
296 }
297
298 /// A set of threads.
299 #[derive(Debug)]
300 pub struct ThreadManager<'mir, 'tcx> {
301     /// Identifier of the currently active thread.
302     active_thread: ThreadId,
303     /// Threads used in the program.
304     ///
305     /// Note that this vector also contains terminated threads.
306     threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
307     /// This field is pub(crate) because the synchronization primitives
308     /// (`crate::sync`) need a way to access it.
309     pub(crate) sync: SynchronizationState<'mir, 'tcx>,
310     /// A mapping from a thread-local static to an allocation id of a thread
311     /// specific allocation.
312     thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>,
313     /// A flag that indicates that we should change the active thread.
314     yield_active_thread: bool,
315     /// Callbacks that are called once the specified time passes.
316     timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
317 }
318
319 impl VisitTags for ThreadManager<'_, '_> {
320     fn visit_tags(&self, visit: &mut dyn FnMut(BorTag)) {
321         let ThreadManager {
322             threads,
323             thread_local_alloc_ids,
324             timeout_callbacks,
325             active_thread: _,
326             yield_active_thread: _,
327             sync,
328         } = self;
329
330         for thread in threads {
331             thread.visit_tags(visit);
332         }
333         for ptr in thread_local_alloc_ids.borrow().values() {
334             ptr.visit_tags(visit);
335         }
336         for callback in timeout_callbacks.values() {
337             callback.callback.visit_tags(visit);
338         }
339         sync.visit_tags(visit);
340     }
341 }
342
343 impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
344     fn default() -> Self {
345         let mut threads = IndexVec::new();
346         // Create the main thread and add it to the list of threads.
347         threads.push(Thread::new(Some("main"), None));
348         Self {
349             active_thread: ThreadId::new(0),
350             threads,
351             sync: SynchronizationState::default(),
352             thread_local_alloc_ids: Default::default(),
353             yield_active_thread: false,
354             timeout_callbacks: FxHashMap::default(),
355         }
356     }
357 }
358
359 impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
360     pub(crate) fn init(
361         ecx: &mut MiriInterpCx<'mir, 'tcx>,
362         on_main_stack_empty: StackEmptyCallback<'mir, 'tcx>,
363     ) {
364         ecx.machine.threads.threads[ThreadId::new(0)].on_stack_empty = Some(on_main_stack_empty);
365         if ecx.tcx.sess.target.os.as_ref() != "windows" {
366             // The main thread can *not* be joined on except on windows.
367             ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached;
368         }
369     }
370
371     /// Check if we have an allocation for the given thread local static for the
372     /// active thread.
373     fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> {
374         self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
375     }
376
377     /// Set the pointer for the allocation of the given thread local
378     /// static for the active thread.
379     ///
380     /// Panics if a thread local is initialized twice for the same thread.
381     fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) {
382         self.thread_local_alloc_ids
383             .borrow_mut()
384             .try_insert((def_id, self.active_thread), ptr)
385             .unwrap();
386     }
387
388     /// Borrow the stack of the active thread.
389     pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
390         &self.threads[self.active_thread].stack
391     }
392
393     /// Mutably borrow the stack of the active thread.
394     fn active_thread_stack_mut(
395         &mut self,
396     ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
397         &mut self.threads[self.active_thread].stack
398     }
399
400     pub fn all_stacks(
401         &self,
402     ) -> impl Iterator<Item = &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>]> {
403         self.threads.iter().map(|t| &t.stack[..])
404     }
405
406     /// Create a new thread and returns its id.
407     fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'mir, 'tcx>) -> ThreadId {
408         let new_thread_id = ThreadId::new(self.threads.len());
409         self.threads.push(Thread::new(None, Some(on_stack_empty)));
410         new_thread_id
411     }
412
413     /// Set an active thread and return the id of the thread that was active before.
414     fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
415         let active_thread_id = self.active_thread;
416         self.active_thread = id;
417         assert!(self.active_thread.index() < self.threads.len());
418         active_thread_id
419     }
420
421     /// Get the id of the currently active thread.
422     pub fn get_active_thread_id(&self) -> ThreadId {
423         self.active_thread
424     }
425
426     /// Get the total number of threads that were ever spawn by this program.
427     pub fn get_total_thread_count(&self) -> usize {
428         self.threads.len()
429     }
430
431     /// Get the total of threads that are currently live, i.e., not yet terminated.
432     /// (They might be blocked.)
433     pub fn get_live_thread_count(&self) -> usize {
434         self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count()
435     }
436
437     /// Has the given thread terminated?
438     fn has_terminated(&self, thread_id: ThreadId) -> bool {
439         self.threads[thread_id].state == ThreadState::Terminated
440     }
441
442     /// Have all threads terminated?
443     fn have_all_terminated(&self) -> bool {
444         self.threads.iter().all(|thread| thread.state == ThreadState::Terminated)
445     }
446
447     /// Enable the thread for execution. The thread must be terminated.
448     fn enable_thread(&mut self, thread_id: ThreadId) {
449         assert!(self.has_terminated(thread_id));
450         self.threads[thread_id].state = ThreadState::Enabled;
451     }
452
453     /// Get a mutable borrow of the currently active thread.
454     pub fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
455         &mut self.threads[self.active_thread]
456     }
457
458     /// Get a shared borrow of the currently active thread.
459     pub fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
460         &self.threads[self.active_thread]
461     }
462
463     /// Mark the thread as detached, which means that no other thread will try
464     /// to join it and the thread is responsible for cleaning up.
465     ///
466     /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
467     /// This matches Windows's behavior for `CloseHandle`.
468     ///
469     /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
470     /// > The handle is valid until closed, even after the thread it represents has been terminated.
471     fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
472         trace!("detaching {:?}", id);
473
474         let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated
475         {
476             // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
477             self.threads[id].join_status == ThreadJoinStatus::Detached
478         } else {
479             self.threads[id].join_status != ThreadJoinStatus::Joinable
480         };
481         if is_ub {
482             throw_ub_format!("trying to detach thread that was already detached or joined");
483         }
484
485         self.threads[id].join_status = ThreadJoinStatus::Detached;
486         Ok(())
487     }
488
489     /// Mark that the active thread tries to join the thread with `joined_thread_id`.
490     fn join_thread(
491         &mut self,
492         joined_thread_id: ThreadId,
493         data_race: Option<&mut data_race::GlobalState>,
494     ) -> InterpResult<'tcx> {
495         if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
496             // On Windows this corresponds to joining on a closed handle.
497             throw_ub_format!("trying to join a detached thread");
498         }
499
500         // Mark the joined thread as being joined so that we detect if other
501         // threads try to join it.
502         self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
503         if self.threads[joined_thread_id].state != ThreadState::Terminated {
504             // The joined thread is still running, we need to wait for it.
505             self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
506             trace!(
507                 "{:?} blocked on {:?} when trying to join",
508                 self.active_thread,
509                 joined_thread_id
510             );
511         } else {
512             // The thread has already terminated - mark join happens-before
513             if let Some(data_race) = data_race {
514                 data_race.thread_joined(self, self.active_thread, joined_thread_id);
515             }
516         }
517         Ok(())
518     }
519
520     /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
521     /// If the thread is already joined by another thread, it will throw UB
522     fn join_thread_exclusive(
523         &mut self,
524         joined_thread_id: ThreadId,
525         data_race: Option<&mut data_race::GlobalState>,
526     ) -> InterpResult<'tcx> {
527         if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
528             throw_ub_format!("trying to join an already joined thread");
529         }
530
531         if joined_thread_id == self.active_thread {
532             throw_ub_format!("trying to join itself");
533         }
534
535         assert!(
536             self.threads
537                 .iter()
538                 .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
539             "this thread already has threads waiting for its termination"
540         );
541
542         self.join_thread(joined_thread_id, data_race)
543     }
544
545     /// Set the name of the given thread.
546     pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
547         self.threads[thread].thread_name = Some(new_thread_name);
548     }
549
550     /// Get the name of the given thread.
551     pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] {
552         self.threads[thread].thread_name()
553     }
554
555     /// Put the thread into the blocked state.
556     fn block_thread(&mut self, thread: ThreadId) {
557         let state = &mut self.threads[thread].state;
558         assert_eq!(*state, ThreadState::Enabled);
559         *state = ThreadState::BlockedOnSync;
560     }
561
562     /// Put the blocked thread into the enabled state.
563     fn unblock_thread(&mut self, thread: ThreadId) {
564         let state = &mut self.threads[thread].state;
565         assert_eq!(*state, ThreadState::BlockedOnSync);
566         *state = ThreadState::Enabled;
567     }
568
569     /// Change the active thread to some enabled thread.
570     fn yield_active_thread(&mut self) {
571         // We do not yield immediately, as swapping out the current stack while executing a MIR statement
572         // could lead to all sorts of confusion.
573         // We should only switch stacks between steps.
574         self.yield_active_thread = true;
575     }
576
577     /// Register the given `callback` to be called once the `call_time` passes.
578     ///
579     /// The callback will be called with `thread` being the active thread, and
580     /// the callback may not change the active thread.
581     fn register_timeout_callback(
582         &mut self,
583         thread: ThreadId,
584         call_time: Time,
585         callback: TimeoutCallback<'mir, 'tcx>,
586     ) {
587         self.timeout_callbacks
588             .try_insert(thread, TimeoutCallbackInfo { call_time, callback })
589             .unwrap();
590     }
591
592     /// Unregister the callback for the `thread`.
593     fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
594         self.timeout_callbacks.remove(&thread);
595     }
596
597     /// Get a callback that is ready to be called.
598     fn get_ready_callback(
599         &mut self,
600         clock: &Clock,
601     ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
602         // We iterate over all threads in the order of their indices because
603         // this allows us to have a deterministic scheduler.
604         for thread in self.threads.indices() {
605             match self.timeout_callbacks.entry(thread) {
606                 Entry::Occupied(entry) =>
607                     if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) {
608                         return Some((thread, entry.remove().callback));
609                     },
610                 Entry::Vacant(_) => {}
611             }
612         }
613         None
614     }
615
616     /// Wakes up threads joining on the active one and deallocates thread-local statics.
617     /// The `AllocId` that can now be freed are returned.
618     fn thread_terminated(
619         &mut self,
620         mut data_race: Option<&mut data_race::GlobalState>,
621         current_span: Span,
622     ) -> Vec<Pointer<Provenance>> {
623         let mut free_tls_statics = Vec::new();
624         {
625             let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut();
626             thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| {
627                 if thread != self.active_thread {
628                     // Keep this static around.
629                     return true;
630                 }
631                 // Delete this static from the map and from memory.
632                 // We cannot free directly here as we cannot use `?` in this context.
633                 free_tls_statics.push(alloc_id);
634                 false
635             });
636         }
637         // Set the thread into a terminated state in the data-race detector.
638         if let Some(ref mut data_race) = data_race {
639             data_race.thread_terminated(self, current_span);
640         }
641         // Check if we need to unblock any threads.
642         let mut joined_threads = vec![]; // store which threads joined, we'll need it
643         for (i, thread) in self.threads.iter_enumerated_mut() {
644             if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
645                 // The thread has terminated, mark happens-before edge to joining thread
646                 if data_race.is_some() {
647                     joined_threads.push(i);
648                 }
649                 trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
650                 thread.state = ThreadState::Enabled;
651             }
652         }
653         for &i in &joined_threads {
654             data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread);
655         }
656         free_tls_statics
657     }
658
659     /// Decide which action to take next and on which thread.
660     ///
661     /// The currently implemented scheduling policy is the one that is commonly
662     /// used in stateless model checkers such as Loom: run the active thread as
663     /// long as we can and switch only when we have to (the active thread was
664     /// blocked, terminated, or has explicitly asked to be preempted).
665     fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
666         // This thread and the program can keep going.
667         if self.threads[self.active_thread].state == ThreadState::Enabled
668             && !self.yield_active_thread
669         {
670             // The currently active thread is still enabled, just continue with it.
671             return Ok(SchedulingAction::ExecuteStep);
672         }
673         // The active thread yielded or got terminated. Let's see if there are any timeouts to take
674         // care of. We do this *before* running any other thread, to ensure that timeouts "in the
675         // past" fire before any other thread can take an action. This ensures that for
676         // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
677         // abstime has already been passed at the time of the call".
678         // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
679         let potential_sleep_time =
680             self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min();
681         if potential_sleep_time == Some(Duration::new(0, 0)) {
682             return Ok(SchedulingAction::ExecuteTimeoutCallback);
683         }
684         // No callbacks immediately scheduled, pick a regular thread to execute.
685         // The active thread blocked or yielded. So we go search for another enabled thread.
686         // Crucially, we start searching at the current active thread ID, rather than at 0, since we
687         // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
688         //
689         // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
690         // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
691         // active thread.
692         let threads = self
693             .threads
694             .iter_enumerated()
695             .skip(self.active_thread.index() + 1)
696             .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
697         for (id, thread) in threads {
698             debug_assert_ne!(self.active_thread, id);
699             if thread.state == ThreadState::Enabled {
700                 self.active_thread = id;
701                 break;
702             }
703         }
704         self.yield_active_thread = false;
705         if self.threads[self.active_thread].state == ThreadState::Enabled {
706             return Ok(SchedulingAction::ExecuteStep);
707         }
708         // We have not found a thread to execute.
709         if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
710             unreachable!("all threads terminated without the main thread terminating?!");
711         } else if let Some(sleep_time) = potential_sleep_time {
712             // All threads are currently blocked, but we have unexecuted
713             // timeout_callbacks, which may unblock some of the threads. Hence,
714             // sleep until the first callback.
715             Ok(SchedulingAction::Sleep(sleep_time))
716         } else {
717             throw_machine_stop!(TerminationInfo::Deadlock);
718         }
719     }
720 }
721
722 impl<'mir, 'tcx: 'mir> EvalContextPrivExt<'mir, 'tcx> for MiriInterpCx<'mir, 'tcx> {}
723 trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
724     /// Execute a timeout callback on the callback's thread.
725     #[inline]
726     fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
727         let this = self.eval_context_mut();
728         let (thread, callback) = if let Some((thread, callback)) =
729             this.machine.threads.get_ready_callback(&this.machine.clock)
730         {
731             (thread, callback)
732         } else {
733             // get_ready_callback can return None if the computer's clock
734             // was shifted after calling the scheduler and before the call
735             // to get_ready_callback (see issue
736             // https://github.com/rust-lang/miri/issues/1763). In this case,
737             // just do nothing, which effectively just returns to the
738             // scheduler.
739             return Ok(());
740         };
741         // This back-and-forth with `set_active_thread` is here because of two
742         // design decisions:
743         // 1. Make the caller and not the callback responsible for changing
744         //    thread.
745         // 2. Make the scheduler the only place that can change the active
746         //    thread.
747         let old_thread = this.set_active_thread(thread);
748         callback.call(this)?;
749         this.set_active_thread(old_thread);
750         Ok(())
751     }
752
753     #[inline]
754     fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
755         let this = self.eval_context_mut();
756         let mut callback = this
757             .active_thread_mut()
758             .on_stack_empty
759             .take()
760             .expect("`on_stack_empty` not set up, or already running");
761         let res = callback(this)?;
762         this.active_thread_mut().on_stack_empty = Some(callback);
763         Ok(res)
764     }
765 }
766
767 // Public interface to thread management.
768 impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
769 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
770     /// Get a thread-specific allocation id for the given thread-local static.
771     /// If needed, allocate a new one.
772     fn get_or_create_thread_local_alloc(
773         &mut self,
774         def_id: DefId,
775     ) -> InterpResult<'tcx, Pointer<Provenance>> {
776         let this = self.eval_context_mut();
777         let tcx = this.tcx;
778         if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
779             // We already have a thread-specific allocation id for this
780             // thread-local static.
781             Ok(old_alloc)
782         } else {
783             // We need to allocate a thread-specific allocation id for this
784             // thread-local static.
785             // First, we compute the initial value for this static.
786             if tcx.is_foreign_item(def_id) {
787                 throw_unsup_format!("foreign thread-local statics are not supported");
788             }
789             // We don't give a span -- statics don't need that, they cannot be generic or associated.
790             let allocation = this.ctfe_query(None, |tcx| tcx.eval_static_initializer(def_id))?;
791             let mut allocation = allocation.inner().clone();
792             // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
793             allocation.mutability = Mutability::Mut;
794             // Create a fresh allocation with this content.
795             let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?;
796             this.machine.threads.set_thread_local_alloc(def_id, new_alloc);
797             Ok(new_alloc)
798         }
799     }
800
801     /// Start a regular (non-main) thread.
802     #[inline]
803     fn start_regular_thread(
804         &mut self,
805         thread: Option<MPlaceTy<'tcx, Provenance>>,
806         start_routine: Pointer<Option<Provenance>>,
807         start_abi: Abi,
808         func_arg: ImmTy<'tcx, Provenance>,
809         ret_layout: TyAndLayout<'tcx>,
810     ) -> InterpResult<'tcx, ThreadId> {
811         let this = self.eval_context_mut();
812
813         // Create the new thread
814         let new_thread_id = this.machine.threads.create_thread({
815             let mut state = tls::TlsDtorsState::default();
816             Box::new(move |m| state.on_stack_empty(m))
817         });
818         let current_span = this.machine.current_span();
819         if let Some(data_race) = &mut this.machine.data_race {
820             data_race.thread_created(&this.machine.threads, new_thread_id, current_span);
821         }
822
823         // Write the current thread-id, switch to the next thread later
824         // to treat this write operation as occuring on the current thread.
825         if let Some(thread_info_place) = thread {
826             this.write_scalar(
827                 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
828                 &thread_info_place.into(),
829             )?;
830         }
831
832         // Finally switch to new thread so that we can push the first stackframe.
833         // After this all accesses will be treated as occuring in the new thread.
834         let old_thread_id = this.set_active_thread(new_thread_id);
835
836         // Perform the function pointer load in the new thread frame.
837         let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
838
839         // Note: the returned value is currently ignored (see the FIXME in
840         // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
841         // it.
842         let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
843
844         this.call_function(
845             instance,
846             start_abi,
847             &[*func_arg],
848             Some(&ret_place.into()),
849             StackPopCleanup::Root { cleanup: true },
850         )?;
851
852         // Restore the old active thread frame.
853         this.set_active_thread(old_thread_id);
854
855         Ok(new_thread_id)
856     }
857
858     #[inline]
859     fn detach_thread(
860         &mut self,
861         thread_id: ThreadId,
862         allow_terminated_joined: bool,
863     ) -> InterpResult<'tcx> {
864         let this = self.eval_context_mut();
865         this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
866     }
867
868     #[inline]
869     fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
870         let this = self.eval_context_mut();
871         this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?;
872         Ok(())
873     }
874
875     #[inline]
876     fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
877         let this = self.eval_context_mut();
878         this.machine
879             .threads
880             .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?;
881         Ok(())
882     }
883
884     #[inline]
885     fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId {
886         let this = self.eval_context_mut();
887         this.machine.threads.set_active_thread_id(thread_id)
888     }
889
890     #[inline]
891     fn get_active_thread(&self) -> ThreadId {
892         let this = self.eval_context_ref();
893         this.machine.threads.get_active_thread_id()
894     }
895
896     #[inline]
897     fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
898         let this = self.eval_context_mut();
899         this.machine.threads.active_thread_mut()
900     }
901
902     #[inline]
903     fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
904         let this = self.eval_context_ref();
905         this.machine.threads.active_thread_ref()
906     }
907
908     #[inline]
909     fn get_total_thread_count(&self) -> usize {
910         let this = self.eval_context_ref();
911         this.machine.threads.get_total_thread_count()
912     }
913
914     #[inline]
915     fn have_all_terminated(&self) -> bool {
916         let this = self.eval_context_ref();
917         this.machine.threads.have_all_terminated()
918     }
919
920     #[inline]
921     fn enable_thread(&mut self, thread_id: ThreadId) {
922         let this = self.eval_context_mut();
923         this.machine.threads.enable_thread(thread_id);
924     }
925
926     #[inline]
927     fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>] {
928         let this = self.eval_context_ref();
929         this.machine.threads.active_thread_stack()
930     }
931
932     #[inline]
933     fn active_thread_stack_mut(
934         &mut self,
935     ) -> &mut Vec<Frame<'mir, 'tcx, Provenance, FrameExtra<'tcx>>> {
936         let this = self.eval_context_mut();
937         this.machine.threads.active_thread_stack_mut()
938     }
939
940     /// Set the name of the current thread. The buffer must not include the null terminator.
941     #[inline]
942     fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
943         let this = self.eval_context_mut();
944         this.machine.threads.set_thread_name(thread, new_thread_name);
945     }
946
947     #[inline]
948     fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) {
949         let this = self.eval_context_mut();
950
951         // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay.
952         // This is only read by diagnostics, which already use `from_utf8_lossy`.
953         this.machine
954             .threads
955             .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes());
956     }
957
958     #[inline]
959     fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8]
960     where
961         'mir: 'c,
962     {
963         self.eval_context_ref().machine.threads.get_thread_name(thread)
964     }
965
966     #[inline]
967     fn block_thread(&mut self, thread: ThreadId) {
968         self.eval_context_mut().machine.threads.block_thread(thread);
969     }
970
971     #[inline]
972     fn unblock_thread(&mut self, thread: ThreadId) {
973         self.eval_context_mut().machine.threads.unblock_thread(thread);
974     }
975
976     #[inline]
977     fn yield_active_thread(&mut self) {
978         self.eval_context_mut().machine.threads.yield_active_thread();
979     }
980
981     #[inline]
982     fn maybe_preempt_active_thread(&mut self) {
983         use rand::Rng as _;
984
985         let this = self.eval_context_mut();
986         if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) {
987             this.yield_active_thread();
988         }
989     }
990
991     #[inline]
992     fn register_timeout_callback(
993         &mut self,
994         thread: ThreadId,
995         call_time: Time,
996         callback: TimeoutCallback<'mir, 'tcx>,
997     ) {
998         let this = self.eval_context_mut();
999         if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) {
1000             panic!("cannot have `RealTime` callback with isolation enabled!")
1001         }
1002         this.machine.threads.register_timeout_callback(thread, call_time, callback);
1003     }
1004
1005     #[inline]
1006     fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
1007         let this = self.eval_context_mut();
1008         this.machine.threads.unregister_timeout_callback_if_exists(thread);
1009     }
1010
1011     /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1012     /// termination).
1013     fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1014         let this = self.eval_context_mut();
1015         loop {
1016             match this.machine.threads.schedule(&this.machine.clock)? {
1017                 SchedulingAction::ExecuteStep => {
1018                     if !this.step()? {
1019                         // See if this thread can do something else.
1020                         match this.run_on_stack_empty()? {
1021                             Poll::Pending => {} // keep going
1022                             Poll::Ready(()) => this.terminate_active_thread()?,
1023                         }
1024                     }
1025                 }
1026                 SchedulingAction::ExecuteTimeoutCallback => {
1027                     this.run_timeout_callback()?;
1028                 }
1029                 SchedulingAction::Sleep(duration) => {
1030                     this.machine.clock.sleep(duration);
1031                 }
1032             }
1033         }
1034     }
1035
1036     /// Handles thread termination of the active thread: wakes up threads joining on this one,
1037     /// and deallocated thread-local statics.
1038     ///
1039     /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
1040     #[inline]
1041     fn terminate_active_thread(&mut self) -> InterpResult<'tcx> {
1042         let this = self.eval_context_mut();
1043         let thread = this.active_thread_mut();
1044         assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1045         thread.state = ThreadState::Terminated;
1046
1047         let current_span = this.machine.current_span();
1048         for ptr in
1049             this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span)
1050         {
1051             this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?;
1052         }
1053         Ok(())
1054     }
1055 }