]> git.lizzy.rs Git - rust.git/blob - library/std/src/thread/scoped.rs
Remove argument from closure in thread::Scope::spawn.
[rust.git] / library / std / src / thread / scoped.rs
1 use super::{current, park, Builder, JoinInner, Result, Thread};
2 use crate::fmt;
3 use crate::io;
4 use crate::marker::PhantomData;
5 use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
6 use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 use crate::sync::Arc;
8
9 /// A scope to spawn scoped threads in.
10 ///
11 /// See [`scope`] for details.
12 pub struct Scope<'scope, 'env: 'scope> {
13     data: ScopeData,
14     /// Invariance over 'scope, to make sure 'scope cannot shrink,
15     /// which is necessary for soundness.
16     ///
17     /// Without invariance, this would compile fine but be unsound:
18     ///
19     /// ```compile_fail,E0373
20     /// #![feature(scoped_threads)]
21     ///
22     /// std::thread::scope(|s| {
23     ///     s.spawn(|| {
24     ///         let a = String::from("abcd");
25     ///         s.spawn(|| println!("{:?}", a)); // might run after `a` is dropped
26     ///     });
27     /// });
28     /// ```
29     scope: PhantomData<&'scope mut &'scope ()>,
30     env: PhantomData<&'env mut &'env ()>,
31 }
32
33 /// An owned permission to join on a scoped thread (block on its termination).
34 ///
35 /// See [`Scope::spawn`] for details.
36 pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
37
38 pub(super) struct ScopeData {
39     num_running_threads: AtomicUsize,
40     a_thread_panicked: AtomicBool,
41     main_thread: Thread,
42 }
43
44 impl ScopeData {
45     pub(super) fn increment_num_running_threads(&self) {
46         // We check for 'overflow' with usize::MAX / 2, to make sure there's no
47         // chance it overflows to 0, which would result in unsoundness.
48         if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
49             // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles.
50             self.decrement_num_running_threads(false);
51             panic!("too many running threads in thread scope");
52         }
53     }
54     pub(super) fn decrement_num_running_threads(&self, panic: bool) {
55         if panic {
56             self.a_thread_panicked.store(true, Ordering::Relaxed);
57         }
58         if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
59             self.main_thread.unpark();
60         }
61     }
62 }
63
64 /// Create a scope for spawning scoped threads.
65 ///
66 /// The function passed to `scope` will be provided a [`Scope`] object,
67 /// through which scoped threads can be [spawned][`Scope::spawn`].
68 ///
69 /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
70 /// as the scope guarantees all threads will be joined at the end of the scope.
71 ///
72 /// All threads spawned within the scope that haven't been manually joined
73 /// will be automatically joined before this function returns.
74 ///
75 /// # Panics
76 ///
77 /// If any of the automatically joined threads panicked, this function will panic.
78 ///
79 /// If you want to handle panics from spawned threads,
80 /// [`join`][ScopedJoinHandle::join] them before the end of the scope.
81 ///
82 /// # Example
83 ///
84 /// ```
85 /// #![feature(scoped_threads)]
86 /// use std::thread;
87 ///
88 /// let mut a = vec![1, 2, 3];
89 /// let mut x = 0;
90 ///
91 /// thread::scope(|s| {
92 ///     s.spawn(|| {
93 ///         println!("hello from the first scoped thread");
94 ///         // We can borrow `a` here.
95 ///         dbg!(&a);
96 ///     });
97 ///     s.spawn(|| {
98 ///         println!("hello from the second scoped thread");
99 ///         // We can even mutably borrow `x` here,
100 ///         // because no other threads are using it.
101 ///         x += a[0] + a[2];
102 ///     });
103 ///     println!("hello from the main thread");
104 /// });
105 ///
106 /// // After the scope, we can modify and access our variables again:
107 /// a.push(4);
108 /// assert_eq!(x, a.len());
109 /// ```
110 #[track_caller]
111 pub fn scope<'env, F, T>(f: F) -> T
112 where
113     F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
114 {
115     let scope = Scope {
116         data: ScopeData {
117             num_running_threads: AtomicUsize::new(0),
118             main_thread: current(),
119             a_thread_panicked: AtomicBool::new(false),
120         },
121         env: PhantomData,
122         scope: PhantomData,
123     };
124
125     // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
126     let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
127
128     // Wait until all the threads are finished.
129     while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
130         park();
131     }
132
133     // Throw any panic from `f`, or the return value of `f` if no thread panicked.
134     match result {
135         Err(e) => resume_unwind(e),
136         Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
137             panic!("a scoped thread panicked")
138         }
139         Ok(result) => result,
140     }
141 }
142
143 impl<'scope, 'env> Scope<'scope, 'env> {
144     /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
145     ///
146     /// Unlike non-scoped threads, threads spawned with this function may
147     /// borrow non-`'static` data from the outside the scope. See [`scope`] for
148     /// details.
149     ///
150     /// The join handle provides a [`join`] method that can be used to join the spawned
151     /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
152     /// the panic payload.
153     ///
154     /// If the join handle is dropped, the spawned thread will implicitly joined at the
155     /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
156     /// panic after all threads are joined.
157     ///
158     /// This call will create a thread using default parameters of [`Builder`].
159     /// If you want to specify the stack size or the name of the thread, use
160     /// [`Builder::spawn_scoped`] instead.
161     ///
162     /// # Panics
163     ///
164     /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
165     /// to recover from such errors.
166     ///
167     /// [`join`]: ScopedJoinHandle::join
168     pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
169     where
170         F: FnOnce() -> T + Send + 'scope,
171         T: Send + 'scope,
172     {
173         Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
174     }
175 }
176
177 impl Builder {
178     /// Spawns a new scoped thread using the settings set through this `Builder`.
179     ///
180     /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
181     /// capture any failure to create the thread at the OS level.
182     ///
183     /// [`io::Result`]: crate::io::Result
184     ///
185     /// # Panics
186     ///
187     /// Panics if a thread name was set and it contained null bytes.
188     ///
189     /// # Example
190     ///
191     /// ```
192     /// #![feature(scoped_threads)]
193     /// use std::thread;
194     ///
195     /// let mut a = vec![1, 2, 3];
196     /// let mut x = 0;
197     ///
198     /// thread::scope(|s| {
199     ///     thread::Builder::new()
200     ///         .name("first".to_string())
201     ///         .spawn_scoped(s, ||
202     ///     {
203     ///         println!("hello from the {:?} scoped thread", thread::current().name());
204     ///         // We can borrow `a` here.
205     ///         dbg!(&a);
206     ///     })
207     ///     .unwrap();
208     ///     thread::Builder::new()
209     ///         .name("second".to_string())
210     ///         .spawn_scoped(s, ||
211     ///     {
212     ///         println!("hello from the {:?} scoped thread", thread::current().name());
213     ///         // We can even mutably borrow `x` here,
214     ///         // because no other threads are using it.
215     ///         x += a[0] + a[2];
216     ///     })
217     ///     .unwrap();
218     ///     println!("hello from the main thread");
219     /// });
220     ///
221     /// // After the scope, we can modify and access our variables again:
222     /// a.push(4);
223     /// assert_eq!(x, a.len());
224     /// ```
225     pub fn spawn_scoped<'scope, 'env, F, T>(
226         self,
227         scope: &'scope Scope<'scope, 'env>,
228         f: F,
229     ) -> io::Result<ScopedJoinHandle<'scope, T>>
230     where
231         F: FnOnce() -> T + Send + 'scope,
232         T: Send + 'scope,
233     {
234         Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(|| f(), Some(&scope.data)) }?))
235     }
236 }
237
238 impl<'scope, T> ScopedJoinHandle<'scope, T> {
239     /// Extracts a handle to the underlying thread.
240     ///
241     /// # Examples
242     ///
243     /// ```
244     /// #![feature(scoped_threads)]
245     /// #![feature(thread_is_running)]
246     ///
247     /// use std::thread;
248     ///
249     /// thread::scope(|s| {
250     ///     let t = s.spawn(|| {
251     ///         println!("hello");
252     ///     });
253     ///     println!("thread id: {:?}", t.thread().id());
254     /// });
255     /// ```
256     #[must_use]
257     pub fn thread(&self) -> &Thread {
258         &self.0.thread
259     }
260
261     /// Waits for the associated thread to finish.
262     ///
263     /// This function will return immediately if the associated thread has already finished.
264     ///
265     /// In terms of [atomic memory orderings], the completion of the associated
266     /// thread synchronizes with this function returning.
267     /// In other words, all operations performed by that thread
268     /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
269     /// all operations that happen after `join` returns.
270     ///
271     /// If the associated thread panics, [`Err`] is returned with the panic payload.
272     ///
273     /// [atomic memory orderings]: crate::sync::atomic
274     ///
275     /// # Examples
276     ///
277     /// ```
278     /// #![feature(scoped_threads)]
279     /// #![feature(thread_is_running)]
280     ///
281     /// use std::thread;
282     ///
283     /// thread::scope(|s| {
284     ///     let t = s.spawn(|| {
285     ///         panic!("oh no");
286     ///     });
287     ///     assert!(t.join().is_err());
288     /// });
289     /// ```
290     pub fn join(self) -> Result<T> {
291         self.0.join()
292     }
293
294     /// Checks if the associated thread is still running its main function.
295     ///
296     /// This might return `false` for a brief moment after the thread's main
297     /// function has returned, but before the thread itself has stopped running.
298     #[unstable(feature = "thread_is_running", issue = "90470")]
299     pub fn is_running(&self) -> bool {
300         Arc::strong_count(&self.0.packet) > 1
301     }
302 }
303
304 impl<'scope, 'env> fmt::Debug for Scope<'scope, 'env> {
305     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306         f.debug_struct("Scope")
307             .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
308             .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
309             .field("main_thread", &self.data.main_thread)
310             .finish_non_exhaustive()
311     }
312 }
313
314 impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
315     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316         f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
317     }
318 }