]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/shared.rs
Fall out of the std::sync rewrite
[rust.git] / src / libstd / comm / shared.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Shared channels
12 ///
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
20
21 pub use self::Failure::*;
22
23 use core::prelude::*;
24
25 use alloc::boxed::Box;
26 use core::cmp;
27 use core::int;
28 use rustrt::local::Local;
29 use rustrt::task::{Task, BlockedTask};
30 use rustrt::thread::Thread;
31
32 use sync::{atomic, Mutex, MutexGuard};
33 use comm::mpsc_queue as mpsc;
34
35 const DISCONNECTED: int = int::MIN;
36 const FUDGE: int = 1024;
37 #[cfg(test)]
38 const MAX_STEALS: int = 5;
39 #[cfg(not(test))]
40 const MAX_STEALS: int = 1 << 20;
41
42 pub struct Packet<T> {
43     queue: mpsc::Queue<T>,
44     cnt: atomic::AtomicInt, // How many items are on this channel
45     steals: int, // How many times has a port received without blocking?
46     to_wake: atomic::AtomicUint, // Task to wake up
47
48     // The number of channels which are currently using this packet.
49     channels: atomic::AtomicInt,
50
51     // See the discussion in Port::drop and the channel send methods for what
52     // these are used for
53     port_dropped: atomic::AtomicBool,
54     sender_drain: atomic::AtomicInt,
55
56     // this lock protects various portions of this implementation during
57     // select()
58     select_lock: Mutex<()>,
59 }
60
61 pub enum Failure {
62     Empty,
63     Disconnected,
64 }
65
66 impl<T: Send> Packet<T> {
67     // Creation of a packet *must* be followed by a call to postinit_lock
68     // and later by inherit_blocker
69     pub fn new() -> Packet<T> {
70         let p = Packet {
71             queue: mpsc::Queue::new(),
72             cnt: atomic::AtomicInt::new(0),
73             steals: 0,
74             to_wake: atomic::AtomicUint::new(0),
75             channels: atomic::AtomicInt::new(2),
76             port_dropped: atomic::AtomicBool::new(false),
77             sender_drain: atomic::AtomicInt::new(0),
78             select_lock: Mutex::new(()),
79         };
80         return p;
81     }
82
83     // This function should be used after newly created Packet
84     // was wrapped with an Arc
85     // In other case mutex data will be duplicated while cloning
86     // and that could cause problems on platforms where it is
87     // represented by opaque data structure
88     pub fn postinit_lock(&self) -> MutexGuard<()> {
89         self.select_lock.lock()
90     }
91
92     // This function is used at the creation of a shared packet to inherit a
93     // previously blocked task. This is done to prevent spurious wakeups of
94     // tasks in select().
95     //
96     // This can only be called at channel-creation time
97     pub fn inherit_blocker(&mut self,
98                            task: Option<BlockedTask>,
99                            guard: MutexGuard<()>) {
100         match task {
101             Some(task) => {
102                 assert_eq!(self.cnt.load(atomic::SeqCst), 0);
103                 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
104                 self.to_wake.store(unsafe { task.cast_to_uint() },
105                                    atomic::SeqCst);
106                 self.cnt.store(-1, atomic::SeqCst);
107
108                 // This store is a little sketchy. What's happening here is
109                 // that we're transferring a blocker from a oneshot or stream
110                 // channel to this shared channel. In doing so, we never
111                 // spuriously wake them up and rather only wake them up at the
112                 // appropriate time. This implementation of shared channels
113                 // assumes that any blocking recv() will undo the increment of
114                 // steals performed in try_recv() once the recv is complete.
115                 // This thread that we're inheriting, however, is not in the
116                 // middle of recv. Hence, the first time we wake them up,
117                 // they're going to wake up from their old port, move on to the
118                 // upgraded port, and then call the block recv() function.
119                 //
120                 // When calling this function, they'll find there's data
121                 // immediately available, counting it as a steal. This in fact
122                 // wasn't a steal because we appropriately blocked them waiting
123                 // for data.
124                 //
125                 // To offset this bad increment, we initially set the steal
126                 // count to -1. You'll find some special code in
127                 // abort_selection() as well to ensure that this -1 steal count
128                 // doesn't escape too far.
129                 self.steals = -1;
130             }
131             None => {}
132         }
133
134         // When the shared packet is constructed, we grabbed this lock. The
135         // purpose of this lock is to ensure that abort_selection() doesn't
136         // interfere with this method. After we unlock this lock, we're
137         // signifying that we're done modifying self.cnt and self.to_wake and
138         // the port is ready for the world to continue using it.
139         drop(guard);
140     }
141
142     pub fn send(&mut self, t: T) -> Result<(), T> {
143         // See Port::drop for what's going on
144         if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
145
146         // Note that the multiple sender case is a little trickier
147         // semantically than the single sender case. The logic for
148         // incrementing is "add and if disconnected store disconnected".
149         // This could end up leading some senders to believe that there
150         // wasn't a disconnect if in fact there was a disconnect. This means
151         // that while one thread is attempting to re-store the disconnected
152         // states, other threads could walk through merrily incrementing
153         // this very-negative disconnected count. To prevent senders from
154         // spuriously attempting to send when the channels is actually
155         // disconnected, the count has a ranged check here.
156         //
157         // This is also done for another reason. Remember that the return
158         // value of this function is:
159         //
160         //  `true` == the data *may* be received, this essentially has no
161         //            meaning
162         //  `false` == the data will *never* be received, this has a lot of
163         //             meaning
164         //
165         // In the SPSC case, we have a check of 'queue.is_empty()' to see
166         // whether the data was actually received, but this same condition
167         // means nothing in a multi-producer context. As a result, this
168         // preflight check serves as the definitive "this will never be
169         // received". Once we get beyond this check, we have permanently
170         // entered the realm of "this may be received"
171         if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
172             return Err(t)
173         }
174
175         self.queue.push(t);
176         match self.cnt.fetch_add(1, atomic::SeqCst) {
177             -1 => {
178                 self.take_to_wake().wake().map(|t| t.reawaken());
179             }
180
181             // In this case, we have possibly failed to send our data, and
182             // we need to consider re-popping the data in order to fully
183             // destroy it. We must arbitrate among the multiple senders,
184             // however, because the queues that we're using are
185             // single-consumer queues. In order to do this, all exiting
186             // pushers will use an atomic count in order to count those
187             // flowing through. Pushers who see 0 are required to drain as
188             // much as possible, and then can only exit when they are the
189             // only pusher (otherwise they must try again).
190             n if n < DISCONNECTED + FUDGE => {
191                 // see the comment in 'try' for a shared channel for why this
192                 // window of "not disconnected" is ok.
193                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
194
195                 if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
196                     loop {
197                         // drain the queue, for info on the thread yield see the
198                         // discussion in try_recv
199                         loop {
200                             match self.queue.pop() {
201                                 mpsc::Data(..) => {}
202                                 mpsc::Empty => break,
203                                 mpsc::Inconsistent => Thread::yield_now(),
204                             }
205                         }
206                         // maybe we're done, if we're not the last ones
207                         // here, then we need to go try again.
208                         if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
209                             break
210                         }
211                     }
212
213                     // At this point, there may still be data on the queue,
214                     // but only if the count hasn't been incremented and
215                     // some other sender hasn't finished pushing data just
216                     // yet. That sender in question will drain its own data.
217                 }
218             }
219
220             // Can't make any assumptions about this case like in the SPSC case.
221             _ => {}
222         }
223
224         Ok(())
225     }
226
227     pub fn recv(&mut self) -> Result<T, Failure> {
228         // This code is essentially the exact same as that found in the stream
229         // case (see stream.rs)
230         match self.try_recv() {
231             Err(Empty) => {}
232             data => return data,
233         }
234
235         let task: Box<Task> = Local::take();
236         task.deschedule(1, |task| {
237             self.decrement(task)
238         });
239
240         match self.try_recv() {
241             data @ Ok(..) => { self.steals -= 1; data }
242             data => data,
243         }
244     }
245
246     // Essentially the exact same thing as the stream decrement function.
247     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
248         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
249         let n = unsafe { task.cast_to_uint() };
250         self.to_wake.store(n, atomic::SeqCst);
251
252         let steals = self.steals;
253         self.steals = 0;
254
255         match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
256             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
257             // If we factor in our steals and notice that the channel has no
258             // data, we successfully sleep
259             n => {
260                 assert!(n >= 0);
261                 if n - steals <= 0 { return Ok(()) }
262             }
263         }
264
265         self.to_wake.store(0, atomic::SeqCst);
266         Err(unsafe { BlockedTask::cast_from_uint(n) })
267     }
268
269     pub fn try_recv(&mut self) -> Result<T, Failure> {
270         let ret = match self.queue.pop() {
271             mpsc::Data(t) => Some(t),
272             mpsc::Empty => None,
273
274             // This is a bit of an interesting case. The channel is
275             // reported as having data available, but our pop() has
276             // failed due to the queue being in an inconsistent state.
277             // This means that there is some pusher somewhere which has
278             // yet to complete, but we are guaranteed that a pop will
279             // eventually succeed. In this case, we spin in a yield loop
280             // because the remote sender should finish their enqueue
281             // operation "very quickly".
282             //
283             // Avoiding this yield loop would require a different queue
284             // abstraction which provides the guarantee that after M
285             // pushes have succeeded, at least M pops will succeed. The
286             // current queues guarantee that if there are N active
287             // pushes, you can pop N times once all N have finished.
288             mpsc::Inconsistent => {
289                 let data;
290                 loop {
291                     Thread::yield_now();
292                     match self.queue.pop() {
293                         mpsc::Data(t) => { data = t; break }
294                         mpsc::Empty => panic!("inconsistent => empty"),
295                         mpsc::Inconsistent => {}
296                     }
297                 }
298                 Some(data)
299             }
300         };
301         match ret {
302             // See the discussion in the stream implementation for why we
303             // might decrement steals.
304             Some(data) => {
305                 if self.steals > MAX_STEALS {
306                     match self.cnt.swap(0, atomic::SeqCst) {
307                         DISCONNECTED => {
308                             self.cnt.store(DISCONNECTED, atomic::SeqCst);
309                         }
310                         n => {
311                             let m = cmp::min(n, self.steals);
312                             self.steals -= m;
313                             self.bump(n - m);
314                         }
315                     }
316                     assert!(self.steals >= 0);
317                 }
318                 self.steals += 1;
319                 Ok(data)
320             }
321
322             // See the discussion in the stream implementation for why we try
323             // again.
324             None => {
325                 match self.cnt.load(atomic::SeqCst) {
326                     n if n != DISCONNECTED => Err(Empty),
327                     _ => {
328                         match self.queue.pop() {
329                             mpsc::Data(t) => Ok(t),
330                             mpsc::Empty => Err(Disconnected),
331                             // with no senders, an inconsistency is impossible.
332                             mpsc::Inconsistent => unreachable!(),
333                         }
334                     }
335                 }
336             }
337         }
338     }
339
340     // Prepares this shared packet for a channel clone, essentially just bumping
341     // a refcount.
342     pub fn clone_chan(&mut self) {
343         self.channels.fetch_add(1, atomic::SeqCst);
344     }
345
346     // Decrement the reference count on a channel. This is called whenever a
347     // Chan is dropped and may end up waking up a receiver. It's the receiver's
348     // responsibility on the other end to figure out that we've disconnected.
349     pub fn drop_chan(&mut self) {
350         match self.channels.fetch_sub(1, atomic::SeqCst) {
351             1 => {}
352             n if n > 1 => return,
353             n => panic!("bad number of channels left {}", n),
354         }
355
356         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
357             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
358             DISCONNECTED => {}
359             n => { assert!(n >= 0); }
360         }
361     }
362
363     // See the long discussion inside of stream.rs for why the queue is drained,
364     // and why it is done in this fashion.
365     pub fn drop_port(&mut self) {
366         self.port_dropped.store(true, atomic::SeqCst);
367         let mut steals = self.steals;
368         while {
369             let cnt = self.cnt.compare_and_swap(
370                             steals, DISCONNECTED, atomic::SeqCst);
371             cnt != DISCONNECTED && cnt != steals
372         } {
373             // See the discussion in 'try_recv' for why we yield
374             // control of this thread.
375             loop {
376                 match self.queue.pop() {
377                     mpsc::Data(..) => { steals += 1; }
378                     mpsc::Empty | mpsc::Inconsistent => break,
379                 }
380             }
381         }
382     }
383
384     // Consumes ownership of the 'to_wake' field.
385     fn take_to_wake(&mut self) -> BlockedTask {
386         let task = self.to_wake.load(atomic::SeqCst);
387         self.to_wake.store(0, atomic::SeqCst);
388         assert!(task != 0);
389         unsafe { BlockedTask::cast_from_uint(task) }
390     }
391
392     ////////////////////////////////////////////////////////////////////////////
393     // select implementation
394     ////////////////////////////////////////////////////////////////////////////
395
396     // Helper function for select, tests whether this port can receive without
397     // blocking (obviously not an atomic decision).
398     //
399     // This is different than the stream version because there's no need to peek
400     // at the queue, we can just look at the local count.
401     pub fn can_recv(&mut self) -> bool {
402         let cnt = self.cnt.load(atomic::SeqCst);
403         cnt == DISCONNECTED || cnt - self.steals > 0
404     }
405
406     // increment the count on the channel (used for selection)
407     fn bump(&mut self, amt: int) -> int {
408         match self.cnt.fetch_add(amt, atomic::SeqCst) {
409             DISCONNECTED => {
410                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
411                 DISCONNECTED
412             }
413             n => n
414         }
415     }
416
417     // Inserts the blocked task for selection on this port, returning it back if
418     // the port already has data on it.
419     //
420     // The code here is the same as in stream.rs, except that it doesn't need to
421     // peek at the channel to see if an upgrade is pending.
422     pub fn start_selection(&mut self,
423                            task: BlockedTask) -> Result<(), BlockedTask> {
424         match self.decrement(task) {
425             Ok(()) => Ok(()),
426             Err(task) => {
427                 let prev = self.bump(1);
428                 assert!(prev == DISCONNECTED || prev >= 0);
429                 return Err(task);
430             }
431         }
432     }
433
434     // Cancels a previous task waiting on this port, returning whether there's
435     // data on the port.
436     //
437     // This is similar to the stream implementation (hence fewer comments), but
438     // uses a different value for the "steals" variable.
439     pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
440         // Before we do anything else, we bounce on this lock. The reason for
441         // doing this is to ensure that any upgrade-in-progress is gone and
442         // done with. Without this bounce, we can race with inherit_blocker
443         // about looking at and dealing with to_wake. Once we have acquired the
444         // lock, we are guaranteed that inherit_blocker is done.
445         {
446             let _guard = self.select_lock.lock();
447         }
448
449         // Like the stream implementation, we want to make sure that the count
450         // on the channel goes non-negative. We don't know how negative the
451         // stream currently is, so instead of using a steal value of 1, we load
452         // the channel count and figure out what we should do to make it
453         // positive.
454         let steals = {
455             let cnt = self.cnt.load(atomic::SeqCst);
456             if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
457         };
458         let prev = self.bump(steals + 1);
459
460         if prev == DISCONNECTED {
461             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
462             true
463         } else {
464             let cur = prev + steals + 1;
465             assert!(cur >= 0);
466             if prev < 0 {
467                 self.take_to_wake().trash();
468             } else {
469                 while self.to_wake.load(atomic::SeqCst) != 0 {
470                     Thread::yield_now();
471                 }
472             }
473             // if the number of steals is -1, it was the pre-emptive -1 steal
474             // count from when we inherited a blocker. This is fine because
475             // we're just going to overwrite it with a real value.
476             assert!(self.steals == 0 || self.steals == -1);
477             self.steals = steals;
478             prev >= 0
479         }
480     }
481 }
482
483 #[unsafe_destructor]
484 impl<T: Send> Drop for Packet<T> {
485     fn drop(&mut self) {
486         // Note that this load is not only an assert for correctness about
487         // disconnection, but also a proper fence before the read of
488         // `to_wake`, so this assert cannot be removed with also removing
489         // the `to_wake` assert.
490         assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
491         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
492         assert_eq!(self.channels.load(atomic::SeqCst), 0);
493     }
494 }