1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
21 pub use self::Failure::*;
28 use sync::{atomic, Mutex, MutexGuard};
29 use sync::mpsc::mpsc_queue as mpsc;
30 use sync::mpsc::blocking::{mod, SignalToken};
31 use sync::mpsc::select::StartResult;
32 use sync::mpsc::select::StartResult::*;
35 const DISCONNECTED: int = int::MIN;
36 const FUDGE: int = 1024;
38 const MAX_STEALS: int = 5;
40 const MAX_STEALS: int = 1 << 20;
42 pub struct Packet<T> {
43 queue: mpsc::Queue<T>,
44 cnt: atomic::AtomicInt, // How many items are on this channel
45 steals: int, // How many times has a port received without blocking?
46 to_wake: atomic::AtomicUint, // SignalToken for wake up
48 // The number of channels which are currently using this packet.
49 channels: atomic::AtomicInt,
51 // See the discussion in Port::drop and the channel send methods for what
53 port_dropped: atomic::AtomicBool,
54 sender_drain: atomic::AtomicInt,
56 // this lock protects various portions of this implementation during
58 select_lock: Mutex<()>,
66 impl<T: Send> Packet<T> {
67 // Creation of a packet *must* be followed by a call to postinit_lock
68 // and later by inherit_blocker
69 pub fn new() -> Packet<T> {
71 queue: mpsc::Queue::new(),
72 cnt: atomic::AtomicInt::new(0),
74 to_wake: atomic::AtomicUint::new(0),
75 channels: atomic::AtomicInt::new(2),
76 port_dropped: atomic::AtomicBool::new(false),
77 sender_drain: atomic::AtomicInt::new(0),
78 select_lock: Mutex::new(()),
83 // This function should be used after newly created Packet
84 // was wrapped with an Arc
85 // In other case mutex data will be duplicated while cloning
86 // and that could cause problems on platforms where it is
87 // represented by opaque data structure
88 pub fn postinit_lock(&self) -> MutexGuard<()> {
89 self.select_lock.lock().unwrap()
92 // This function is used at the creation of a shared packet to inherit a
93 // previously blocked task. This is done to prevent spurious wakeups of
96 // This can only be called at channel-creation time
97 pub fn inherit_blocker(&mut self,
98 token: Option<SignalToken>,
99 guard: MutexGuard<()>) {
101 assert_eq!(self.cnt.load(atomic::SeqCst), 0);
102 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
103 self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
104 self.cnt.store(-1, atomic::SeqCst);
106 // This store is a little sketchy. What's happening here is that
107 // we're transferring a blocker from a oneshot or stream channel to
108 // this shared channel. In doing so, we never spuriously wake them
109 // up and rather only wake them up at the appropriate time. This
110 // implementation of shared channels assumes that any blocking
111 // recv() will undo the increment of steals performed in try_recv()
112 // once the recv is complete. This thread that we're inheriting,
113 // however, is not in the middle of recv. Hence, the first time we
114 // wake them up, they're going to wake up from their old port, move
115 // on to the upgraded port, and then call the block recv() function.
117 // When calling this function, they'll find there's data immediately
118 // available, counting it as a steal. This in fact wasn't a steal
119 // because we appropriately blocked them waiting for data.
121 // To offset this bad increment, we initially set the steal count to
122 // -1. You'll find some special code in abort_selection() as well to
123 // ensure that this -1 steal count doesn't escape too far.
127 // When the shared packet is constructed, we grabbed this lock. The
128 // purpose of this lock is to ensure that abort_selection() doesn't
129 // interfere with this method. After we unlock this lock, we're
130 // signifying that we're done modifying self.cnt and self.to_wake and
131 // the port is ready for the world to continue using it.
135 pub fn send(&mut self, t: T) -> Result<(), T> {
136 // See Port::drop for what's going on
137 if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
139 // Note that the multiple sender case is a little trickier
140 // semantically than the single sender case. The logic for
141 // incrementing is "add and if disconnected store disconnected".
142 // This could end up leading some senders to believe that there
143 // wasn't a disconnect if in fact there was a disconnect. This means
144 // that while one thread is attempting to re-store the disconnected
145 // states, other threads could walk through merrily incrementing
146 // this very-negative disconnected count. To prevent senders from
147 // spuriously attempting to send when the channels is actually
148 // disconnected, the count has a ranged check here.
150 // This is also done for another reason. Remember that the return
151 // value of this function is:
153 // `true` == the data *may* be received, this essentially has no
155 // `false` == the data will *never* be received, this has a lot of
158 // In the SPSC case, we have a check of 'queue.is_empty()' to see
159 // whether the data was actually received, but this same condition
160 // means nothing in a multi-producer context. As a result, this
161 // preflight check serves as the definitive "this will never be
162 // received". Once we get beyond this check, we have permanently
163 // entered the realm of "this may be received"
164 if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
169 match self.cnt.fetch_add(1, atomic::SeqCst) {
171 self.take_to_wake().signal();
174 // In this case, we have possibly failed to send our data, and
175 // we need to consider re-popping the data in order to fully
176 // destroy it. We must arbitrate among the multiple senders,
177 // however, because the queues that we're using are
178 // single-consumer queues. In order to do this, all exiting
179 // pushers will use an atomic count in order to count those
180 // flowing through. Pushers who see 0 are required to drain as
181 // much as possible, and then can only exit when they are the
182 // only pusher (otherwise they must try again).
183 n if n < DISCONNECTED + FUDGE => {
184 // see the comment in 'try' for a shared channel for why this
185 // window of "not disconnected" is ok.
186 self.cnt.store(DISCONNECTED, atomic::SeqCst);
188 if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
190 // drain the queue, for info on the thread yield see the
191 // discussion in try_recv
193 match self.queue.pop() {
195 mpsc::Empty => break,
196 mpsc::Inconsistent => Thread::yield_now(),
199 // maybe we're done, if we're not the last ones
200 // here, then we need to go try again.
201 if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
206 // At this point, there may still be data on the queue,
207 // but only if the count hasn't been incremented and
208 // some other sender hasn't finished pushing data just
209 // yet. That sender in question will drain its own data.
213 // Can't make any assumptions about this case like in the SPSC case.
220 pub fn recv(&mut self) -> Result<T, Failure> {
221 // This code is essentially the exact same as that found in the stream
222 // case (see stream.rs)
223 match self.try_recv() {
228 let (wait_token, signal_token) = blocking::tokens();
229 if self.decrement(signal_token) == Installed {
233 match self.try_recv() {
234 data @ Ok(..) => { self.steals -= 1; data }
239 // Essentially the exact same thing as the stream decrement function.
240 // Returns true if blocking should proceed.
241 fn decrement(&mut self, token: SignalToken) -> StartResult {
242 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
243 let ptr = unsafe { token.cast_to_uint() };
244 self.to_wake.store(ptr, atomic::SeqCst);
246 let steals = self.steals;
249 match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
250 DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
251 // If we factor in our steals and notice that the channel has no
252 // data, we successfully sleep
255 if n - steals <= 0 { return Installed }
259 self.to_wake.store(0, atomic::SeqCst);
260 drop(unsafe { SignalToken::cast_from_uint(ptr) });
264 pub fn try_recv(&mut self) -> Result<T, Failure> {
265 let ret = match self.queue.pop() {
266 mpsc::Data(t) => Some(t),
269 // This is a bit of an interesting case. The channel is reported as
270 // having data available, but our pop() has failed due to the queue
271 // being in an inconsistent state. This means that there is some
272 // pusher somewhere which has yet to complete, but we are guaranteed
273 // that a pop will eventually succeed. In this case, we spin in a
274 // yield loop because the remote sender should finish their enqueue
275 // operation "very quickly".
277 // Avoiding this yield loop would require a different queue
278 // abstraction which provides the guarantee that after M pushes have
279 // succeeded, at least M pops will succeed. The current queues
280 // guarantee that if there are N active pushes, you can pop N times
281 // once all N have finished.
282 mpsc::Inconsistent => {
286 match self.queue.pop() {
287 mpsc::Data(t) => { data = t; break }
288 mpsc::Empty => panic!("inconsistent => empty"),
289 mpsc::Inconsistent => {}
296 // See the discussion in the stream implementation for why we
297 // might decrement steals.
299 if self.steals > MAX_STEALS {
300 match self.cnt.swap(0, atomic::SeqCst) {
302 self.cnt.store(DISCONNECTED, atomic::SeqCst);
305 let m = cmp::min(n, self.steals);
310 assert!(self.steals >= 0);
316 // See the discussion in the stream implementation for why we try
319 match self.cnt.load(atomic::SeqCst) {
320 n if n != DISCONNECTED => Err(Empty),
322 match self.queue.pop() {
323 mpsc::Data(t) => Ok(t),
324 mpsc::Empty => Err(Disconnected),
325 // with no senders, an inconsistency is impossible.
326 mpsc::Inconsistent => unreachable!(),
334 // Prepares this shared packet for a channel clone, essentially just bumping
336 pub fn clone_chan(&mut self) {
337 self.channels.fetch_add(1, atomic::SeqCst);
340 // Decrement the reference count on a channel. This is called whenever a
341 // Chan is dropped and may end up waking up a receiver. It's the receiver's
342 // responsibility on the other end to figure out that we've disconnected.
343 pub fn drop_chan(&mut self) {
344 match self.channels.fetch_sub(1, atomic::SeqCst) {
346 n if n > 1 => return,
347 n => panic!("bad number of channels left {}", n),
350 match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
351 -1 => { self.take_to_wake().signal(); }
353 n => { assert!(n >= 0); }
357 // See the long discussion inside of stream.rs for why the queue is drained,
358 // and why it is done in this fashion.
359 pub fn drop_port(&mut self) {
360 self.port_dropped.store(true, atomic::SeqCst);
361 let mut steals = self.steals;
363 let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
364 cnt != DISCONNECTED && cnt != steals
366 // See the discussion in 'try_recv' for why we yield
367 // control of this thread.
369 match self.queue.pop() {
370 mpsc::Data(..) => { steals += 1; }
371 mpsc::Empty | mpsc::Inconsistent => break,
377 // Consumes ownership of the 'to_wake' field.
378 fn take_to_wake(&mut self) -> SignalToken {
379 let ptr = self.to_wake.load(atomic::SeqCst);
380 self.to_wake.store(0, atomic::SeqCst);
382 unsafe { SignalToken::cast_from_uint(ptr) }
385 ////////////////////////////////////////////////////////////////////////////
386 // select implementation
387 ////////////////////////////////////////////////////////////////////////////
389 // Helper function for select, tests whether this port can receive without
390 // blocking (obviously not an atomic decision).
392 // This is different than the stream version because there's no need to peek
393 // at the queue, we can just look at the local count.
394 pub fn can_recv(&mut self) -> bool {
395 let cnt = self.cnt.load(atomic::SeqCst);
396 cnt == DISCONNECTED || cnt - self.steals > 0
399 // increment the count on the channel (used for selection)
400 fn bump(&mut self, amt: int) -> int {
401 match self.cnt.fetch_add(amt, atomic::SeqCst) {
403 self.cnt.store(DISCONNECTED, atomic::SeqCst);
410 // Inserts the signal token for selection on this port, returning true if
411 // blocking should proceed.
413 // The code here is the same as in stream.rs, except that it doesn't need to
414 // peek at the channel to see if an upgrade is pending.
415 pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
416 match self.decrement(token) {
417 Installed => Installed,
419 let prev = self.bump(1);
420 assert!(prev == DISCONNECTED || prev >= 0);
426 // Cancels a previous task waiting on this port, returning whether there's
429 // This is similar to the stream implementation (hence fewer comments), but
430 // uses a different value for the "steals" variable.
431 pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
432 // Before we do anything else, we bounce on this lock. The reason for
433 // doing this is to ensure that any upgrade-in-progress is gone and
434 // done with. Without this bounce, we can race with inherit_blocker
435 // about looking at and dealing with to_wake. Once we have acquired the
436 // lock, we are guaranteed that inherit_blocker is done.
438 let _guard = self.select_lock.lock().unwrap();
441 // Like the stream implementation, we want to make sure that the count
442 // on the channel goes non-negative. We don't know how negative the
443 // stream currently is, so instead of using a steal value of 1, we load
444 // the channel count and figure out what we should do to make it
447 let cnt = self.cnt.load(atomic::SeqCst);
448 if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
450 let prev = self.bump(steals + 1);
452 if prev == DISCONNECTED {
453 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
456 let cur = prev + steals + 1;
459 drop(self.take_to_wake());
461 while self.to_wake.load(atomic::SeqCst) != 0 {
465 // if the number of steals is -1, it was the pre-emptive -1 steal
466 // count from when we inherited a blocker. This is fine because
467 // we're just going to overwrite it with a real value.
468 assert!(self.steals == 0 || self.steals == -1);
469 self.steals = steals;
476 impl<T: Send> Drop for Packet<T> {
478 // Note that this load is not only an assert for correctness about
479 // disconnection, but also a proper fence before the read of
480 // `to_wake`, so this assert cannot be removed with also removing
481 // the `to_wake` assert.
482 assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
483 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
484 assert_eq!(self.channels.load(atomic::SeqCst), 0);