]> git.lizzy.rs Git - rust.git/blob - library/std/src/thread/scoped.rs
Rollup merge of #93365 - nnethercote:more-arena-cleanups, r=oli-obk
[rust.git] / library / std / src / thread / scoped.rs
1 use super::{current, park, Builder, JoinInner, Result, Thread};
2 use crate::fmt;
3 use crate::io;
4 use crate::marker::PhantomData;
5 use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
6 use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 use crate::sync::Arc;
8
9 /// A scope to spawn scoped threads in.
10 ///
11 /// See [`scope`] for details.
12 pub struct Scope<'env> {
13     data: ScopeData,
14     /// Invariance over 'env, to make sure 'env cannot shrink,
15     /// which is necessary for soundness.
16     ///
17     /// Without invariance, this would compile fine but be unsound:
18     ///
19     /// ```compile_fail
20     /// #![feature(scoped_threads)]
21     ///
22     /// std::thread::scope(|s| {
23     ///     s.spawn(|s| {
24     ///         let a = String::from("abcd");
25     ///         s.spawn(|_| println!("{:?}", a)); // might run after `a` is dropped
26     ///     });
27     /// });
28     /// ```
29     env: PhantomData<&'env mut &'env ()>,
30 }
31
32 /// An owned permission to join on a scoped thread (block on its termination).
33 ///
34 /// See [`Scope::spawn`] for details.
35 pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
36
37 pub(super) struct ScopeData {
38     num_running_threads: AtomicUsize,
39     a_thread_panicked: AtomicBool,
40     main_thread: Thread,
41 }
42
43 impl ScopeData {
44     pub(super) fn increment_num_running_threads(&self) {
45         // We check for 'overflow' with usize::MAX / 2, to make sure there's no
46         // chance it overflows to 0, which would result in unsoundness.
47         if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
48             // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles.
49             self.decrement_num_running_threads(false);
50             panic!("too many running threads in thread scope");
51         }
52     }
53     pub(super) fn decrement_num_running_threads(&self, panic: bool) {
54         if panic {
55             self.a_thread_panicked.store(true, Ordering::Relaxed);
56         }
57         if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
58             self.main_thread.unpark();
59         }
60     }
61 }
62
63 /// Create a scope for spawning scoped threads.
64 ///
65 /// The function passed to `scope` will be provided a [`Scope`] object,
66 /// through which scoped threads can be [spawned][`Scope::spawn`].
67 ///
68 /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
69 /// as the scope guarantees all threads will be joined at the end of the scope.
70 ///
71 /// All threads spawned within the scope that haven't been manually joined
72 /// will be automatically joined before this function returns.
73 ///
74 /// # Panics
75 ///
76 /// If any of the automatically joined threads panicked, this function will panic.
77 ///
78 /// If you want to handle panics from spawned threads,
79 /// [`join`][ScopedJoinHandle::join] them before the end of the scope.
80 ///
81 /// # Example
82 ///
83 /// ```
84 /// #![feature(scoped_threads)]
85 /// use std::thread;
86 ///
87 /// let mut a = vec![1, 2, 3];
88 /// let mut x = 0;
89 ///
90 /// thread::scope(|s| {
91 ///     s.spawn(|_| {
92 ///         println!("hello from the first scoped thread");
93 ///         // We can borrow `a` here.
94 ///         dbg!(&a);
95 ///     });
96 ///     s.spawn(|_| {
97 ///         println!("hello from the second scoped thread");
98 ///         // We can even mutably borrow `x` here,
99 ///         // because no other threads are using it.
100 ///         x += a[0] + a[2];
101 ///     });
102 ///     println!("hello from the main thread");
103 /// });
104 ///
105 /// // After the scope, we can modify and access our variables again:
106 /// a.push(4);
107 /// assert_eq!(x, a.len());
108 /// ```
109 #[track_caller]
110 pub fn scope<'env, F, T>(f: F) -> T
111 where
112     F: FnOnce(&Scope<'env>) -> T,
113 {
114     let scope = Scope {
115         data: ScopeData {
116             num_running_threads: AtomicUsize::new(0),
117             main_thread: current(),
118             a_thread_panicked: AtomicBool::new(false),
119         },
120         env: PhantomData,
121     };
122
123     // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
124     let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
125
126     // Wait until all the threads are finished.
127     while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
128         park();
129     }
130
131     // Throw any panic from `f`, or the return value of `f` if no thread panicked.
132     match result {
133         Err(e) => resume_unwind(e),
134         Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
135             panic!("a scoped thread panicked")
136         }
137         Ok(result) => result,
138     }
139 }
140
141 impl<'env> Scope<'env> {
142     /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
143     ///
144     /// Unlike non-scoped threads, threads spawned with this function may
145     /// borrow non-`'static` data from the outside the scope. See [`scope`] for
146     /// details.
147     ///
148     /// The join handle provides a [`join`] method that can be used to join the spawned
149     /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
150     /// the panic payload.
151     ///
152     /// If the join handle is dropped, the spawned thread will implicitly joined at the
153     /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
154     /// panic after all threads are joined.
155     ///
156     /// This call will create a thread using default parameters of [`Builder`].
157     /// If you want to specify the stack size or the name of the thread, use
158     /// [`Builder::spawn_scoped`] instead.
159     ///
160     /// # Panics
161     ///
162     /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
163     /// to recover from such errors.
164     ///
165     /// [`join`]: ScopedJoinHandle::join
166     pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
167     where
168         F: FnOnce(&Scope<'env>) -> T + Send + 'env,
169         T: Send + 'env,
170     {
171         Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
172     }
173 }
174
175 impl Builder {
176     /// Spawns a new scoped thread using the settings set through this `Builder`.
177     ///
178     /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
179     /// capture any failure to create the thread at the OS level.
180     ///
181     /// [`io::Result`]: crate::io::Result
182     ///
183     /// # Panics
184     ///
185     /// Panics if a thread name was set and it contained null bytes.
186     ///
187     /// # Example
188     ///
189     /// ```
190     /// #![feature(scoped_threads)]
191     /// use std::thread;
192     ///
193     /// let mut a = vec![1, 2, 3];
194     /// let mut x = 0;
195     ///
196     /// thread::scope(|s| {
197     ///     thread::Builder::new()
198     ///         .name("first".to_string())
199     ///         .spawn_scoped(s, |_|
200     ///     {
201     ///         println!("hello from the {:?} scoped thread", thread::current().name());
202     ///         // We can borrow `a` here.
203     ///         dbg!(&a);
204     ///     })
205     ///     .unwrap();
206     ///     thread::Builder::new()
207     ///         .name("second".to_string())
208     ///         .spawn_scoped(s, |_|
209     ///     {
210     ///         println!("hello from the {:?} scoped thread", thread::current().name());
211     ///         // We can even mutably borrow `x` here,
212     ///         // because no other threads are using it.
213     ///         x += a[0] + a[2];
214     ///     })
215     ///     .unwrap();
216     ///     println!("hello from the main thread");
217     /// });
218     ///
219     /// // After the scope, we can modify and access our variables again:
220     /// a.push(4);
221     /// assert_eq!(x, a.len());
222     /// ```
223     pub fn spawn_scoped<'scope, 'env, F, T>(
224         self,
225         scope: &'scope Scope<'env>,
226         f: F,
227     ) -> io::Result<ScopedJoinHandle<'scope, T>>
228     where
229         F: FnOnce(&Scope<'env>) -> T + Send + 'env,
230         T: Send + 'env,
231     {
232         Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(|| f(scope), Some(&scope.data)) }?))
233     }
234 }
235
236 impl<'scope, T> ScopedJoinHandle<'scope, T> {
237     /// Extracts a handle to the underlying thread.
238     ///
239     /// # Examples
240     ///
241     /// ```
242     /// #![feature(scoped_threads)]
243     /// #![feature(thread_is_running)]
244     ///
245     /// use std::thread;
246     ///
247     /// thread::scope(|s| {
248     ///     let t = s.spawn(|_| {
249     ///         println!("hello");
250     ///     });
251     ///     println!("thread id: {:?}", t.thread().id());
252     /// });
253     /// ```
254     #[must_use]
255     pub fn thread(&self) -> &Thread {
256         &self.0.thread
257     }
258
259     /// Waits for the associated thread to finish.
260     ///
261     /// This function will return immediately if the associated thread has already finished.
262     ///
263     /// In terms of [atomic memory orderings], the completion of the associated
264     /// thread synchronizes with this function returning.
265     /// In other words, all operations performed by that thread
266     /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
267     /// all operations that happen after `join` returns.
268     ///
269     /// If the associated thread panics, [`Err`] is returned with the panic payload.
270     ///
271     /// [atomic memory orderings]: crate::sync::atomic
272     ///
273     /// # Examples
274     ///
275     /// ```
276     /// #![feature(scoped_threads)]
277     /// #![feature(thread_is_running)]
278     ///
279     /// use std::thread;
280     ///
281     /// thread::scope(|s| {
282     ///     let t = s.spawn(|_| {
283     ///         panic!("oh no");
284     ///     });
285     ///     assert!(t.join().is_err());
286     /// });
287     /// ```
288     pub fn join(self) -> Result<T> {
289         self.0.join()
290     }
291
292     /// Checks if the associated thread is still running its main function.
293     ///
294     /// This might return `false` for a brief moment after the thread's main
295     /// function has returned, but before the thread itself has stopped running.
296     #[unstable(feature = "thread_is_running", issue = "90470")]
297     pub fn is_running(&self) -> bool {
298         Arc::strong_count(&self.0.packet) > 1
299     }
300 }
301
302 impl<'env> fmt::Debug for Scope<'env> {
303     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304         f.debug_struct("Scope")
305             .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
306             .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
307             .field("main_thread", &self.data.main_thread)
308             .finish_non_exhaustive()
309     }
310 }
311
312 impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
313     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314         f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
315     }
316 }