1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 /// This is the flavor of channels which are optimized for one sender and one
14 /// receiver. The sender will be upgraded to a shared channel if the channel is
17 /// High level implementation details can be found in the comment of the parent
22 use alloc::boxed::Box;
25 use rustrt::local::Local;
26 use rustrt::task::{Task, BlockedTask};
27 use rustrt::thread::Thread;
31 use spsc = spsc_queue;
33 static DISCONNECTED: int = int::MIN;
35 static MAX_STEALS: int = 5;
37 static MAX_STEALS: int = 1 << 20;
39 pub struct Packet<T> {
40 queue: spsc::Queue<Message<T>>, // internal queue for all message
42 cnt: atomics::AtomicInt, // How many items are on this channel
43 steals: int, // How many times has a port received without blocking?
44 to_wake: atomics::AtomicUint, // Task to wake up
46 port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
52 Upgraded(Receiver<T>),
55 pub enum UpgradeResult {
61 pub enum SelectionResult<T> {
63 SelCanceled(BlockedTask),
64 SelUpgraded(BlockedTask, Receiver<T>),
67 // Any message could contain an "upgrade request" to a new shared port, so the
68 // internal queue it's a queue of T, but rather Message<T>
74 impl<T: Send> Packet<T> {
75 pub fn new() -> Packet<T> {
77 queue: spsc::Queue::new(128),
79 cnt: atomics::AtomicInt::new(0),
81 to_wake: atomics::AtomicUint::new(0),
83 port_dropped: atomics::AtomicBool::new(false),
88 pub fn send(&mut self, t: T) -> Result<(), T> {
89 // If the other port has deterministically gone away, then definitely
90 // must return the data back up the stack. Otherwise, the data is
91 // considered as being sent.
92 if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
94 match self.do_send(Data(t)) {
95 UpSuccess | UpDisconnected => {},
96 UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
100 pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
101 // If the port has gone away, then there's no need to proceed any
103 if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
105 self.do_send(GoUp(up))
108 fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
110 match self.cnt.fetch_add(1, atomics::SeqCst) {
111 // As described in the mod's doc comment, -1 == wakeup
112 -1 => UpWoke(self.take_to_wake()),
113 // As as described before, SPSC queues must be >= -2
116 // Be sure to preserve the disconnected state, and the return value
117 // in this case is going to be whether our data was received or not.
118 // This manifests itself on whether we have an empty queue or not.
120 // Primarily, are required to drain the queue here because the port
121 // will never remove this data. We can only have at most one item to
122 // drain (the port drains the rest).
124 self.cnt.store(DISCONNECTED, atomics::SeqCst);
125 let first = self.queue.pop();
126 let second = self.queue.pop();
127 assert!(second.is_none());
130 Some(..) => UpSuccess, // we failed to send the data
131 None => UpDisconnected, // we successfully sent data
135 // Otherwise we just sent some data on a non-waiting queue, so just
136 // make sure the world is sane and carry on!
137 n => { assert!(n >= 0); UpSuccess }
141 // Consumes ownership of the 'to_wake' field.
142 fn take_to_wake(&mut self) -> BlockedTask {
143 let task = self.to_wake.load(atomics::SeqCst);
144 self.to_wake.store(0, atomics::SeqCst);
146 unsafe { BlockedTask::cast_from_uint(task) }
149 // Decrements the count on the channel for a sleeper, returning the sleeper
150 // back if it shouldn't sleep. Note that this is the location where we take
151 // steals into account.
152 fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
153 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
154 let n = unsafe { task.cast_to_uint() };
155 self.to_wake.store(n, atomics::SeqCst);
157 let steals = self.steals;
160 match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
161 DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
162 // If we factor in our steals and notice that the channel has no
163 // data, we successfully sleep
166 if n - steals <= 0 { return Ok(()) }
170 self.to_wake.store(0, atomics::SeqCst);
171 Err(unsafe { BlockedTask::cast_from_uint(n) })
174 pub fn recv(&mut self) -> Result<T, Failure<T>> {
175 // Optimistic preflight check (scheduling is expensive).
176 match self.try_recv() {
181 // Welp, our channel has no data. Deschedule the current task and
182 // initiate the blocking protocol.
183 let task: Box<Task> = Local::take();
184 task.deschedule(1, |task| {
188 match self.try_recv() {
189 // Messages which actually popped from the queue shouldn't count as
190 // a steal, so offset the decrement here (we already have our
191 // "steal" factored into the channel count above).
193 data @ Err(Upgraded(..)) => {
202 pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
203 match self.queue.pop() {
204 // If we stole some data, record to that effect (this will be
205 // factored into cnt later on).
207 // Note that we don't allow steals to grow without bound in order to
208 // prevent eventual overflow of either steals or cnt as an overflow
209 // would have catastrophic results. Sometimes, steals > cnt, but
210 // other times cnt > steals, so we don't know the relation between
211 // steals and cnt. This code path is executed only rarely, so we do
212 // a pretty slow operation, of swapping 0 into cnt, taking steals
213 // down as much as possible (without going negative), and then
214 // adding back in whatever we couldn't factor into steals.
216 if self.steals > MAX_STEALS {
217 match self.cnt.swap(0, atomics::SeqCst) {
219 self.cnt.store(DISCONNECTED, atomics::SeqCst);
222 let m = cmp::min(n, self.steals);
227 assert!(self.steals >= 0);
232 GoUp(up) => Err(Upgraded(up)),
237 match self.cnt.load(atomics::SeqCst) {
238 n if n != DISCONNECTED => Err(Empty),
240 // This is a little bit of a tricky case. We failed to pop
241 // data above, and then we have viewed that the channel is
242 // disconnected. In this window more data could have been
243 // sent on the channel. It doesn't really make sense to
244 // return that the channel is disconnected when there's
245 // actually data on it, so be extra sure there's no data by
246 // popping one more time.
248 // We can ignore steals because the other end is
249 // disconnected and we'll never need to really factor in our
252 match self.queue.pop() {
253 Some(Data(t)) => Ok(t),
254 Some(GoUp(up)) => Err(Upgraded(up)),
255 None => Err(Disconnected),
263 pub fn drop_chan(&mut self) {
264 // Dropping a channel is pretty simple, we just flag it as disconnected
265 // and then wakeup a blocker if there is one.
266 match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
267 -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
269 n => { assert!(n >= 0); }
273 pub fn drop_port(&mut self) {
274 // Dropping a port seems like a fairly trivial thing. In theory all we
275 // need to do is flag that we're disconnected and then everything else
276 // can take over (we don't have anyone to wake up).
278 // The catch for Ports is that we want to drop the entire contents of
279 // the queue. There are multiple reasons for having this property, the
280 // largest of which is that if another chan is waiting in this channel
281 // (but not received yet), then waiting on that port will cause a
284 // So if we accept that we must now destroy the entire contents of the
285 // queue, this code may make a bit more sense. The tricky part is that
286 // we can't let any in-flight sends go un-dropped, we have to make sure
287 // *everything* is dropped and nothing new will come onto the channel.
289 // The first thing we do is set a flag saying that we're done for. All
290 // sends are gated on this flag, so we're immediately guaranteed that
291 // there are a bounded number of active sends that we'll have to deal
293 self.port_dropped.store(true, atomics::SeqCst);
295 // Now that we're guaranteed to deal with a bounded number of senders,
296 // we need to drain the queue. This draining process happens atomically
297 // with respect to the "count" of the channel. If the count is nonzero
298 // (with steals taken into account), then there must be data on the
299 // channel. In this case we drain everything and then try again. We will
300 // continue to fail while active senders send data while we're dropping
301 // data, but eventually we're guaranteed to break out of this loop
302 // (because there is a bounded number of senders).
303 let mut steals = self.steals;
305 let cnt = self.cnt.compare_and_swap(
306 steals, DISCONNECTED, atomics::SeqCst);
307 cnt != DISCONNECTED && cnt != steals
310 match self.queue.pop() {
311 Some(..) => { steals += 1; }
317 // At this point in time, we have gated all future senders from sending,
318 // and we have flagged the channel as being disconnected. The senders
319 // still have some responsibility, however, because some sends may not
320 // complete until after we flag the disconnection. There are more
321 // details in the sending methods that see DISCONNECTED
324 ////////////////////////////////////////////////////////////////////////////
325 // select implementation
326 ////////////////////////////////////////////////////////////////////////////
328 // Tests to see whether this port can receive without blocking. If Ok is
329 // returned, then that's the answer. If Err is returned, then the returned
330 // port needs to be queried instead (an upgrade happened)
331 pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
332 // We peek at the queue to see if there's anything on it, and we use
333 // this return value to determine if we should pop from the queue and
334 // upgrade this channel immediately. If it looks like we've got an
335 // upgrade pending, then go through the whole recv rigamarole to update
336 // the internal state.
337 match self.queue.peek() {
340 Err(Upgraded(port)) => Err(port),
344 Some(..) => Ok(true),
349 // increment the count on the channel (used for selection)
350 fn bump(&mut self, amt: int) -> int {
351 match self.cnt.fetch_add(amt, atomics::SeqCst) {
353 self.cnt.store(DISCONNECTED, atomics::SeqCst);
360 // Attempts to start selecting on this port. Like a oneshot, this can fail
361 // immediately because of an upgrade.
362 pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
363 match self.decrement(task) {
364 Ok(()) => SelSuccess,
366 let ret = match self.queue.peek() {
368 match self.queue.pop() {
369 Some(GoUp(port)) => SelUpgraded(task, port),
373 Some(..) => SelCanceled(task),
374 None => SelCanceled(task),
376 // Undo our decrement above, and we should be guaranteed that the
377 // previous value is positive because we're not going to sleep
378 let prev = self.bump(1);
379 assert!(prev == DISCONNECTED || prev >= 0);
385 // Removes a previous task from being blocked in this port
386 pub fn abort_selection(&mut self,
387 was_upgrade: bool) -> Result<bool, Receiver<T>> {
388 // If we're aborting selection after upgrading from a oneshot, then
389 // we're guarantee that no one is waiting. The only way that we could
390 // have seen the upgrade is if data was actually sent on the channel
391 // half again. For us, this means that there is guaranteed to be data on
392 // this channel. Furthermore, we're guaranteed that there was no
393 // start_selection previously, so there's no need to modify `self.cnt`
396 // Hence, because of these invariants, we immediately return `Ok(true)`.
397 // Note that the data may not actually be sent on the channel just yet.
398 // The other end could have flagged the upgrade but not sent data to
399 // this end. This is fine because we know it's a small bounded windows
400 // of time until the data is actually sent.
402 assert_eq!(self.steals, 0);
403 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
407 // We want to make sure that the count on the channel goes non-negative,
408 // and in the stream case we can have at most one steal, so just assume
409 // that we had one steal.
411 let prev = self.bump(steals + 1);
413 // If we were previously disconnected, then we know for sure that there
414 // is no task in to_wake, so just keep going
415 let has_data = if prev == DISCONNECTED {
416 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
417 true // there is data, that data is that we're disconnected
419 let cur = prev + steals + 1;
422 // If the previous count was negative, then we just made things go
423 // positive, hence we passed the -1 boundary and we're responsible
424 // for removing the to_wake() field and trashing it.
426 // If the previous count was positive then we're in a tougher
427 // situation. A possible race is that a sender just incremented
428 // through -1 (meaning it's going to try to wake a task up), but it
429 // hasn't yet read the to_wake. In order to prevent a future recv()
430 // from waking up too early (this sender picking up the plastered
431 // over to_wake), we spin loop here waiting for to_wake to be 0.
432 // Note that this entire select() implementation needs an overhaul,
433 // and this is *not* the worst part of it, so this is not done as a
434 // final solution but rather out of necessity for now to get
435 // something working.
437 self.take_to_wake().trash();
439 while self.to_wake.load(atomics::SeqCst) != 0 {
443 assert_eq!(self.steals, 0);
444 self.steals = steals;
446 // if we were previously positive, then there's surely data to
451 // Now that we've determined that this queue "has data", we peek at the
452 // queue to see if the data is an upgrade or not. If it's an upgrade,
453 // then we need to destroy this port and abort selection on the
456 match self.queue.peek() {
458 match self.queue.pop() {
459 Some(GoUp(port)) => Err(port),
472 impl<T: Send> Drop for Packet<T> {
474 // Note that this load is not only an assert for correctness about
475 // disconnection, but also a proper fence before the read of
476 // `to_wake`, so this assert cannot be removed with also removing
477 // the `to_wake` assert.
478 assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
479 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);