]> git.lizzy.rs Git - rust.git/blob - src/librustuv/homing.rs
d7be06724a09a768b3b12664f15f47e2b967d28e
[rust.git] / src / librustuv / homing.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Homing I/O implementation
12 //!
13 //! In libuv, whenever a handle is created on an I/O loop it is illegal to use
14 //! that handle outside of that I/O loop. We use libuv I/O with our green
15 //! scheduler, and each green scheduler corresponds to a different I/O loop on a
16 //! different OS thread. Green tasks are also free to roam among schedulers,
17 //! which implies that it is possible to create an I/O handle on one event loop
18 //! and then attempt to use it on another.
19 //!
20 //! In order to solve this problem, this module implements the notion of a
21 //! "homing operation" which will transplant a task from its currently running
22 //! scheduler back onto the original I/O loop. This is accomplished entirely at
23 //! the librustuv layer with very little cooperation from the scheduler (which
24 //! we don't even know exists technically).
25 //!
26 //! These homing operations are completed by first realizing that we're on the
27 //! wrong I/O loop, then descheduling ourselves, sending ourselves to the
28 //! correct I/O loop, and then waking up the I/O loop in order to process its
29 //! local queue of tasks which need to run.
30 //!
31 //! This enqueueing is done with a concurrent queue from libstd, and the
32 //! signalling is achieved with an async handle.
33
34 #[allow(dead_code)];
35
36 use std::rt::local::Local;
37 use std::rt::rtio::LocalIo;
38 use std::rt::task::{Task, BlockedTask};
39
40 use ForbidUnwind;
41 use queue::{Queue, QueuePool};
42
43 /// A handle to a remote libuv event loop. This handle will keep the event loop
44 /// alive while active in order to ensure that a homing operation can always be
45 /// completed.
46 ///
47 /// Handles are clone-able in order to derive new handles from existing handles
48 /// (very useful for when accepting a socket from a server).
49 pub struct HomeHandle {
50     priv queue: Queue,
51     priv id: uint,
52 }
53
54 impl HomeHandle {
55     pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
56         HomeHandle { queue: pool.queue(), id: id }
57     }
58
59     fn send(&mut self, task: BlockedTask) {
60         self.queue.push(task);
61     }
62 }
63
64 impl Clone for HomeHandle {
65     fn clone(&self) -> HomeHandle {
66         HomeHandle {
67             queue: self.queue.clone(),
68             id: self.id,
69         }
70     }
71 }
72
73 pub trait HomingIO {
74     fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
75
76     /// This function will move tasks to run on their home I/O scheduler. Note
77     /// that this function does *not* pin the task to the I/O scheduler, but
78     /// rather it simply moves it to running on the I/O scheduler.
79     fn go_to_IO_home(&mut self) -> uint {
80         let _f = ForbidUnwind::new("going home");
81
82         let mut cur_task: ~Task = Local::take();
83         let cur_loop_id = {
84             let mut io = cur_task.local_io().expect("libuv must have I/O");
85             io.get().id()
86         };
87
88         // Try at all costs to avoid the homing operation because it is quite
89         // expensive. Hence, we only deschedule/send if we're not on the correct
90         // event loop. If we're already on the home event loop, then we're good
91         // to go (remember we have no preemption, so we're guaranteed to stay on
92         // this event loop as long as we avoid the scheduler).
93         if cur_loop_id != self.home().id {
94             cur_task.deschedule(1, |task| {
95                 self.home().send(task);
96                 Ok(())
97             });
98
99             // Once we wake up, assert that we're in the right location
100             let cur_loop_id = {
101                 let mut io = LocalIo::borrow().expect("libuv must have I/O");
102                 io.get().id()
103             };
104             assert_eq!(cur_loop_id, self.home().id);
105
106             cur_loop_id
107         } else {
108             Local::put(cur_task);
109             cur_loop_id
110         }
111     }
112
113     /// Fires a single homing missile, returning another missile targeted back
114     /// at the original home of this task. In other words, this function will
115     /// move the local task to its I/O scheduler and then return an RAII wrapper
116     /// which will return the task home.
117     fn fire_homing_missile(&mut self) -> HomingMissile {
118         HomingMissile { io_home: self.go_to_IO_home() }
119     }
120 }
121
122 /// After a homing operation has been completed, this will return the current
123 /// task back to its appropriate home (if applicable). The field is used to
124 /// assert that we are where we think we are.
125 struct HomingMissile {
126     priv io_home: uint,
127 }
128
129 impl HomingMissile {
130     /// Check at runtime that the task has *not* transplanted itself to a
131     /// different I/O loop while executing.
132     pub fn check(&self, msg: &'static str) {
133         let mut io = LocalIo::borrow().expect("libuv must have I/O");
134         assert!(io.get().id() == self.io_home, "{}", msg);
135     }
136 }
137
138 impl Drop for HomingMissile {
139     fn drop(&mut self) {
140         let _f = ForbidUnwind::new("leaving home");
141
142         // It would truly be a sad day if we had moved off the home I/O
143         // scheduler while we were doing I/O.
144         self.check("task moved away from the home scheduler");
145     }
146 }
147
148 #[cfg(test)]
149 mod test {
150     use green::sched;
151     use green::{SchedPool, PoolConfig};
152     use std::rt::rtio::RtioUdpSocket;
153     use std::io::test::next_test_ip4;
154     use std::task::TaskOpts;
155
156     use net::UdpWatcher;
157     use super::super::local_loop;
158
159     // On one thread, create a udp socket. Then send that socket to another
160     // thread and destroy the socket on the remote thread. This should make sure
161     // that homing kicks in for the socket to go back home to the original
162     // thread, close itself, and then come back to the last thread.
163     #[test]
164     fn test_homing_closes_correctly() {
165         let (port, chan) = Chan::new();
166         let mut pool = SchedPool::new(PoolConfig {
167             threads: 1,
168             event_loop_factory: None,
169         });
170
171         do pool.spawn(TaskOpts::new()) {
172             let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
173             chan.send(listener.unwrap());
174         }
175
176         let task = do pool.task(TaskOpts::new()) {
177             port.recv();
178         };
179         pool.spawn_sched().send(sched::TaskFromFriend(task));
180
181         pool.shutdown();
182     }
183
184     #[test]
185     fn test_homing_read() {
186         let (port, chan) = Chan::new();
187         let mut pool = SchedPool::new(PoolConfig {
188             threads: 1,
189             event_loop_factory: None,
190         });
191
192         do pool.spawn(TaskOpts::new()) {
193             let addr1 = next_test_ip4();
194             let addr2 = next_test_ip4();
195             let listener = UdpWatcher::bind(local_loop(), addr2);
196             chan.send((listener.unwrap(), addr1));
197             let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
198             listener.sendto([1, 2, 3, 4], addr2);
199         }
200
201         let task = do pool.task(TaskOpts::new()) {
202             let (mut watcher, addr) = port.recv();
203             let mut buf = [0, ..10];
204             assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
205         };
206         pool.spawn_sched().send(sched::TaskFromFriend(task));
207
208         pool.shutdown();
209     }
210 }