]> git.lizzy.rs Git - rust.git/blob - src/librustuv/queue.rs
a3694bfe9c2beed46199f5a7f825f48fa636b994
[rust.git] / src / librustuv / queue.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A concurrent queue used to signal remote event loops
12 //!
13 //! This queue implementation is used to send tasks among event loops. This is
14 //! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
15 //! handles (to wake up a remote event loop).
16 //!
17 //! The uv_async_t is stored next to the event loop, so in order to not keep the
18 //! event loop alive we use uv_ref and uv_unref in order to control when the
19 //! async handle is active or not.
20
21 #![allow(dead_code)]
22
23 use alloc::arc::Arc;
24 use libc::c_void;
25 use std::mem;
26 use std::rt::mutex::NativeMutex;
27 use std::rt::task::BlockedTask;
28 use mpsc = std::sync::mpsc_queue;
29
30 use async::AsyncWatcher;
31 use super::{Loop, UvHandle};
32 use uvll;
33
34 enum Message {
35     Task(BlockedTask),
36     Increment,
37     Decrement,
38 }
39
40 struct State {
41     handle: *uvll::uv_async_t,
42     lock: NativeMutex, // see comments in async_cb for why this is needed
43     queue: mpsc::Queue<Message>,
44 }
45
46 /// This structure is intended to be stored next to the event loop, and it is
47 /// used to create new `Queue` structures.
48 pub struct QueuePool {
49     queue: Arc<State>,
50     refcnt: uint,
51 }
52
53 /// This type is used to send messages back to the original event loop.
54 pub struct Queue {
55     queue: Arc<State>,
56 }
57
58 extern fn async_cb(handle: *uvll::uv_async_t) {
59     let pool: &mut QueuePool = unsafe {
60         mem::transmute(uvll::get_data_for_uv_handle(handle))
61     };
62     let state: &State = &*pool.queue;
63
64     // Remember that there is no guarantee about how many times an async
65     // callback is called with relation to the number of sends, so process the
66     // entire queue in a loop.
67     loop {
68         match state.queue.pop() {
69             mpsc::Data(Task(task)) => {
70                 let _ = task.wake().map(|t| t.reawaken());
71             }
72             mpsc::Data(Increment) => unsafe {
73                 if pool.refcnt == 0 {
74                     uvll::uv_ref(state.handle);
75                 }
76                 pool.refcnt += 1;
77             },
78             mpsc::Data(Decrement) => unsafe {
79                 pool.refcnt -= 1;
80                 if pool.refcnt == 0 {
81                     uvll::uv_unref(state.handle);
82                 }
83             },
84             mpsc::Empty | mpsc::Inconsistent => break
85         };
86     }
87
88     // If the refcount is now zero after processing the queue, then there is no
89     // longer a reference on the async handle and it is possible that this event
90     // loop can exit. What we're not guaranteed, however, is that a producer in
91     // the middle of dropping itself is yet done with the handle. It could be
92     // possible that we saw their Decrement message but they have yet to signal
93     // on the async handle. If we were to return immediately, the entire uv loop
94     // could be destroyed meaning the call to uv_async_send would abort()
95     //
96     // In order to fix this, an OS mutex is used to wait for the other end to
97     // finish before we continue. The drop block on a handle will acquire a
98     // mutex and then drop it after both the push and send have been completed.
99     // If we acquire the mutex here, then we are guaranteed that there are no
100     // longer any senders which are holding on to their handles, so we can
101     // safely allow the event loop to exit.
102     if pool.refcnt == 0 {
103         unsafe {
104             let _l = state.lock.lock();
105         }
106     }
107 }
108
109 impl QueuePool {
110     pub fn new(loop_: &mut Loop) -> Box<QueuePool> {
111         let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112         let state = Arc::new(State {
113             handle: handle,
114             lock: unsafe {NativeMutex::new()},
115             queue: mpsc::Queue::new(),
116         });
117         let q = box QueuePool {
118             refcnt: 0,
119             queue: state,
120         };
121
122         unsafe {
123             assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
124             uvll::uv_unref(handle);
125             let data = &*q as *QueuePool as *c_void;
126             uvll::set_data_for_uv_handle(handle, data);
127         }
128
129         return q;
130     }
131
132     pub fn queue(&mut self) -> Queue {
133         unsafe {
134             if self.refcnt == 0 {
135                 uvll::uv_ref(self.queue.handle);
136             }
137             self.refcnt += 1;
138         }
139         Queue { queue: self.queue.clone() }
140     }
141
142     pub fn handle(&self) -> *uvll::uv_async_t { self.queue.handle }
143 }
144
145 impl Queue {
146     pub fn push(&mut self, task: BlockedTask) {
147         self.queue.queue.push(Task(task));
148         unsafe { uvll::uv_async_send(self.queue.handle); }
149     }
150 }
151
152 impl Clone for Queue {
153     fn clone(&self) -> Queue {
154         // Push a request to increment on the queue, but there's no need to
155         // signal the event loop to process it at this time. We're guaranteed
156         // that the count is at least one (because we have a queue right here),
157         // and if the queue is dropped later on it'll see the increment for the
158         // decrement anyway.
159         self.queue.queue.push(Increment);
160         Queue { queue: self.queue.clone() }
161     }
162 }
163
164 impl Drop for Queue {
165     fn drop(&mut self) {
166         // See the comments in the async_cb function for why there is a lock
167         // that is acquired only on a drop.
168         unsafe {
169             let _l = self.queue.lock.lock();
170             self.queue.queue.push(Decrement);
171             uvll::uv_async_send(self.queue.handle);
172         }
173     }
174 }
175
176 impl Drop for State {
177     fn drop(&mut self) {
178         unsafe {
179             uvll::uv_close(self.handle, mem::transmute(0));
180             // Note that this does *not* free the handle, that is the
181             // responsibility of the caller because the uv loop must be closed
182             // before we deallocate this uv handle.
183         }
184     }
185 }