]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/shared.rs
6396edbdbd148eff3912c404475105662d650754
[rust.git] / src / libstd / comm / shared.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Shared channels
12 ///
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
20
21 pub use self::Failure::*;
22
23 use core::prelude::*;
24
25 use alloc::boxed::Box;
26 use core::cmp;
27 use core::int;
28 use rustrt::local::Local;
29 use rustrt::mutex::NativeMutex;
30 use rustrt::task::{Task, BlockedTask};
31 use rustrt::thread::Thread;
32
33 use sync::atomic;
34 use sync::mpsc_queue as mpsc;
35
36 const DISCONNECTED: int = int::MIN;
37 const FUDGE: int = 1024;
38 #[cfg(test)]
39 const MAX_STEALS: int = 5;
40 #[cfg(not(test))]
41 const MAX_STEALS: int = 1 << 20;
42
43 pub struct Packet<T> {
44     queue: mpsc::Queue<T>,
45     cnt: atomic::AtomicInt, // How many items are on this channel
46     steals: int, // How many times has a port received without blocking?
47     to_wake: atomic::AtomicUint, // Task to wake up
48
49     // The number of channels which are currently using this packet.
50     channels: atomic::AtomicInt,
51
52     // See the discussion in Port::drop and the channel send methods for what
53     // these are used for
54     port_dropped: atomic::AtomicBool,
55     sender_drain: atomic::AtomicInt,
56
57     // this lock protects various portions of this implementation during
58     // select()
59     select_lock: NativeMutex,
60 }
61
62 pub enum Failure {
63     Empty,
64     Disconnected,
65 }
66
67 impl<T: Send> Packet<T> {
68     // Creation of a packet *must* be followed by a call to postinit_lock
69     // and later by inherit_blocker
70     pub fn new() -> Packet<T> {
71         let p = Packet {
72             queue: mpsc::Queue::new(),
73             cnt: atomic::AtomicInt::new(0),
74             steals: 0,
75             to_wake: atomic::AtomicUint::new(0),
76             channels: atomic::AtomicInt::new(2),
77             port_dropped: atomic::AtomicBool::new(false),
78             sender_drain: atomic::AtomicInt::new(0),
79             select_lock: unsafe { NativeMutex::new() },
80         };
81         return p;
82     }
83
84     // This function should be used after newly created Packet
85     // was wrapped with an Arc
86     // In other case mutex data will be duplicated while cloning
87     // and that could cause problems on platforms where it is
88     // represented by opaque data structure
89     pub fn postinit_lock(&mut self) {
90         unsafe { self.select_lock.lock_noguard() }
91     }
92
93     // This function is used at the creation of a shared packet to inherit a
94     // previously blocked task. This is done to prevent spurious wakeups of
95     // tasks in select().
96     //
97     // This can only be called at channel-creation time
98     pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
99         match task {
100             Some(task) => {
101                 assert_eq!(self.cnt.load(atomic::SeqCst), 0);
102                 assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
103                 self.to_wake.store(unsafe { task.cast_to_uint() },
104                                    atomic::SeqCst);
105                 self.cnt.store(-1, atomic::SeqCst);
106
107                 // This store is a little sketchy. What's happening here is
108                 // that we're transferring a blocker from a oneshot or stream
109                 // channel to this shared channel. In doing so, we never
110                 // spuriously wake them up and rather only wake them up at the
111                 // appropriate time. This implementation of shared channels
112                 // assumes that any blocking recv() will undo the increment of
113                 // steals performed in try_recv() once the recv is complete.
114                 // This thread that we're inheriting, however, is not in the
115                 // middle of recv. Hence, the first time we wake them up,
116                 // they're going to wake up from their old port, move on to the
117                 // upgraded port, and then call the block recv() function.
118                 //
119                 // When calling this function, they'll find there's data
120                 // immediately available, counting it as a steal. This in fact
121                 // wasn't a steal because we appropriately blocked them waiting
122                 // for data.
123                 //
124                 // To offset this bad increment, we initially set the steal
125                 // count to -1. You'll find some special code in
126                 // abort_selection() as well to ensure that this -1 steal count
127                 // doesn't escape too far.
128                 self.steals = -1;
129             }
130             None => {}
131         }
132
133         // When the shared packet is constructed, we grabbed this lock. The
134         // purpose of this lock is to ensure that abort_selection() doesn't
135         // interfere with this method. After we unlock this lock, we're
136         // signifying that we're done modifying self.cnt and self.to_wake and
137         // the port is ready for the world to continue using it.
138         unsafe { self.select_lock.unlock_noguard() }
139     }
140
141     pub fn send(&mut self, t: T) -> Result<(), T> {
142         // See Port::drop for what's going on
143         if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
144
145         // Note that the multiple sender case is a little trickier
146         // semantically than the single sender case. The logic for
147         // incrementing is "add and if disconnected store disconnected".
148         // This could end up leading some senders to believe that there
149         // wasn't a disconnect if in fact there was a disconnect. This means
150         // that while one thread is attempting to re-store the disconnected
151         // states, other threads could walk through merrily incrementing
152         // this very-negative disconnected count. To prevent senders from
153         // spuriously attempting to send when the channels is actually
154         // disconnected, the count has a ranged check here.
155         //
156         // This is also done for another reason. Remember that the return
157         // value of this function is:
158         //
159         //  `true` == the data *may* be received, this essentially has no
160         //            meaning
161         //  `false` == the data will *never* be received, this has a lot of
162         //             meaning
163         //
164         // In the SPSC case, we have a check of 'queue.is_empty()' to see
165         // whether the data was actually received, but this same condition
166         // means nothing in a multi-producer context. As a result, this
167         // preflight check serves as the definitive "this will never be
168         // received". Once we get beyond this check, we have permanently
169         // entered the realm of "this may be received"
170         if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
171             return Err(t)
172         }
173
174         self.queue.push(t);
175         match self.cnt.fetch_add(1, atomic::SeqCst) {
176             -1 => {
177                 self.take_to_wake().wake().map(|t| t.reawaken());
178             }
179
180             // In this case, we have possibly failed to send our data, and
181             // we need to consider re-popping the data in order to fully
182             // destroy it. We must arbitrate among the multiple senders,
183             // however, because the queues that we're using are
184             // single-consumer queues. In order to do this, all exiting
185             // pushers will use an atomic count in order to count those
186             // flowing through. Pushers who see 0 are required to drain as
187             // much as possible, and then can only exit when they are the
188             // only pusher (otherwise they must try again).
189             n if n < DISCONNECTED + FUDGE => {
190                 // see the comment in 'try' for a shared channel for why this
191                 // window of "not disconnected" is ok.
192                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
193
194                 if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
195                     loop {
196                         // drain the queue, for info on the thread yield see the
197                         // discussion in try_recv
198                         loop {
199                             match self.queue.pop() {
200                                 mpsc::Data(..) => {}
201                                 mpsc::Empty => break,
202                                 mpsc::Inconsistent => Thread::yield_now(),
203                             }
204                         }
205                         // maybe we're done, if we're not the last ones
206                         // here, then we need to go try again.
207                         if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
208                             break
209                         }
210                     }
211
212                     // At this point, there may still be data on the queue,
213                     // but only if the count hasn't been incremented and
214                     // some other sender hasn't finished pushing data just
215                     // yet. That sender in question will drain its own data.
216                 }
217             }
218
219             // Can't make any assumptions about this case like in the SPSC case.
220             _ => {}
221         }
222
223         Ok(())
224     }
225
226     pub fn recv(&mut self) -> Result<T, Failure> {
227         // This code is essentially the exact same as that found in the stream
228         // case (see stream.rs)
229         match self.try_recv() {
230             Err(Empty) => {}
231             data => return data,
232         }
233
234         let task: Box<Task> = Local::take();
235         task.deschedule(1, |task| {
236             self.decrement(task)
237         });
238
239         match self.try_recv() {
240             data @ Ok(..) => { self.steals -= 1; data }
241             data => data,
242         }
243     }
244
245     // Essentially the exact same thing as the stream decrement function.
246     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
247         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
248         let n = unsafe { task.cast_to_uint() };
249         self.to_wake.store(n, atomic::SeqCst);
250
251         let steals = self.steals;
252         self.steals = 0;
253
254         match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
255             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
256             // If we factor in our steals and notice that the channel has no
257             // data, we successfully sleep
258             n => {
259                 assert!(n >= 0);
260                 if n - steals <= 0 { return Ok(()) }
261             }
262         }
263
264         self.to_wake.store(0, atomic::SeqCst);
265         Err(unsafe { BlockedTask::cast_from_uint(n) })
266     }
267
268     pub fn try_recv(&mut self) -> Result<T, Failure> {
269         let ret = match self.queue.pop() {
270             mpsc::Data(t) => Some(t),
271             mpsc::Empty => None,
272
273             // This is a bit of an interesting case. The channel is
274             // reported as having data available, but our pop() has
275             // failed due to the queue being in an inconsistent state.
276             // This means that there is some pusher somewhere which has
277             // yet to complete, but we are guaranteed that a pop will
278             // eventually succeed. In this case, we spin in a yield loop
279             // because the remote sender should finish their enqueue
280             // operation "very quickly".
281             //
282             // Avoiding this yield loop would require a different queue
283             // abstraction which provides the guarantee that after M
284             // pushes have succeeded, at least M pops will succeed. The
285             // current queues guarantee that if there are N active
286             // pushes, you can pop N times once all N have finished.
287             mpsc::Inconsistent => {
288                 let data;
289                 loop {
290                     Thread::yield_now();
291                     match self.queue.pop() {
292                         mpsc::Data(t) => { data = t; break }
293                         mpsc::Empty => panic!("inconsistent => empty"),
294                         mpsc::Inconsistent => {}
295                     }
296                 }
297                 Some(data)
298             }
299         };
300         match ret {
301             // See the discussion in the stream implementation for why we
302             // might decrement steals.
303             Some(data) => {
304                 if self.steals > MAX_STEALS {
305                     match self.cnt.swap(0, atomic::SeqCst) {
306                         DISCONNECTED => {
307                             self.cnt.store(DISCONNECTED, atomic::SeqCst);
308                         }
309                         n => {
310                             let m = cmp::min(n, self.steals);
311                             self.steals -= m;
312                             self.bump(n - m);
313                         }
314                     }
315                     assert!(self.steals >= 0);
316                 }
317                 self.steals += 1;
318                 Ok(data)
319             }
320
321             // See the discussion in the stream implementation for why we try
322             // again.
323             None => {
324                 match self.cnt.load(atomic::SeqCst) {
325                     n if n != DISCONNECTED => Err(Empty),
326                     _ => {
327                         match self.queue.pop() {
328                             mpsc::Data(t) => Ok(t),
329                             mpsc::Empty => Err(Disconnected),
330                             // with no senders, an inconsistency is impossible.
331                             mpsc::Inconsistent => unreachable!(),
332                         }
333                     }
334                 }
335             }
336         }
337     }
338
339     // Prepares this shared packet for a channel clone, essentially just bumping
340     // a refcount.
341     pub fn clone_chan(&mut self) {
342         self.channels.fetch_add(1, atomic::SeqCst);
343     }
344
345     // Decrement the reference count on a channel. This is called whenever a
346     // Chan is dropped and may end up waking up a receiver. It's the receiver's
347     // responsibility on the other end to figure out that we've disconnected.
348     pub fn drop_chan(&mut self) {
349         match self.channels.fetch_sub(1, atomic::SeqCst) {
350             1 => {}
351             n if n > 1 => return,
352             n => panic!("bad number of channels left {}", n),
353         }
354
355         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
356             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
357             DISCONNECTED => {}
358             n => { assert!(n >= 0); }
359         }
360     }
361
362     // See the long discussion inside of stream.rs for why the queue is drained,
363     // and why it is done in this fashion.
364     pub fn drop_port(&mut self) {
365         self.port_dropped.store(true, atomic::SeqCst);
366         let mut steals = self.steals;
367         while {
368             let cnt = self.cnt.compare_and_swap(
369                             steals, DISCONNECTED, atomic::SeqCst);
370             cnt != DISCONNECTED && cnt != steals
371         } {
372             // See the discussion in 'try_recv' for why we yield
373             // control of this thread.
374             loop {
375                 match self.queue.pop() {
376                     mpsc::Data(..) => { steals += 1; }
377                     mpsc::Empty | mpsc::Inconsistent => break,
378                 }
379             }
380         }
381     }
382
383     // Consumes ownership of the 'to_wake' field.
384     fn take_to_wake(&mut self) -> BlockedTask {
385         let task = self.to_wake.load(atomic::SeqCst);
386         self.to_wake.store(0, atomic::SeqCst);
387         assert!(task != 0);
388         unsafe { BlockedTask::cast_from_uint(task) }
389     }
390
391     ////////////////////////////////////////////////////////////////////////////
392     // select implementation
393     ////////////////////////////////////////////////////////////////////////////
394
395     // Helper function for select, tests whether this port can receive without
396     // blocking (obviously not an atomic decision).
397     //
398     // This is different than the stream version because there's no need to peek
399     // at the queue, we can just look at the local count.
400     pub fn can_recv(&mut self) -> bool {
401         let cnt = self.cnt.load(atomic::SeqCst);
402         cnt == DISCONNECTED || cnt - self.steals > 0
403     }
404
405     // increment the count on the channel (used for selection)
406     fn bump(&mut self, amt: int) -> int {
407         match self.cnt.fetch_add(amt, atomic::SeqCst) {
408             DISCONNECTED => {
409                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
410                 DISCONNECTED
411             }
412             n => n
413         }
414     }
415
416     // Inserts the blocked task for selection on this port, returning it back if
417     // the port already has data on it.
418     //
419     // The code here is the same as in stream.rs, except that it doesn't need to
420     // peek at the channel to see if an upgrade is pending.
421     pub fn start_selection(&mut self,
422                            task: BlockedTask) -> Result<(), BlockedTask> {
423         match self.decrement(task) {
424             Ok(()) => Ok(()),
425             Err(task) => {
426                 let prev = self.bump(1);
427                 assert!(prev == DISCONNECTED || prev >= 0);
428                 return Err(task);
429             }
430         }
431     }
432
433     // Cancels a previous task waiting on this port, returning whether there's
434     // data on the port.
435     //
436     // This is similar to the stream implementation (hence fewer comments), but
437     // uses a different value for the "steals" variable.
438     pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
439         // Before we do anything else, we bounce on this lock. The reason for
440         // doing this is to ensure that any upgrade-in-progress is gone and
441         // done with. Without this bounce, we can race with inherit_blocker
442         // about looking at and dealing with to_wake. Once we have acquired the
443         // lock, we are guaranteed that inherit_blocker is done.
444         unsafe {
445             let _guard = self.select_lock.lock();
446         }
447
448         // Like the stream implementation, we want to make sure that the count
449         // on the channel goes non-negative. We don't know how negative the
450         // stream currently is, so instead of using a steal value of 1, we load
451         // the channel count and figure out what we should do to make it
452         // positive.
453         let steals = {
454             let cnt = self.cnt.load(atomic::SeqCst);
455             if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
456         };
457         let prev = self.bump(steals + 1);
458
459         if prev == DISCONNECTED {
460             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
461             true
462         } else {
463             let cur = prev + steals + 1;
464             assert!(cur >= 0);
465             if prev < 0 {
466                 self.take_to_wake().trash();
467             } else {
468                 while self.to_wake.load(atomic::SeqCst) != 0 {
469                     Thread::yield_now();
470                 }
471             }
472             // if the number of steals is -1, it was the pre-emptive -1 steal
473             // count from when we inherited a blocker. This is fine because
474             // we're just going to overwrite it with a real value.
475             assert!(self.steals == 0 || self.steals == -1);
476             self.steals = steals;
477             prev >= 0
478         }
479     }
480 }
481
482 #[unsafe_destructor]
483 impl<T: Send> Drop for Packet<T> {
484     fn drop(&mut self) {
485         // Note that this load is not only an assert for correctness about
486         // disconnection, but also a proper fence before the read of
487         // `to_wake`, so this assert cannot be removed with also removing
488         // the `to_wake` assert.
489         assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
490         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
491         assert_eq!(self.channels.load(atomic::SeqCst), 0);
492     }
493 }