]> git.lizzy.rs Git - rust.git/blob - src/libnative/io/helper_thread.rs
443c82c6a547c53918a3ebd586783ce604268805
[rust.git] / src / libnative / io / helper_thread.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Implementation of the helper thread for the timer module
12 //!
13 //! This module contains the management necessary for the timer worker thread.
14 //! This thread is responsible for performing the send()s on channels for timers
15 //! that are using channels instead of a blocking call.
16 //!
17 //! The timer thread is lazily initialized, and it's shut down via the
18 //! `shutdown` function provided. It must be maintained as an invariant that
19 //! `shutdown` is only called when the entire program is finished. No new timers
20 //! can be created in the future and there must be no active timers at that
21 //! time.
22
23 #![macro_escape]
24
25 use std::mem;
26 use std::rt::bookkeeping;
27 use std::rt::mutex::StaticNativeMutex;
28 use std::rt;
29 use std::ty::Unsafe;
30
31 use task;
32
33 /// A structure for management of a helper thread.
34 ///
35 /// This is generally a static structure which tracks the lifetime of a helper
36 /// thread.
37 ///
38 /// The fields of this helper are all public, but they should not be used, this
39 /// is for static initialization.
40 pub struct Helper<M> {
41     /// Internal lock which protects the remaining fields
42     pub lock: StaticNativeMutex,
43
44     // You'll notice that the remaining fields are Unsafe<T>, and this is
45     // because all helper thread operations are done through &self, but we need
46     // these to be mutable (once `lock` is held).
47
48     /// Lazily allocated channel to send messages to the helper thread.
49     pub chan: Unsafe<*mut Sender<M>>,
50
51     /// OS handle used to wake up a blocked helper thread
52     pub signal: Unsafe<uint>,
53
54     /// Flag if this helper thread has booted and been initialized yet.
55     pub initialized: Unsafe<bool>,
56 }
57
58 macro_rules! helper_init( (static mut $name:ident: Helper<$m:ty>) => (
59     static mut $name: Helper<$m> = Helper {
60         lock: ::std::rt::mutex::NATIVE_MUTEX_INIT,
61         chan: ::std::ty::Unsafe {
62             value: 0 as *mut Sender<$m>,
63             marker1: ::std::kinds::marker::InvariantType,
64         },
65         signal: ::std::ty::Unsafe {
66             value: 0,
67             marker1: ::std::kinds::marker::InvariantType,
68         },
69         initialized: ::std::ty::Unsafe {
70             value: false,
71             marker1: ::std::kinds::marker::InvariantType,
72         },
73     };
74 ) )
75
76 impl<M: Send> Helper<M> {
77     /// Lazily boots a helper thread, becoming a no-op if the helper has already
78     /// been spawned.
79     ///
80     /// This function will check to see if the thread has been initialized, and
81     /// if it has it returns quickly. If initialization has not happened yet,
82     /// the closure `f` will be run (inside of the initialization lock) and
83     /// passed to the helper thread in a separate task.
84     ///
85     /// This function is safe to be called many times.
86     pub fn boot<T: Send>(&'static self,
87                          f: || -> T,
88                          helper: fn(imp::signal, Receiver<M>, T)) {
89         unsafe {
90             let _guard = self.lock.lock();
91             if !*self.initialized.get() {
92                 let (tx, rx) = channel();
93                 *self.chan.get() = mem::transmute(box tx);
94                 let (receive, send) = imp::new();
95                 *self.signal.get() = send as uint;
96
97                 let t = f();
98                 task::spawn(proc() {
99                     bookkeeping::decrement();
100                     helper(receive, rx, t);
101                     self.lock.lock().signal()
102                 });
103
104                 rt::at_exit(proc() { self.shutdown() });
105                 *self.initialized.get() = true;
106             }
107         }
108     }
109
110     /// Sends a message to a spawned worker thread.
111     ///
112     /// This is only valid if the worker thread has previously booted
113     pub fn send(&'static self, msg: M) {
114         unsafe {
115             let _guard = self.lock.lock();
116
117             // Must send and *then* signal to ensure that the child receives the
118             // message. Otherwise it could wake up and go to sleep before we
119             // send the message.
120             assert!(!self.chan.get().is_null());
121             (**self.chan.get()).send(msg);
122             imp::signal(*self.signal.get() as imp::signal);
123         }
124     }
125
126     fn shutdown(&'static self) {
127         unsafe {
128             // Shut down, but make sure this is done inside our lock to ensure
129             // that we'll always receive the exit signal when the thread
130             // returns.
131             let guard = self.lock.lock();
132
133             // Close the channel by destroying it
134             let chan: Box<Sender<M>> = mem::transmute(*self.chan.get());
135             *self.chan.get() = 0 as *mut Sender<M>;
136             drop(chan);
137             imp::signal(*self.signal.get() as imp::signal);
138
139             // Wait for the child to exit
140             guard.wait();
141             drop(guard);
142
143             // Clean up after ourselves
144             self.lock.destroy();
145             imp::close(*self.signal.get() as imp::signal);
146             *self.signal.get() = 0;
147         }
148     }
149 }
150
151 #[cfg(unix)]
152 mod imp {
153     use libc;
154     use std::os;
155
156     use io::file::FileDesc;
157
158     pub type signal = libc::c_int;
159
160     pub fn new() -> (signal, signal) {
161         let pipe = os::pipe();
162         (pipe.input, pipe.out)
163     }
164
165     pub fn signal(fd: libc::c_int) {
166         FileDesc::new(fd, false).inner_write([0]).ok().unwrap();
167     }
168
169     pub fn close(fd: libc::c_int) {
170         let _fd = FileDesc::new(fd, true);
171     }
172 }
173
174 #[cfg(windows)]
175 mod imp {
176     use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
177     use std::ptr;
178     use libc;
179
180     pub type signal = HANDLE;
181
182     pub fn new() -> (HANDLE, HANDLE) {
183         unsafe {
184             let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
185                                       ptr::null());
186             (handle, handle)
187         }
188     }
189
190     pub fn signal(handle: HANDLE) {
191         assert!(unsafe { SetEvent(handle) != 0 });
192     }
193
194     pub fn close(handle: HANDLE) {
195         assert!(unsafe { CloseHandle(handle) != 0 });
196     }
197
198     extern "system" {
199         fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
200                         bManualReset: BOOL,
201                         bInitialState: BOOL,
202                         lpName: LPCSTR) -> HANDLE;
203         fn SetEvent(hEvent: HANDLE) -> BOOL;
204     }
205 }