1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 // use either::{Either, Left, Right};
13 use rt::kill::BlockedTask;
14 use rt::sched::Scheduler;
17 /// Trait for message-passing primitives that can be select()ed on.
19 // Returns true if data was available.
20 fn optimistic_check(&mut self) -> bool;
21 // Returns true if data was available. If so, shall also wake() the task.
22 fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
23 // Returns true if data was available.
24 fn unblock_from(&mut self) -> bool;
27 /// Trait for message-passing primitives that can use the select2() convenience wrapper.
28 // (This is separate from the above trait to enable heterogeneous lists of ports
29 // that implement Select on different types to use select().)
30 pub trait SelectPort<T> : Select {
31 fn recv_ready(self) -> Option<T>;
34 /// Receive a message from any one of many ports at once.
35 pub fn select<A: Select>(ports: &mut [A]) -> uint {
37 fail!("can't select on an empty list");
40 for (index, port) in ports.mut_iter().enumerate() {
41 if port.optimistic_check() {
46 // If one of the ports already contains data when we go to block on it, we
47 // don't bother enqueueing on the rest of them, so we shouldn't bother
48 // unblocking from it either. This is just for efficiency, not correctness.
49 // (If not, we need to unblock from all of them. Length is a placeholder.)
50 let mut ready_index = ports.len();
52 let sched = Local::take::<Scheduler>();
53 do sched.deschedule_running_task_and_then |sched, task| {
54 let task_handles = task.make_selectable(ports.len());
56 for (index, (port, task_handle)) in
57 ports.mut_iter().zip(task_handles.consume_iter()).enumerate() {
58 // If one of the ports has data by now, it will wake the handle.
59 if port.block_on(sched, task_handle) {
66 // Task resumes. Now unblock ourselves from all the ports we blocked on.
67 // If the success index wasn't reset, 'take' will just take all of them.
68 // Iterate in reverse so the 'earliest' index that's ready gets returned.
69 for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() {
70 if port.unblock_from() {
75 assert!(ready_index < ports.len());
79 /* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
81 impl <'self> Select for &'self mut Select {
82 fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
83 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
84 self.block_on(sched, task)
86 fn unblock_from(&mut self) -> bool { self.unblock_from() }
89 pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
90 -> Either<(Option<TA>, B), (A, Option<TB>)> {
92 let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
96 0 => Left ((a.recv_ready(), b)),
97 1 => Right((a, b.recv_ready())),
98 x => fail!("impossible case in select2: %?", x)
111 use comm::GenericChan;
114 use iterator::{Iterator, range};
116 #[test] #[ignore(cfg(windows))] #[should_fail]
117 fn select_doesnt_get_trolled() {
118 select::<PortOne<()>>([]);
121 /* non-blocking select tests */
124 fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
125 // Unfortunately this does not actually test the block_on early-break
126 // codepath in select -- racing between the sender and the receiver in
127 // separate tasks is necessary to get around the optimistic check.
128 let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
129 let mut dead_chans = ~[];
130 let mut ports = ports;
131 for (i, chan) in chans.consume_iter().enumerate() {
132 if send_on_chans.contains(&i) {
135 dead_chans.push(chan);
138 let ready_index = select(ports);
139 assert!(send_on_chans.contains(&ready_index));
140 assert!(ports.swap_remove(ready_index).recv_ready().is_some());
143 // Same thing with streams instead.
144 // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
145 let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
146 let mut dead_chans = ~[];
147 let mut ports = ports;
148 for (i, chan) in chans.consume_iter().enumerate() {
149 if send_on_chans.contains(&i) {
152 dead_chans.push(chan);
155 let ready_index = select(ports);
156 assert!(send_on_chans.contains(&ready_index));
157 assert!(ports.swap_remove(ready_index).recv_ready().is_some());
163 do run_in_newsched_task { select_helper(1, [0]) }
168 // NB. I would like to have a test that tests the first one that is
169 // ready is the one that's returned, but that can't be reliably tested
170 // with the randomized behaviour of optimistic_check.
171 do run_in_newsched_task { select_helper(2, [1]) }
172 do run_in_newsched_task { select_helper(2, [0]) }
173 do run_in_newsched_task { select_helper(2, [1,0]) }
178 do run_in_newsched_task { select_helper(12, [7,8,9]) }
184 use comm::GenericChan;
187 // Sends 10 buffered packets, and uses select to retrieve them all.
188 // Puts the port in a different spot in the vector each time.
189 do run_in_newsched_task {
190 let (ports, _) = unzip(from_fn(10, |_| stream()));
191 let (port, chan) = stream();
192 do 10.times { chan.send(31337); }
193 let mut ports = ports;
194 let mut port = Some(port);
195 let order = [5u,0,4,3,2,6,9,8,7,1];
196 for &index in order.iter() {
197 // put the port in the vector at any index
198 util::swap(port.get_mut_ref(), &mut ports[index]);
199 assert!(select(ports) == index);
201 util::swap(port.get_mut_ref(), &mut ports[index]);
202 // NB. Not recv(), because optimistic_check randomly fails.
203 assert!(port.get_ref().recv_ready().unwrap() == 31337);
209 fn select_unkillable() {
210 do run_in_newsched_task {
211 do task::unkillable { select_helper(2, [1]) }
215 /* blocking select tests */
218 fn select_blocking() {
219 select_blocking_helper(true);
220 select_blocking_helper(false);
222 fn select_blocking_helper(killable: bool) {
223 do run_in_newsched_task {
224 let (p1,_c) = oneshot();
225 let (p2,c2) = oneshot();
226 let mut ports = [p1,p2];
228 let (p3,c3) = oneshot();
229 let (p4,c4) = oneshot();
231 let x = Cell::new((c2, p3, c4));
233 let (c2, p3, c4) = x.take();
234 p3.recv(); // handshake parent
235 c4.send(()); // normal receive
237 c2.send(()); // select receive
240 // Try to block before child sends on c2.
244 assert!(select(ports) == 1);
246 do task::unkillable { assert!(select(ports) == 1); }
253 fn select_racing_senders() {
254 static NUM_CHANS: uint = 10;
256 select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
257 select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
258 select_racing_senders_helper(true, ~[0,1,2]);
259 select_racing_senders_helper(false, ~[0,1,2]);
260 select_racing_senders_helper(true, ~[3,4,5,6]);
261 select_racing_senders_helper(false, ~[3,4,5,6]);
262 select_racing_senders_helper(true, ~[7,8,9]);
263 select_racing_senders_helper(false, ~[7,8,9]);
265 fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
266 use rt::test::spawntask_random;
269 do run_in_newsched_task {
270 // A bit of stress, since ordinarily this is just smoke and mirrors.
272 let send_on_chans = send_on_chans.clone();
275 for i in range(0u, NUM_CHANS) {
276 let (p,c) = oneshot();
278 if send_on_chans.contains(&i) {
279 let c = Cell::new(c);
280 do spawntask_random {
286 // nondeterministic result, but should succeed
290 do task::unkillable { select(ports); }
298 #[test] #[ignore(cfg(windows))]
300 do run_in_newsched_task {
301 let (success_p, success_c) = oneshot::<bool>();
302 let success_c = Cell::new(success_c);
304 let success_c = Cell::new(success_c.take());
305 do task::unkillable {
306 let (p,c) = oneshot();
307 let c = Cell::new(c);
309 let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
310 let mut ports = dead_ps;
311 select(ports); // should get killed; nothing should leak
312 c.take().send(()); // must not happen
313 // Make sure dead_cs doesn't get closed until after select.
317 fail!(); // should kill sibling awake
320 // wait for killed selector to close (NOT send on) its c.
321 // hope to send 'true'.
322 success_c.take().send(p.try_recv().is_none());
325 assert!(success_p.recv());