]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/select.rs
auto merge of #8356 : toddaaro/rust/ws, r=brson
[rust.git] / src / libstd / rt / select.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::*;
12 // use either::{Either, Left, Right};
13 use rt::kill::BlockedTask;
14 use rt::sched::Scheduler;
15 use rt::local::Local;
16
17 /// Trait for message-passing primitives that can be select()ed on.
18 pub trait Select {
19     // Returns true if data was available.
20     fn optimistic_check(&mut self) -> bool;
21     // Returns true if data was available. If so, shall also wake() the task.
22     fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
23     // Returns true if data was available.
24     fn unblock_from(&mut self) -> bool;
25 }
26
27 /// Trait for message-passing primitives that can use the select2() convenience wrapper.
28 // (This is separate from the above trait to enable heterogeneous lists of ports
29 // that implement Select on different types to use select().)
30 pub trait SelectPort<T> : Select {
31     fn recv_ready(self) -> Option<T>;
32 }
33
34 /// Receive a message from any one of many ports at once.
35 pub fn select<A: Select>(ports: &mut [A]) -> uint {
36     if ports.is_empty() {
37         fail!("can't select on an empty list");
38     }
39
40     for (index, port) in ports.mut_iter().enumerate() {
41         if port.optimistic_check() {
42             return index;
43         }
44     }
45
46     // If one of the ports already contains data when we go to block on it, we
47     // don't bother enqueueing on the rest of them, so we shouldn't bother
48     // unblocking from it either. This is just for efficiency, not correctness.
49     // (If not, we need to unblock from all of them. Length is a placeholder.)
50     let mut ready_index = ports.len();
51
52     let sched = Local::take::<Scheduler>();
53     do sched.deschedule_running_task_and_then |sched, task| {
54         let task_handles = task.make_selectable(ports.len());
55
56         for (index, (port, task_handle)) in
57                 ports.mut_iter().zip(task_handles.consume_iter()).enumerate() {
58             // If one of the ports has data by now, it will wake the handle.
59             if port.block_on(sched, task_handle) {
60                 ready_index = index;
61                 break;
62             }
63         }
64     }
65
66     // Task resumes. Now unblock ourselves from all the ports we blocked on.
67     // If the success index wasn't reset, 'take' will just take all of them.
68     // Iterate in reverse so the 'earliest' index that's ready gets returned.
69     for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
70         if port.unblock_from() {
71             ready_index = index;
72         }
73     }
74
75     assert!(ready_index < ports.len());
76     return ready_index;
77 }
78
79 /* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
80
81 impl <'self> Select for &'self mut Select {
82     fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
83     fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
84         self.block_on(sched, task)
85     }
86     fn unblock_from(&mut self) -> bool { self.unblock_from() }
87 }
88
89 pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
90         -> Either<(Option<TA>, B), (A, Option<TB>)> {
91     let result = {
92         let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
93         select(ports)
94     };
95     match result {
96         0 => Left ((a.recv_ready(), b)),
97         1 => Right((a, b.recv_ready())),
98         x => fail!("impossible case in select2: %?", x)
99     }
100 }
101
102 */
103
104 #[cfg(test)]
105 mod test {
106     use super::*;
107     use option::*;
108     use rt::comm::*;
109     use rt::test::*;
110     use vec::*;
111     use comm::GenericChan;
112     use task;
113     use cell::Cell;
114     use iterator::{Iterator, range};
115
116     #[test] #[ignore(cfg(windows))] #[should_fail]
117     fn select_doesnt_get_trolled() {
118         select::<PortOne<()>>([]);
119     }
120
121     /* non-blocking select tests */
122
123     #[cfg(test)]
124     fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
125         // Unfortunately this does not actually test the block_on early-break
126         // codepath in select -- racing between the sender and the receiver in
127         // separate tasks is necessary to get around the optimistic check.
128         let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
129         let mut dead_chans = ~[];
130         let mut ports = ports;
131         for (i, chan) in chans.consume_iter().enumerate() {
132             if send_on_chans.contains(&i) {
133                 chan.send(());
134             } else {
135                 dead_chans.push(chan);
136             }
137         }
138         let ready_index = select(ports);
139         assert!(send_on_chans.contains(&ready_index));
140         assert!(ports.swap_remove(ready_index).recv_ready().is_some());
141         let _ = dead_chans;
142
143         // Same thing with streams instead.
144         // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
145         let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
146         let mut dead_chans = ~[];
147         let mut ports = ports;
148         for (i, chan) in chans.consume_iter().enumerate() {
149             if send_on_chans.contains(&i) {
150                 chan.send(());
151             } else {
152                 dead_chans.push(chan);
153             }
154         }
155         let ready_index = select(ports);
156         assert!(send_on_chans.contains(&ready_index));
157         assert!(ports.swap_remove(ready_index).recv_ready().is_some());
158         let _ = dead_chans;
159     }
160
161     #[test]
162     fn select_one() {
163         do run_in_newsched_task { select_helper(1, [0]) }
164     }
165
166     #[test]
167     fn select_two() {
168         // NB. I would like to have a test that tests the first one that is
169         // ready is the one that's returned, but that can't be reliably tested
170         // with the randomized behaviour of optimistic_check.
171         do run_in_newsched_task { select_helper(2, [1]) }
172         do run_in_newsched_task { select_helper(2, [0]) }
173         do run_in_newsched_task { select_helper(2, [1,0]) }
174     }
175
176     #[test]
177     fn select_a_lot() {
178         do run_in_newsched_task { select_helper(12, [7,8,9]) }
179     }
180
181     #[test]
182     fn select_stream() {
183         use util;
184         use comm::GenericChan;
185         use iter::Times;
186
187         // Sends 10 buffered packets, and uses select to retrieve them all.
188         // Puts the port in a different spot in the vector each time.
189         do run_in_newsched_task {
190             let (ports, _) = unzip(from_fn(10, |_| stream()));
191             let (port, chan) = stream();
192             do 10.times { chan.send(31337); }
193             let mut ports = ports;
194             let mut port = Some(port);
195             let order = [5u,0,4,3,2,6,9,8,7,1];
196             for &index in order.iter() {
197                 // put the port in the vector at any index
198                 util::swap(port.get_mut_ref(), &mut ports[index]);
199                 assert!(select(ports) == index);
200                 // get it back out
201                 util::swap(port.get_mut_ref(), &mut ports[index]);
202                 // NB. Not recv(), because optimistic_check randomly fails.
203                 assert!(port.get_ref().recv_ready().unwrap() == 31337);
204             }
205         }
206     }
207
208     #[test]
209     fn select_unkillable() {
210         do run_in_newsched_task {
211             do task::unkillable { select_helper(2, [1]) }
212         }
213     }
214
215     /* blocking select tests */
216
217     #[test]
218     fn select_blocking() {
219         select_blocking_helper(true);
220         select_blocking_helper(false);
221
222         fn select_blocking_helper(killable: bool) {
223             do run_in_newsched_task {
224                 let (p1,_c) = oneshot();
225                 let (p2,c2) = oneshot();
226                 let mut ports = [p1,p2];
227
228                 let (p3,c3) = oneshot();
229                 let (p4,c4) = oneshot();
230
231                 let x = Cell::new((c2, p3, c4));
232                 do task::spawn {
233                     let (c2, p3, c4) = x.take();
234                     p3.recv();   // handshake parent
235                     c4.send(()); // normal receive
236                     task::yield();
237                     c2.send(()); // select receive
238                 }
239
240                 // Try to block before child sends on c2.
241                 c3.send(());
242                 p4.recv();
243                 if killable {
244                     assert!(select(ports) == 1);
245                 } else {
246                     do task::unkillable { assert!(select(ports) == 1); }
247                 }
248             }
249         }
250     }
251
252     #[test]
253     fn select_racing_senders() {
254         static NUM_CHANS: uint = 10;
255
256         select_racing_senders_helper(true,  ~[0,1,2,3,4,5,6,7,8,9]);
257         select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
258         select_racing_senders_helper(true,  ~[0,1,2]);
259         select_racing_senders_helper(false, ~[0,1,2]);
260         select_racing_senders_helper(true,  ~[3,4,5,6]);
261         select_racing_senders_helper(false, ~[3,4,5,6]);
262         select_racing_senders_helper(true,  ~[7,8,9]);
263         select_racing_senders_helper(false, ~[7,8,9]);
264
265         fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
266             use rt::test::spawntask_random;
267             use iter::Times;
268
269             do run_in_newsched_task {
270                 // A bit of stress, since ordinarily this is just smoke and mirrors.
271                 do 4.times {
272                     let send_on_chans = send_on_chans.clone();
273                     do task::spawn {
274                         let mut ports = ~[];
275                         for i in range(0u, NUM_CHANS) {
276                             let (p,c) = oneshot();
277                             ports.push(p);
278                             if send_on_chans.contains(&i) {
279                                 let c = Cell::new(c);
280                                 do spawntask_random {
281                                     task::yield();
282                                     c.take().send(());
283                                 }
284                             }
285                         }
286                         // nondeterministic result, but should succeed
287                         if killable {
288                             select(ports);
289                         } else {
290                             do task::unkillable { select(ports); }
291                         }
292                     }
293                 }
294             }
295         }
296     }
297
298     #[test] #[ignore(cfg(windows))]
299     fn select_killed() {
300         do run_in_newsched_task {
301             let (success_p, success_c) = oneshot::<bool>();
302             let success_c = Cell::new(success_c);
303             do task::try {
304                 let success_c = Cell::new(success_c.take());
305                 do task::unkillable {
306                     let (p,c) = oneshot();
307                     let c = Cell::new(c);
308                     do task::spawn {
309                         let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
310                         let mut ports = dead_ps;
311                         select(ports); // should get killed; nothing should leak
312                         c.take().send(()); // must not happen
313                         // Make sure dead_cs doesn't get closed until after select.
314                         let _ = dead_cs;
315                     }
316                     do task::spawn {
317                         fail!(); // should kill sibling awake
318                     }
319
320                     // wait for killed selector to close (NOT send on) its c.
321                     // hope to send 'true'.
322                     success_c.take().send(p.try_recv().is_none());
323                 }
324             };
325             assert!(success_p.recv());
326         }
327     }
328 }