]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/stream.rs
auto merge of #13967 : richo/rust/features/ICE-fails, r=alexcrichton
[rust.git] / src / libstd / comm / stream.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Stream channels
12 ///
13 /// This is the flavor of channels which are optimized for one sender and one
14 /// receiver. The sender will be upgraded to a shared channel if the channel is
15 /// cloned.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module.
19
20 use cmp;
21 use comm::Receiver;
22 use int;
23 use iter::Iterator;
24 use kinds::Send;
25 use ops::Drop;
26 use option::{Some, None};
27 use owned::Box;
28 use result::{Ok, Err, Result};
29 use rt::local::Local;
30 use rt::task::{Task, BlockedTask};
31 use rt::thread::Thread;
32 use spsc = sync::spsc_queue;
33 use sync::atomics;
34
35 static DISCONNECTED: int = int::MIN;
36 #[cfg(test)]
37 static MAX_STEALS: int = 5;
38 #[cfg(not(test))]
39 static MAX_STEALS: int = 1 << 20;
40
41 pub struct Packet<T> {
42     queue: spsc::Queue<Message<T>>, // internal queue for all message
43
44     cnt: atomics::AtomicInt, // How many items are on this channel
45     steals: int, // How many times has a port received without blocking?
46     to_wake: atomics::AtomicUint, // Task to wake up
47
48     port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
49 }
50
51 pub enum Failure<T> {
52     Empty,
53     Disconnected,
54     Upgraded(Receiver<T>),
55 }
56
57 pub enum UpgradeResult {
58     UpSuccess,
59     UpDisconnected,
60     UpWoke(BlockedTask),
61 }
62
63 pub enum SelectionResult<T> {
64     SelSuccess,
65     SelCanceled(BlockedTask),
66     SelUpgraded(BlockedTask, Receiver<T>),
67 }
68
69 // Any message could contain an "upgrade request" to a new shared port, so the
70 // internal queue it's a queue of T, but rather Message<T>
71 enum Message<T> {
72     Data(T),
73     GoUp(Receiver<T>),
74 }
75
76 impl<T: Send> Packet<T> {
77     pub fn new() -> Packet<T> {
78         Packet {
79             queue: spsc::Queue::new(128),
80
81             cnt: atomics::AtomicInt::new(0),
82             steals: 0,
83             to_wake: atomics::AtomicUint::new(0),
84
85             port_dropped: atomics::AtomicBool::new(false),
86         }
87     }
88
89
90     pub fn send(&mut self, t: T) -> Result<(), T> {
91         // If the other port has deterministically gone away, then definitely
92         // must return the data back up the stack. Otherwise, the data is
93         // considered as being sent.
94         if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
95
96         match self.do_send(Data(t)) {
97             UpSuccess | UpDisconnected => {},
98             UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
99         }
100         Ok(())
101     }
102     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
103         // If the port has gone away, then there's no need to proceed any
104         // further.
105         if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
106
107         self.do_send(GoUp(up))
108     }
109
110     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
111         self.queue.push(t);
112         match self.cnt.fetch_add(1, atomics::SeqCst) {
113             // As described in the mod's doc comment, -1 == wakeup
114             -1 => UpWoke(self.take_to_wake()),
115             // As as described before, SPSC queues must be >= -2
116             -2 => UpSuccess,
117
118             // Be sure to preserve the disconnected state, and the return value
119             // in this case is going to be whether our data was received or not.
120             // This manifests itself on whether we have an empty queue or not.
121             //
122             // Primarily, are required to drain the queue here because the port
123             // will never remove this data. We can only have at most one item to
124             // drain (the port drains the rest).
125             DISCONNECTED => {
126                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
127                 let first = self.queue.pop();
128                 let second = self.queue.pop();
129                 assert!(second.is_none());
130
131                 match first {
132                     Some(..) => UpSuccess,  // we failed to send the data
133                     None => UpDisconnected, // we successfully sent data
134                 }
135             }
136
137             // Otherwise we just sent some data on a non-waiting queue, so just
138             // make sure the world is sane and carry on!
139             n => { assert!(n >= 0); UpSuccess }
140         }
141     }
142
143     // Consumes ownership of the 'to_wake' field.
144     fn take_to_wake(&mut self) -> BlockedTask {
145         let task = self.to_wake.load(atomics::SeqCst);
146         self.to_wake.store(0, atomics::SeqCst);
147         assert!(task != 0);
148         unsafe { BlockedTask::cast_from_uint(task) }
149     }
150
151     // Decrements the count on the channel for a sleeper, returning the sleeper
152     // back if it shouldn't sleep. Note that this is the location where we take
153     // steals into account.
154     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
155         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
156         let n = unsafe { task.cast_to_uint() };
157         self.to_wake.store(n, atomics::SeqCst);
158
159         let steals = self.steals;
160         self.steals = 0;
161
162         match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
163             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
164             // If we factor in our steals and notice that the channel has no
165             // data, we successfully sleep
166             n => {
167                 assert!(n >= 0);
168                 if n - steals <= 0 { return Ok(()) }
169             }
170         }
171
172         self.to_wake.store(0, atomics::SeqCst);
173         Err(unsafe { BlockedTask::cast_from_uint(n) })
174     }
175
176     pub fn recv(&mut self) -> Result<T, Failure<T>> {
177         // Optimistic preflight check (scheduling is expensive).
178         match self.try_recv() {
179             Err(Empty) => {}
180             data => return data,
181         }
182
183         // Welp, our channel has no data. Deschedule the current task and
184         // initiate the blocking protocol.
185         let task: Box<Task> = Local::take();
186         task.deschedule(1, |task| {
187             self.decrement(task)
188         });
189
190         match self.try_recv() {
191             // Messages which actually popped from the queue shouldn't count as
192             // a steal, so offset the decrement here (we already have our
193             // "steal" factored into the channel count above).
194             data @ Ok(..) |
195             data @ Err(Upgraded(..)) => {
196                 self.steals -= 1;
197                 data
198             }
199
200             data => data,
201         }
202     }
203
204     pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
205         match self.queue.pop() {
206             // If we stole some data, record to that effect (this will be
207             // factored into cnt later on).
208             //
209             // Note that we don't allow steals to grow without bound in order to
210             // prevent eventual overflow of either steals or cnt as an overflow
211             // would have catastrophic results. Sometimes, steals > cnt, but
212             // other times cnt > steals, so we don't know the relation between
213             // steals and cnt. This code path is executed only rarely, so we do
214             // a pretty slow operation, of swapping 0 into cnt, taking steals
215             // down as much as possible (without going negative), and then
216             // adding back in whatever we couldn't factor into steals.
217             Some(data) => {
218                 if self.steals > MAX_STEALS {
219                     match self.cnt.swap(0, atomics::SeqCst) {
220                         DISCONNECTED => {
221                             self.cnt.store(DISCONNECTED, atomics::SeqCst);
222                         }
223                         n => {
224                             let m = cmp::min(n, self.steals);
225                             self.steals -= m;
226                             self.bump(n - m);
227                         }
228                     }
229                     assert!(self.steals >= 0);
230                 }
231                 self.steals += 1;
232                 match data {
233                     Data(t) => Ok(t),
234                     GoUp(up) => Err(Upgraded(up)),
235                 }
236             }
237
238             None => {
239                 match self.cnt.load(atomics::SeqCst) {
240                     n if n != DISCONNECTED => Err(Empty),
241
242                     // This is a little bit of a tricky case. We failed to pop
243                     // data above, and then we have viewed that the channel is
244                     // disconnected. In this window more data could have been
245                     // sent on the channel. It doesn't really make sense to
246                     // return that the channel is disconnected when there's
247                     // actually data on it, so be extra sure there's no data by
248                     // popping one more time.
249                     //
250                     // We can ignore steals because the other end is
251                     // disconnected and we'll never need to really factor in our
252                     // steals again.
253                     _ => {
254                         match self.queue.pop() {
255                             Some(Data(t)) => Ok(t),
256                             Some(GoUp(up)) => Err(Upgraded(up)),
257                             None => Err(Disconnected),
258                         }
259                     }
260                 }
261             }
262         }
263     }
264
265     pub fn drop_chan(&mut self) {
266         // Dropping a channel is pretty simple, we just flag it as disconnected
267         // and then wakeup a blocker if there is one.
268         match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
269             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
270             DISCONNECTED => {}
271             n => { assert!(n >= 0); }
272         }
273     }
274
275     pub fn drop_port(&mut self) {
276         // Dropping a port seems like a fairly trivial thing. In theory all we
277         // need to do is flag that we're disconnected and then everything else
278         // can take over (we don't have anyone to wake up).
279         //
280         // The catch for Ports is that we want to drop the entire contents of
281         // the queue. There are multiple reasons for having this property, the
282         // largest of which is that if another chan is waiting in this channel
283         // (but not received yet), then waiting on that port will cause a
284         // deadlock.
285         //
286         // So if we accept that we must now destroy the entire contents of the
287         // queue, this code may make a bit more sense. The tricky part is that
288         // we can't let any in-flight sends go un-dropped, we have to make sure
289         // *everything* is dropped and nothing new will come onto the channel.
290
291         // The first thing we do is set a flag saying that we're done for. All
292         // sends are gated on this flag, so we're immediately guaranteed that
293         // there are a bounded number of active sends that we'll have to deal
294         // with.
295         self.port_dropped.store(true, atomics::SeqCst);
296
297         // Now that we're guaranteed to deal with a bounded number of senders,
298         // we need to drain the queue. This draining process happens atomically
299         // with respect to the "count" of the channel. If the count is nonzero
300         // (with steals taken into account), then there must be data on the
301         // channel. In this case we drain everything and then try again. We will
302         // continue to fail while active senders send data while we're dropping
303         // data, but eventually we're guaranteed to break out of this loop
304         // (because there is a bounded number of senders).
305         let mut steals = self.steals;
306         while {
307             let cnt = self.cnt.compare_and_swap(
308                             steals, DISCONNECTED, atomics::SeqCst);
309             cnt != DISCONNECTED && cnt != steals
310         } {
311             loop {
312                 match self.queue.pop() {
313                     Some(..) => { steals += 1; }
314                     None => break
315                 }
316             }
317         }
318
319         // At this point in time, we have gated all future senders from sending,
320         // and we have flagged the channel as being disconnected. The senders
321         // still have some responsibility, however, because some sends may not
322         // complete until after we flag the disconnection. There are more
323         // details in the sending methods that see DISCONNECTED
324     }
325
326     ////////////////////////////////////////////////////////////////////////////
327     // select implementation
328     ////////////////////////////////////////////////////////////////////////////
329
330     // Tests to see whether this port can receive without blocking. If Ok is
331     // returned, then that's the answer. If Err is returned, then the returned
332     // port needs to be queried instead (an upgrade happened)
333     pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
334         // We peek at the queue to see if there's anything on it, and we use
335         // this return value to determine if we should pop from the queue and
336         // upgrade this channel immediately. If it looks like we've got an
337         // upgrade pending, then go through the whole recv rigamarole to update
338         // the internal state.
339         match self.queue.peek() {
340             Some(&GoUp(..)) => {
341                 match self.recv() {
342                     Err(Upgraded(port)) => Err(port),
343                     _ => unreachable!(),
344                 }
345             }
346             Some(..) => Ok(true),
347             None => Ok(false)
348         }
349     }
350
351     // increment the count on the channel (used for selection)
352     fn bump(&mut self, amt: int) -> int {
353         match self.cnt.fetch_add(amt, atomics::SeqCst) {
354             DISCONNECTED => {
355                 self.cnt.store(DISCONNECTED, atomics::SeqCst);
356                 DISCONNECTED
357             }
358             n => n
359         }
360     }
361
362     // Attempts to start selecting on this port. Like a oneshot, this can fail
363     // immediately because of an upgrade.
364     pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
365         match self.decrement(task) {
366             Ok(()) => SelSuccess,
367             Err(task) => {
368                 let ret = match self.queue.peek() {
369                     Some(&GoUp(..)) => {
370                         match self.queue.pop() {
371                             Some(GoUp(port)) => SelUpgraded(task, port),
372                             _ => unreachable!(),
373                         }
374                     }
375                     Some(..) => SelCanceled(task),
376                     None => SelCanceled(task),
377                 };
378                 // Undo our decrement above, and we should be guaranteed that the
379                 // previous value is positive because we're not going to sleep
380                 let prev = self.bump(1);
381                 assert!(prev == DISCONNECTED || prev >= 0);
382                 return ret;
383             }
384         }
385     }
386
387     // Removes a previous task from being blocked in this port
388     pub fn abort_selection(&mut self,
389                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
390         // If we're aborting selection after upgrading from a oneshot, then
391         // we're guarantee that no one is waiting. The only way that we could
392         // have seen the upgrade is if data was actually sent on the channel
393         // half again. For us, this means that there is guaranteed to be data on
394         // this channel. Furthermore, we're guaranteed that there was no
395         // start_selection previously, so there's no need to modify `self.cnt`
396         // at all.
397         //
398         // Hence, because of these invariants, we immediately return `Ok(true)`.
399         // Note that the data may not actually be sent on the channel just yet.
400         // The other end could have flagged the upgrade but not sent data to
401         // this end. This is fine because we know it's a small bounded windows
402         // of time until the data is actually sent.
403         if was_upgrade {
404             assert_eq!(self.steals, 0);
405             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
406             return Ok(true)
407         }
408
409         // We want to make sure that the count on the channel goes non-negative,
410         // and in the stream case we can have at most one steal, so just assume
411         // that we had one steal.
412         let steals = 1;
413         let prev = self.bump(steals + 1);
414
415         // If we were previously disconnected, then we know for sure that there
416         // is no task in to_wake, so just keep going
417         let has_data = if prev == DISCONNECTED {
418             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
419             true // there is data, that data is that we're disconnected
420         } else {
421             let cur = prev + steals + 1;
422             assert!(cur >= 0);
423
424             // If the previous count was negative, then we just made things go
425             // positive, hence we passed the -1 boundary and we're responsible
426             // for removing the to_wake() field and trashing it.
427             //
428             // If the previous count was positive then we're in a tougher
429             // situation. A possible race is that a sender just incremented
430             // through -1 (meaning it's going to try to wake a task up), but it
431             // hasn't yet read the to_wake. In order to prevent a future recv()
432             // from waking up too early (this sender picking up the plastered
433             // over to_wake), we spin loop here waiting for to_wake to be 0.
434             // Note that this entire select() implementation needs an overhaul,
435             // and this is *not* the worst part of it, so this is not done as a
436             // final solution but rather out of necessity for now to get
437             // something working.
438             if prev < 0 {
439                 self.take_to_wake().trash();
440             } else {
441                 while self.to_wake.load(atomics::SeqCst) != 0 {
442                     Thread::yield_now();
443                 }
444             }
445             assert_eq!(self.steals, 0);
446             self.steals = steals;
447
448             // if we were previously positive, then there's surely data to
449             // receive
450             prev >= 0
451         };
452
453         // Now that we've determined that this queue "has data", we peek at the
454         // queue to see if the data is an upgrade or not. If it's an upgrade,
455         // then we need to destroy this port and abort selection on the
456         // upgraded port.
457         if has_data {
458             match self.queue.peek() {
459                 Some(&GoUp(..)) => {
460                     match self.queue.pop() {
461                         Some(GoUp(port)) => Err(port),
462                         _ => unreachable!(),
463                     }
464                 }
465                 _ => Ok(true),
466             }
467         } else {
468             Ok(false)
469         }
470     }
471 }
472
473 #[unsafe_destructor]
474 impl<T: Send> Drop for Packet<T> {
475     fn drop(&mut self) {
476         // Note that this load is not only an assert for correctness about
477         // disconnection, but also a proper fence before the read of
478         // `to_wake`, so this assert cannot be removed with also removing
479         // the `to_wake` assert.
480         assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
481         assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
482     }
483 }