]> git.lizzy.rs Git - rust.git/blob - src/libstd/unstable/weak_task.rs
extra: Add url module
[rust.git] / src / libstd / unstable / weak_task.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 Weak tasks
13
14 Weak tasks are a runtime feature for building global services that
15 do not keep the runtime alive. Normally the runtime exits when all
16 tasks exits, but if a task is weak then the runtime may exit while
17 it is running, sending a notification to the task that the runtime
18 is trying to shut down.
19 */
20
21 use cell::Cell;
22 use comm::{GenericSmartChan, stream};
23 use comm::{Port, Chan, SharedChan, GenericChan, GenericPort};
24 use hashmap::HashMap;
25 use option::{Some, None};
26 use unstable::at_exit::at_exit;
27 use unstable::finally::Finally;
28 use unstable::global::global_data_clone_create;
29 use task::rt::{task_id, get_task_id};
30 use task::task;
31
32 #[cfg(test)] use task::spawn;
33
34 type ShutdownMsg = ();
35
36 // FIXME #4729: This could be a PortOne but I've experienced bugginess
37 // with oneshot pipes and try_send
38 pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
39     let service = global_data_clone_create(global_data_key,
40                                            create_global_service);
41     let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
42     let shutdown_port = Cell::new(shutdown_port);
43     let task = get_task_id();
44     // Expect the weak task service to be alive
45     assert!(service.try_send(RegisterWeakTask(task, shutdown_chan)));
46     rust_dec_kernel_live_count();
47     do (|| {
48         f(shutdown_port.take())
49     }).finally || {
50         rust_inc_kernel_live_count();
51         // Service my have already exited
52         service.send(UnregisterWeakTask(task));
53     }
54 }
55
56 type WeakTaskService = SharedChan<ServiceMsg>;
57 type TaskHandle = task_id;
58
59 fn global_data_key(_v: WeakTaskService) { }
60
61 enum ServiceMsg {
62     RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
63     UnregisterWeakTask(TaskHandle),
64     Shutdown
65 }
66
67 fn create_global_service() -> ~WeakTaskService {
68
69     debug!("creating global weak task service");
70     let (port, chan) = stream::<ServiceMsg>();
71     let port = Cell::new(port);
72     let chan = SharedChan::new(chan);
73     let chan_clone = chan.clone();
74
75     let mut task = task();
76     task.unlinked();
77     do task.spawn {
78         debug!("running global weak task service");
79         let port = Cell::new(port.take());
80         do (|| {
81             let port = port.take();
82             // The weak task service is itself a weak task
83             debug!("weakening the weak service task");
84             unsafe { rust_dec_kernel_live_count(); }
85             run_weak_task_service(port);
86         }).finally {
87             debug!("unweakening the weak service task");
88             unsafe { rust_inc_kernel_live_count(); }
89         }
90     }
91
92     do at_exit {
93         debug!("shutting down weak task service");
94         chan.send(Shutdown);
95     }
96
97     return ~chan_clone;
98 }
99
100 fn run_weak_task_service(port: Port<ServiceMsg>) {
101
102     let mut shutdown_map = HashMap::new();
103
104     loop {
105         match port.recv() {
106             RegisterWeakTask(task, shutdown_chan) => {
107                 let previously_unregistered =
108                     shutdown_map.insert(task, shutdown_chan);
109                 assert!(previously_unregistered);
110             }
111             UnregisterWeakTask(task) => {
112                 match shutdown_map.pop(&task) {
113                     Some(shutdown_chan) => {
114                         // Oneshot pipes must send, even though
115                         // nobody will receive this
116                         shutdown_chan.send(());
117                     }
118                     None => fail!()
119                 }
120             }
121             Shutdown => break
122         }
123     }
124
125     for shutdown_map.consume().advance |(_, shutdown_chan)| {
126         // Weak task may have already exited
127         shutdown_chan.send(());
128     }
129 }
130
131 extern {
132     unsafe fn rust_inc_kernel_live_count();
133     unsafe fn rust_dec_kernel_live_count();
134 }
135
136 #[test]
137 fn test_simple() {
138     let (port, chan) = stream();
139     do spawn {
140         unsafe {
141             do weaken_task |_signal| {
142             }
143         }
144         chan.send(());
145     }
146     port.recv();
147 }
148
149 #[test]
150 fn test_weak_weak() {
151     let (port, chan) = stream();
152     do spawn {
153         unsafe {
154             do weaken_task |_signal| {
155             }
156             do weaken_task |_signal| {
157             }
158         }
159         chan.send(());
160     }
161     port.recv();
162 }
163
164 #[test]
165 fn test_wait_for_signal() {
166     do spawn {
167         unsafe {
168             do weaken_task |signal| {
169                 signal.recv();
170             }
171         }
172     }
173 }
174
175 #[test]
176 fn test_wait_for_signal_many() {
177     use uint;
178     for uint::range(0, 100) |_| {
179         do spawn {
180             unsafe {
181                 do weaken_task |signal| {
182                     signal.recv();
183                 }
184             }
185         }
186     }
187 }
188
189 #[test]
190 fn test_select_stream_and_oneshot() {
191     use comm::select2i;
192     use either::{Left, Right};
193
194     let (port, chan) = stream();
195     let port = Cell::new(port);
196     let (waitport, waitchan) = stream();
197     do spawn {
198         unsafe {
199             do weaken_task |mut signal| {
200                 let mut port = port.take();
201                 match select2i(&mut port, &mut signal) {
202                     Left(*) => (),
203                     Right(*) => fail!()
204                 }
205             }
206         }
207         waitchan.send(());
208     }
209     chan.send(());
210     waitport.recv();
211 }