]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/shared.rs
Stabilization for `owned` (now `boxed`) and `cell`
[rust.git] / src / libsync / comm / shared.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Shared channels
12 ///
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
20
21 use core::prelude::*;
22
23 use alloc::boxed::Box;
24 use core::cmp;
25 use core::int;
26 use rustrt::local::Local;
27 use rustrt::mutex::NativeMutex;
28 use rustrt::task::{Task, BlockedTask};
29 use rustrt::thread::Thread;
30
31 use atomics;
32 use mpsc = mpsc_queue;
33
34 static DISCONNECTED: int = int::MIN;
35 static FUDGE: int = 1024;
36 #[cfg(test)]
37 static MAX_STEALS: int = 5;
38 #[cfg(not(test))]
39 static MAX_STEALS: int = 1 << 20;
40
41 pub struct Packet<T> {
42     queue: mpsc::Queue<T>,
43     cnt: atomics::AtomicInt, // How many items are on this channel
44     steals: int, // How many times has a port received without blocking?
45     to_wake: atomics::AtomicUint, // Task to wake up
46
47     // The number of channels which are currently using this packet.
48     channels: atomics::AtomicInt,
49
50     // See the discussion in Port::drop and the channel send methods for what
51     // these are used for
52     port_dropped: atomics::AtomicBool,
53     sender_drain: atomics::AtomicInt,
54
55     // this lock protects various portions of this implementation during
56     // select()
57     select_lock: NativeMutex,
58 }
59
60 pub enum Failure {
61     Empty,
62     Disconnected,
63 }
64
65 impl<T: Send> Packet<T> {
66     // Creation of a packet *must* be followed by a call to postinit_lock
67     // and later by inherit_blocker
68     pub fn new() -> Packet<T> {
69         let p = Packet {
70             queue: mpsc::Queue::new(),
71             cnt: atomics::AtomicInt::new(0),
72             steals: 0,
73             to_wake: atomics::AtomicUint::new(0),
74             channels: atomics::AtomicInt::new(2),
75             port_dropped: atomics::AtomicBool::new(false),
76             sender_drain: atomics::AtomicInt::new(0),
77             select_lock: unsafe { NativeMutex::new() },
78         };
79         return p;
80     }
81
82     // This function should be used after newly created Packet
83     // was wrapped with an Arc
84     // In other case mutex data will be duplicated while cloning
85     // and that could cause problems on platforms where it is
86     // represented by opaque data structure
87     pub fn postinit_lock(&mut self) {
88         unsafe { self.select_lock.lock_noguard() }
89     }
90
91     // This function is used at the creation of a shared packet to inherit a
92     // previously blocked task. This is done to prevent spurious wakeups of
93     // tasks in select().
94     //
95     // This can only be called at channel-creation time
96     pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
97         match task {
98             Some(task) => {
99                 assert_eq!(self.cnt.load(atomics::SeqCst), 0);
100                 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
101                 self.to_wake.store(unsafe { task.cast_to_uint() },
102                                    atomics::SeqCst);
103                 self.cnt.store(-1, atomics::SeqCst);
104
105                 // This store is a little sketchy. What's happening here is
106                 // that we're transferring a blocker from a oneshot or stream
107                 // channel to this shared channel. In doing so, we never
108                 // spuriously wake them up and rather only wake them up at the
109                 // appropriate time. This implementation of shared channels
110                 // assumes that any blocking recv() will undo the increment of
111                 // steals performed in try_recv() once the recv is complete.
112                 // This thread that we're inheriting, however, is not in the
113                 // middle of recv. Hence, the first time we wake them up,
114                 // they're going to wake up from their old port, move on to the
115                 // upgraded port, and then call the block recv() function.
116                 //
117                 // When calling this function, they'll find there's data
118                 // immediately available, counting it as a steal. This in fact
119                 // wasn't a steal because we appropriately blocked them waiting
120                 // for data.
121                 //
122                 // To offset this bad increment, we initially set the steal
123                 // count to -1. You'll find some special code in
124                 // abort_selection() as well to ensure that this -1 steal count
125                 // doesn't escape too far.
126                 self.steals = -1;
127             }
128             None => {}
129         }
130
131         // When the shared packet is constructed, we grabbed this lock. The
132         // purpose of this lock is to ensure that abort_selection() doesn't
133         // interfere with this method. After we unlock this lock, we're
134         // signifying that we're done modifying self.cnt and self.to_wake and
135         // the port is ready for the world to continue using it.
136         unsafe { self.select_lock.unlock_noguard() }
137     }
138
139     pub fn send(&mut self, t: T) -> Result<(), T> {
140         // See Port::drop for what's going on
141         if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
142
143         // Note that the multiple sender case is a little trickier
144         // semantically than the single sender case. The logic for
145         // incrementing is "add and if disconnected store disconnected".
146         // This could end up leading some senders to believe that there
147         // wasn't a disconnect if in fact there was a disconnect. This means
148         // that while one thread is attempting to re-store the disconnected
149         // states, other threads could walk through merrily incrementing
150         // this very-negative disconnected count. To prevent senders from
151         // spuriously attempting to send when the channels is actually
152         // disconnected, the count has a ranged check here.
153         //
154         // This is also done for another reason. Remember that the return
155         // value of this function is:
156         //
157         //  `true` == the data *may* be received, this essentially has no
158         //            meaning
159         //  `false` == the data will *never* be received, this has a lot of
160         //             meaning
161         //
162         // In the SPSC case, we have a check of 'queue.is_empty()' to see
163         // whether the data was actually received, but this same condition
164         // means nothing in a multi-producer context. As a result, this
165         // preflight check serves as the definitive "this will never be
166         // received". Once we get beyond this check, we have permanently
167         // entered the realm of "this may be received"
168         if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
169             return Err(t)
170         }
171
172         self.queue.push(t);
173         match self.cnt.fetch_add(1, atomics::SeqCst) {
174             -1 => {
175                 self.take_to_wake().wake().map(|t| t.reawaken());
176             }
177
178             // In this case, we have possibly failed to send our data, and
179             // we need to consider re-popping the data in order to fully
180             // destroy it. We must arbitrate among the multiple senders,
181             // however, because the queues that we're using are
182             // single-consumer queues. In order to do this, all exiting
183             // pushers will use an atomic count in order to count those
184             // flowing through. Pushers who see 0 are required to drain as
185             // much as possible, and then can only exit when they are the
186             // only pusher (otherwise they must try again).
187             n if n < DISCONNECTED + FUDGE => {
188                 // see the comment in 'try' for a shared channel for why this
189                 // window of "not disconnected" is ok.
190                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
191
192                 if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
193                     loop {
194                         // drain the queue, for info on the thread yield see the
195                         // discussion in try_recv
196                         loop {
197                             match self.queue.pop() {
198                                 mpsc::Data(..) => {}
199                                 mpsc::Empty => break,
200                                 mpsc::Inconsistent => Thread::yield_now(),
201                             }
202                         }
203                         // maybe we're done, if we're not the last ones
204                         // here, then we need to go try again.
205                         if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
206                             break
207                         }
208                     }
209
210                     // At this point, there may still be data on the queue,
211                     // but only if the count hasn't been incremented and
212                     // some other sender hasn't finished pushing data just
213                     // yet. That sender in question will drain its own data.
214                 }
215             }
216
217             // Can't make any assumptions about this case like in the SPSC case.
218             _ => {}
219         }
220
221         Ok(())
222     }
223
224     pub fn recv(&mut self) -> Result<T, Failure> {
225         // This code is essentially the exact same as that found in the stream
226         // case (see stream.rs)
227         match self.try_recv() {
228             Err(Empty) => {}
229             data => return data,
230         }
231
232         let task: Box<Task> = Local::take();
233         task.deschedule(1, |task| {
234             self.decrement(task)
235         });
236
237         match self.try_recv() {
238             data @ Ok(..) => { self.steals -= 1; data }
239             data => data,
240         }
241     }
242
243     // Essentially the exact same thing as the stream decrement function.
244     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
245         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
246         let n = unsafe { task.cast_to_uint() };
247         self.to_wake.store(n, atomics::SeqCst);
248
249         let steals = self.steals;
250         self.steals = 0;
251
252         match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
253             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
254             // If we factor in our steals and notice that the channel has no
255             // data, we successfully sleep
256             n => {
257                 assert!(n >= 0);
258                 if n - steals <= 0 { return Ok(()) }
259             }
260         }
261
262         self.to_wake.store(0, atomics::SeqCst);
263         Err(unsafe { BlockedTask::cast_from_uint(n) })
264     }
265
266     pub fn try_recv(&mut self) -> Result<T, Failure> {
267         let ret = match self.queue.pop() {
268             mpsc::Data(t) => Some(t),
269             mpsc::Empty => None,
270
271             // This is a bit of an interesting case. The channel is
272             // reported as having data available, but our pop() has
273             // failed due to the queue being in an inconsistent state.
274             // This means that there is some pusher somewhere which has
275             // yet to complete, but we are guaranteed that a pop will
276             // eventually succeed. In this case, we spin in a yield loop
277             // because the remote sender should finish their enqueue
278             // operation "very quickly".
279             //
280             // Note that this yield loop does *not* attempt to do a green
281             // yield (regardless of the context), but *always* performs an
282             // OS-thread yield. The reasoning for this is that the pusher in
283             // question which is causing the inconsistent state is
284             // guaranteed to *not* be a blocked task (green tasks can't get
285             // pre-empted), so it must be on a different OS thread. Also,
286             // `try_recv` is normally a "guaranteed no rescheduling" context
287             // in a green-thread situation. By yielding control of the
288             // thread, we will hopefully allow time for the remote task on
289             // the other OS thread to make progress.
290             //
291             // Avoiding this yield loop would require a different queue
292             // abstraction which provides the guarantee that after M
293             // pushes have succeeded, at least M pops will succeed. The
294             // current queues guarantee that if there are N active
295             // pushes, you can pop N times once all N have finished.
296             mpsc::Inconsistent => {
297                 let data;
298                 loop {
299                     Thread::yield_now();
300                     match self.queue.pop() {
301                         mpsc::Data(t) => { data = t; break }
302                         mpsc::Empty => fail!("inconsistent => empty"),
303                         mpsc::Inconsistent => {}
304                     }
305                 }
306                 Some(data)
307             }
308         };
309         match ret {
310             // See the discussion in the stream implementation for why we
311             // might decrement steals.
312             Some(data) => {
313                 if self.steals > MAX_STEALS {
314                     match self.cnt.swap(0, atomics::SeqCst) {
315                         DISCONNECTED => {
316                             self.cnt.store(DISCONNECTED, atomics::SeqCst);
317                         }
318                         n => {
319                             let m = cmp::min(n, self.steals);
320                             self.steals -= m;
321                             self.bump(n - m);
322                         }
323                     }
324                     assert!(self.steals >= 0);
325                 }
326                 self.steals += 1;
327                 Ok(data)
328             }
329
330             // See the discussion in the stream implementation for why we try
331             // again.
332             None => {
333                 match self.cnt.load(atomics::SeqCst) {
334                     n if n != DISCONNECTED => Err(Empty),
335                     _ => {
336                         match self.queue.pop() {
337                             mpsc::Data(t) => Ok(t),
338                             mpsc::Empty => Err(Disconnected),
339                             // with no senders, an inconsistency is impossible.
340                             mpsc::Inconsistent => unreachable!(),
341                         }
342                     }
343                 }
344             }
345         }
346     }
347
348     // Prepares this shared packet for a channel clone, essentially just bumping
349     // a refcount.
350     pub fn clone_chan(&mut self) {
351         self.channels.fetch_add(1, atomics::SeqCst);
352     }
353
354     // Decrement the reference count on a channel. This is called whenever a
355     // Chan is dropped and may end up waking up a receiver. It's the receiver's
356     // responsibility on the other end to figure out that we've disconnected.
357     pub fn drop_chan(&mut self) {
358         match self.channels.fetch_sub(1, atomics::SeqCst) {
359             1 => {}
360             n if n > 1 => return,
361             n => fail!("bad number of channels left {}", n),
362         }
363
364         match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
365             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
366             DISCONNECTED => {}
367             n => { assert!(n >= 0); }
368         }
369     }
370
371     // See the long discussion inside of stream.rs for why the queue is drained,
372     // and why it is done in this fashion.
373     pub fn drop_port(&mut self) {
374         self.port_dropped.store(true, atomics::SeqCst);
375         let mut steals = self.steals;
376         while {
377             let cnt = self.cnt.compare_and_swap(
378                             steals, DISCONNECTED, atomics::SeqCst);
379             cnt != DISCONNECTED && cnt != steals
380         } {
381             // See the discussion in 'try_recv' for why we yield
382             // control of this thread.
383             loop {
384                 match self.queue.pop() {
385                     mpsc::Data(..) => { steals += 1; }
386                     mpsc::Empty | mpsc::Inconsistent => break,
387                 }
388             }
389         }
390     }
391
392     // Consumes ownership of the 'to_wake' field.
393     fn take_to_wake(&mut self) -> BlockedTask {
394         let task = self.to_wake.load(atomics::SeqCst);
395         self.to_wake.store(0, atomics::SeqCst);
396         assert!(task != 0);
397         unsafe { BlockedTask::cast_from_uint(task) }
398     }
399
400     ////////////////////////////////////////////////////////////////////////////
401     // select implementation
402     ////////////////////////////////////////////////////////////////////////////
403
404     // Helper function for select, tests whether this port can receive without
405     // blocking (obviously not an atomic decision).
406     //
407     // This is different than the stream version because there's no need to peek
408     // at the queue, we can just look at the local count.
409     pub fn can_recv(&mut self) -> bool {
410         let cnt = self.cnt.load(atomics::SeqCst);
411         cnt == DISCONNECTED || cnt - self.steals > 0
412     }
413
414     // increment the count on the channel (used for selection)
415     fn bump(&mut self, amt: int) -> int {
416         match self.cnt.fetch_add(amt, atomics::SeqCst) {
417             DISCONNECTED => {
418                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
419                 DISCONNECTED
420             }
421             n => n
422         }
423     }
424
425     // Inserts the blocked task for selection on this port, returning it back if
426     // the port already has data on it.
427     //
428     // The code here is the same as in stream.rs, except that it doesn't need to
429     // peek at the channel to see if an upgrade is pending.
430     pub fn start_selection(&mut self,
431                            task: BlockedTask) -> Result<(), BlockedTask> {
432         match self.decrement(task) {
433             Ok(()) => Ok(()),
434             Err(task) => {
435                 let prev = self.bump(1);
436                 assert!(prev == DISCONNECTED || prev >= 0);
437                 return Err(task);
438             }
439         }
440     }
441
442     // Cancels a previous task waiting on this port, returning whether there's
443     // data on the port.
444     //
445     // This is similar to the stream implementation (hence fewer comments), but
446     // uses a different value for the "steals" variable.
447     pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
448         // Before we do anything else, we bounce on this lock. The reason for
449         // doing this is to ensure that any upgrade-in-progress is gone and
450         // done with. Without this bounce, we can race with inherit_blocker
451         // about looking at and dealing with to_wake. Once we have acquired the
452         // lock, we are guaranteed that inherit_blocker is done.
453         unsafe {
454             let _guard = self.select_lock.lock();
455         }
456
457         // Like the stream implementation, we want to make sure that the count
458         // on the channel goes non-negative. We don't know how negative the
459         // stream currently is, so instead of using a steal value of 1, we load
460         // the channel count and figure out what we should do to make it
461         // positive.
462         let steals = {
463             let cnt = self.cnt.load(atomics::SeqCst);
464             if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
465         };
466         let prev = self.bump(steals + 1);
467
468         if prev == DISCONNECTED {
469             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
470             true
471         } else {
472             let cur = prev + steals + 1;
473             assert!(cur >= 0);
474             if prev < 0 {
475                 self.take_to_wake().trash();
476             } else {
477                 while self.to_wake.load(atomics::SeqCst) != 0 {
478                     Thread::yield_now();
479                 }
480             }
481             // if the number of steals is -1, it was the pre-emptive -1 steal
482             // count from when we inherited a blocker. This is fine because
483             // we're just going to overwrite it with a real value.
484             assert!(self.steals == 0 || self.steals == -1);
485             self.steals = steals;
486             prev >= 0
487         }
488     }
489 }
490
491 #[unsafe_destructor]
492 impl<T: Send> Drop for Packet<T> {
493     fn drop(&mut self) {
494         // Note that this load is not only an assert for correctness about
495         // disconnection, but also a proper fence before the read of
496         // `to_wake`, so this assert cannot be removed with also removing
497         // the `to_wake` assert.
498         assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
499         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
500         assert_eq!(self.channels.load(atomics::SeqCst), 0);
501     }
502 }