1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
23 use alloc::boxed::Box;
26 use rustrt::local::Local;
27 use rustrt::mutex::NativeMutex;
28 use rustrt::task::{Task, BlockedTask};
29 use rustrt::thread::Thread;
32 use mpsc = mpsc_queue;
34 static DISCONNECTED: int = int::MIN;
35 static FUDGE: int = 1024;
37 static MAX_STEALS: int = 5;
39 static MAX_STEALS: int = 1 << 20;
41 pub struct Packet<T> {
42 queue: mpsc::Queue<T>,
43 cnt: atomics::AtomicInt, // How many items are on this channel
44 steals: int, // How many times has a port received without blocking?
45 to_wake: atomics::AtomicUint, // Task to wake up
47 // The number of channels which are currently using this packet.
48 channels: atomics::AtomicInt,
50 // See the discussion in Port::drop and the channel send methods for what
52 port_dropped: atomics::AtomicBool,
53 sender_drain: atomics::AtomicInt,
55 // this lock protects various portions of this implementation during
57 select_lock: NativeMutex,
65 impl<T: Send> Packet<T> {
66 // Creation of a packet *must* be followed by a call to postinit_lock
67 // and later by inherit_blocker
68 pub fn new() -> Packet<T> {
70 queue: mpsc::Queue::new(),
71 cnt: atomics::AtomicInt::new(0),
73 to_wake: atomics::AtomicUint::new(0),
74 channels: atomics::AtomicInt::new(2),
75 port_dropped: atomics::AtomicBool::new(false),
76 sender_drain: atomics::AtomicInt::new(0),
77 select_lock: unsafe { NativeMutex::new() },
82 // This function should be used after newly created Packet
83 // was wrapped with an Arc
84 // In other case mutex data will be duplicated while cloning
85 // and that could cause problems on platforms where it is
86 // represented by opaque data structure
87 pub fn postinit_lock(&mut self) {
88 unsafe { self.select_lock.lock_noguard() }
91 // This function is used at the creation of a shared packet to inherit a
92 // previously blocked task. This is done to prevent spurious wakeups of
95 // This can only be called at channel-creation time
96 pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
99 assert_eq!(self.cnt.load(atomics::SeqCst), 0);
100 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
101 self.to_wake.store(unsafe { task.cast_to_uint() },
103 self.cnt.store(-1, atomics::SeqCst);
105 // This store is a little sketchy. What's happening here is
106 // that we're transferring a blocker from a oneshot or stream
107 // channel to this shared channel. In doing so, we never
108 // spuriously wake them up and rather only wake them up at the
109 // appropriate time. This implementation of shared channels
110 // assumes that any blocking recv() will undo the increment of
111 // steals performed in try_recv() once the recv is complete.
112 // This thread that we're inheriting, however, is not in the
113 // middle of recv. Hence, the first time we wake them up,
114 // they're going to wake up from their old port, move on to the
115 // upgraded port, and then call the block recv() function.
117 // When calling this function, they'll find there's data
118 // immediately available, counting it as a steal. This in fact
119 // wasn't a steal because we appropriately blocked them waiting
122 // To offset this bad increment, we initially set the steal
123 // count to -1. You'll find some special code in
124 // abort_selection() as well to ensure that this -1 steal count
125 // doesn't escape too far.
131 // When the shared packet is constructed, we grabbed this lock. The
132 // purpose of this lock is to ensure that abort_selection() doesn't
133 // interfere with this method. After we unlock this lock, we're
134 // signifying that we're done modifying self.cnt and self.to_wake and
135 // the port is ready for the world to continue using it.
136 unsafe { self.select_lock.unlock_noguard() }
139 pub fn send(&mut self, t: T) -> Result<(), T> {
140 // See Port::drop for what's going on
141 if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
143 // Note that the multiple sender case is a little trickier
144 // semantically than the single sender case. The logic for
145 // incrementing is "add and if disconnected store disconnected".
146 // This could end up leading some senders to believe that there
147 // wasn't a disconnect if in fact there was a disconnect. This means
148 // that while one thread is attempting to re-store the disconnected
149 // states, other threads could walk through merrily incrementing
150 // this very-negative disconnected count. To prevent senders from
151 // spuriously attempting to send when the channels is actually
152 // disconnected, the count has a ranged check here.
154 // This is also done for another reason. Remember that the return
155 // value of this function is:
157 // `true` == the data *may* be received, this essentially has no
159 // `false` == the data will *never* be received, this has a lot of
162 // In the SPSC case, we have a check of 'queue.is_empty()' to see
163 // whether the data was actually received, but this same condition
164 // means nothing in a multi-producer context. As a result, this
165 // preflight check serves as the definitive "this will never be
166 // received". Once we get beyond this check, we have permanently
167 // entered the realm of "this may be received"
168 if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
173 match self.cnt.fetch_add(1, atomics::SeqCst) {
175 self.take_to_wake().wake().map(|t| t.reawaken());
178 // In this case, we have possibly failed to send our data, and
179 // we need to consider re-popping the data in order to fully
180 // destroy it. We must arbitrate among the multiple senders,
181 // however, because the queues that we're using are
182 // single-consumer queues. In order to do this, all exiting
183 // pushers will use an atomic count in order to count those
184 // flowing through. Pushers who see 0 are required to drain as
185 // much as possible, and then can only exit when they are the
186 // only pusher (otherwise they must try again).
187 n if n < DISCONNECTED + FUDGE => {
188 // see the comment in 'try' for a shared channel for why this
189 // window of "not disconnected" is ok.
190 self.cnt.store(DISCONNECTED, atomics::SeqCst);
192 if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
194 // drain the queue, for info on the thread yield see the
195 // discussion in try_recv
197 match self.queue.pop() {
199 mpsc::Empty => break,
200 mpsc::Inconsistent => Thread::yield_now(),
203 // maybe we're done, if we're not the last ones
204 // here, then we need to go try again.
205 if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
210 // At this point, there may still be data on the queue,
211 // but only if the count hasn't been incremented and
212 // some other sender hasn't finished pushing data just
213 // yet. That sender in question will drain its own data.
217 // Can't make any assumptions about this case like in the SPSC case.
224 pub fn recv(&mut self) -> Result<T, Failure> {
225 // This code is essentially the exact same as that found in the stream
226 // case (see stream.rs)
227 match self.try_recv() {
232 let task: Box<Task> = Local::take();
233 task.deschedule(1, |task| {
237 match self.try_recv() {
238 data @ Ok(..) => { self.steals -= 1; data }
243 // Essentially the exact same thing as the stream decrement function.
244 fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
245 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
246 let n = unsafe { task.cast_to_uint() };
247 self.to_wake.store(n, atomics::SeqCst);
249 let steals = self.steals;
252 match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
253 DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
254 // If we factor in our steals and notice that the channel has no
255 // data, we successfully sleep
258 if n - steals <= 0 { return Ok(()) }
262 self.to_wake.store(0, atomics::SeqCst);
263 Err(unsafe { BlockedTask::cast_from_uint(n) })
266 pub fn try_recv(&mut self) -> Result<T, Failure> {
267 let ret = match self.queue.pop() {
268 mpsc::Data(t) => Some(t),
271 // This is a bit of an interesting case. The channel is
272 // reported as having data available, but our pop() has
273 // failed due to the queue being in an inconsistent state.
274 // This means that there is some pusher somewhere which has
275 // yet to complete, but we are guaranteed that a pop will
276 // eventually succeed. In this case, we spin in a yield loop
277 // because the remote sender should finish their enqueue
278 // operation "very quickly".
280 // Note that this yield loop does *not* attempt to do a green
281 // yield (regardless of the context), but *always* performs an
282 // OS-thread yield. The reasoning for this is that the pusher in
283 // question which is causing the inconsistent state is
284 // guaranteed to *not* be a blocked task (green tasks can't get
285 // pre-empted), so it must be on a different OS thread. Also,
286 // `try_recv` is normally a "guaranteed no rescheduling" context
287 // in a green-thread situation. By yielding control of the
288 // thread, we will hopefully allow time for the remote task on
289 // the other OS thread to make progress.
291 // Avoiding this yield loop would require a different queue
292 // abstraction which provides the guarantee that after M
293 // pushes have succeeded, at least M pops will succeed. The
294 // current queues guarantee that if there are N active
295 // pushes, you can pop N times once all N have finished.
296 mpsc::Inconsistent => {
300 match self.queue.pop() {
301 mpsc::Data(t) => { data = t; break }
302 mpsc::Empty => fail!("inconsistent => empty"),
303 mpsc::Inconsistent => {}
310 // See the discussion in the stream implementation for why we
311 // might decrement steals.
313 if self.steals > MAX_STEALS {
314 match self.cnt.swap(0, atomics::SeqCst) {
316 self.cnt.store(DISCONNECTED, atomics::SeqCst);
319 let m = cmp::min(n, self.steals);
324 assert!(self.steals >= 0);
330 // See the discussion in the stream implementation for why we try
333 match self.cnt.load(atomics::SeqCst) {
334 n if n != DISCONNECTED => Err(Empty),
336 match self.queue.pop() {
337 mpsc::Data(t) => Ok(t),
338 mpsc::Empty => Err(Disconnected),
339 // with no senders, an inconsistency is impossible.
340 mpsc::Inconsistent => unreachable!(),
348 // Prepares this shared packet for a channel clone, essentially just bumping
350 pub fn clone_chan(&mut self) {
351 self.channels.fetch_add(1, atomics::SeqCst);
354 // Decrement the reference count on a channel. This is called whenever a
355 // Chan is dropped and may end up waking up a receiver. It's the receiver's
356 // responsibility on the other end to figure out that we've disconnected.
357 pub fn drop_chan(&mut self) {
358 match self.channels.fetch_sub(1, atomics::SeqCst) {
360 n if n > 1 => return,
361 n => fail!("bad number of channels left {}", n),
364 match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
365 -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
367 n => { assert!(n >= 0); }
371 // See the long discussion inside of stream.rs for why the queue is drained,
372 // and why it is done in this fashion.
373 pub fn drop_port(&mut self) {
374 self.port_dropped.store(true, atomics::SeqCst);
375 let mut steals = self.steals;
377 let cnt = self.cnt.compare_and_swap(
378 steals, DISCONNECTED, atomics::SeqCst);
379 cnt != DISCONNECTED && cnt != steals
381 // See the discussion in 'try_recv' for why we yield
382 // control of this thread.
384 match self.queue.pop() {
385 mpsc::Data(..) => { steals += 1; }
386 mpsc::Empty | mpsc::Inconsistent => break,
392 // Consumes ownership of the 'to_wake' field.
393 fn take_to_wake(&mut self) -> BlockedTask {
394 let task = self.to_wake.load(atomics::SeqCst);
395 self.to_wake.store(0, atomics::SeqCst);
397 unsafe { BlockedTask::cast_from_uint(task) }
400 ////////////////////////////////////////////////////////////////////////////
401 // select implementation
402 ////////////////////////////////////////////////////////////////////////////
404 // Helper function for select, tests whether this port can receive without
405 // blocking (obviously not an atomic decision).
407 // This is different than the stream version because there's no need to peek
408 // at the queue, we can just look at the local count.
409 pub fn can_recv(&mut self) -> bool {
410 let cnt = self.cnt.load(atomics::SeqCst);
411 cnt == DISCONNECTED || cnt - self.steals > 0
414 // increment the count on the channel (used for selection)
415 fn bump(&mut self, amt: int) -> int {
416 match self.cnt.fetch_add(amt, atomics::SeqCst) {
418 self.cnt.store(DISCONNECTED, atomics::SeqCst);
425 // Inserts the blocked task for selection on this port, returning it back if
426 // the port already has data on it.
428 // The code here is the same as in stream.rs, except that it doesn't need to
429 // peek at the channel to see if an upgrade is pending.
430 pub fn start_selection(&mut self,
431 task: BlockedTask) -> Result<(), BlockedTask> {
432 match self.decrement(task) {
435 let prev = self.bump(1);
436 assert!(prev == DISCONNECTED || prev >= 0);
442 // Cancels a previous task waiting on this port, returning whether there's
445 // This is similar to the stream implementation (hence fewer comments), but
446 // uses a different value for the "steals" variable.
447 pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
448 // Before we do anything else, we bounce on this lock. The reason for
449 // doing this is to ensure that any upgrade-in-progress is gone and
450 // done with. Without this bounce, we can race with inherit_blocker
451 // about looking at and dealing with to_wake. Once we have acquired the
452 // lock, we are guaranteed that inherit_blocker is done.
454 let _guard = self.select_lock.lock();
457 // Like the stream implementation, we want to make sure that the count
458 // on the channel goes non-negative. We don't know how negative the
459 // stream currently is, so instead of using a steal value of 1, we load
460 // the channel count and figure out what we should do to make it
463 let cnt = self.cnt.load(atomics::SeqCst);
464 if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
466 let prev = self.bump(steals + 1);
468 if prev == DISCONNECTED {
469 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
472 let cur = prev + steals + 1;
475 self.take_to_wake().trash();
477 while self.to_wake.load(atomics::SeqCst) != 0 {
481 // if the number of steals is -1, it was the pre-emptive -1 steal
482 // count from when we inherited a blocker. This is fine because
483 // we're just going to overwrite it with a real value.
484 assert!(self.steals == 0 || self.steals == -1);
485 self.steals = steals;
492 impl<T: Send> Drop for Packet<T> {
494 // Note that this load is not only an assert for correctness about
495 // disconnection, but also a proper fence before the read of
496 // `to_wake`, so this assert cannot be removed with also removing
497 // the `to_wake` assert.
498 assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
499 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
500 assert_eq!(self.channels.load(atomics::SeqCst), 0);