]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/stream.rs
6f337f1730950d3cfb9288d32e76c028fe4c1fa4
[rust.git] / src / libsync / comm / stream.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Stream channels
12 ///
13 /// This is the flavor of channels which are optimized for one sender and one
14 /// receiver. The sender will be upgraded to a shared channel if the channel is
15 /// cloned.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module.
19
20 use core::prelude::*;
21
22 use alloc::owned::Box;
23 use core::cmp;
24 use core::int;
25 use rustrt::local::Local;
26 use rustrt::task::{Task, BlockedTask};
27 use rustrt::thread::Thread;
28
29 use atomics;
30 use comm::Receiver;
31 use spsc = spsc_queue;
32
33 static DISCONNECTED: int = int::MIN;
34 #[cfg(test)]
35 static MAX_STEALS: int = 5;
36 #[cfg(not(test))]
37 static MAX_STEALS: int = 1 << 20;
38
39 pub struct Packet<T> {
40     queue: spsc::Queue<Message<T>>, // internal queue for all message
41
42     cnt: atomics::AtomicInt, // How many items are on this channel
43     steals: int, // How many times has a port received without blocking?
44     to_wake: atomics::AtomicUint, // Task to wake up
45
46     port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
47 }
48
49 pub enum Failure<T> {
50     Empty,
51     Disconnected,
52     Upgraded(Receiver<T>),
53 }
54
55 pub enum UpgradeResult {
56     UpSuccess,
57     UpDisconnected,
58     UpWoke(BlockedTask),
59 }
60
61 pub enum SelectionResult<T> {
62     SelSuccess,
63     SelCanceled(BlockedTask),
64     SelUpgraded(BlockedTask, Receiver<T>),
65 }
66
67 // Any message could contain an "upgrade request" to a new shared port, so the
68 // internal queue it's a queue of T, but rather Message<T>
69 enum Message<T> {
70     Data(T),
71     GoUp(Receiver<T>),
72 }
73
74 impl<T: Send> Packet<T> {
75     pub fn new() -> Packet<T> {
76         Packet {
77             queue: spsc::Queue::new(128),
78
79             cnt: atomics::AtomicInt::new(0),
80             steals: 0,
81             to_wake: atomics::AtomicUint::new(0),
82
83             port_dropped: atomics::AtomicBool::new(false),
84         }
85     }
86
87
88     pub fn send(&mut self, t: T) -> Result<(), T> {
89         // If the other port has deterministically gone away, then definitely
90         // must return the data back up the stack. Otherwise, the data is
91         // considered as being sent.
92         if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
93
94         match self.do_send(Data(t)) {
95             UpSuccess | UpDisconnected => {},
96             UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
97         }
98         Ok(())
99     }
100     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
101         // If the port has gone away, then there's no need to proceed any
102         // further.
103         if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
104
105         self.do_send(GoUp(up))
106     }
107
108     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
109         self.queue.push(t);
110         match self.cnt.fetch_add(1, atomics::SeqCst) {
111             // As described in the mod's doc comment, -1 == wakeup
112             -1 => UpWoke(self.take_to_wake()),
113             // As as described before, SPSC queues must be >= -2
114             -2 => UpSuccess,
115
116             // Be sure to preserve the disconnected state, and the return value
117             // in this case is going to be whether our data was received or not.
118             // This manifests itself on whether we have an empty queue or not.
119             //
120             // Primarily, are required to drain the queue here because the port
121             // will never remove this data. We can only have at most one item to
122             // drain (the port drains the rest).
123             DISCONNECTED => {
124                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
125                 let first = self.queue.pop();
126                 let second = self.queue.pop();
127                 assert!(second.is_none());
128
129                 match first {
130                     Some(..) => UpSuccess,  // we failed to send the data
131                     None => UpDisconnected, // we successfully sent data
132                 }
133             }
134
135             // Otherwise we just sent some data on a non-waiting queue, so just
136             // make sure the world is sane and carry on!
137             n => { assert!(n >= 0); UpSuccess }
138         }
139     }
140
141     // Consumes ownership of the 'to_wake' field.
142     fn take_to_wake(&mut self) -> BlockedTask {
143         let task = self.to_wake.load(atomics::SeqCst);
144         self.to_wake.store(0, atomics::SeqCst);
145         assert!(task != 0);
146         unsafe { BlockedTask::cast_from_uint(task) }
147     }
148
149     // Decrements the count on the channel for a sleeper, returning the sleeper
150     // back if it shouldn't sleep. Note that this is the location where we take
151     // steals into account.
152     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
153         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
154         let n = unsafe { task.cast_to_uint() };
155         self.to_wake.store(n, atomics::SeqCst);
156
157         let steals = self.steals;
158         self.steals = 0;
159
160         match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
161             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
162             // If we factor in our steals and notice that the channel has no
163             // data, we successfully sleep
164             n => {
165                 assert!(n >= 0);
166                 if n - steals <= 0 { return Ok(()) }
167             }
168         }
169
170         self.to_wake.store(0, atomics::SeqCst);
171         Err(unsafe { BlockedTask::cast_from_uint(n) })
172     }
173
174     pub fn recv(&mut self) -> Result<T, Failure<T>> {
175         // Optimistic preflight check (scheduling is expensive).
176         match self.try_recv() {
177             Err(Empty) => {}
178             data => return data,
179         }
180
181         // Welp, our channel has no data. Deschedule the current task and
182         // initiate the blocking protocol.
183         let task: Box<Task> = Local::take();
184         task.deschedule(1, |task| {
185             self.decrement(task)
186         });
187
188         match self.try_recv() {
189             // Messages which actually popped from the queue shouldn't count as
190             // a steal, so offset the decrement here (we already have our
191             // "steal" factored into the channel count above).
192             data @ Ok(..) |
193             data @ Err(Upgraded(..)) => {
194                 self.steals -= 1;
195                 data
196             }
197
198             data => data,
199         }
200     }
201
202     pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
203         match self.queue.pop() {
204             // If we stole some data, record to that effect (this will be
205             // factored into cnt later on).
206             //
207             // Note that we don't allow steals to grow without bound in order to
208             // prevent eventual overflow of either steals or cnt as an overflow
209             // would have catastrophic results. Sometimes, steals > cnt, but
210             // other times cnt > steals, so we don't know the relation between
211             // steals and cnt. This code path is executed only rarely, so we do
212             // a pretty slow operation, of swapping 0 into cnt, taking steals
213             // down as much as possible (without going negative), and then
214             // adding back in whatever we couldn't factor into steals.
215             Some(data) => {
216                 if self.steals > MAX_STEALS {
217                     match self.cnt.swap(0, atomics::SeqCst) {
218                         DISCONNECTED => {
219                             self.cnt.store(DISCONNECTED, atomics::SeqCst);
220                         }
221                         n => {
222                             let m = cmp::min(n, self.steals);
223                             self.steals -= m;
224                             self.bump(n - m);
225                         }
226                     }
227                     assert!(self.steals >= 0);
228                 }
229                 self.steals += 1;
230                 match data {
231                     Data(t) => Ok(t),
232                     GoUp(up) => Err(Upgraded(up)),
233                 }
234             }
235
236             None => {
237                 match self.cnt.load(atomics::SeqCst) {
238                     n if n != DISCONNECTED => Err(Empty),
239
240                     // This is a little bit of a tricky case. We failed to pop
241                     // data above, and then we have viewed that the channel is
242                     // disconnected. In this window more data could have been
243                     // sent on the channel. It doesn't really make sense to
244                     // return that the channel is disconnected when there's
245                     // actually data on it, so be extra sure there's no data by
246                     // popping one more time.
247                     //
248                     // We can ignore steals because the other end is
249                     // disconnected and we'll never need to really factor in our
250                     // steals again.
251                     _ => {
252                         match self.queue.pop() {
253                             Some(Data(t)) => Ok(t),
254                             Some(GoUp(up)) => Err(Upgraded(up)),
255                             None => Err(Disconnected),
256                         }
257                     }
258                 }
259             }
260         }
261     }
262
263     pub fn drop_chan(&mut self) {
264         // Dropping a channel is pretty simple, we just flag it as disconnected
265         // and then wakeup a blocker if there is one.
266         match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
267             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
268             DISCONNECTED => {}
269             n => { assert!(n >= 0); }
270         }
271     }
272
273     pub fn drop_port(&mut self) {
274         // Dropping a port seems like a fairly trivial thing. In theory all we
275         // need to do is flag that we're disconnected and then everything else
276         // can take over (we don't have anyone to wake up).
277         //
278         // The catch for Ports is that we want to drop the entire contents of
279         // the queue. There are multiple reasons for having this property, the
280         // largest of which is that if another chan is waiting in this channel
281         // (but not received yet), then waiting on that port will cause a
282         // deadlock.
283         //
284         // So if we accept that we must now destroy the entire contents of the
285         // queue, this code may make a bit more sense. The tricky part is that
286         // we can't let any in-flight sends go un-dropped, we have to make sure
287         // *everything* is dropped and nothing new will come onto the channel.
288
289         // The first thing we do is set a flag saying that we're done for. All
290         // sends are gated on this flag, so we're immediately guaranteed that
291         // there are a bounded number of active sends that we'll have to deal
292         // with.
293         self.port_dropped.store(true, atomics::SeqCst);
294
295         // Now that we're guaranteed to deal with a bounded number of senders,
296         // we need to drain the queue. This draining process happens atomically
297         // with respect to the "count" of the channel. If the count is nonzero
298         // (with steals taken into account), then there must be data on the
299         // channel. In this case we drain everything and then try again. We will
300         // continue to fail while active senders send data while we're dropping
301         // data, but eventually we're guaranteed to break out of this loop
302         // (because there is a bounded number of senders).
303         let mut steals = self.steals;
304         while {
305             let cnt = self.cnt.compare_and_swap(
306                             steals, DISCONNECTED, atomics::SeqCst);
307             cnt != DISCONNECTED && cnt != steals
308         } {
309             loop {
310                 match self.queue.pop() {
311                     Some(..) => { steals += 1; }
312                     None => break
313                 }
314             }
315         }
316
317         // At this point in time, we have gated all future senders from sending,
318         // and we have flagged the channel as being disconnected. The senders
319         // still have some responsibility, however, because some sends may not
320         // complete until after we flag the disconnection. There are more
321         // details in the sending methods that see DISCONNECTED
322     }
323
324     ////////////////////////////////////////////////////////////////////////////
325     // select implementation
326     ////////////////////////////////////////////////////////////////////////////
327
328     // Tests to see whether this port can receive without blocking. If Ok is
329     // returned, then that's the answer. If Err is returned, then the returned
330     // port needs to be queried instead (an upgrade happened)
331     pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
332         // We peek at the queue to see if there's anything on it, and we use
333         // this return value to determine if we should pop from the queue and
334         // upgrade this channel immediately. If it looks like we've got an
335         // upgrade pending, then go through the whole recv rigamarole to update
336         // the internal state.
337         match self.queue.peek() {
338             Some(&GoUp(..)) => {
339                 match self.recv() {
340                     Err(Upgraded(port)) => Err(port),
341                     _ => unreachable!(),
342                 }
343             }
344             Some(..) => Ok(true),
345             None => Ok(false)
346         }
347     }
348
349     // increment the count on the channel (used for selection)
350     fn bump(&mut self, amt: int) -> int {
351         match self.cnt.fetch_add(amt, atomics::SeqCst) {
352             DISCONNECTED => {
353                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
354                 DISCONNECTED
355             }
356             n => n
357         }
358     }
359
360     // Attempts to start selecting on this port. Like a oneshot, this can fail
361     // immediately because of an upgrade.
362     pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
363         match self.decrement(task) {
364             Ok(()) => SelSuccess,
365             Err(task) => {
366                 let ret = match self.queue.peek() {
367                     Some(&GoUp(..)) => {
368                         match self.queue.pop() {
369                             Some(GoUp(port)) => SelUpgraded(task, port),
370                             _ => unreachable!(),
371                         }
372                     }
373                     Some(..) => SelCanceled(task),
374                     None => SelCanceled(task),
375                 };
376                 // Undo our decrement above, and we should be guaranteed that the
377                 // previous value is positive because we're not going to sleep
378                 let prev = self.bump(1);
379                 assert!(prev == DISCONNECTED || prev >= 0);
380                 return ret;
381             }
382         }
383     }
384
385     // Removes a previous task from being blocked in this port
386     pub fn abort_selection(&mut self,
387                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
388         // If we're aborting selection after upgrading from a oneshot, then
389         // we're guarantee that no one is waiting. The only way that we could
390         // have seen the upgrade is if data was actually sent on the channel
391         // half again. For us, this means that there is guaranteed to be data on
392         // this channel. Furthermore, we're guaranteed that there was no
393         // start_selection previously, so there's no need to modify `self.cnt`
394         // at all.
395         //
396         // Hence, because of these invariants, we immediately return `Ok(true)`.
397         // Note that the data may not actually be sent on the channel just yet.
398         // The other end could have flagged the upgrade but not sent data to
399         // this end. This is fine because we know it's a small bounded windows
400         // of time until the data is actually sent.
401         if was_upgrade {
402             assert_eq!(self.steals, 0);
403             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
404             return Ok(true)
405         }
406
407         // We want to make sure that the count on the channel goes non-negative,
408         // and in the stream case we can have at most one steal, so just assume
409         // that we had one steal.
410         let steals = 1;
411         let prev = self.bump(steals + 1);
412
413         // If we were previously disconnected, then we know for sure that there
414         // is no task in to_wake, so just keep going
415         let has_data = if prev == DISCONNECTED {
416             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
417             true // there is data, that data is that we're disconnected
418         } else {
419             let cur = prev + steals + 1;
420             assert!(cur >= 0);
421
422             // If the previous count was negative, then we just made things go
423             // positive, hence we passed the -1 boundary and we're responsible
424             // for removing the to_wake() field and trashing it.
425             //
426             // If the previous count was positive then we're in a tougher
427             // situation. A possible race is that a sender just incremented
428             // through -1 (meaning it's going to try to wake a task up), but it
429             // hasn't yet read the to_wake. In order to prevent a future recv()
430             // from waking up too early (this sender picking up the plastered
431             // over to_wake), we spin loop here waiting for to_wake to be 0.
432             // Note that this entire select() implementation needs an overhaul,
433             // and this is *not* the worst part of it, so this is not done as a
434             // final solution but rather out of necessity for now to get
435             // something working.
436             if prev < 0 {
437                 self.take_to_wake().trash();
438             } else {
439                 while self.to_wake.load(atomics::SeqCst) != 0 {
440                     Thread::yield_now();
441                 }
442             }
443             assert_eq!(self.steals, 0);
444             self.steals = steals;
445
446             // if we were previously positive, then there's surely data to
447             // receive
448             prev >= 0
449         };
450
451         // Now that we've determined that this queue "has data", we peek at the
452         // queue to see if the data is an upgrade or not. If it's an upgrade,
453         // then we need to destroy this port and abort selection on the
454         // upgraded port.
455         if has_data {
456             match self.queue.peek() {
457                 Some(&GoUp(..)) => {
458                     match self.queue.pop() {
459                         Some(GoUp(port)) => Err(port),
460                         _ => unreachable!(),
461                     }
462                 }
463                 _ => Ok(true),
464             }
465         } else {
466             Ok(false)
467         }
468     }
469 }
470
471 #[unsafe_destructor]
472 impl<T: Send> Drop for Packet<T> {
473     fn drop(&mut self) {
474         // Note that this load is not only an assert for correctness about
475         // disconnection, but also a proper fence before the read of
476         // `to_wake`, so this assert cannot be removed with also removing
477         // the `to_wake` assert.
478         assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
479         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
480     }
481 }