]> git.lizzy.rs Git - rust.git/blob - library/std/src/thread/scoped.rs
Improve docs for `is_running` to explain use case
[rust.git] / library / std / src / thread / scoped.rs
1 use super::{current, park, Builder, JoinInner, Result, Thread};
2 use crate::fmt;
3 use crate::io;
4 use crate::marker::PhantomData;
5 use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
6 use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 use crate::sync::Arc;
8
9 /// A scope to spawn scoped threads in.
10 ///
11 /// See [`scope`] for details.
12 #[stable(feature = "scoped_threads", since = "1.63.0")]
13 pub struct Scope<'scope, 'env: 'scope> {
14     data: ScopeData,
15     /// Invariance over 'scope, to make sure 'scope cannot shrink,
16     /// which is necessary for soundness.
17     ///
18     /// Without invariance, this would compile fine but be unsound:
19     ///
20     /// ```compile_fail,E0373
21     /// std::thread::scope(|s| {
22     ///     s.spawn(|| {
23     ///         let a = String::from("abcd");
24     ///         s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
25     ///     });
26     /// });
27     /// ```
28     scope: PhantomData<&'scope mut &'scope ()>,
29     env: PhantomData<&'env mut &'env ()>,
30 }
31
32 /// An owned permission to join on a scoped thread (block on its termination).
33 ///
34 /// See [`Scope::spawn`] for details.
35 #[stable(feature = "scoped_threads", since = "1.63.0")]
36 pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
37
38 pub(super) struct ScopeData {
39     num_running_threads: AtomicUsize,
40     a_thread_panicked: AtomicBool,
41     main_thread: Thread,
42 }
43
44 impl ScopeData {
45     pub(super) fn increment_num_running_threads(&self) {
46         // We check for 'overflow' with usize::MAX / 2, to make sure there's no
47         // chance it overflows to 0, which would result in unsoundness.
48         if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
49             // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles.
50             self.decrement_num_running_threads(false);
51             panic!("too many running threads in thread scope");
52         }
53     }
54     pub(super) fn decrement_num_running_threads(&self, panic: bool) {
55         if panic {
56             self.a_thread_panicked.store(true, Ordering::Relaxed);
57         }
58         if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
59             self.main_thread.unpark();
60         }
61     }
62 }
63
64 /// Create a scope for spawning scoped threads.
65 ///
66 /// The function passed to `scope` will be provided a [`Scope`] object,
67 /// through which scoped threads can be [spawned][`Scope::spawn`].
68 ///
69 /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
70 /// as the scope guarantees all threads will be joined at the end of the scope.
71 ///
72 /// All threads spawned within the scope that haven't been manually joined
73 /// will be automatically joined before this function returns.
74 ///
75 /// # Panics
76 ///
77 /// If any of the automatically joined threads panicked, this function will panic.
78 ///
79 /// If you want to handle panics from spawned threads,
80 /// [`join`][ScopedJoinHandle::join] them before the end of the scope.
81 ///
82 /// # Example
83 ///
84 /// ```
85 /// use std::thread;
86 ///
87 /// let mut a = vec![1, 2, 3];
88 /// let mut x = 0;
89 ///
90 /// thread::scope(|s| {
91 ///     s.spawn(|| {
92 ///         println!("hello from the first scoped thread");
93 ///         // We can borrow `a` here.
94 ///         dbg!(&a);
95 ///     });
96 ///     s.spawn(|| {
97 ///         println!("hello from the second scoped thread");
98 ///         // We can even mutably borrow `x` here,
99 ///         // because no other threads are using it.
100 ///         x += a[0] + a[2];
101 ///     });
102 ///     println!("hello from the main thread");
103 /// });
104 ///
105 /// // After the scope, we can modify and access our variables again:
106 /// a.push(4);
107 /// assert_eq!(x, a.len());
108 /// ```
109 ///
110 /// # Lifetimes
111 ///
112 /// Scoped threads involve two lifetimes: `'scope` and `'env`.
113 ///
114 /// The `'scope` lifetime represents the lifetime of the scope itself.
115 /// That is: the time during which new scoped threads may be spawned,
116 /// and also the time during which they might still be running.
117 /// Once this lifetime ends, all scoped threads are joined.
118 /// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts.
119 /// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns.
120 ///
121 /// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads.
122 /// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`.
123 /// It can be as small as the call to `scope`, meaning that anything that outlives this call,
124 /// such as local variables defined right before the scope, can be borrowed by the scoped threads.
125 ///
126 /// The `'env: 'scope` bound is part of the definition of the `Scope` type.
127 #[track_caller]
128 #[stable(feature = "scoped_threads", since = "1.63.0")]
129 pub fn scope<'env, F, T>(f: F) -> T
130 where
131     F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
132 {
133     let scope = Scope {
134         data: ScopeData {
135             num_running_threads: AtomicUsize::new(0),
136             main_thread: current(),
137             a_thread_panicked: AtomicBool::new(false),
138         },
139         env: PhantomData,
140         scope: PhantomData,
141     };
142
143     // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
144     let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
145
146     // Wait until all the threads are finished.
147     while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
148         park();
149     }
150
151     // Throw any panic from `f`, or the return value of `f` if no thread panicked.
152     match result {
153         Err(e) => resume_unwind(e),
154         Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
155             panic!("a scoped thread panicked")
156         }
157         Ok(result) => result,
158     }
159 }
160
161 impl<'scope, 'env> Scope<'scope, 'env> {
162     /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
163     ///
164     /// Unlike non-scoped threads, threads spawned with this function may
165     /// borrow non-`'static` data from the outside the scope. See [`scope`] for
166     /// details.
167     ///
168     /// The join handle provides a [`join`] method that can be used to join the spawned
169     /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
170     /// the panic payload.
171     ///
172     /// If the join handle is dropped, the spawned thread will implicitly joined at the
173     /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
174     /// panic after all threads are joined.
175     ///
176     /// This call will create a thread using default parameters of [`Builder`].
177     /// If you want to specify the stack size or the name of the thread, use
178     /// [`Builder::spawn_scoped`] instead.
179     ///
180     /// # Panics
181     ///
182     /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
183     /// to recover from such errors.
184     ///
185     /// [`join`]: ScopedJoinHandle::join
186     #[stable(feature = "scoped_threads", since = "1.63.0")]
187     pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
188     where
189         F: FnOnce() -> T + Send + 'scope,
190         T: Send + 'scope,
191     {
192         Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
193     }
194 }
195
196 impl Builder {
197     /// Spawns a new scoped thread using the settings set through this `Builder`.
198     ///
199     /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
200     /// capture any failure to create the thread at the OS level.
201     ///
202     /// [`io::Result`]: crate::io::Result
203     ///
204     /// # Panics
205     ///
206     /// Panics if a thread name was set and it contained null bytes.
207     ///
208     /// # Example
209     ///
210     /// ```
211     /// use std::thread;
212     ///
213     /// let mut a = vec![1, 2, 3];
214     /// let mut x = 0;
215     ///
216     /// thread::scope(|s| {
217     ///     thread::Builder::new()
218     ///         .name("first".to_string())
219     ///         .spawn_scoped(s, ||
220     ///     {
221     ///         println!("hello from the {:?} scoped thread", thread::current().name());
222     ///         // We can borrow `a` here.
223     ///         dbg!(&a);
224     ///     })
225     ///     .unwrap();
226     ///     thread::Builder::new()
227     ///         .name("second".to_string())
228     ///         .spawn_scoped(s, ||
229     ///     {
230     ///         println!("hello from the {:?} scoped thread", thread::current().name());
231     ///         // We can even mutably borrow `x` here,
232     ///         // because no other threads are using it.
233     ///         x += a[0] + a[2];
234     ///     })
235     ///     .unwrap();
236     ///     println!("hello from the main thread");
237     /// });
238     ///
239     /// // After the scope, we can modify and access our variables again:
240     /// a.push(4);
241     /// assert_eq!(x, a.len());
242     /// ```
243     #[stable(feature = "scoped_threads", since = "1.63.0")]
244     pub fn spawn_scoped<'scope, 'env, F, T>(
245         self,
246         scope: &'scope Scope<'scope, 'env>,
247         f: F,
248     ) -> io::Result<ScopedJoinHandle<'scope, T>>
249     where
250         F: FnOnce() -> T + Send + 'scope,
251         T: Send + 'scope,
252     {
253         Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(&scope.data)) }?))
254     }
255 }
256
257 impl<'scope, T> ScopedJoinHandle<'scope, T> {
258     /// Extracts a handle to the underlying thread.
259     ///
260     /// # Examples
261     ///
262     /// ```
263     /// use std::thread;
264     ///
265     /// thread::scope(|s| {
266     ///     let t = s.spawn(|| {
267     ///         println!("hello");
268     ///     });
269     ///     println!("thread id: {:?}", t.thread().id());
270     /// });
271     /// ```
272     #[must_use]
273     #[stable(feature = "scoped_threads", since = "1.63.0")]
274     pub fn thread(&self) -> &Thread {
275         &self.0.thread
276     }
277
278     /// Waits for the associated thread to finish.
279     ///
280     /// This function will return immediately if the associated thread has already finished.
281     ///
282     /// In terms of [atomic memory orderings], the completion of the associated
283     /// thread synchronizes with this function returning.
284     /// In other words, all operations performed by that thread
285     /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
286     /// all operations that happen after `join` returns.
287     ///
288     /// If the associated thread panics, [`Err`] is returned with the panic payload.
289     ///
290     /// [atomic memory orderings]: crate::sync::atomic
291     ///
292     /// # Examples
293     ///
294     /// ```
295     /// use std::thread;
296     ///
297     /// thread::scope(|s| {
298     ///     let t = s.spawn(|| {
299     ///         panic!("oh no");
300     ///     });
301     ///     assert!(t.join().is_err());
302     /// });
303     /// ```
304     #[stable(feature = "scoped_threads", since = "1.63.0")]
305     pub fn join(self) -> Result<T> {
306         self.0.join()
307     }
308
309     /// Checks if the associated thread has finished running its main function.
310     ///
311     /// `is_finished` supports implementing a non-blocking join operation, by checking
312     /// `is_finished`, and calling `join` if it returns `false`. This function does not block. To
313     /// block while waiting on the thread to finish, use [`join`][Self::join].
314     ///
315     /// This might return `true` for a brief moment after the thread's main
316     /// function has returned, but before the thread itself has stopped running.
317     /// However, once this returns `true`, [`join`][Self::join] can be expected
318     /// to return quickly, without blocking for any significant amount of time.
319     #[stable(feature = "scoped_threads", since = "1.63.0")]
320     pub fn is_finished(&self) -> bool {
321         Arc::strong_count(&self.0.packet) == 1
322     }
323 }
324
325 #[stable(feature = "scoped_threads", since = "1.63.0")]
326 impl fmt::Debug for Scope<'_, '_> {
327     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328         f.debug_struct("Scope")
329             .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
330             .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
331             .field("main_thread", &self.data.main_thread)
332             .finish_non_exhaustive()
333     }
334 }
335
336 #[stable(feature = "scoped_threads", since = "1.63.0")]
337 impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
338     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
339         f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
340     }
341 }