1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! A concurrent queue used to signal remote event loops
13 //! This queue implementation is used to send tasks among event loops. This is
14 //! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
15 //! handles (to wake up a remote event loop).
17 //! The uv_async_t is stored next to the event loop, so in order to not keep the
18 //! event loop alive we use uv_ref and uv_unref in order to control when the
19 //! async handle is active or not.
26 use std::rt::mutex::NativeMutex;
27 use std::rt::task::BlockedTask;
28 use mpsc = std::sync::mpsc_queue;
30 use async::AsyncWatcher;
31 use super::{Loop, UvHandle};
41 handle: *mut uvll::uv_async_t,
42 lock: NativeMutex, // see comments in async_cb for why this is needed
43 queue: mpsc::Queue<Message>,
46 /// This structure is intended to be stored next to the event loop, and it is
47 /// used to create new `Queue` structures.
48 pub struct QueuePool {
53 /// This type is used to send messages back to the original event loop.
58 extern fn async_cb(handle: *mut uvll::uv_async_t) {
59 let pool: &mut QueuePool = unsafe {
60 mem::transmute(uvll::get_data_for_uv_handle(handle))
62 let state: &State = &*pool.queue;
64 // Remember that there is no guarantee about how many times an async
65 // callback is called with relation to the number of sends, so process the
66 // entire queue in a loop.
68 match state.queue.pop() {
69 mpsc::Data(Task(task)) => {
70 let _ = task.wake().map(|t| t.reawaken());
72 mpsc::Data(Increment) => unsafe {
74 uvll::uv_ref(state.handle);
78 mpsc::Data(Decrement) => unsafe {
81 uvll::uv_unref(state.handle);
84 mpsc::Empty | mpsc::Inconsistent => break
88 // If the refcount is now zero after processing the queue, then there is no
89 // longer a reference on the async handle and it is possible that this event
90 // loop can exit. What we're not guaranteed, however, is that a producer in
91 // the middle of dropping itself is yet done with the handle. It could be
92 // possible that we saw their Decrement message but they have yet to signal
93 // on the async handle. If we were to return immediately, the entire uv loop
94 // could be destroyed meaning the call to uv_async_send would abort()
96 // In order to fix this, an OS mutex is used to wait for the other end to
97 // finish before we continue. The drop block on a handle will acquire a
98 // mutex and then drop it after both the push and send have been completed.
99 // If we acquire the mutex here, then we are guaranteed that there are no
100 // longer any senders which are holding on to their handles, so we can
101 // safely allow the event loop to exit.
102 if pool.refcnt == 0 {
104 let _l = state.lock.lock();
110 pub fn new(loop_: &mut Loop) -> Box<QueuePool> {
111 let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112 let state = Arc::new(State {
114 lock: unsafe {NativeMutex::new()},
115 queue: mpsc::Queue::new(),
117 let mut q = box QueuePool {
123 assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
124 uvll::uv_unref(handle);
125 let data = &mut *q as *mut QueuePool as *mut c_void;
126 uvll::set_data_for_uv_handle(handle, data);
132 pub fn queue(&mut self) -> Queue {
134 if self.refcnt == 0 {
135 uvll::uv_ref(self.queue.handle);
139 Queue { queue: self.queue.clone() }
142 pub fn handle(&self) -> *mut uvll::uv_async_t { self.queue.handle }
146 pub fn push(&mut self, task: BlockedTask) {
147 self.queue.queue.push(Task(task));
148 unsafe { uvll::uv_async_send(self.queue.handle); }
152 impl Clone for Queue {
153 fn clone(&self) -> Queue {
154 // Push a request to increment on the queue, but there's no need to
155 // signal the event loop to process it at this time. We're guaranteed
156 // that the count is at least one (because we have a queue right here),
157 // and if the queue is dropped later on it'll see the increment for the
159 self.queue.queue.push(Increment);
160 Queue { queue: self.queue.clone() }
164 impl Drop for Queue {
166 // See the comments in the async_cb function for why there is a lock
167 // that is acquired only on a drop.
169 let _l = self.queue.lock.lock();
170 self.queue.queue.push(Decrement);
171 uvll::uv_async_send(self.queue.handle);
176 impl Drop for State {
179 uvll::uv_close(self.handle, mem::transmute(0));
180 // Note that this does *not* free the handle, that is the
181 // responsibility of the caller because the uv loop must be closed
182 // before we deallocate this uv handle.