]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/shared.rs
doc: remove incomplete sentence
[rust.git] / src / libstd / sync / mpsc / shared.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Shared channels
12 ///
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
20
21 pub use self::Failure::*;
22
23 use core::prelude::*;
24
25 use core::cmp;
26 use core::int;
27
28 use sync::{atomic, Mutex, MutexGuard};
29 use sync::mpsc::mpsc_queue as mpsc;
30 use sync::mpsc::blocking::{mod, SignalToken};
31 use sync::mpsc::select::StartResult;
32 use sync::mpsc::select::StartResult::*;
33 use thread::Thread;
34
35 const DISCONNECTED: int = int::MIN;
36 const FUDGE: int = 1024;
37 #[cfg(test)]
38 const MAX_STEALS: int = 5;
39 #[cfg(not(test))]
40 const MAX_STEALS: int = 1 << 20;
41
42 pub struct Packet<T> {
43     queue: mpsc::Queue<T>,
44     cnt: atomic::AtomicInt, // How many items are on this channel
45     steals: int, // How many times has a port received without blocking?
46     to_wake: atomic::AtomicUint, // SignalToken for wake up
47
48     // The number of channels which are currently using this packet.
49     channels: atomic::AtomicInt,
50
51     // See the discussion in Port::drop and the channel send methods for what
52     // these are used for
53     port_dropped: atomic::AtomicBool,
54     sender_drain: atomic::AtomicInt,
55
56     // this lock protects various portions of this implementation during
57     // select()
58     select_lock: Mutex<()>,
59 }
60
61 pub enum Failure {
62     Empty,
63     Disconnected,
64 }
65
66 impl<T: Send> Packet<T> {
67     // Creation of a packet *must* be followed by a call to postinit_lock
68     // and later by inherit_blocker
69     pub fn new() -> Packet<T> {
70         let p = Packet {
71             queue: mpsc::Queue::new(),
72             cnt: atomic::AtomicInt::new(0),
73             steals: 0,
74             to_wake: atomic::AtomicUint::new(0),
75             channels: atomic::AtomicInt::new(2),
76             port_dropped: atomic::AtomicBool::new(false),
77             sender_drain: atomic::AtomicInt::new(0),
78             select_lock: Mutex::new(()),
79         };
80         return p;
81     }
82
83     // This function should be used after newly created Packet
84     // was wrapped with an Arc
85     // In other case mutex data will be duplicated while cloning
86     // and that could cause problems on platforms where it is
87     // represented by opaque data structure
88     pub fn postinit_lock(&self) -> MutexGuard<()> {
89         self.select_lock.lock().unwrap()
90     }
91
92     // This function is used at the creation of a shared packet to inherit a
93     // previously blocked task. This is done to prevent spurious wakeups of
94     // tasks in select().
95     //
96     // This can only be called at channel-creation time
97     pub fn inherit_blocker(&mut self,
98                            token: Option<SignalToken>,
99                            guard: MutexGuard<()>) {
100         token.map(|token| {
101             assert_eq!(self.cnt.load(atomic::SeqCst), 0);
102             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
103             self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
104             self.cnt.store(-1, atomic::SeqCst);
105
106             // This store is a little sketchy. What's happening here is that
107             // we're transferring a blocker from a oneshot or stream channel to
108             // this shared channel. In doing so, we never spuriously wake them
109             // up and rather only wake them up at the appropriate time. This
110             // implementation of shared channels assumes that any blocking
111             // recv() will undo the increment of steals performed in try_recv()
112             // once the recv is complete.  This thread that we're inheriting,
113             // however, is not in the middle of recv. Hence, the first time we
114             // wake them up, they're going to wake up from their old port, move
115             // on to the upgraded port, and then call the block recv() function.
116             //
117             // When calling this function, they'll find there's data immediately
118             // available, counting it as a steal. This in fact wasn't a steal
119             // because we appropriately blocked them waiting for data.
120             //
121             // To offset this bad increment, we initially set the steal count to
122             // -1. You'll find some special code in abort_selection() as well to
123             // ensure that this -1 steal count doesn't escape too far.
124             self.steals = -1;
125         });
126
127         // When the shared packet is constructed, we grabbed this lock. The
128         // purpose of this lock is to ensure that abort_selection() doesn't
129         // interfere with this method. After we unlock this lock, we're
130         // signifying that we're done modifying self.cnt and self.to_wake and
131         // the port is ready for the world to continue using it.
132         drop(guard);
133     }
134
135     pub fn send(&mut self, t: T) -> Result<(), T> {
136         // See Port::drop for what's going on
137         if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
138
139         // Note that the multiple sender case is a little trickier
140         // semantically than the single sender case. The logic for
141         // incrementing is "add and if disconnected store disconnected".
142         // This could end up leading some senders to believe that there
143         // wasn't a disconnect if in fact there was a disconnect. This means
144         // that while one thread is attempting to re-store the disconnected
145         // states, other threads could walk through merrily incrementing
146         // this very-negative disconnected count. To prevent senders from
147         // spuriously attempting to send when the channels is actually
148         // disconnected, the count has a ranged check here.
149         //
150         // This is also done for another reason. Remember that the return
151         // value of this function is:
152         //
153         //  `true` == the data *may* be received, this essentially has no
154         //            meaning
155         //  `false` == the data will *never* be received, this has a lot of
156         //             meaning
157         //
158         // In the SPSC case, we have a check of 'queue.is_empty()' to see
159         // whether the data was actually received, but this same condition
160         // means nothing in a multi-producer context. As a result, this
161         // preflight check serves as the definitive "this will never be
162         // received". Once we get beyond this check, we have permanently
163         // entered the realm of "this may be received"
164         if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
165             return Err(t)
166         }
167
168         self.queue.push(t);
169         match self.cnt.fetch_add(1, atomic::SeqCst) {
170             -1 => {
171                 self.take_to_wake().signal();
172             }
173
174             // In this case, we have possibly failed to send our data, and
175             // we need to consider re-popping the data in order to fully
176             // destroy it. We must arbitrate among the multiple senders,
177             // however, because the queues that we're using are
178             // single-consumer queues. In order to do this, all exiting
179             // pushers will use an atomic count in order to count those
180             // flowing through. Pushers who see 0 are required to drain as
181             // much as possible, and then can only exit when they are the
182             // only pusher (otherwise they must try again).
183             n if n < DISCONNECTED + FUDGE => {
184                 // see the comment in 'try' for a shared channel for why this
185                 // window of "not disconnected" is ok.
186                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
187
188                 if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
189                     loop {
190                         // drain the queue, for info on the thread yield see the
191                         // discussion in try_recv
192                         loop {
193                             match self.queue.pop() {
194                                 mpsc::Data(..) => {}
195                                 mpsc::Empty => break,
196                                 mpsc::Inconsistent => Thread::yield_now(),
197                             }
198                         }
199                         // maybe we're done, if we're not the last ones
200                         // here, then we need to go try again.
201                         if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
202                             break
203                         }
204                     }
205
206                     // At this point, there may still be data on the queue,
207                     // but only if the count hasn't been incremented and
208                     // some other sender hasn't finished pushing data just
209                     // yet. That sender in question will drain its own data.
210                 }
211             }
212
213             // Can't make any assumptions about this case like in the SPSC case.
214             _ => {}
215         }
216
217         Ok(())
218     }
219
220     pub fn recv(&mut self) -> Result<T, Failure> {
221         // This code is essentially the exact same as that found in the stream
222         // case (see stream.rs)
223         match self.try_recv() {
224             Err(Empty) => {}
225             data => return data,
226         }
227
228         let (wait_token, signal_token) = blocking::tokens();
229         if self.decrement(signal_token) == Installed {
230             wait_token.wait()
231         }
232
233         match self.try_recv() {
234             data @ Ok(..) => { self.steals -= 1; data }
235             data => data,
236         }
237     }
238
239     // Essentially the exact same thing as the stream decrement function.
240     // Returns true if blocking should proceed.
241     fn decrement(&mut self, token: SignalToken) -> StartResult {
242         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
243         let ptr = unsafe { token.cast_to_uint() };
244         self.to_wake.store(ptr, atomic::SeqCst);
245
246         let steals = self.steals;
247         self.steals = 0;
248
249         match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
250             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
251             // If we factor in our steals and notice that the channel has no
252             // data, we successfully sleep
253             n => {
254                 assert!(n >= 0);
255                 if n - steals <= 0 { return Installed }
256             }
257         }
258
259         self.to_wake.store(0, atomic::SeqCst);
260         drop(unsafe { SignalToken::cast_from_uint(ptr) });
261         Abort
262     }
263
264     pub fn try_recv(&mut self) -> Result<T, Failure> {
265         let ret = match self.queue.pop() {
266             mpsc::Data(t) => Some(t),
267             mpsc::Empty => None,
268
269             // This is a bit of an interesting case. The channel is reported as
270             // having data available, but our pop() has failed due to the queue
271             // being in an inconsistent state.  This means that there is some
272             // pusher somewhere which has yet to complete, but we are guaranteed
273             // that a pop will eventually succeed. In this case, we spin in a
274             // yield loop because the remote sender should finish their enqueue
275             // operation "very quickly".
276             //
277             // Avoiding this yield loop would require a different queue
278             // abstraction which provides the guarantee that after M pushes have
279             // succeeded, at least M pops will succeed. The current queues
280             // guarantee that if there are N active pushes, you can pop N times
281             // once all N have finished.
282             mpsc::Inconsistent => {
283                 let data;
284                 loop {
285                     Thread::yield_now();
286                     match self.queue.pop() {
287                         mpsc::Data(t) => { data = t; break }
288                         mpsc::Empty => panic!("inconsistent => empty"),
289                         mpsc::Inconsistent => {}
290                     }
291                 }
292                 Some(data)
293             }
294         };
295         match ret {
296             // See the discussion in the stream implementation for why we
297             // might decrement steals.
298             Some(data) => {
299                 if self.steals > MAX_STEALS {
300                     match self.cnt.swap(0, atomic::SeqCst) {
301                         DISCONNECTED => {
302                             self.cnt.store(DISCONNECTED, atomic::SeqCst);
303                         }
304                         n => {
305                             let m = cmp::min(n, self.steals);
306                             self.steals -= m;
307                             self.bump(n - m);
308                         }
309                     }
310                     assert!(self.steals >= 0);
311                 }
312                 self.steals += 1;
313                 Ok(data)
314             }
315
316             // See the discussion in the stream implementation for why we try
317             // again.
318             None => {
319                 match self.cnt.load(atomic::SeqCst) {
320                     n if n != DISCONNECTED => Err(Empty),
321                     _ => {
322                         match self.queue.pop() {
323                             mpsc::Data(t) => Ok(t),
324                             mpsc::Empty => Err(Disconnected),
325                             // with no senders, an inconsistency is impossible.
326                             mpsc::Inconsistent => unreachable!(),
327                         }
328                     }
329                 }
330             }
331         }
332     }
333
334     // Prepares this shared packet for a channel clone, essentially just bumping
335     // a refcount.
336     pub fn clone_chan(&mut self) {
337         self.channels.fetch_add(1, atomic::SeqCst);
338     }
339
340     // Decrement the reference count on a channel. This is called whenever a
341     // Chan is dropped and may end up waking up a receiver. It's the receiver's
342     // responsibility on the other end to figure out that we've disconnected.
343     pub fn drop_chan(&mut self) {
344         match self.channels.fetch_sub(1, atomic::SeqCst) {
345             1 => {}
346             n if n > 1 => return,
347             n => panic!("bad number of channels left {}", n),
348         }
349
350         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
351             -1 => { self.take_to_wake().signal(); }
352             DISCONNECTED => {}
353             n => { assert!(n >= 0); }
354         }
355     }
356
357     // See the long discussion inside of stream.rs for why the queue is drained,
358     // and why it is done in this fashion.
359     pub fn drop_port(&mut self) {
360         self.port_dropped.store(true, atomic::SeqCst);
361         let mut steals = self.steals;
362         while {
363             let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
364             cnt != DISCONNECTED && cnt != steals
365         } {
366             // See the discussion in 'try_recv' for why we yield
367             // control of this thread.
368             loop {
369                 match self.queue.pop() {
370                     mpsc::Data(..) => { steals += 1; }
371                     mpsc::Empty | mpsc::Inconsistent => break,
372                 }
373             }
374         }
375     }
376
377     // Consumes ownership of the 'to_wake' field.
378     fn take_to_wake(&mut self) -> SignalToken {
379         let ptr = self.to_wake.load(atomic::SeqCst);
380         self.to_wake.store(0, atomic::SeqCst);
381         assert!(ptr != 0);
382         unsafe { SignalToken::cast_from_uint(ptr) }
383     }
384
385     ////////////////////////////////////////////////////////////////////////////
386     // select implementation
387     ////////////////////////////////////////////////////////////////////////////
388
389     // Helper function for select, tests whether this port can receive without
390     // blocking (obviously not an atomic decision).
391     //
392     // This is different than the stream version because there's no need to peek
393     // at the queue, we can just look at the local count.
394     pub fn can_recv(&mut self) -> bool {
395         let cnt = self.cnt.load(atomic::SeqCst);
396         cnt == DISCONNECTED || cnt - self.steals > 0
397     }
398
399     // increment the count on the channel (used for selection)
400     fn bump(&mut self, amt: int) -> int {
401         match self.cnt.fetch_add(amt, atomic::SeqCst) {
402             DISCONNECTED => {
403                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
404                 DISCONNECTED
405             }
406             n => n
407         }
408     }
409
410     // Inserts the signal token for selection on this port, returning true if
411     // blocking should proceed.
412     //
413     // The code here is the same as in stream.rs, except that it doesn't need to
414     // peek at the channel to see if an upgrade is pending.
415     pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
416         match self.decrement(token) {
417             Installed => Installed,
418             Abort => {
419                 let prev = self.bump(1);
420                 assert!(prev == DISCONNECTED || prev >= 0);
421                 Abort
422             }
423         }
424     }
425
426     // Cancels a previous task waiting on this port, returning whether there's
427     // data on the port.
428     //
429     // This is similar to the stream implementation (hence fewer comments), but
430     // uses a different value for the "steals" variable.
431     pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
432         // Before we do anything else, we bounce on this lock. The reason for
433         // doing this is to ensure that any upgrade-in-progress is gone and
434         // done with. Without this bounce, we can race with inherit_blocker
435         // about looking at and dealing with to_wake. Once we have acquired the
436         // lock, we are guaranteed that inherit_blocker is done.
437         {
438             let _guard = self.select_lock.lock().unwrap();
439         }
440
441         // Like the stream implementation, we want to make sure that the count
442         // on the channel goes non-negative. We don't know how negative the
443         // stream currently is, so instead of using a steal value of 1, we load
444         // the channel count and figure out what we should do to make it
445         // positive.
446         let steals = {
447             let cnt = self.cnt.load(atomic::SeqCst);
448             if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
449         };
450         let prev = self.bump(steals + 1);
451
452         if prev == DISCONNECTED {
453             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
454             true
455         } else {
456             let cur = prev + steals + 1;
457             assert!(cur >= 0);
458             if prev < 0 {
459                 drop(self.take_to_wake());
460             } else {
461                 while self.to_wake.load(atomic::SeqCst) != 0 {
462                     Thread::yield_now();
463                 }
464             }
465             // if the number of steals is -1, it was the pre-emptive -1 steal
466             // count from when we inherited a blocker. This is fine because
467             // we're just going to overwrite it with a real value.
468             assert!(self.steals == 0 || self.steals == -1);
469             self.steals = steals;
470             prev >= 0
471         }
472     }
473 }
474
475 #[unsafe_destructor]
476 impl<T: Send> Drop for Packet<T> {
477     fn drop(&mut self) {
478         // Note that this load is not only an assert for correctness about
479         // disconnection, but also a proper fence before the read of
480         // `to_wake`, so this assert cannot be removed with also removing
481         // the `to_wake` assert.
482         assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
483         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
484         assert_eq!(self.channels.load(atomic::SeqCst), 0);
485     }
486 }