]> git.lizzy.rs Git - rust.git/blob - library/std/src/thread/scoped.rs
Mention *scoped* thread in panic message.
[rust.git] / library / std / src / thread / scoped.rs
1 use super::{current, park, Builder, JoinInner, Result, Thread};
2 use crate::fmt;
3 use crate::io;
4 use crate::marker::PhantomData;
5 use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
6 use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 use crate::sync::Arc;
8
9 /// A scope to spawn scoped threads in.
10 ///
11 /// See [`scope`] for details.
12 pub struct Scope<'env> {
13     data: ScopeData,
14     /// Invariance over 'env, to make sure 'env cannot shrink,
15     /// which is necessary for soundness.
16     env: PhantomData<&'env mut &'env ()>,
17 }
18
19 /// An owned permission to join on a scoped thread (block on its termination).
20 ///
21 /// See [`Scope::spawn`] for details.
22 pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
23
24 pub(super) struct ScopeData {
25     num_running_threads: AtomicUsize,
26     a_thread_panicked: AtomicBool,
27     main_thread: Thread,
28 }
29
30 impl ScopeData {
31     pub(super) fn increment_num_running_threads(&self) {
32         // We check for 'overflow' with usize::MAX / 2, to make sure there's no
33         // chance it overflows to 0, which would result in unsoundness.
34         if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
35             // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles.
36             self.decrement_num_running_threads(false);
37             panic!("too many running threads in thread scope");
38         }
39     }
40     pub(super) fn decrement_num_running_threads(&self, panic: bool) {
41         if panic {
42             self.a_thread_panicked.store(true, Ordering::Relaxed);
43         }
44         if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
45             self.main_thread.unpark();
46         }
47     }
48 }
49
50 /// Create a scope for spawning scoped threads.
51 ///
52 /// The function passed to `scope` will be provided a [`Scope`] object,
53 /// through which scoped threads can be [spawned][`Scope::spawn`].
54 ///
55 /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
56 /// as the scope guarantees all threads will be joined at the end of the scope.
57 ///
58 /// All threads spawned within the scope that haven't been manually joined
59 /// will be automatically joined before this function returns.
60 ///
61 /// # Panics
62 ///
63 /// If any of the automatically joined threads panicked, this function will panic.
64 ///
65 /// If you want to handle panics from spawned threads,
66 /// [`join`][ScopedJoinHandle::join] them before the end of the scope.
67 ///
68 /// # Example
69 ///
70 /// ```
71 /// #![feature(scoped_threads)]
72 /// use std::thread;
73 ///
74 /// let mut a = vec![1, 2, 3];
75 /// let mut x = 0;
76 ///
77 /// thread::scope(|s| {
78 ///     s.spawn(|_| {
79 ///         println!("hello from the first scoped thread");
80 ///         // We can borrow `a` here.
81 ///         dbg!(&a);
82 ///     });
83 ///     s.spawn(|_| {
84 ///         println!("hello from the second scoped thread");
85 ///         // We can even mutably borrow `x` here,
86 ///         // because no other threads are using it.
87 ///         x += a[0] + a[2];
88 ///     });
89 ///     println!("hello from the main thread");
90 /// });
91 ///
92 /// // After the scope, we can modify and access our variables again:
93 /// a.push(4);
94 /// assert_eq!(x, a.len());
95 /// ```
96 #[track_caller]
97 pub fn scope<'env, F, T>(f: F) -> T
98 where
99     F: FnOnce(&Scope<'env>) -> T,
100 {
101     let scope = Scope {
102         data: ScopeData {
103             num_running_threads: AtomicUsize::new(0),
104             main_thread: current(),
105             a_thread_panicked: AtomicBool::new(false),
106         },
107         env: PhantomData,
108     };
109
110     // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
111     let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
112
113     // Wait until all the threads are finished.
114     while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
115         park();
116     }
117
118     // Throw any panic from `f`, or the return value of `f` if no thread panicked.
119     match result {
120         Err(e) => resume_unwind(e),
121         Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
122             panic!("a scoped thread panicked")
123         }
124         Ok(result) => result,
125     }
126 }
127
128 impl<'env> Scope<'env> {
129     /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
130     ///
131     /// Unlike non-scoped threads, threads spawned with this function may
132     /// borrow non-`'static` data from the outside the scope. See [`scope`] for
133     /// details.
134     ///
135     /// The join handle provides a [`join`] method that can be used to join the spawned
136     /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
137     /// the panic payload.
138     ///
139     /// If the join handle is dropped, the spawned thread will implicitly joined at the
140     /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
141     /// panic after all threads are joined.
142     ///
143     /// This call will create a thread using default parameters of [`Builder`].
144     /// If you want to specify the stack size or the name of the thread, use
145     /// [`Builder::spawn_scoped`] instead.
146     ///
147     /// # Panics
148     ///
149     /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
150     /// to recover from such errors.
151     ///
152     /// [`join`]: ScopedJoinHandle::join
153     pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
154     where
155         F: FnOnce(&Scope<'env>) -> T + Send + 'env,
156         T: Send + 'env,
157     {
158         Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
159     }
160 }
161
162 impl Builder {
163     /// Spawns a new scoped thread using the settings set through this `Builder`.
164     ///
165     /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
166     /// capture any failure to create the thread at the OS level.
167     ///
168     /// [`io::Result`]: crate::io::Result
169     ///
170     /// # Panics
171     ///
172     /// Panics if a thread name was set and it contained null bytes.
173     ///
174     /// # Example
175     ///
176     /// ```
177     /// #![feature(scoped_threads)]
178     /// use std::thread;
179     ///
180     /// let mut a = vec![1, 2, 3];
181     /// let mut x = 0;
182     ///
183     /// thread::scope(|s| {
184     ///     thread::Builder::new()
185     ///         .name("first".to_string())
186     ///         .spawn_scoped(s, |_|
187     ///     {
188     ///         println!("hello from the {:?} scoped thread", thread::current().name());
189     ///         // We can borrow `a` here.
190     ///         dbg!(&a);
191     ///     })
192     ///     .unwrap();
193     ///     thread::Builder::new()
194     ///         .name("second".to_string())
195     ///         .spawn_scoped(s, |_|
196     ///     {
197     ///         println!("hello from the {:?} scoped thread", thread::current().name());
198     ///         // We can even mutably borrow `x` here,
199     ///         // because no other threads are using it.
200     ///         x += a[0] + a[2];
201     ///     })
202     ///     .unwrap();
203     ///     println!("hello from the main thread");
204     /// });
205     ///
206     /// // After the scope, we can modify and access our variables again:
207     /// a.push(4);
208     /// assert_eq!(x, a.len());
209     /// ```
210     pub fn spawn_scoped<'scope, 'env, F, T>(
211         self,
212         scope: &'scope Scope<'env>,
213         f: F,
214     ) -> io::Result<ScopedJoinHandle<'scope, T>>
215     where
216         F: FnOnce(&Scope<'env>) -> T + Send + 'env,
217         T: Send + 'env,
218     {
219         Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(|| f(scope), Some(&scope.data)) }?))
220     }
221 }
222
223 impl<'scope, T> ScopedJoinHandle<'scope, T> {
224     /// Extracts a handle to the underlying thread.
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// #![feature(scoped_threads)]
230     /// #![feature(thread_is_running)]
231     ///
232     /// use std::thread;
233     ///
234     /// thread::scope(|s| {
235     ///     let t = s.spawn(|_| {
236     ///         println!("hello");
237     ///     });
238     ///     println!("thread id: {:?}", t.thread().id());
239     /// });
240     /// ```
241     #[must_use]
242     pub fn thread(&self) -> &Thread {
243         &self.0.thread
244     }
245
246     /// Waits for the associated thread to finish.
247     ///
248     /// This function will return immediately if the associated thread has already finished.
249     ///
250     /// In terms of [atomic memory orderings], the completion of the associated
251     /// thread synchronizes with this function returning.
252     /// In other words, all operations performed by that thread
253     /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
254     /// all operations that happen after `join` returns.
255     ///
256     /// If the associated thread panics, [`Err`] is returned with the panic payload.
257     ///
258     /// [atomic memory orderings]: crate::sync::atomic
259     ///
260     /// # Examples
261     ///
262     /// ```
263     /// #![feature(scoped_threads)]
264     /// #![feature(thread_is_running)]
265     ///
266     /// use std::thread;
267     ///
268     /// thread::scope(|s| {
269     ///     let t = s.spawn(|_| {
270     ///         panic!("oh no");
271     ///     });
272     ///     assert!(t.join().is_err());
273     /// });
274     /// ```
275     pub fn join(self) -> Result<T> {
276         self.0.join()
277     }
278
279     /// Checks if the associated thread is still running its main function.
280     ///
281     /// This might return `false` for a brief moment after the thread's main
282     /// function has returned, but before the thread itself has stopped running.
283     #[unstable(feature = "thread_is_running", issue = "90470")]
284     pub fn is_running(&self) -> bool {
285         Arc::strong_count(&self.0.packet) > 1
286     }
287 }
288
289 impl<'env> fmt::Debug for Scope<'env> {
290     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291         f.debug_struct("Scope")
292             .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
293             .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
294             .field("main_thread", &self.data.main_thread)
295             .finish_non_exhaustive()
296     }
297 }
298
299 impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
300     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301         f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
302     }
303 }