1 /// Oneshot channels/ports
3 /// This is the initial flavor of channels/ports used for comm module. This is
4 /// an optimization for the one-use case of a channel. The major optimization of
5 /// this type is to have one and exactly one allocation when the chan/port pair
8 /// Another possible optimization would be to not use an Arc box because
9 /// in theory we know when the shared packet can be deallocated (no real need
10 /// for the atomic reference counting), but I was having trouble how to destroy
11 /// the data early in a drop of a Port.
15 /// Oneshots are implemented around one atomic usize variable. This variable
16 /// indicates both the state of the port/chan but also contains any threads
17 /// blocked on the port. All atomic operations happen on this one word.
19 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
20 /// on behalf of the channel side of things (it can be mentally thought of as
21 /// consuming the port). This upgrade is then also stored in the shared packet.
22 /// The one caveat to consider is that when a port sees a disconnected channel
23 /// it must check for data because there is no "data plus upgrade" state.
25 pub use self::Failure::*;
26 pub use self::UpgradeResult::*;
27 pub use self::SelectionResult::*;
28 use self::MyUpgrade::*;
30 use crate::sync::mpsc::Receiver;
31 use crate::sync::mpsc::blocking::{self, SignalToken};
32 use crate::cell::UnsafeCell;
34 use crate::sync::atomic::{AtomicUsize, Ordering};
35 use crate::time::Instant;
37 // Various states you can find a port in.
38 const EMPTY: usize = 0; // initial state: no data, no blocked receiver
39 const DATA: usize = 1; // data ready for receiver to take
40 const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
41 // Any other value represents a pointer to a SignalToken value. The
42 // protocol ensures that when the state moves *to* a pointer,
43 // ownership of the token is given to the packet, and when the state
44 // moves *from* a pointer, ownership of the token is transferred to
45 // whoever changed the state.
47 pub struct Packet<T> {
48 // Internal state of the chan/port pair (stores the blocked thread as well)
50 // One-shot data slot location
51 data: UnsafeCell<Option<T>>,
52 // when used for the second time, a oneshot channel must be upgraded, and
53 // this contains the slot for the upgrade
54 upgrade: UnsafeCell<MyUpgrade<T>>,
60 Upgraded(Receiver<T>),
63 pub enum UpgradeResult {
69 pub enum SelectionResult<T> {
71 SelUpgraded(SignalToken, Receiver<T>),
82 pub fn new() -> Packet<T> {
84 data: UnsafeCell::new(None),
85 upgrade: UnsafeCell::new(NothingSent),
86 state: AtomicUsize::new(EMPTY),
90 pub fn send(&self, t: T) -> Result<(), T> {
93 match *self.upgrade.get() {
95 _ => panic!("sending on a oneshot that's already sent on "),
97 assert!((*self.data.get()).is_none());
98 ptr::write(self.data.get(), Some(t));
99 ptr::write(self.upgrade.get(), SendUsed);
101 match self.state.swap(DATA, Ordering::SeqCst) {
102 // Sent the data, no one was waiting
105 // Couldn't send the data, the port hung up first. Return the data
106 // back up the stack.
108 self.state.swap(DISCONNECTED, Ordering::SeqCst);
109 ptr::write(self.upgrade.get(), NothingSent);
110 Err((&mut *self.data.get()).take().unwrap())
113 // Not possible, these are one-use channels
114 DATA => unreachable!(),
116 // There is a thread waiting on the other end. We leave the 'DATA'
117 // state inside so it'll pick it up on the other end.
119 SignalToken::cast_from_usize(ptr).signal();
126 // Just tests whether this channel has been sent on or not, this is only
127 // safe to use from the sender.
128 pub fn sent(&self) -> bool {
130 match *self.upgrade.get() {
131 NothingSent => false,
137 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
138 // Attempt to not block the thread (it's a little expensive). If it looks
139 // like we're not empty, then immediately go through to `try_recv`.
140 if self.state.load(Ordering::SeqCst) == EMPTY {
141 let (wait_token, signal_token) = blocking::tokens();
142 let ptr = unsafe { signal_token.cast_to_usize() };
144 // race with senders to enter the blocking state
145 if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
146 if let Some(deadline) = deadline {
147 let timed_out = !wait_token.wait_max_until(deadline);
148 // Try to reset the state
150 self.abort_selection().map_err(Upgraded)?;
154 debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
157 // drop the signal token, since we never blocked
158 drop(unsafe { SignalToken::cast_from_usize(ptr) });
165 pub fn try_recv(&self) -> Result<T, Failure<T>> {
167 match self.state.load(Ordering::SeqCst) {
170 // We saw some data on the channel, but the channel can be used
171 // again to send us an upgrade. As a result, we need to re-insert
172 // into the channel that there's no data available (otherwise we'll
173 // just see DATA next time). This is done as a cmpxchg because if
174 // the state changes under our feet we'd rather just see that state
177 self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
178 match (&mut *self.data.get()).take() {
179 Some(data) => Ok(data),
180 None => unreachable!(),
184 // There's no guarantee that we receive before an upgrade happens,
185 // and an upgrade flags the channel as disconnected, so when we see
186 // this we first need to check if there's data available and *then*
187 // we go through and process the upgrade.
189 match (&mut *self.data.get()).take() {
190 Some(data) => Ok(data),
192 match ptr::replace(self.upgrade.get(), SendUsed) {
193 SendUsed | NothingSent => Err(Disconnected),
194 GoUp(upgrade) => Err(Upgraded(upgrade))
200 // We are the sole receiver; there cannot be a blocking
207 // Returns whether the upgrade was completed. If the upgrade wasn't
208 // completed, then the port couldn't get sent to the other half (it will
209 // never receive it).
210 pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
212 let prev = match *self.upgrade.get() {
213 NothingSent => NothingSent,
214 SendUsed => SendUsed,
215 _ => panic!("upgrading again"),
217 ptr::write(self.upgrade.get(), GoUp(up));
219 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
220 // If the channel is empty or has data on it, then we're good to go.
221 // Senders will check the data before the upgrade (in case we
222 // plastered over the DATA state).
223 DATA | EMPTY => UpSuccess,
225 // If the other end is already disconnected, then we failed the
226 // upgrade. Be sure to trash the port we were given.
227 DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
229 // If someone's waiting, we gotta wake them up
230 ptr => UpWoke(SignalToken::cast_from_usize(ptr))
235 pub fn drop_chan(&self) {
236 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
237 DATA | DISCONNECTED | EMPTY => {}
239 // If someone's waiting, we gotta wake them up
241 SignalToken::cast_from_usize(ptr).signal();
246 pub fn drop_port(&self) {
247 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
248 // An empty channel has nothing to do, and a remotely disconnected
249 // channel also has nothing to do b/c we're about to run the drop
251 DISCONNECTED | EMPTY => {}
253 // There's data on the channel, so make sure we destroy it promptly.
254 // This is why not using an arc is a little difficult (need the box
255 // to stay valid while we take the data).
256 DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
258 // We're the only ones that can block on this port
263 ////////////////////////////////////////////////////////////////////////////
264 // select implementation
265 ////////////////////////////////////////////////////////////////////////////
267 // If Ok, the value is whether this port has data, if Err, then the upgraded
268 // port needs to be checked instead of this one.
269 pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
271 match self.state.load(Ordering::SeqCst) {
272 EMPTY => Ok(false), // Welp, we tried
273 DATA => Ok(true), // we have some un-acquired data
274 DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
276 match ptr::replace(self.upgrade.get(), SendUsed) {
277 // The other end sent us an upgrade, so we need to
278 // propagate upwards whether the upgrade can receive
280 GoUp(upgrade) => Err(upgrade),
282 // If the other end disconnected without sending an
283 // upgrade, then we have data to receive (the channel is
285 up => { ptr::write(self.upgrade.get(), up); Ok(true) }
288 _ => unreachable!(), // we're the "one blocker"
293 // Attempts to start selection on this port. This can either succeed, fail
294 // because there is data, or fail because there is an upgrade pending.
295 pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
297 let ptr = token.cast_to_usize();
298 match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
301 drop(SignalToken::cast_from_usize(ptr));
304 DISCONNECTED if (*self.data.get()).is_some() => {
305 drop(SignalToken::cast_from_usize(ptr));
309 match ptr::replace(self.upgrade.get(), SendUsed) {
310 // The other end sent us an upgrade, so we need to
311 // propagate upwards whether the upgrade can receive
314 SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
317 // If the other end disconnected without sending an
318 // upgrade, then we have data to receive (the channel is
321 ptr::write(self.upgrade.get(), up);
322 drop(SignalToken::cast_from_usize(ptr));
327 _ => unreachable!(), // we're the "one blocker"
332 // Remove a previous selecting thread from this port. This ensures that the
333 // blocked thread will no longer be visible to any other threads.
335 // The return value indicates whether there's data on this port.
336 pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
337 let state = match self.state.load(Ordering::SeqCst) {
338 // Each of these states means that no further activity will happen
339 // with regard to abortion selection
342 s @ DISCONNECTED => s,
344 // If we've got a blocked thread, then use an atomic to gain ownership
346 ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
349 // Now that we've got ownership of our state, figure out what to do
352 EMPTY => unreachable!(),
353 // our thread used for select was stolen
356 // If the other end has hung up, then we have complete ownership
357 // of the port. First, check if there was data waiting for us. This
358 // is possible if the other end sent something and then hung up.
360 // We then need to check to see if there was an upgrade requested,
361 // and if so, the upgraded port needs to have its selection aborted.
362 DISCONNECTED => unsafe {
363 if (*self.data.get()).is_some() {
366 match ptr::replace(self.upgrade.get(), SendUsed) {
367 GoUp(port) => Err(port),
373 // We woke ourselves up from select.
375 drop(SignalToken::cast_from_usize(ptr));
382 impl<T> Drop for Packet<T> {
384 assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);