]> git.lizzy.rs Git - rust.git/blob - src/librustuv/async.rs
Register new snapshots
[rust.git] / src / librustuv / async.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use alloc::arc::Arc;
12 use std::mem;
13 use std::rt::exclusive::Exclusive;
14 use std::rt::rtio::{Callback, RemoteCallback};
15
16 use uvll;
17 use super::{Loop, UvHandle};
18
19 // The entire point of async is to call into a loop from other threads so it
20 // does not need to home.
21 pub struct AsyncWatcher {
22     handle: *uvll::uv_async_t,
23
24     // A flag to tell the callback to exit, set from the dtor. This is
25     // almost never contested - only in rare races with the dtor.
26     exit_flag: Arc<Exclusive<bool>>,
27 }
28
29 struct Payload {
30     callback: Box<Callback + Send>,
31     exit_flag: Arc<Exclusive<bool>>,
32 }
33
34 impl AsyncWatcher {
35     pub fn new(loop_: &mut Loop, cb: Box<Callback + Send>) -> AsyncWatcher {
36         let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
37         assert_eq!(unsafe {
38             uvll::uv_async_init(loop_.handle, handle, async_cb)
39         }, 0);
40         let flag = Arc::new(Exclusive::new(false));
41         let payload = box Payload { callback: cb, exit_flag: flag.clone() };
42         unsafe {
43             let payload: *u8 = mem::transmute(payload);
44             uvll::set_data_for_uv_handle(handle, payload);
45         }
46         return AsyncWatcher { handle: handle, exit_flag: flag, };
47     }
48 }
49
50 impl UvHandle<uvll::uv_async_t> for AsyncWatcher {
51     fn uv_handle(&self) -> *uvll::uv_async_t { self.handle }
52     unsafe fn from_uv_handle<'a>(_: &'a *uvll::uv_async_t) -> &'a mut AsyncWatcher {
53         fail!("async watchers can't be built from their handles");
54     }
55 }
56
57 extern fn async_cb(handle: *uvll::uv_async_t) {
58     let payload: &mut Payload = unsafe {
59         mem::transmute(uvll::get_data_for_uv_handle(handle))
60     };
61
62     // The synchronization logic here is subtle. To review,
63     // the uv async handle type promises that, after it is
64     // triggered the remote callback is definitely called at
65     // least once. UvRemoteCallback needs to maintain those
66     // semantics while also shutting down cleanly from the
67     // dtor. In our case that means that, when the
68     // UvRemoteCallback dtor calls `async.send()`, here `f` is
69     // always called later.
70
71     // In the dtor both the exit flag is set and the async
72     // callback fired under a lock.  Here, before calling `f`,
73     // we take the lock and check the flag. Because we are
74     // checking the flag before calling `f`, and the flag is
75     // set under the same lock as the send, then if the flag
76     // is set then we're guaranteed to call `f` after the
77     // final send.
78
79     // If the check was done after `f()` then there would be a
80     // period between that call and the check where the dtor
81     // could be called in the other thread, missing the final
82     // callback while still destroying the handle.
83
84     let should_exit = unsafe { *payload.exit_flag.lock() };
85
86     payload.callback.call();
87
88     if should_exit {
89         unsafe { uvll::uv_close(handle, close_cb) }
90     }
91 }
92
93 extern fn close_cb(handle: *uvll::uv_handle_t) {
94     // drop the payload
95     let _payload: Box<Payload> = unsafe {
96         mem::transmute(uvll::get_data_for_uv_handle(handle))
97     };
98     // and then free the handle
99     unsafe { uvll::free_handle(handle) }
100 }
101
102 impl RemoteCallback for AsyncWatcher {
103     fn fire(&mut self) {
104         unsafe { uvll::uv_async_send(self.handle) }
105     }
106 }
107
108 impl Drop for AsyncWatcher {
109     fn drop(&mut self) {
110         let mut should_exit = unsafe { self.exit_flag.lock() };
111         // NB: These two things need to happen atomically. Otherwise
112         // the event handler could wake up due to a *previous*
113         // signal and see the exit flag, destroying the handle
114         // before the final send.
115         *should_exit = true;
116         unsafe { uvll::uv_async_send(self.handle) }
117     }
118 }
119
120 #[cfg(test)]
121 mod test_remote {
122     use std::rt::rtio::{Callback, RemoteCallback};
123     use std::rt::thread::Thread;
124
125     use super::AsyncWatcher;
126     use super::super::local_loop;
127
128     // Make sure that we can fire watchers in remote threads and that they
129     // actually trigger what they say they will.
130     #[test]
131     fn smoke_test() {
132         struct MyCallback(Option<Sender<int>>);
133         impl Callback for MyCallback {
134             fn call(&mut self) {
135                 // this can get called more than once, but we only want to send
136                 // once
137                 let MyCallback(ref mut s) = *self;
138                 if s.is_some() {
139                     s.take_unwrap().send(1);
140                 }
141             }
142         }
143
144         let (tx, rx) = channel();
145         let cb = box MyCallback(Some(tx));
146         let watcher = AsyncWatcher::new(&mut local_loop().loop_, cb);
147
148         let thread = Thread::start(proc() {
149             let mut watcher = watcher;
150             watcher.fire();
151         });
152
153         assert_eq!(rx.recv(), 1);
154         thread.join();
155     }
156 }