]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/stream.rs
Fall out of the std::sync rewrite
[rust.git] / src / libstd / comm / stream.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Stream channels
12 ///
13 /// This is the flavor of channels which are optimized for one sender and one
14 /// receiver. The sender will be upgraded to a shared channel if the channel is
15 /// cloned.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module.
19
20 pub use self::Failure::*;
21 pub use self::UpgradeResult::*;
22 pub use self::SelectionResult::*;
23 use self::Message::*;
24
25 use core::prelude::*;
26
27 use alloc::boxed::Box;
28 use core::cmp;
29 use core::int;
30 use rustrt::local::Local;
31 use rustrt::task::{Task, BlockedTask};
32 use rustrt::thread::Thread;
33
34 use sync::atomic;
35 use comm::spsc_queue as spsc;
36 use comm::Receiver;
37
38 const DISCONNECTED: int = int::MIN;
39 #[cfg(test)]
40 const MAX_STEALS: int = 5;
41 #[cfg(not(test))]
42 const MAX_STEALS: int = 1 << 20;
43
44 pub struct Packet<T> {
45     queue: spsc::Queue<Message<T>>, // internal queue for all message
46
47     cnt: atomic::AtomicInt, // How many items are on this channel
48     steals: int, // How many times has a port received without blocking?
49     to_wake: atomic::AtomicUint, // Task to wake up
50
51     port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
52 }
53
54 pub enum Failure<T> {
55     Empty,
56     Disconnected,
57     Upgraded(Receiver<T>),
58 }
59
60 pub enum UpgradeResult {
61     UpSuccess,
62     UpDisconnected,
63     UpWoke(BlockedTask),
64 }
65
66 pub enum SelectionResult<T> {
67     SelSuccess,
68     SelCanceled(BlockedTask),
69     SelUpgraded(BlockedTask, Receiver<T>),
70 }
71
72 // Any message could contain an "upgrade request" to a new shared port, so the
73 // internal queue it's a queue of T, but rather Message<T>
74 enum Message<T> {
75     Data(T),
76     GoUp(Receiver<T>),
77 }
78
79 impl<T: Send> Packet<T> {
80     pub fn new() -> Packet<T> {
81         Packet {
82             queue: unsafe { spsc::Queue::new(128) },
83
84             cnt: atomic::AtomicInt::new(0),
85             steals: 0,
86             to_wake: atomic::AtomicUint::new(0),
87
88             port_dropped: atomic::AtomicBool::new(false),
89         }
90     }
91
92
93     pub fn send(&mut self, t: T) -> Result<(), T> {
94         // If the other port has deterministically gone away, then definitely
95         // must return the data back up the stack. Otherwise, the data is
96         // considered as being sent.
97         if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
98
99         match self.do_send(Data(t)) {
100             UpSuccess | UpDisconnected => {},
101             UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
102         }
103         Ok(())
104     }
105     pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
106         // If the port has gone away, then there's no need to proceed any
107         // further.
108         if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected }
109
110         self.do_send(GoUp(up))
111     }
112
113     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
114         self.queue.push(t);
115         match self.cnt.fetch_add(1, atomic::SeqCst) {
116             // As described in the mod's doc comment, -1 == wakeup
117             -1 => UpWoke(self.take_to_wake()),
118             // As as described before, SPSC queues must be >= -2
119             -2 => UpSuccess,
120
121             // Be sure to preserve the disconnected state, and the return value
122             // in this case is going to be whether our data was received or not.
123             // This manifests itself on whether we have an empty queue or not.
124             //
125             // Primarily, are required to drain the queue here because the port
126             // will never remove this data. We can only have at most one item to
127             // drain (the port drains the rest).
128             DISCONNECTED => {
129                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
130                 let first = self.queue.pop();
131                 let second = self.queue.pop();
132                 assert!(second.is_none());
133
134                 match first {
135                     Some(..) => UpSuccess,  // we failed to send the data
136                     None => UpDisconnected, // we successfully sent data
137                 }
138             }
139
140             // Otherwise we just sent some data on a non-waiting queue, so just
141             // make sure the world is sane and carry on!
142             n => { assert!(n >= 0); UpSuccess }
143         }
144     }
145
146     // Consumes ownership of the 'to_wake' field.
147     fn take_to_wake(&mut self) -> BlockedTask {
148         let task = self.to_wake.load(atomic::SeqCst);
149         self.to_wake.store(0, atomic::SeqCst);
150         assert!(task != 0);
151         unsafe { BlockedTask::cast_from_uint(task) }
152     }
153
154     // Decrements the count on the channel for a sleeper, returning the sleeper
155     // back if it shouldn't sleep. Note that this is the location where we take
156     // steals into account.
157     fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
158         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
159         let n = unsafe { task.cast_to_uint() };
160         self.to_wake.store(n, atomic::SeqCst);
161
162         let steals = self.steals;
163         self.steals = 0;
164
165         match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
166             DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
167             // If we factor in our steals and notice that the channel has no
168             // data, we successfully sleep
169             n => {
170                 assert!(n >= 0);
171                 if n - steals <= 0 { return Ok(()) }
172             }
173         }
174
175         self.to_wake.store(0, atomic::SeqCst);
176         Err(unsafe { BlockedTask::cast_from_uint(n) })
177     }
178
179     pub fn recv(&mut self) -> Result<T, Failure<T>> {
180         // Optimistic preflight check (scheduling is expensive).
181         match self.try_recv() {
182             Err(Empty) => {}
183             data => return data,
184         }
185
186         // Welp, our channel has no data. Deschedule the current task and
187         // initiate the blocking protocol.
188         let task: Box<Task> = Local::take();
189         task.deschedule(1, |task| {
190             self.decrement(task)
191         });
192
193         match self.try_recv() {
194             // Messages which actually popped from the queue shouldn't count as
195             // a steal, so offset the decrement here (we already have our
196             // "steal" factored into the channel count above).
197             data @ Ok(..) |
198             data @ Err(Upgraded(..)) => {
199                 self.steals -= 1;
200                 data
201             }
202
203             data => data,
204         }
205     }
206
207     pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
208         match self.queue.pop() {
209             // If we stole some data, record to that effect (this will be
210             // factored into cnt later on).
211             //
212             // Note that we don't allow steals to grow without bound in order to
213             // prevent eventual overflow of either steals or cnt as an overflow
214             // would have catastrophic results. Sometimes, steals > cnt, but
215             // other times cnt > steals, so we don't know the relation between
216             // steals and cnt. This code path is executed only rarely, so we do
217             // a pretty slow operation, of swapping 0 into cnt, taking steals
218             // down as much as possible (without going negative), and then
219             // adding back in whatever we couldn't factor into steals.
220             Some(data) => {
221                 if self.steals > MAX_STEALS {
222                     match self.cnt.swap(0, atomic::SeqCst) {
223                         DISCONNECTED => {
224                             self.cnt.store(DISCONNECTED, atomic::SeqCst);
225                         }
226                         n => {
227                             let m = cmp::min(n, self.steals);
228                             self.steals -= m;
229                             self.bump(n - m);
230                         }
231                     }
232                     assert!(self.steals >= 0);
233                 }
234                 self.steals += 1;
235                 match data {
236                     Data(t) => Ok(t),
237                     GoUp(up) => Err(Upgraded(up)),
238                 }
239             }
240
241             None => {
242                 match self.cnt.load(atomic::SeqCst) {
243                     n if n != DISCONNECTED => Err(Empty),
244
245                     // This is a little bit of a tricky case. We failed to pop
246                     // data above, and then we have viewed that the channel is
247                     // disconnected. In this window more data could have been
248                     // sent on the channel. It doesn't really make sense to
249                     // return that the channel is disconnected when there's
250                     // actually data on it, so be extra sure there's no data by
251                     // popping one more time.
252                     //
253                     // We can ignore steals because the other end is
254                     // disconnected and we'll never need to really factor in our
255                     // steals again.
256                     _ => {
257                         match self.queue.pop() {
258                             Some(Data(t)) => Ok(t),
259                             Some(GoUp(up)) => Err(Upgraded(up)),
260                             None => Err(Disconnected),
261                         }
262                     }
263                 }
264             }
265         }
266     }
267
268     pub fn drop_chan(&mut self) {
269         // Dropping a channel is pretty simple, we just flag it as disconnected
270         // and then wakeup a blocker if there is one.
271         match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
272             -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
273             DISCONNECTED => {}
274             n => { assert!(n >= 0); }
275         }
276     }
277
278     pub fn drop_port(&mut self) {
279         // Dropping a port seems like a fairly trivial thing. In theory all we
280         // need to do is flag that we're disconnected and then everything else
281         // can take over (we don't have anyone to wake up).
282         //
283         // The catch for Ports is that we want to drop the entire contents of
284         // the queue. There are multiple reasons for having this property, the
285         // largest of which is that if another chan is waiting in this channel
286         // (but not received yet), then waiting on that port will cause a
287         // deadlock.
288         //
289         // So if we accept that we must now destroy the entire contents of the
290         // queue, this code may make a bit more sense. The tricky part is that
291         // we can't let any in-flight sends go un-dropped, we have to make sure
292         // *everything* is dropped and nothing new will come onto the channel.
293
294         // The first thing we do is set a flag saying that we're done for. All
295         // sends are gated on this flag, so we're immediately guaranteed that
296         // there are a bounded number of active sends that we'll have to deal
297         // with.
298         self.port_dropped.store(true, atomic::SeqCst);
299
300         // Now that we're guaranteed to deal with a bounded number of senders,
301         // we need to drain the queue. This draining process happens atomically
302         // with respect to the "count" of the channel. If the count is nonzero
303         // (with steals taken into account), then there must be data on the
304         // channel. In this case we drain everything and then try again. We will
305         // continue to fail while active senders send data while we're dropping
306         // data, but eventually we're guaranteed to break out of this loop
307         // (because there is a bounded number of senders).
308         let mut steals = self.steals;
309         while {
310             let cnt = self.cnt.compare_and_swap(
311                             steals, DISCONNECTED, atomic::SeqCst);
312             cnt != DISCONNECTED && cnt != steals
313         } {
314             loop {
315                 match self.queue.pop() {
316                     Some(..) => { steals += 1; }
317                     None => break
318                 }
319             }
320         }
321
322         // At this point in time, we have gated all future senders from sending,
323         // and we have flagged the channel as being disconnected. The senders
324         // still have some responsibility, however, because some sends may not
325         // complete until after we flag the disconnection. There are more
326         // details in the sending methods that see DISCONNECTED
327     }
328
329     ////////////////////////////////////////////////////////////////////////////
330     // select implementation
331     ////////////////////////////////////////////////////////////////////////////
332
333     // Tests to see whether this port can receive without blocking. If Ok is
334     // returned, then that's the answer. If Err is returned, then the returned
335     // port needs to be queried instead (an upgrade happened)
336     pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
337         // We peek at the queue to see if there's anything on it, and we use
338         // this return value to determine if we should pop from the queue and
339         // upgrade this channel immediately. If it looks like we've got an
340         // upgrade pending, then go through the whole recv rigamarole to update
341         // the internal state.
342         match self.queue.peek() {
343             Some(&GoUp(..)) => {
344                 match self.recv() {
345                     Err(Upgraded(port)) => Err(port),
346                     _ => unreachable!(),
347                 }
348             }
349             Some(..) => Ok(true),
350             None => Ok(false)
351         }
352     }
353
354     // increment the count on the channel (used for selection)
355     fn bump(&mut self, amt: int) -> int {
356         match self.cnt.fetch_add(amt, atomic::SeqCst) {
357             DISCONNECTED => {
358                 self.cnt.store(DISCONNECTED, atomic::SeqCst);
359                 DISCONNECTED
360             }
361             n => n
362         }
363     }
364
365     // Attempts to start selecting on this port. Like a oneshot, this can fail
366     // immediately because of an upgrade.
367     pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
368         match self.decrement(task) {
369             Ok(()) => SelSuccess,
370             Err(task) => {
371                 let ret = match self.queue.peek() {
372                     Some(&GoUp(..)) => {
373                         match self.queue.pop() {
374                             Some(GoUp(port)) => SelUpgraded(task, port),
375                             _ => unreachable!(),
376                         }
377                     }
378                     Some(..) => SelCanceled(task),
379                     None => SelCanceled(task),
380                 };
381                 // Undo our decrement above, and we should be guaranteed that the
382                 // previous value is positive because we're not going to sleep
383                 let prev = self.bump(1);
384                 assert!(prev == DISCONNECTED || prev >= 0);
385                 return ret;
386             }
387         }
388     }
389
390     // Removes a previous task from being blocked in this port
391     pub fn abort_selection(&mut self,
392                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
393         // If we're aborting selection after upgrading from a oneshot, then
394         // we're guarantee that no one is waiting. The only way that we could
395         // have seen the upgrade is if data was actually sent on the channel
396         // half again. For us, this means that there is guaranteed to be data on
397         // this channel. Furthermore, we're guaranteed that there was no
398         // start_selection previously, so there's no need to modify `self.cnt`
399         // at all.
400         //
401         // Hence, because of these invariants, we immediately return `Ok(true)`.
402         // Note that the data may not actually be sent on the channel just yet.
403         // The other end could have flagged the upgrade but not sent data to
404         // this end. This is fine because we know it's a small bounded windows
405         // of time until the data is actually sent.
406         if was_upgrade {
407             assert_eq!(self.steals, 0);
408             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
409             return Ok(true)
410         }
411
412         // We want to make sure that the count on the channel goes non-negative,
413         // and in the stream case we can have at most one steal, so just assume
414         // that we had one steal.
415         let steals = 1;
416         let prev = self.bump(steals + 1);
417
418         // If we were previously disconnected, then we know for sure that there
419         // is no task in to_wake, so just keep going
420         let has_data = if prev == DISCONNECTED {
421             assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
422             true // there is data, that data is that we're disconnected
423         } else {
424             let cur = prev + steals + 1;
425             assert!(cur >= 0);
426
427             // If the previous count was negative, then we just made things go
428             // positive, hence we passed the -1 boundary and we're responsible
429             // for removing the to_wake() field and trashing it.
430             //
431             // If the previous count was positive then we're in a tougher
432             // situation. A possible race is that a sender just incremented
433             // through -1 (meaning it's going to try to wake a task up), but it
434             // hasn't yet read the to_wake. In order to prevent a future recv()
435             // from waking up too early (this sender picking up the plastered
436             // over to_wake), we spin loop here waiting for to_wake to be 0.
437             // Note that this entire select() implementation needs an overhaul,
438             // and this is *not* the worst part of it, so this is not done as a
439             // final solution but rather out of necessity for now to get
440             // something working.
441             if prev < 0 {
442                 self.take_to_wake().trash();
443             } else {
444                 while self.to_wake.load(atomic::SeqCst) != 0 {
445                     Thread::yield_now();
446                 }
447             }
448             assert_eq!(self.steals, 0);
449             self.steals = steals;
450
451             // if we were previously positive, then there's surely data to
452             // receive
453             prev >= 0
454         };
455
456         // Now that we've determined that this queue "has data", we peek at the
457         // queue to see if the data is an upgrade or not. If it's an upgrade,
458         // then we need to destroy this port and abort selection on the
459         // upgraded port.
460         if has_data {
461             match self.queue.peek() {
462                 Some(&GoUp(..)) => {
463                     match self.queue.pop() {
464                         Some(GoUp(port)) => Err(port),
465                         _ => unreachable!(),
466                     }
467                 }
468                 _ => Ok(true),
469             }
470         } else {
471             Ok(false)
472         }
473     }
474 }
475
476 #[unsafe_destructor]
477 impl<T: Send> Drop for Packet<T> {
478     fn drop(&mut self) {
479         // Note that this load is not only an assert for correctness about
480         // disconnection, but also a proper fence before the read of
481         // `to_wake`, so this assert cannot be removed with also removing
482         // the `to_wake` assert.
483         assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
484         assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
485     }
486 }