1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use std::rt::task::BlockedTask;
14 use std::rt::rtio::IoResult;
17 use homing::{HomeHandle, HomingMissile, HomingIO};
18 use timer::TimerWatcher;
20 use uvio::UvIoFactory;
21 use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
22 use {UvHandle, wait_until_woken_after};
24 /// Management of a timeout when gaining access to a portion of a duplex stream.
25 pub struct AccessTimeout {
27 timer: Option<Box<TimerWatcher>>,
28 pub access: access::Access,
31 pub struct Guard<'a> {
32 state: &'a mut TimeoutState,
33 pub access: access::Guard<'a>,
34 pub can_timeout: bool,
37 #[deriving(PartialEq)]
40 TimeoutPending(ClientState),
44 #[deriving(PartialEq)]
52 timeout: *mut AccessTimeout,
53 callback: fn(uint) -> Option<BlockedTask>,
58 pub fn new() -> AccessTimeout {
62 access: access::Access::new(),
66 /// Grants access to half of a duplex stream, timing out if necessary.
68 /// On success, Ok(Guard) is returned and access has been granted to the
69 /// stream. If a timeout occurs, then Err is returned with an appropriate
71 pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
72 // First, flag that we're attempting to acquire access. This will allow
73 // us to cancel the pending grant if we timeout out while waiting for a
77 TimeoutPending(ref mut client) => *client = AccessPending,
78 TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
80 let access = self.access.grant(self as *mut _ as uint, m);
82 // After acquiring the grant, we need to flag ourselves as having a
83 // pending request so the timeout knows to cancel the request.
84 let can_timeout = match self.state {
86 TimeoutPending(ref mut client) => { *client = RequestPending; true }
87 TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
92 state: &mut self.state,
93 can_timeout: can_timeout
97 /// Sets the pending timeout to the value specified.
99 /// The home/loop variables are used to construct a timer if one has not
100 /// been previously constructed.
102 /// The callback will be invoked if the timeout elapses, and the data of
103 /// the time will be set to `data`.
104 pub fn set_timeout(&mut self, ms: Option<u64>,
107 cb: fn(uint) -> Option<BlockedTask>,
109 self.state = NoTimeout;
112 None => return match self.timer {
113 Some(ref mut t) => t.stop(),
118 // If we have a timeout, lazily initialize the timer which will be used
119 // to fire when the timeout runs out.
120 if self.timer.is_none() {
121 let mut timer = box TimerWatcher::new_home(loop_, home.clone());
122 let cx = box TimerContext {
123 timeout: self as *mut _,
128 timer.set_data(&*cx);
131 self.timer = Some(timer);
134 let timer = self.timer.get_mut_ref();
136 let cx = uvll::get_data_for_uv_handle(timer.handle);
137 let cx = cx as *mut TimerContext;
139 (*cx).payload = data;
142 timer.start(timer_cb, ms, 0);
143 self.state = TimeoutPending(NoWaiter);
145 extern fn timer_cb(timer: *uvll::uv_timer_t) {
146 let cx: &TimerContext = unsafe {
147 &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
149 let me = unsafe { &mut *cx.timeout };
151 match mem::replace(&mut me.state, TimedOut) {
152 TimedOut | NoTimeout => unreachable!(),
153 TimeoutPending(NoWaiter) => {}
154 TimeoutPending(AccessPending) => {
155 match unsafe { me.access.dequeue(me as *mut _ as uint) } {
156 Some(task) => task.reawaken(),
157 None => unreachable!(),
160 TimeoutPending(RequestPending) => {
161 match (cx.callback)(cx.payload) {
162 Some(task) => task.reawaken(),
163 None => unreachable!(),
171 impl Clone for AccessTimeout {
172 fn clone(&self) -> AccessTimeout {
174 access: self.access.clone(),
182 impl<'a> Drop for Guard<'a> {
185 TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
188 NoTimeout | TimedOut => {}
189 TimeoutPending(RequestPending) => {
190 *self.state = TimeoutPending(NoWaiter);
196 impl Drop for AccessTimeout {
199 Some(ref timer) => unsafe {
200 let data = uvll::get_data_for_uv_handle(timer.handle);
201 let _data: Box<TimerContext> = mem::transmute(data);
208 ////////////////////////////////////////////////////////////////////////////////
210 ////////////////////////////////////////////////////////////////////////////////
212 pub struct ConnectCtx {
214 pub task: Option<BlockedTask>,
215 pub timer: Option<Box<TimerWatcher>>,
218 pub struct AcceptTimeout {
219 timer: Option<TimerWatcher>,
220 timeout_tx: Option<Sender<()>>,
221 timeout_rx: Option<Receiver<()>>,
226 mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
227 f: |&Request, &T, uvll::uv_connect_cb| -> c_int
228 ) -> Result<T, UvError> {
229 let mut req = Request::new(uvll::UV_CONNECT);
230 let r = f(&req, &obj, connect_cb);
233 req.defuse(); // uv callback now owns this request
236 let mut timer = TimerWatcher::new(io);
237 timer.start(timer_cb, t, 0);
238 self.timer = Some(timer);
242 wait_until_woken_after(&mut self.task, &io.loop_, || {
243 let data = &self as *_;
245 Some(ref mut timer) => unsafe { timer.set_data(data) },
250 // Make sure an erroneously fired callback doesn't have access
251 // to the context any more.
252 req.set_data(0 as *int);
254 // If we failed because of a timeout, drop the TcpWatcher as
255 // soon as possible because it's data is now set to null and we
256 // want to cancel the callback ASAP.
259 n => { drop(obj); Err(UvError(n)) }
265 extern fn timer_cb(handle: *uvll::uv_timer_t) {
266 // Don't close the corresponding tcp request, just wake up the task
267 // and let RAII take care of the pending watcher.
268 let cx: &mut ConnectCtx = unsafe {
269 &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
271 cx.status = uvll::ECANCELED;
272 wakeup(&mut cx.task);
275 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
276 // This callback can be invoked with ECANCELED if the watcher is
277 // closed by the timeout callback. In that case we just want to free
278 // the request and be along our merry way.
279 let req = Request::wrap(req);
280 if status == uvll::ECANCELED { return }
282 // Apparently on windows when the handle is closed this callback may
283 // not be invoked with ECANCELED but rather another error code.
284 // Either ways, if the data is null, then our timeout has expired
285 // and there's nothing we can do.
286 let data = unsafe { uvll::get_data_for_req(req.handle) };
287 if data.is_null() { return }
289 let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
292 Some(ref mut t) => t.stop(),
295 // Note that the timer callback doesn't cancel the connect request
296 // (that's the job of uv_close()), so it's possible for this
297 // callback to get triggered after the timeout callback fires, but
298 // before the task wakes up. In that case, we did indeed
299 // successfully connect, but we don't need to wake someone up. We
300 // updated the status above (correctly so), and the task will pick
301 // up on this when it wakes up.
302 if cx.task.is_some() {
303 wakeup(&mut cx.task);
310 pub fn new() -> AcceptTimeout {
311 AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
314 pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
315 match self.timeout_rx {
318 use std::comm::Select;
320 // Poll the incoming channel first (don't rely on the order of
321 // select just yet). If someone's pending then we should return
324 Ok(data) => return data,
328 // Use select to figure out which channel gets ready first. We
329 // do some custom handling of select to ensure that we never
330 // actually drain the timeout channel (we'll keep seeing the
331 // timeout message in the future).
332 let s = Select::new();
333 let mut timeout = s.handle(rx);
334 let mut data = s.handle(c);
339 if s.wait() == timeout.id() {
340 Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
348 pub fn clear(&mut self) {
349 match self.timeout_rx {
350 Some(ref t) => { let _ = t.try_recv(); }
354 Some(ref mut t) => t.stop(),
359 pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
360 &mut self, ms: u64, t: &mut T
362 // If we have a timeout, lazily initialize the timer which will be used
363 // to fire when the timeout runs out.
364 if self.timer.is_none() {
365 let loop_ = Loop::wrap(unsafe {
366 uvll::get_loop_for_uv_handle(t.uv_handle())
368 let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
370 timer.set_data(self as *mut _ as *AcceptTimeout);
372 self.timer = Some(timer);
375 // Once we've got a timer, stop any previous timeout, reset it for the
376 // current one, and install some new channels to send/receive data on
377 let timer = self.timer.get_mut_ref();
379 timer.start(timer_cb, ms, 0);
380 let (tx, rx) = channel();
381 self.timeout_tx = Some(tx);
382 self.timeout_rx = Some(rx);
384 extern fn timer_cb(timer: *uvll::uv_timer_t) {
385 let acceptor: &mut AcceptTimeout = unsafe {
386 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
388 // This send can never fail because if this timer is active then the
389 // receiving channel is guaranteed to be alive
390 acceptor.timeout_tx.get_ref().send(());