]> git.lizzy.rs Git - rust.git/blob - src/libstd/select.rs
auto merge of #10519 : nikomatsakis/rust/issue-8624-borrowck-overly-permissive, r...
[rust.git] / src / libstd / select.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #[allow(missing_doc)];
12
13 use cell::Cell;
14 use comm;
15 use container::Container;
16 use iter::{Iterator, DoubleEndedIterator};
17 use option::*;
18 // use either::{Either, Left, Right};
19 // use rt::kill::BlockedTask;
20 use rt::local::Local;
21 use rt::rtio::EventLoop;
22 use rt::sched::Scheduler;
23 use rt::shouldnt_be_public::{SelectInner, SelectPortInner};
24 use unstable::finally::Finally;
25 use vec::{OwnedVector, MutableVector};
26
27 /// Trait for message-passing primitives that can be select()ed on.
28 pub trait Select : SelectInner { }
29
30 /// Trait for message-passing primitives that can use the select2() convenience wrapper.
31 // (This is separate from the above trait to enable heterogeneous lists of ports
32 // that implement Select on different types to use select().)
33 pub trait SelectPort<T> : SelectPortInner<T> { }
34
35 /// Receive a message from any one of many ports at once. Returns the index of the
36 /// port whose data is ready. (If multiple are ready, returns the lowest index.)
37 pub fn select<A: Select>(ports: &mut [A]) -> uint {
38     if ports.is_empty() {
39         fail!("can't select on an empty list");
40     }
41
42     for (index, port) in ports.mut_iter().enumerate() {
43         if port.optimistic_check() {
44             return index;
45         }
46     }
47
48     // If one of the ports already contains data when we go to block on it, we
49     // don't bother enqueueing on the rest of them, so we shouldn't bother
50     // unblocking from it either. This is just for efficiency, not correctness.
51     // (If not, we need to unblock from all of them. Length is a placeholder.)
52     let mut ready_index = ports.len();
53
54     // XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
55     // in that we need to continue mutating the ready_index in the environment
56     // after letting the task get woken up. The and_then closure needs to delay
57     // the task from resuming until all ports have become blocked_on.
58     let (p,c) = comm::oneshot();
59     let p = Cell::new(p);
60     let c = Cell::new(c);
61
62     (|| {
63         let c = Cell::new(c.take());
64         let sched: ~Scheduler = Local::take();
65         sched.deschedule_running_task_and_then(|sched, task| {
66             let task_handles = task.make_selectable(ports.len());
67
68             for (index, (port, task_handle)) in
69                     ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
70                 // If one of the ports has data by now, it will wake the handle.
71                 if port.block_on(sched, task_handle) {
72                     ready_index = index;
73                     break;
74                 }
75             }
76
77             let c = Cell::new(c.take());
78             do sched.event_loop.callback { c.take().send_deferred(()) }
79         })
80     }).finally(|| {
81         // Unkillable is necessary not because getting killed is dangerous here,
82         // but to force the recv not to use the same kill-flag that we used for
83         // selecting. Otherwise a user-sender could spuriously wakeup us here.
84         p.take().recv();
85     });
86
87     // Task resumes. Now unblock ourselves from all the ports we blocked on.
88     // If the success index wasn't reset, 'take' will just take all of them.
89     // Iterate in reverse so the 'earliest' index that's ready gets returned.
90     for (index, port) in ports.mut_slice(0, ready_index).mut_iter().enumerate().invert() {
91         if port.unblock_from() {
92             ready_index = index;
93         }
94     }
95
96     assert!(ready_index < ports.len());
97     return ready_index;
98 }
99
100 /* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
101
102 impl <'self> Select for &'self mut Select {
103     fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
104     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
105         self.block_on(sched, task)
106     }
107     fn unblock_from(&mut self) -> bool { self.unblock_from() }
108 }
109
110 pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
111         -> Either<(Option<TA>, B), (A, Option<TB>)> {
112     let result = {
113         let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
114         select(ports)
115     };
116     match result {
117         0 => Left ((a.recv_ready(), b)),
118         1 => Right((a, b.recv_ready())),
119         x => fail!("impossible case in select2: {:?}", x)
120     }
121 }
122
123 */
124
125 #[cfg(test)]
126 mod test {
127     use super::*;
128     use clone::Clone;
129     use num::Times;
130     use option::*;
131     use rt::comm::*;
132     use rt::test::*;
133     use vec::*;
134     use comm::GenericChan;
135     use task;
136     use cell::Cell;
137     use iter::{Iterator, range};
138
139     #[test] #[should_fail]
140     fn select_doesnt_get_trolled() {
141         select::<PortOne<()>>([]);
142     }
143
144     /* non-blocking select tests */
145
146     #[cfg(test)]
147     fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
148         // Unfortunately this does not actually test the block_on early-break
149         // codepath in select -- racing between the sender and the receiver in
150         // separate tasks is necessary to get around the optimistic check.
151         let (ports, chans) = unzip(range(0, num_ports).map(|_| oneshot::<()>()));
152         let mut dead_chans = ~[];
153         let mut ports = ports;
154         for (i, chan) in chans.move_iter().enumerate() {
155             if send_on_chans.contains(&i) {
156                 chan.send(());
157             } else {
158                 dead_chans.push(chan);
159             }
160         }
161         let ready_index = select(ports);
162         assert!(send_on_chans.contains(&ready_index));
163         assert!(ports.swap_remove(ready_index).recv_ready().is_some());
164         let _ = dead_chans;
165
166         // Same thing with streams instead.
167         // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
168         let (ports, chans) = unzip(range(0, num_ports).map(|_| stream::<()>()));
169         let mut dead_chans = ~[];
170         let mut ports = ports;
171         for (i, chan) in chans.move_iter().enumerate() {
172             if send_on_chans.contains(&i) {
173                 chan.send(());
174             } else {
175                 dead_chans.push(chan);
176             }
177         }
178         let ready_index = select(ports);
179         assert!(send_on_chans.contains(&ready_index));
180         assert!(ports.swap_remove(ready_index).recv_ready().is_some());
181         let _ = dead_chans;
182     }
183
184     #[test]
185     fn select_one() {
186         do run_in_uv_task { select_helper(1, [0]) }
187     }
188
189     #[test]
190     fn select_two() {
191         // NB. I would like to have a test that tests the first one that is
192         // ready is the one that's returned, but that can't be reliably tested
193         // with the randomized behaviour of optimistic_check.
194         do run_in_uv_task { select_helper(2, [1]) }
195         do run_in_uv_task { select_helper(2, [0]) }
196         do run_in_uv_task { select_helper(2, [1,0]) }
197     }
198
199     #[test]
200     fn select_a_lot() {
201         do run_in_uv_task { select_helper(12, [7,8,9]) }
202     }
203
204     #[test]
205     fn select_stream() {
206         use util;
207         use comm::GenericChan;
208
209         // Sends 10 buffered packets, and uses select to retrieve them all.
210         // Puts the port in a different spot in the vector each time.
211         do run_in_uv_task {
212             let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
213             let (port, chan) = stream();
214             10.times(|| { chan.send(31337); });
215             let mut ports = ports;
216             let mut port = Some(port);
217             let order = [5u,0,4,3,2,6,9,8,7,1];
218             for &index in order.iter() {
219                 // put the port in the vector at any index
220                 util::swap(port.get_mut_ref(), &mut ports[index]);
221                 assert!(select(ports) == index);
222                 // get it back out
223                 util::swap(port.get_mut_ref(), &mut ports[index]);
224                 // NB. Not recv(), because optimistic_check randomly fails.
225                 assert!(port.get_ref().recv_ready().unwrap() == 31337);
226             }
227         }
228     }
229
230     #[test]
231     fn select_simple() {
232         do run_in_uv_task {
233             select_helper(2, [1])
234         }
235     }
236
237     /* blocking select tests */
238
239     #[test]
240     fn select_blocking() {
241         do run_in_uv_task {
242             let (p1,_c) = oneshot();
243             let (p2,c2) = oneshot();
244             let mut ports = [p1,p2];
245
246             let (p3,c3) = oneshot();
247             let (p4,c4) = oneshot();
248
249             let x = Cell::new((c2, p3, c4));
250             do task::spawn {
251                 let (c2, p3, c4) = x.take();
252                 p3.recv();   // handshake parent
253                 c4.send(()); // normal receive
254                 task::deschedule();
255                 c2.send(()); // select receive
256             }
257
258             // Try to block before child sends on c2.
259             c3.send(());
260             p4.recv();
261             assert!(select(ports) == 1);
262         }
263     }
264
265     #[test]
266     fn select_racing_senders() {
267         static NUM_CHANS: uint = 10;
268
269         select_racing_senders_helper(~[0,1,2,3,4,5,6,7,8,9]);
270         select_racing_senders_helper(~[0,1,2]);
271         select_racing_senders_helper(~[3,4,5,6]);
272         select_racing_senders_helper(~[7,8,9]);
273
274         fn select_racing_senders_helper(send_on_chans: ~[uint]) {
275             use rt::test::spawntask_random;
276
277             do run_in_uv_task {
278                 // A bit of stress, since ordinarily this is just smoke and mirrors.
279                 4.times(|| {
280                     let send_on_chans = send_on_chans.clone();
281                     do task::spawn {
282                         let mut ports = ~[];
283                         for i in range(0u, NUM_CHANS) {
284                             let (p,c) = oneshot();
285                             ports.push(p);
286                             if send_on_chans.contains(&i) {
287                                 let c = Cell::new(c);
288                                 do spawntask_random {
289                                     task::deschedule();
290                                     c.take().send(());
291                                 }
292                             }
293                         }
294                         // nondeterministic result, but should succeed
295                         select(ports);
296                     }
297                 })
298             }
299         }
300     }
301 }