]> git.lizzy.rs Git - rust.git/blob - src/thread.rs
376920e225ba7f5d4cfc95e0b1cc5eea59bc83c7
[rust.git] / src / thread.rs
1 //! Implements threads.
2
3 use std::cell::RefCell;
4 use std::convert::TryFrom;
5 use std::num::{NonZeroU32, TryFromIntError};
6
7 use log::trace;
8
9 use rustc_data_structures::fx::FxHashMap;
10 use rustc_hir::def_id::DefId;
11 use rustc_index::vec::{Idx, IndexVec};
12 use rustc_middle::{
13     middle::codegen_fn_attrs::CodegenFnAttrFlags,
14     mir,
15     ty::{self, Instance},
16 };
17
18 use crate::*;
19
20 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
21 pub enum SchedulingAction {
22     /// Execute step on the active thread.
23     ExecuteStep,
24     /// Execute destructors of the active thread.
25     ExecuteDtors,
26     /// Stop the program.
27     Stop,
28 }
29
30 /// A thread identifier.
31 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
32 pub struct ThreadId(u32);
33
34 /// The main thread. When it terminates, the whole application terminates.
35 const MAIN_THREAD: ThreadId = ThreadId(0);
36
37 impl ThreadId {
38     pub fn to_u32(self) -> u32 {
39         self.0
40     }
41 }
42
43 impl Idx for ThreadId {
44     fn new(idx: usize) -> Self {
45         ThreadId(u32::try_from(idx).unwrap())
46     }
47
48     fn index(self) -> usize {
49         usize::try_from(self.0).unwrap()
50     }
51 }
52
53 impl TryFrom<u64> for ThreadId {
54     type Error = TryFromIntError;
55     fn try_from(id: u64) -> Result<Self, Self::Error> {
56         u32::try_from(id).map(|id_u32| Self(id_u32))
57     }
58 }
59
60 impl From<u32> for ThreadId {
61     fn from(id: u32) -> Self {
62         Self(id)
63     }
64 }
65
66 impl ThreadId {
67     pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
68         Scalar::from_u32(u32::try_from(self.0).unwrap())
69     }
70 }
71
72 /// An identifier of a set of blocked threads. 0 is used to indicate the absence
73 /// of a blockset identifier and, therefore, is not a valid identifier.
74 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
75 pub struct BlockSetId(NonZeroU32);
76
77 impl BlockSetId {
78     /// Panics if `id` is 0.
79     pub fn new(id: u32) -> Self {
80         Self(NonZeroU32::new(id).expect("0 is not a valid blockset id"))
81     }
82     pub fn to_u32_scalar<'tcx>(&self) -> Scalar<Tag> {
83         Scalar::from_u32(self.0.get())
84     }
85 }
86
87 /// The state of a thread.
88 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
89 pub enum ThreadState {
90     /// The thread is enabled and can be executed.
91     Enabled,
92     /// The thread tried to join the specified thread and is blocked until that
93     /// thread terminates.
94     BlockedOnJoin(ThreadId),
95     /// The thread is blocked and belongs to the given blockset.
96     Blocked(BlockSetId),
97     /// The thread has terminated its execution (we do not delete terminated
98     /// threads).
99     Terminated,
100 }
101
102 /// The join status of a thread.
103 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
104 enum ThreadJoinStatus {
105     /// The thread can be joined.
106     Joinable,
107     /// A thread is detached if its join handle was destroyed and no other
108     /// thread can join it.
109     Detached,
110     /// The thread was already joined by some thread and cannot be joined again.
111     Joined,
112 }
113
114 /// A thread.
115 pub struct Thread<'mir, 'tcx> {
116     state: ThreadState,
117     /// Name of the thread.
118     thread_name: Option<Vec<u8>>,
119     /// The virtual call stack.
120     stack: Vec<Frame<'mir, 'tcx, Tag, FrameData<'tcx>>>,
121     /// The join status.
122     join_status: ThreadJoinStatus,
123 }
124
125 impl<'mir, 'tcx> Thread<'mir, 'tcx> {
126     /// Check if the thread is done executing (no more stack frames). If yes,
127     /// change the state to terminated and return `true`.
128     fn check_terminated(&mut self) -> bool {
129         if self.state == ThreadState::Enabled {
130             if self.stack.is_empty() {
131                 self.state = ThreadState::Terminated;
132                 return true;
133             }
134         }
135         false
136     }
137 }
138
139 impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> {
140     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141         if let Some(ref name) = self.thread_name {
142             write!(f, "{}", String::from_utf8_lossy(name))?;
143         } else {
144             write!(f, "<unnamed>")?;
145         }
146         write!(f, "({:?}, {:?})", self.state, self.join_status)
147     }
148 }
149
150 impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> {
151     fn default() -> Self {
152         Self {
153             state: ThreadState::Enabled,
154             thread_name: None,
155             stack: Vec::new(),
156             join_status: ThreadJoinStatus::Joinable,
157         }
158     }
159 }
160
161 /// A set of threads.
162 #[derive(Debug)]
163 pub struct ThreadManager<'mir, 'tcx> {
164     /// Identifier of the currently active thread.
165     active_thread: ThreadId,
166     /// Threads used in the program.
167     ///
168     /// Note that this vector also contains terminated threads.
169     threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
170     /// A counter used to generate unique identifiers for blocksets.
171     blockset_counter: u32,
172     /// A mapping from a thread-local static to an allocation id of a thread
173     /// specific allocation.
174     thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), AllocId>>,
175     /// A flag that indicates that we should change the active thread.
176     yield_active_thread: bool,
177 }
178
179 impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
180     fn default() -> Self {
181         let mut threads = IndexVec::new();
182         // Create the main thread and add it to the list of threads.
183         let mut main_thread = Thread::default();
184         // The main thread can *not* be joined on.
185         main_thread.join_status = ThreadJoinStatus::Detached;
186         threads.push(main_thread);
187         Self {
188             active_thread: ThreadId::new(0),
189             threads: threads,
190             blockset_counter: 0,
191             thread_local_alloc_ids: Default::default(),
192             yield_active_thread: false,
193         }
194     }
195 }
196
197 impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
198     /// Check if we have an allocation for the given thread local static for the
199     /// active thread.
200     fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<AllocId> {
201         self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
202     }
203
204     /// Set the allocation id as the allocation id of the given thread local
205     /// static for the active thread.
206     ///
207     /// Panics if a thread local is initialized twice for the same thread.
208     fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) {
209         self.thread_local_alloc_ids
210             .borrow_mut()
211             .insert((def_id, self.active_thread), new_alloc_id)
212             .unwrap_none();
213     }
214
215     /// Borrow the stack of the active thread.
216     fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Tag, FrameData<'tcx>>] {
217         &self.threads[self.active_thread].stack
218     }
219
220     /// Mutably borrow the stack of the active thread.
221     fn active_thread_stack_mut(&mut self) -> &mut Vec<Frame<'mir, 'tcx, Tag, FrameData<'tcx>>> {
222         &mut self.threads[self.active_thread].stack
223     }
224
225     /// Create a new thread and returns its id.
226     fn create_thread(&mut self) -> ThreadId {
227         let new_thread_id = ThreadId::new(self.threads.len());
228         self.threads.push(Default::default());
229         new_thread_id
230     }
231
232     /// Set an active thread and return the id of the thread that was active before.
233     fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
234         let active_thread_id = self.active_thread;
235         self.active_thread = id;
236         assert!(self.active_thread.index() < self.threads.len());
237         active_thread_id
238     }
239
240     /// Get the id of the currently active thread.
241     fn get_active_thread_id(&self) -> ThreadId {
242         self.active_thread
243     }
244
245     /// Get the total number of threads that were ever spawn by this program.
246     fn get_total_thread_count(&self) -> usize {
247         self.threads.len()
248     }
249
250     /// Has the given thread terminated?
251     fn has_terminated(&self, thread_id: ThreadId) -> bool {
252         self.threads[thread_id].state == ThreadState::Terminated
253     }
254
255     /// Enable the thread for execution. The thread must be terminated.
256     fn enable_thread(&mut self, thread_id: ThreadId) {
257         assert!(self.has_terminated(thread_id));
258         self.threads[thread_id].state = ThreadState::Enabled;
259     }
260
261     /// Get a mutable borrow of the currently active thread.
262     fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> {
263         &mut self.threads[self.active_thread]
264     }
265
266     /// Get a shared borrow of the currently active thread.
267     fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> {
268         &self.threads[self.active_thread]
269     }
270
271     /// Mark the thread as detached, which means that no other thread will try
272     /// to join it and the thread is responsible for cleaning up.
273     fn detach_thread(&mut self, id: ThreadId) -> InterpResult<'tcx> {
274         if self.threads[id].join_status != ThreadJoinStatus::Joinable {
275             throw_ub_format!("trying to detach thread that was already detached or joined");
276         }
277         self.threads[id].join_status = ThreadJoinStatus::Detached;
278         Ok(())
279     }
280
281     /// Mark that the active thread tries to join the thread with `joined_thread_id`.
282     fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
283         if self.threads[joined_thread_id].join_status != ThreadJoinStatus::Joinable {
284             throw_ub_format!("trying to join a detached or already joined thread");
285         }
286         if joined_thread_id == self.active_thread {
287             throw_ub_format!("trying to join itself");
288         }
289         assert!(
290             self.threads
291                 .iter()
292                 .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
293             "a joinable thread already has threads waiting for its termination"
294         );
295         // Mark the joined thread as being joined so that we detect if other
296         // threads try to join it.
297         self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
298         if self.threads[joined_thread_id].state != ThreadState::Terminated {
299             // The joined thread is still running, we need to wait for it.
300             self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
301             trace!(
302                 "{:?} blocked on {:?} when trying to join",
303                 self.active_thread,
304                 joined_thread_id
305             );
306         }
307         Ok(())
308     }
309
310     /// Set the name of the active thread.
311     fn set_thread_name(&mut self, new_thread_name: Vec<u8>) {
312         self.active_thread_mut().thread_name = Some(new_thread_name);
313     }
314
315     /// Get the name of the active thread.
316     fn get_thread_name(&self) -> &[u8] {
317         if let Some(ref thread_name) = self.active_thread_ref().thread_name {
318             thread_name
319         } else {
320             b"<unnamed>"
321         }
322     }
323
324     /// Allocate a new blockset id.
325     fn create_blockset(&mut self) -> BlockSetId {
326         self.blockset_counter = self.blockset_counter.checked_add(1).unwrap();
327         BlockSetId::new(self.blockset_counter)
328     }
329
330     /// Block the currently active thread and put it into the given blockset.
331     fn block_active_thread(&mut self, set: BlockSetId) {
332         let state = &mut self.active_thread_mut().state;
333         assert_eq!(*state, ThreadState::Enabled);
334         *state = ThreadState::Blocked(set);
335     }
336
337     /// Unblock any one thread from the given blockset if it contains at least
338     /// one. Return the id of the unblocked thread.
339     fn unblock_some_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
340         for (id, thread) in self.threads.iter_enumerated_mut() {
341             if thread.state == ThreadState::Blocked(set) {
342                 trace!("unblocking {:?} in blockset {:?}", id, set);
343                 thread.state = ThreadState::Enabled;
344                 return Some(id);
345             }
346         }
347         None
348     }
349
350     /// Change the active thread to some enabled thread.
351     fn yield_active_thread(&mut self) {
352         self.yield_active_thread = true;
353     }
354
355     /// Decide which action to take next and on which thread.
356     ///
357     /// The currently implemented scheduling policy is the one that is commonly
358     /// used in stateless model checkers such as Loom: run the active thread as
359     /// long as we can and switch only when we have to (the active thread was
360     /// blocked, terminated, or has explicitly asked to be preempted).
361     fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
362         // Check whether the thread has **just** terminated (`check_terminated`
363         // checks whether the thread has popped all its stack and if yes, sets
364         // the thread state to terminated).
365         if self.threads[self.active_thread].check_terminated() {
366             // Check if we need to unblock any threads.
367             for (i, thread) in self.threads.iter_enumerated_mut() {
368                 if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
369                     trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
370                     thread.state = ThreadState::Enabled;
371                 }
372             }
373             return Ok(SchedulingAction::ExecuteDtors);
374         }
375         if self.threads[MAIN_THREAD].state == ThreadState::Terminated {
376             // The main thread terminated; stop the program.
377             if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) {
378                 // FIXME: This check should be either configurable or just emit
379                 // a warning. For example, it seems normal for a program to
380                 // terminate without waiting for its detached threads to
381                 // terminate. However, this case is not trivial to support
382                 // because we also probably do not want to consider the memory
383                 // owned by these threads as leaked.
384                 throw_unsup_format!("the main thread terminated without waiting for other threads");
385             }
386             return Ok(SchedulingAction::Stop);
387         }
388         if self.threads[self.active_thread].state == ThreadState::Enabled
389             && !self.yield_active_thread
390         {
391             // The currently active thread is still enabled, just continue with it.
392             return Ok(SchedulingAction::ExecuteStep);
393         }
394         // We need to pick a new thread for execution.
395         for (id, thread) in self.threads.iter_enumerated() {
396             if thread.state == ThreadState::Enabled {
397                 if !self.yield_active_thread || id != self.active_thread {
398                     self.active_thread = id;
399                     break;
400                 }
401             }
402         }
403         self.yield_active_thread = false;
404         if self.threads[self.active_thread].state == ThreadState::Enabled {
405             return Ok(SchedulingAction::ExecuteStep);
406         }
407         // We have not found a thread to execute.
408         if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
409             unreachable!();
410         } else {
411             throw_machine_stop!(TerminationInfo::Deadlock);
412         }
413     }
414 }
415
416 // Public interface to thread management.
417 impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
418 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
419     /// A workaround for thread-local statics until
420     /// https://github.com/rust-lang/rust/issues/70685 is fixed: change the
421     /// thread-local allocation id with a freshly generated allocation id for
422     /// the currently active thread.
423     fn remap_thread_local_alloc_ids(
424         &self,
425         val: &mut mir::interpret::ConstValue<'tcx>,
426     ) -> InterpResult<'tcx> {
427         let this = self.eval_context_ref();
428         match *val {
429             mir::interpret::ConstValue::Scalar(Scalar::Ptr(ref mut ptr)) => {
430                 let alloc_id = ptr.alloc_id;
431                 let alloc = this.tcx.alloc_map.lock().get(alloc_id);
432                 let tcx = this.tcx;
433                 let is_thread_local = |def_id| {
434                     tcx.codegen_fn_attrs(def_id).flags.contains(CodegenFnAttrFlags::THREAD_LOCAL)
435                 };
436                 match alloc {
437                     Some(GlobalAlloc::Static(def_id)) if is_thread_local(def_id) => {
438                         ptr.alloc_id = this.get_or_create_thread_local_alloc_id(def_id)?;
439                     }
440                     _ => {}
441                 }
442             }
443             _ => {
444                 // FIXME: Handling only `Scalar` seems to work for now, but at
445                 // least in principle thread-locals could be in any constant, so
446                 // we should also consider other cases. However, once
447                 // https://github.com/rust-lang/rust/issues/70685 gets fixed,
448                 // this code will have to be rewritten anyway.
449             }
450         }
451         Ok(())
452     }
453
454     /// Get a thread-specific allocation id for the given thread-local static.
455     /// If needed, allocate a new one.
456     ///
457     /// FIXME: This method should be replaced as soon as
458     /// https://github.com/rust-lang/rust/issues/70685 gets fixed.
459     fn get_or_create_thread_local_alloc_id(&self, def_id: DefId) -> InterpResult<'tcx, AllocId> {
460         let this = self.eval_context_ref();
461         let tcx = this.tcx;
462         if let Some(new_alloc_id) = this.machine.threads.get_thread_local_alloc_id(def_id) {
463             // We already have a thread-specific allocation id for this
464             // thread-local static.
465             Ok(new_alloc_id)
466         } else {
467             // We need to allocate a thread-specific allocation id for this
468             // thread-local static.
469             //
470             // At first, we invoke the `const_eval_raw` query and extract the
471             // allocation from it. Unfortunately, we have to duplicate the code
472             // from `Memory::get_global_alloc` that does this.
473             //
474             // Then we store the retrieved allocation back into the `alloc_map`
475             // to get a fresh allocation id, which we can use as a
476             // thread-specific allocation id for the thread-local static.
477             if tcx.is_foreign_item(def_id) {
478                 throw_unsup_format!("foreign thread-local statics are not supported");
479             }
480             // Invoke the `const_eval_raw` query.
481             let instance = Instance::mono(tcx.tcx, def_id);
482             let gid = GlobalId { instance, promoted: None };
483             let raw_const =
484                 tcx.const_eval_raw(ty::ParamEnv::reveal_all().and(gid)).map_err(|err| {
485                     // no need to report anything, the const_eval call takes care of that
486                     // for statics
487                     assert!(tcx.is_static(def_id));
488                     err
489                 })?;
490             let id = raw_const.alloc_id;
491             // Extract the allocation from the query result.
492             let mut alloc_map = tcx.alloc_map.lock();
493             let allocation = alloc_map.unwrap_memory(id);
494             // Create a new allocation id for the same allocation in this hacky
495             // way. Internally, `alloc_map` deduplicates allocations, but this
496             // is fine because Miri will make a copy before a first mutable
497             // access.
498             let new_alloc_id = alloc_map.create_memory_alloc(allocation);
499             this.machine.threads.set_thread_local_alloc_id(def_id, new_alloc_id);
500             Ok(new_alloc_id)
501         }
502     }
503
504     #[inline]
505     fn create_thread(&mut self) -> InterpResult<'tcx, ThreadId> {
506         let this = self.eval_context_mut();
507         Ok(this.machine.threads.create_thread())
508     }
509
510     #[inline]
511     fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> {
512         let this = self.eval_context_mut();
513         this.machine.threads.detach_thread(thread_id)
514     }
515
516     #[inline]
517     fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
518         let this = self.eval_context_mut();
519         this.machine.threads.join_thread(joined_thread_id)
520     }
521
522     #[inline]
523     fn set_active_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx, ThreadId> {
524         let this = self.eval_context_mut();
525         Ok(this.machine.threads.set_active_thread_id(thread_id))
526     }
527
528     #[inline]
529     fn get_active_thread(&self) -> InterpResult<'tcx, ThreadId> {
530         let this = self.eval_context_ref();
531         Ok(this.machine.threads.get_active_thread_id())
532     }
533
534     #[inline]
535     fn get_total_thread_count(&self) -> InterpResult<'tcx, usize> {
536         let this = self.eval_context_ref();
537         Ok(this.machine.threads.get_total_thread_count())
538     }
539
540     #[inline]
541     fn has_terminated(&self, thread_id: ThreadId) -> InterpResult<'tcx, bool> {
542         let this = self.eval_context_ref();
543         Ok(this.machine.threads.has_terminated(thread_id))
544     }
545
546     #[inline]
547     fn enable_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> {
548         let this = self.eval_context_mut();
549         this.machine.threads.enable_thread(thread_id);
550         Ok(())
551     }
552
553     #[inline]
554     fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Tag, FrameData<'tcx>>] {
555         let this = self.eval_context_ref();
556         this.machine.threads.active_thread_stack()
557     }
558
559     #[inline]
560     fn active_thread_stack_mut(&mut self) -> &mut Vec<Frame<'mir, 'tcx, Tag, FrameData<'tcx>>> {
561         let this = self.eval_context_mut();
562         this.machine.threads.active_thread_stack_mut()
563     }
564
565     #[inline]
566     fn set_active_thread_name(&mut self, new_thread_name: Vec<u8>) -> InterpResult<'tcx, ()> {
567         let this = self.eval_context_mut();
568         Ok(this.machine.threads.set_thread_name(new_thread_name))
569     }
570
571     #[inline]
572     fn get_active_thread_name<'c>(&'c self) -> InterpResult<'tcx, &'c [u8]>
573     where
574         'mir: 'c,
575     {
576         let this = self.eval_context_ref();
577         Ok(this.machine.threads.get_thread_name())
578     }
579
580     #[inline]
581     fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> {
582         let this = self.eval_context_mut();
583         Ok(this.machine.threads.create_blockset())
584     }
585
586     #[inline]
587     fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> {
588         let this = self.eval_context_mut();
589         Ok(this.machine.threads.block_active_thread(set))
590     }
591
592     #[inline]
593     fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
594         let this = self.eval_context_mut();
595         Ok(this.machine.threads.unblock_some_thread(set))
596     }
597
598     #[inline]
599     fn yield_active_thread(&mut self) -> InterpResult<'tcx> {
600         let this = self.eval_context_mut();
601         this.machine.threads.yield_active_thread();
602         Ok(())
603     }
604
605     /// Decide which action to take next and on which thread.
606     #[inline]
607     fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
608         let this = self.eval_context_mut();
609         this.machine.threads.schedule()
610     }
611 }