]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpmc/context.rs
Rollup merge of #103439 - Nilstrieb:help-me-with-my-macro, r=estebank
[rust.git] / library / std / src / sync / mpmc / context.rs
1 //! Thread-local channel context.
2
3 use super::select::Selected;
4 use super::waker::current_thread_id;
5
6 use crate::cell::Cell;
7 use crate::ptr;
8 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9 use crate::sync::Arc;
10 use crate::thread::{self, Thread};
11 use crate::time::Instant;
12
13 /// Thread-local context.
14 #[derive(Debug, Clone)]
15 pub struct Context {
16     inner: Arc<Inner>,
17 }
18
19 /// Inner representation of `Context`.
20 #[derive(Debug)]
21 struct Inner {
22     /// Selected operation.
23     select: AtomicUsize,
24
25     /// A slot into which another thread may store a pointer to its `Packet`.
26     packet: AtomicPtr<()>,
27
28     /// Thread handle.
29     thread: Thread,
30
31     /// Thread id.
32     thread_id: usize,
33 }
34
35 impl Context {
36     /// Creates a new context for the duration of the closure.
37     #[inline]
38     pub fn with<F, R>(f: F) -> R
39     where
40         F: FnOnce(&Context) -> R,
41     {
42         thread_local! {
43             /// Cached thread-local context.
44             static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
45         }
46
47         let mut f = Some(f);
48         let mut f = |cx: &Context| -> R {
49             let f = f.take().unwrap();
50             f(cx)
51         };
52
53         CONTEXT
54             .try_with(|cell| match cell.take() {
55                 None => f(&Context::new()),
56                 Some(cx) => {
57                     cx.reset();
58                     let res = f(&cx);
59                     cell.set(Some(cx));
60                     res
61                 }
62             })
63             .unwrap_or_else(|_| f(&Context::new()))
64     }
65
66     /// Creates a new `Context`.
67     #[cold]
68     fn new() -> Context {
69         Context {
70             inner: Arc::new(Inner {
71                 select: AtomicUsize::new(Selected::Waiting.into()),
72                 packet: AtomicPtr::new(ptr::null_mut()),
73                 thread: thread::current(),
74                 thread_id: current_thread_id(),
75             }),
76         }
77     }
78
79     /// Resets `select` and `packet`.
80     #[inline]
81     fn reset(&self) {
82         self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
83         self.inner.packet.store(ptr::null_mut(), Ordering::Release);
84     }
85
86     /// Attempts to select an operation.
87     ///
88     /// On failure, the previously selected operation is returned.
89     #[inline]
90     pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
91         self.inner
92             .select
93             .compare_exchange(
94                 Selected::Waiting.into(),
95                 select.into(),
96                 Ordering::AcqRel,
97                 Ordering::Acquire,
98             )
99             .map(|_| ())
100             .map_err(|e| e.into())
101     }
102
103     /// Stores a packet.
104     ///
105     /// This method must be called after `try_select` succeeds and there is a packet to provide.
106     #[inline]
107     pub fn store_packet(&self, packet: *mut ()) {
108         if !packet.is_null() {
109             self.inner.packet.store(packet, Ordering::Release);
110         }
111     }
112
113     /// Waits until an operation is selected and returns it.
114     ///
115     /// If the deadline is reached, `Selected::Aborted` will be selected.
116     #[inline]
117     pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
118         loop {
119             // Check whether an operation has been selected.
120             let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
121             if sel != Selected::Waiting {
122                 return sel;
123             }
124
125             // If there's a deadline, park the current thread until the deadline is reached.
126             if let Some(end) = deadline {
127                 let now = Instant::now();
128
129                 if now < end {
130                     thread::park_timeout(end - now);
131                 } else {
132                     // The deadline has been reached. Try aborting select.
133                     return match self.try_select(Selected::Aborted) {
134                         Ok(()) => Selected::Aborted,
135                         Err(s) => s,
136                     };
137                 }
138             } else {
139                 thread::park();
140             }
141         }
142     }
143
144     /// Unparks the thread this context belongs to.
145     #[inline]
146     pub fn unpark(&self) {
147         self.inner.thread.unpark();
148     }
149
150     /// Returns the id of the thread this context belongs to.
151     #[inline]
152     pub fn thread_id(&self) -> usize {
153         self.inner.thread_id
154     }
155 }