]> git.lizzy.rs Git - rust.git/blob - src/librustuv/timeout.rs
1c191d476edb9efa27750192bb823d1ea654cc89
[rust.git] / src / librustuv / timeout.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::c_int;
12 use std::mem;
13 use std::rt::task::BlockedTask;
14 use std::rt::rtio::IoResult;
15
16 use access;
17 use homing::{HomeHandle, HomingMissile, HomingIO};
18 use timer::TimerWatcher;
19 use uvll;
20 use uvio::UvIoFactory;
21 use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
22 use {UvHandle, wait_until_woken_after};
23
24 /// Management of a timeout when gaining access to a portion of a duplex stream.
25 pub struct AccessTimeout {
26     state: TimeoutState,
27     timer: Option<Box<TimerWatcher>>,
28     pub access: access::Access,
29 }
30
31 pub struct Guard<'a> {
32     state: &'a mut TimeoutState,
33     pub access: access::Guard<'a>,
34     pub can_timeout: bool,
35 }
36
37 #[deriving(PartialEq)]
38 enum TimeoutState {
39     NoTimeout,
40     TimeoutPending(ClientState),
41     TimedOut,
42 }
43
44 #[deriving(PartialEq)]
45 enum ClientState {
46     NoWaiter,
47     AccessPending,
48     RequestPending,
49 }
50
51 struct TimerContext {
52     timeout: *mut AccessTimeout,
53     callback: fn(uint) -> Option<BlockedTask>,
54     payload: uint,
55 }
56
57 impl AccessTimeout {
58     pub fn new() -> AccessTimeout {
59         AccessTimeout {
60             state: NoTimeout,
61             timer: None,
62             access: access::Access::new(),
63         }
64     }
65
66     /// Grants access to half of a duplex stream, timing out if necessary.
67     ///
68     /// On success, Ok(Guard) is returned and access has been granted to the
69     /// stream. If a timeout occurs, then Err is returned with an appropriate
70     /// error.
71     pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
72         // First, flag that we're attempting to acquire access. This will allow
73         // us to cancel the pending grant if we timeout out while waiting for a
74         // grant.
75         match self.state {
76             NoTimeout => {},
77             TimeoutPending(ref mut client) => *client = AccessPending,
78             TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
79         }
80         let access = self.access.grant(self as *mut _ as uint, m);
81
82         // After acquiring the grant, we need to flag ourselves as having a
83         // pending request so the timeout knows to cancel the request.
84         let can_timeout = match self.state {
85             NoTimeout => false,
86             TimeoutPending(ref mut client) => { *client = RequestPending; true }
87             TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
88         };
89
90         Ok(Guard {
91             access: access,
92             state: &mut self.state,
93             can_timeout: can_timeout
94         })
95     }
96
97     /// Sets the pending timeout to the value specified.
98     ///
99     /// The home/loop variables are used to construct a timer if one has not
100     /// been previously constructed.
101     ///
102     /// The callback will be invoked if the timeout elapses, and the data of
103     /// the time will be set to `data`.
104     pub fn set_timeout(&mut self, ms: Option<u64>,
105                        home: &HomeHandle,
106                        loop_: &Loop,
107                        cb: fn(uint) -> Option<BlockedTask>,
108                        data: uint) {
109         self.state = NoTimeout;
110         let ms = match ms {
111             Some(ms) => ms,
112             None => return match self.timer {
113                 Some(ref mut t) => t.stop(),
114                 None => {}
115             }
116         };
117
118         // If we have a timeout, lazily initialize the timer which will be used
119         // to fire when the timeout runs out.
120         if self.timer.is_none() {
121             let mut timer = box TimerWatcher::new_home(loop_, home.clone());
122             let cx = box TimerContext {
123                 timeout: self as *mut _,
124                 callback: cb,
125                 payload: data,
126             };
127             unsafe {
128                 timer.set_data(&*cx);
129                 mem::forget(cx);
130             }
131             self.timer = Some(timer);
132         }
133
134         let timer = self.timer.get_mut_ref();
135         unsafe {
136             let cx = uvll::get_data_for_uv_handle(timer.handle);
137             let cx = cx as *mut TimerContext;
138             (*cx).callback = cb;
139             (*cx).payload = data;
140         }
141         timer.stop();
142         timer.start(timer_cb, ms, 0);
143         self.state = TimeoutPending(NoWaiter);
144
145         extern fn timer_cb(timer: *uvll::uv_timer_t) {
146             let cx: &TimerContext = unsafe {
147                 &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
148             };
149             let me = unsafe { &mut *cx.timeout };
150
151             match mem::replace(&mut me.state, TimedOut) {
152                 TimedOut | NoTimeout => unreachable!(),
153                 TimeoutPending(NoWaiter) => {}
154                 TimeoutPending(AccessPending) => {
155                     match unsafe { me.access.dequeue(me as *mut _ as uint) } {
156                         Some(task) => task.reawaken(),
157                         None => unreachable!(),
158                     }
159                 }
160                 TimeoutPending(RequestPending) => {
161                     match (cx.callback)(cx.payload) {
162                         Some(task) => task.reawaken(),
163                         None => unreachable!(),
164                     }
165                 }
166             }
167         }
168     }
169 }
170
171 impl Clone for AccessTimeout {
172     fn clone(&self) -> AccessTimeout {
173         AccessTimeout {
174             access: self.access.clone(),
175             state: NoTimeout,
176             timer: None,
177         }
178     }
179 }
180
181 #[unsafe_destructor]
182 impl<'a> Drop for Guard<'a> {
183     fn drop(&mut self) {
184         match *self.state {
185             TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
186                 unreachable!(),
187
188             NoTimeout | TimedOut => {}
189             TimeoutPending(RequestPending) => {
190                 *self.state = TimeoutPending(NoWaiter);
191             }
192         }
193     }
194 }
195
196 impl Drop for AccessTimeout {
197     fn drop(&mut self) {
198         match self.timer {
199             Some(ref timer) => unsafe {
200                 let data = uvll::get_data_for_uv_handle(timer.handle);
201                 let _data: Box<TimerContext> = mem::transmute(data);
202             },
203             None => {}
204         }
205     }
206 }
207
208 ////////////////////////////////////////////////////////////////////////////////
209 // Connect timeouts
210 ////////////////////////////////////////////////////////////////////////////////
211
212 pub struct ConnectCtx {
213     pub status: c_int,
214     pub task: Option<BlockedTask>,
215     pub timer: Option<Box<TimerWatcher>>,
216 }
217
218 pub struct AcceptTimeout {
219     timer: Option<TimerWatcher>,
220     timeout_tx: Option<Sender<()>>,
221     timeout_rx: Option<Receiver<()>>,
222 }
223
224 impl ConnectCtx {
225     pub fn connect<T>(
226         mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
227         f: |&Request, &T, uvll::uv_connect_cb| -> c_int
228     ) -> Result<T, UvError> {
229         let mut req = Request::new(uvll::UV_CONNECT);
230         let r = f(&req, &obj, connect_cb);
231         return match r {
232             0 => {
233                 req.defuse(); // uv callback now owns this request
234                 match timeout {
235                     Some(t) => {
236                         let mut timer = TimerWatcher::new(io);
237                         timer.start(timer_cb, t, 0);
238                         self.timer = Some(timer);
239                     }
240                     None => {}
241                 }
242                 wait_until_woken_after(&mut self.task, &io.loop_, || {
243                     let data = &self as *_;
244                     match self.timer {
245                         Some(ref mut timer) => unsafe { timer.set_data(data) },
246                         None => {}
247                     }
248                     req.set_data(data);
249                 });
250                 // Make sure an erroneously fired callback doesn't have access
251                 // to the context any more.
252                 req.set_data(0 as *int);
253
254                 // If we failed because of a timeout, drop the TcpWatcher as
255                 // soon as possible because it's data is now set to null and we
256                 // want to cancel the callback ASAP.
257                 match self.status {
258                     0 => Ok(obj),
259                     n => { drop(obj); Err(UvError(n)) }
260                 }
261             }
262             n => Err(UvError(n))
263         };
264
265         extern fn timer_cb(handle: *uvll::uv_timer_t) {
266             // Don't close the corresponding tcp request, just wake up the task
267             // and let RAII take care of the pending watcher.
268             let cx: &mut ConnectCtx = unsafe {
269                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
270             };
271             cx.status = uvll::ECANCELED;
272             wakeup(&mut cx.task);
273         }
274
275         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
276             // This callback can be invoked with ECANCELED if the watcher is
277             // closed by the timeout callback. In that case we just want to free
278             // the request and be along our merry way.
279             let req = Request::wrap(req);
280             if status == uvll::ECANCELED { return }
281
282             // Apparently on windows when the handle is closed this callback may
283             // not be invoked with ECANCELED but rather another error code.
284             // Either ways, if the data is null, then our timeout has expired
285             // and there's nothing we can do.
286             let data = unsafe { uvll::get_data_for_req(req.handle) };
287             if data.is_null() { return }
288
289             let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
290             cx.status = status;
291             match cx.timer {
292                 Some(ref mut t) => t.stop(),
293                 None => {}
294             }
295             // Note that the timer callback doesn't cancel the connect request
296             // (that's the job of uv_close()), so it's possible for this
297             // callback to get triggered after the timeout callback fires, but
298             // before the task wakes up. In that case, we did indeed
299             // successfully connect, but we don't need to wake someone up. We
300             // updated the status above (correctly so), and the task will pick
301             // up on this when it wakes up.
302             if cx.task.is_some() {
303                 wakeup(&mut cx.task);
304             }
305         }
306     }
307 }
308
309 impl AcceptTimeout {
310     pub fn new() -> AcceptTimeout {
311         AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
312     }
313
314     pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
315         match self.timeout_rx {
316             None => c.recv(),
317             Some(ref rx) => {
318                 use std::comm::Select;
319
320                 // Poll the incoming channel first (don't rely on the order of
321                 // select just yet). If someone's pending then we should return
322                 // them immediately.
323                 match c.try_recv() {
324                     Ok(data) => return data,
325                     Err(..) => {}
326                 }
327
328                 // Use select to figure out which channel gets ready first. We
329                 // do some custom handling of select to ensure that we never
330                 // actually drain the timeout channel (we'll keep seeing the
331                 // timeout message in the future).
332                 let s = Select::new();
333                 let mut timeout = s.handle(rx);
334                 let mut data = s.handle(c);
335                 unsafe {
336                     timeout.add();
337                     data.add();
338                 }
339                 if s.wait() == timeout.id() {
340                     Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
341                 } else {
342                     c.recv()
343                 }
344             }
345         }
346     }
347
348     pub fn clear(&mut self) {
349         match self.timeout_rx {
350             Some(ref t) => { let _ = t.try_recv(); }
351             None => {}
352         }
353         match self.timer {
354             Some(ref mut t) => t.stop(),
355             None => {}
356         }
357     }
358
359     pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
360         &mut self, ms: u64, t: &mut T
361     ) {
362         // If we have a timeout, lazily initialize the timer which will be used
363         // to fire when the timeout runs out.
364         if self.timer.is_none() {
365             let loop_ = Loop::wrap(unsafe {
366                 uvll::get_loop_for_uv_handle(t.uv_handle())
367             });
368             let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
369             unsafe {
370                 timer.set_data(self as *mut _ as *AcceptTimeout);
371             }
372             self.timer = Some(timer);
373         }
374
375         // Once we've got a timer, stop any previous timeout, reset it for the
376         // current one, and install some new channels to send/receive data on
377         let timer = self.timer.get_mut_ref();
378         timer.stop();
379         timer.start(timer_cb, ms, 0);
380         let (tx, rx) = channel();
381         self.timeout_tx = Some(tx);
382         self.timeout_rx = Some(rx);
383
384         extern fn timer_cb(timer: *uvll::uv_timer_t) {
385             let acceptor: &mut AcceptTimeout = unsafe {
386                 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
387             };
388             // This send can never fail because if this timer is active then the
389             // receiving channel is guaranteed to be alive
390             acceptor.timeout_tx.get_ref().send(());
391         }
392     }
393 }