1 //! Thread-local channel context.
3 use super::select::Selected;
4 use super::waker::current_thread_id;
8 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
10 use crate::thread::{self, Thread};
11 use crate::time::Instant;
13 /// Thread-local context.
14 #[derive(Debug, Clone)]
19 /// Inner representation of `Context`.
22 /// Selected operation.
25 /// A slot into which another thread may store a pointer to its `Packet`.
26 packet: AtomicPtr<()>,
36 /// Creates a new context for the duration of the closure.
38 pub fn with<F, R>(f: F) -> R
40 F: FnOnce(&Context) -> R,
43 /// Cached thread-local context.
44 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
48 let mut f = |cx: &Context| -> R {
49 let f = f.take().unwrap();
54 .try_with(|cell| match cell.take() {
55 None => f(&Context::new()),
63 .unwrap_or_else(|_| f(&Context::new()))
66 /// Creates a new `Context`.
70 inner: Arc::new(Inner {
71 select: AtomicUsize::new(Selected::Waiting.into()),
72 packet: AtomicPtr::new(ptr::null_mut()),
73 thread: thread::current(),
74 thread_id: current_thread_id(),
79 /// Resets `select` and `packet`.
82 self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
83 self.inner.packet.store(ptr::null_mut(), Ordering::Release);
86 /// Attempts to select an operation.
88 /// On failure, the previously selected operation is returned.
90 pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
94 Selected::Waiting.into(),
100 .map_err(|e| e.into())
105 /// This method must be called after `try_select` succeeds and there is a packet to provide.
107 pub fn store_packet(&self, packet: *mut ()) {
108 if !packet.is_null() {
109 self.inner.packet.store(packet, Ordering::Release);
113 /// Waits until an operation is selected and returns it.
115 /// If the deadline is reached, `Selected::Aborted` will be selected.
117 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
119 // Check whether an operation has been selected.
120 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
121 if sel != Selected::Waiting {
125 // If there's a deadline, park the current thread until the deadline is reached.
126 if let Some(end) = deadline {
127 let now = Instant::now();
130 thread::park_timeout(end - now);
132 // The deadline has been reached. Try aborting select.
133 return match self.try_select(Selected::Aborted) {
134 Ok(()) => Selected::Aborted,
144 /// Unparks the thread this context belongs to.
146 pub fn unpark(&self) {
147 self.inner.thread.unpark();
150 /// Returns the id of the thread this context belongs to.
152 pub fn thread_id(&self) -> usize {