]> git.lizzy.rs Git - rust.git/blob - src/librustuv/async.rs
auto merge of #13967 : richo/rust/features/ICE-fails, r=alexcrichton
[rust.git] / src / librustuv / async.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::rt::rtio::{Callback, RemoteCallback};
13 use std::unstable::sync::Exclusive;
14
15 use uvll;
16 use super::{Loop, UvHandle};
17
18 // The entire point of async is to call into a loop from other threads so it
19 // does not need to home.
20 pub struct AsyncWatcher {
21     handle: *uvll::uv_async_t,
22
23     // A flag to tell the callback to exit, set from the dtor. This is
24     // almost never contested - only in rare races with the dtor.
25     exit_flag: Exclusive<bool>
26 }
27
28 struct Payload {
29     callback: Box<Callback:Send>,
30     exit_flag: Exclusive<bool>,
31 }
32
33 impl AsyncWatcher {
34     pub fn new(loop_: &mut Loop, cb: Box<Callback:Send>) -> AsyncWatcher {
35         let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
36         assert_eq!(unsafe {
37             uvll::uv_async_init(loop_.handle, handle, async_cb)
38         }, 0);
39         let flag = Exclusive::new(false);
40         let payload = box Payload { callback: cb, exit_flag: flag.clone() };
41         unsafe {
42             let payload: *u8 = cast::transmute(payload);
43             uvll::set_data_for_uv_handle(handle, payload);
44         }
45         return AsyncWatcher { handle: handle, exit_flag: flag, };
46     }
47 }
48
49 impl UvHandle<uvll::uv_async_t> for AsyncWatcher {
50     fn uv_handle(&self) -> *uvll::uv_async_t { self.handle }
51     unsafe fn from_uv_handle<'a>(_: &'a *uvll::uv_async_t) -> &'a mut AsyncWatcher {
52         fail!("async watchers can't be built from their handles");
53     }
54 }
55
56 extern fn async_cb(handle: *uvll::uv_async_t) {
57     let payload: &mut Payload = unsafe {
58         cast::transmute(uvll::get_data_for_uv_handle(handle))
59     };
60
61     // The synchronization logic here is subtle. To review,
62     // the uv async handle type promises that, after it is
63     // triggered the remote callback is definitely called at
64     // least once. UvRemoteCallback needs to maintain those
65     // semantics while also shutting down cleanly from the
66     // dtor. In our case that means that, when the
67     // UvRemoteCallback dtor calls `async.send()`, here `f` is
68     // always called later.
69
70     // In the dtor both the exit flag is set and the async
71     // callback fired under a lock.  Here, before calling `f`,
72     // we take the lock and check the flag. Because we are
73     // checking the flag before calling `f`, and the flag is
74     // set under the same lock as the send, then if the flag
75     // is set then we're guaranteed to call `f` after the
76     // final send.
77
78     // If the check was done after `f()` then there would be a
79     // period between that call and the check where the dtor
80     // could be called in the other thread, missing the final
81     // callback while still destroying the handle.
82
83     let should_exit = unsafe {
84         payload.exit_flag.with_imm(|&should_exit| should_exit)
85     };
86
87     payload.callback.call();
88
89     if should_exit {
90         unsafe { uvll::uv_close(handle, close_cb) }
91     }
92 }
93
94 extern fn close_cb(handle: *uvll::uv_handle_t) {
95     // drop the payload
96     let _payload: Box<Payload> = unsafe {
97         cast::transmute(uvll::get_data_for_uv_handle(handle))
98     };
99     // and then free the handle
100     unsafe { uvll::free_handle(handle) }
101 }
102
103 impl RemoteCallback for AsyncWatcher {
104     fn fire(&mut self) {
105         unsafe { uvll::uv_async_send(self.handle) }
106     }
107 }
108
109 impl Drop for AsyncWatcher {
110     fn drop(&mut self) {
111         unsafe {
112             self.exit_flag.with(|should_exit| {
113                 // NB: These two things need to happen atomically. Otherwise
114                 // the event handler could wake up due to a *previous*
115                 // signal and see the exit flag, destroying the handle
116                 // before the final send.
117                 *should_exit = true;
118                 uvll::uv_async_send(self.handle)
119             })
120         }
121     }
122 }
123
124 #[cfg(test)]
125 mod test_remote {
126     use std::rt::rtio::{Callback, RemoteCallback};
127     use std::rt::thread::Thread;
128
129     use super::AsyncWatcher;
130     use super::super::local_loop;
131
132     // Make sure that we can fire watchers in remote threads and that they
133     // actually trigger what they say they will.
134     #[test]
135     fn smoke_test() {
136         struct MyCallback(Option<Sender<int>>);
137         impl Callback for MyCallback {
138             fn call(&mut self) {
139                 // this can get called more than once, but we only want to send
140                 // once
141                 let MyCallback(ref mut s) = *self;
142                 if s.is_some() {
143                     s.take_unwrap().send(1);
144                 }
145             }
146         }
147
148         let (tx, rx) = channel();
149         let cb = box MyCallback(Some(tx));
150         let watcher = AsyncWatcher::new(&mut local_loop().loop_, cb);
151
152         let thread = Thread::start(proc() {
153             let mut watcher = watcher;
154             watcher.fire();
155         });
156
157         assert_eq!(rx.recv(), 1);
158         thread.join();
159     }
160 }