]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/stream.rs
Remove the unstable and deprecated mpsc_select
[rust.git] / src / libstd / sync / mpsc / stream.rs
1 /// Stream channels
2 ///
3 /// This is the flavor of channels which are optimized for one sender and one
4 /// receiver. The sender will be upgraded to a shared channel if the channel is
5 /// cloned.
6 ///
7 /// High level implementation details can be found in the comment of the parent
8 /// module.
9
10 pub use self::Failure::*;
11 pub use self::UpgradeResult::*;
12 use self::Message::*;
13
14 use core::cmp;
15 use core::isize;
16
17 use crate::cell::UnsafeCell;
18 use crate::ptr;
19 use crate::thread;
20 use crate::time::Instant;
21
22 use crate::sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool};
23 use crate::sync::mpsc::Receiver;
24 use crate::sync::mpsc::blocking::{self, SignalToken};
25 use crate::sync::mpsc::spsc_queue as spsc;
26
27 const DISCONNECTED: isize = isize::MIN;
28 #[cfg(test)]
29 const MAX_STEALS: isize = 5;
30 #[cfg(not(test))]
31 const MAX_STEALS: isize = 1 << 20;
32
33 pub struct Packet<T> {
34     // internal queue for all messages
35     queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
36 }
37
38 struct ProducerAddition {
39     cnt: AtomicIsize, // How many items are on this channel
40     to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
41
42     port_dropped: AtomicBool, // flag if the channel has been destroyed.
43 }
44
45 struct ConsumerAddition {
46     steals: UnsafeCell<isize>,  // How many times has a port received without blocking?
47 }
48
49
50 pub enum Failure<T> {
51     Empty,
52     Disconnected,
53     Upgraded(Receiver<T>),
54 }
55
56 pub enum UpgradeResult {
57     UpSuccess,
58     UpDisconnected,
59     UpWoke(SignalToken),
60 }
61
62 // Any message could contain an "upgrade request" to a new shared port, so the
63 // internal queue it's a queue of T, but rather Message<T>
64 enum Message<T> {
65     Data(T),
66     GoUp(Receiver<T>),
67 }
68
69 impl<T> Packet<T> {
70     pub fn new() -> Packet<T> {
71         Packet {
72             queue: unsafe { spsc::Queue::with_additions(
73                 128,
74                 ProducerAddition {
75                     cnt: AtomicIsize::new(0),
76                     to_wake: AtomicUsize::new(0),
77
78                     port_dropped: AtomicBool::new(false),
79                 },
80                 ConsumerAddition {
81                     steals: UnsafeCell::new(0),
82                 }
83             )},
84         }
85     }
86
87     pub fn send(&self, t: T) -> Result<(), T> {
88         // If the other port has deterministically gone away, then definitely
89         // must return the data back up the stack. Otherwise, the data is
90         // considered as being sent.
91         if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
92
93         match self.do_send(Data(t)) {
94             UpSuccess | UpDisconnected => {},
95             UpWoke(token) => { token.signal(); }
96         }
97         Ok(())
98     }
99
100     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
101         // If the port has gone away, then there's no need to proceed any
102         // further.
103         if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
104             return UpDisconnected
105         }
106
107         self.do_send(GoUp(up))
108     }
109
110     fn do_send(&self, t: Message<T>) -> UpgradeResult {
111         self.queue.push(t);
112         match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
113             // As described in the mod's doc comment, -1 == wakeup
114             -1 => UpWoke(self.take_to_wake()),
115             // As as described before, SPSC queues must be >= -2
116             -2 => UpSuccess,
117
118             // Be sure to preserve the disconnected state, and the return value
119             // in this case is going to be whether our data was received or not.
120             // This manifests itself on whether we have an empty queue or not.
121             //
122             // Primarily, are required to drain the queue here because the port
123             // will never remove this data. We can only have at most one item to
124             // drain (the port drains the rest).
125             DISCONNECTED => {
126                 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
127                 let first = self.queue.pop();
128                 let second = self.queue.pop();
129                 assert!(second.is_none());
130
131                 match first {
132                     Some(..) => UpSuccess,  // we failed to send the data
133                     None => UpDisconnected, // we successfully sent data
134                 }
135             }
136
137             // Otherwise we just sent some data on a non-waiting queue, so just
138             // make sure the world is sane and carry on!
139             n => { assert!(n >= 0); UpSuccess }
140         }
141     }
142
143     // Consumes ownership of the 'to_wake' field.
144     fn take_to_wake(&self) -> SignalToken {
145         let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
146         self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
147         assert!(ptr != 0);
148         unsafe { SignalToken::cast_from_usize(ptr) }
149     }
150
151     // Decrements the count on the channel for a sleeper, returning the sleeper
152     // back if it shouldn't sleep. Note that this is the location where we take
153     // steals into account.
154     fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
155         assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
156         let ptr = unsafe { token.cast_to_usize() };
157         self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
158
159         let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
160
161         match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
162             DISCONNECTED => {
163                 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
164             }
165             // If we factor in our steals and notice that the channel has no
166             // data, we successfully sleep
167             n => {
168                 assert!(n >= 0);
169                 if n - steals <= 0 { return Ok(()) }
170             }
171         }
172
173         self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
174         Err(unsafe { SignalToken::cast_from_usize(ptr) })
175     }
176
177     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
178         // Optimistic preflight check (scheduling is expensive).
179         match self.try_recv() {
180             Err(Empty) => {}
181             data => return data,
182         }
183
184         // Welp, our channel has no data. Deschedule the current thread and
185         // initiate the blocking protocol.
186         let (wait_token, signal_token) = blocking::tokens();
187         if self.decrement(signal_token).is_ok() {
188             if let Some(deadline) = deadline {
189                 let timed_out = !wait_token.wait_max_until(deadline);
190                 if timed_out {
191                     self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
192                 }
193             } else {
194                 wait_token.wait();
195             }
196         }
197
198         match self.try_recv() {
199             // Messages which actually popped from the queue shouldn't count as
200             // a steal, so offset the decrement here (we already have our
201             // "steal" factored into the channel count above).
202             data @ Ok(..) |
203             data @ Err(Upgraded(..)) => unsafe {
204                 *self.queue.consumer_addition().steals.get() -= 1;
205                 data
206             },
207
208             data => data,
209         }
210     }
211
212     pub fn try_recv(&self) -> Result<T, Failure<T>> {
213         match self.queue.pop() {
214             // If we stole some data, record to that effect (this will be
215             // factored into cnt later on).
216             //
217             // Note that we don't allow steals to grow without bound in order to
218             // prevent eventual overflow of either steals or cnt as an overflow
219             // would have catastrophic results. Sometimes, steals > cnt, but
220             // other times cnt > steals, so we don't know the relation between
221             // steals and cnt. This code path is executed only rarely, so we do
222             // a pretty slow operation, of swapping 0 into cnt, taking steals
223             // down as much as possible (without going negative), and then
224             // adding back in whatever we couldn't factor into steals.
225             Some(data) => unsafe {
226                 if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
227                     match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
228                         DISCONNECTED => {
229                             self.queue.producer_addition().cnt.store(
230                                 DISCONNECTED, Ordering::SeqCst);
231                         }
232                         n => {
233                             let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
234                             *self.queue.consumer_addition().steals.get() -= m;
235                             self.bump(n - m);
236                         }
237                     }
238                     assert!(*self.queue.consumer_addition().steals.get() >= 0);
239                 }
240                 *self.queue.consumer_addition().steals.get() += 1;
241                 match data {
242                     Data(t) => Ok(t),
243                     GoUp(up) => Err(Upgraded(up)),
244                 }
245             },
246
247             None => {
248                 match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
249                     n if n != DISCONNECTED => Err(Empty),
250
251                     // This is a little bit of a tricky case. We failed to pop
252                     // data above, and then we have viewed that the channel is
253                     // disconnected. In this window more data could have been
254                     // sent on the channel. It doesn't really make sense to
255                     // return that the channel is disconnected when there's
256                     // actually data on it, so be extra sure there's no data by
257                     // popping one more time.
258                     //
259                     // We can ignore steals because the other end is
260                     // disconnected and we'll never need to really factor in our
261                     // steals again.
262                     _ => {
263                         match self.queue.pop() {
264                             Some(Data(t)) => Ok(t),
265                             Some(GoUp(up)) => Err(Upgraded(up)),
266                             None => Err(Disconnected),
267                         }
268                     }
269                 }
270             }
271         }
272     }
273
274     pub fn drop_chan(&self) {
275         // Dropping a channel is pretty simple, we just flag it as disconnected
276         // and then wakeup a blocker if there is one.
277         match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
278             -1 => { self.take_to_wake().signal(); }
279             DISCONNECTED => {}
280             n => { assert!(n >= 0); }
281         }
282     }
283
284     pub fn drop_port(&self) {
285         // Dropping a port seems like a fairly trivial thing. In theory all we
286         // need to do is flag that we're disconnected and then everything else
287         // can take over (we don't have anyone to wake up).
288         //
289         // The catch for Ports is that we want to drop the entire contents of
290         // the queue. There are multiple reasons for having this property, the
291         // largest of which is that if another chan is waiting in this channel
292         // (but not received yet), then waiting on that port will cause a
293         // deadlock.
294         //
295         // So if we accept that we must now destroy the entire contents of the
296         // queue, this code may make a bit more sense. The tricky part is that
297         // we can't let any in-flight sends go un-dropped, we have to make sure
298         // *everything* is dropped and nothing new will come onto the channel.
299
300         // The first thing we do is set a flag saying that we're done for. All
301         // sends are gated on this flag, so we're immediately guaranteed that
302         // there are a bounded number of active sends that we'll have to deal
303         // with.
304         self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
305
306         // Now that we're guaranteed to deal with a bounded number of senders,
307         // we need to drain the queue. This draining process happens atomically
308         // with respect to the "count" of the channel. If the count is nonzero
309         // (with steals taken into account), then there must be data on the
310         // channel. In this case we drain everything and then try again. We will
311         // continue to fail while active senders send data while we're dropping
312         // data, but eventually we're guaranteed to break out of this loop
313         // (because there is a bounded number of senders).
314         let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
315         while {
316             let cnt = self.queue.producer_addition().cnt.compare_and_swap(
317                             steals, DISCONNECTED, Ordering::SeqCst);
318             cnt != DISCONNECTED && cnt != steals
319         } {
320             while let Some(_) = self.queue.pop() { steals += 1; }
321         }
322
323         // At this point in time, we have gated all future senders from sending,
324         // and we have flagged the channel as being disconnected. The senders
325         // still have some responsibility, however, because some sends may not
326         // complete until after we flag the disconnection. There are more
327         // details in the sending methods that see DISCONNECTED
328     }
329
330     ////////////////////////////////////////////////////////////////////////////
331     // select implementation
332     ////////////////////////////////////////////////////////////////////////////
333
334     // increment the count on the channel (used for selection)
335     fn bump(&self, amt: isize) -> isize {
336         match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
337             DISCONNECTED => {
338                 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
339                 DISCONNECTED
340             }
341             n => n
342         }
343     }
344
345     // Removes a previous thread from being blocked in this port
346     pub fn abort_selection(&self,
347                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
348         // If we're aborting selection after upgrading from a oneshot, then
349         // we're guarantee that no one is waiting. The only way that we could
350         // have seen the upgrade is if data was actually sent on the channel
351         // half again. For us, this means that there is guaranteed to be data on
352         // this channel. Furthermore, we're guaranteed that there was no
353         // start_selection previously, so there's no need to modify `self.cnt`
354         // at all.
355         //
356         // Hence, because of these invariants, we immediately return `Ok(true)`.
357         // Note that the data may not actually be sent on the channel just yet.
358         // The other end could have flagged the upgrade but not sent data to
359         // this end. This is fine because we know it's a small bounded windows
360         // of time until the data is actually sent.
361         if was_upgrade {
362             assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
363             assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
364             return Ok(true)
365         }
366
367         // We want to make sure that the count on the channel goes non-negative,
368         // and in the stream case we can have at most one steal, so just assume
369         // that we had one steal.
370         let steals = 1;
371         let prev = self.bump(steals + 1);
372
373         // If we were previously disconnected, then we know for sure that there
374         // is no thread in to_wake, so just keep going
375         let has_data = if prev == DISCONNECTED {
376             assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
377             true // there is data, that data is that we're disconnected
378         } else {
379             let cur = prev + steals + 1;
380             assert!(cur >= 0);
381
382             // If the previous count was negative, then we just made things go
383             // positive, hence we passed the -1 boundary and we're responsible
384             // for removing the to_wake() field and trashing it.
385             //
386             // If the previous count was positive then we're in a tougher
387             // situation. A possible race is that a sender just incremented
388             // through -1 (meaning it's going to try to wake a thread up), but it
389             // hasn't yet read the to_wake. In order to prevent a future recv()
390             // from waking up too early (this sender picking up the plastered
391             // over to_wake), we spin loop here waiting for to_wake to be 0.
392             // Note that this entire select() implementation needs an overhaul,
393             // and this is *not* the worst part of it, so this is not done as a
394             // final solution but rather out of necessity for now to get
395             // something working.
396             if prev < 0 {
397                 drop(self.take_to_wake());
398             } else {
399                 while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
400                     thread::yield_now();
401                 }
402             }
403             unsafe {
404                 assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
405                 *self.queue.consumer_addition().steals.get() = steals;
406             }
407
408             // if we were previously positive, then there's surely data to
409             // receive
410             prev >= 0
411         };
412
413         // Now that we've determined that this queue "has data", we peek at the
414         // queue to see if the data is an upgrade or not. If it's an upgrade,
415         // then we need to destroy this port and abort selection on the
416         // upgraded port.
417         if has_data {
418             match self.queue.peek() {
419                 Some(&mut GoUp(..)) => {
420                     match self.queue.pop() {
421                         Some(GoUp(port)) => Err(port),
422                         _ => unreachable!(),
423                     }
424                 }
425                 _ => Ok(true),
426             }
427         } else {
428             Ok(false)
429         }
430     }
431 }
432
433 impl<T> Drop for Packet<T> {
434     fn drop(&mut self) {
435         // Note that this load is not only an assert for correctness about
436         // disconnection, but also a proper fence before the read of
437         // `to_wake`, so this assert cannot be removed with also removing
438         // the `to_wake` assert.
439         assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
440         assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
441     }
442 }