]> git.lizzy.rs Git - rust.git/blob - src/librustuv/stream.rs
Doc says to avoid mixing allocator instead of forbiding it
[rust.git] / src / librustuv / stream.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::{c_int, size_t, ssize_t};
12 use std::mem;
13 use std::ptr;
14 use std::rt::task::BlockedTask;
15
16 use Loop;
17 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
18             ForbidUnwind, wakeup};
19 use uvll;
20
21 // This is a helper structure which is intended to get embedded into other
22 // Watcher structures. This structure will retain a handle to the underlying
23 // uv_stream_t instance, and all I/O operations assume that it's already located
24 // on the appropriate scheduler.
25 pub struct StreamWatcher {
26     pub handle: *mut uvll::uv_stream_t,
27
28     // Cache the last used uv_write_t so we don't have to allocate a new one on
29     // every call to uv_write(). Ideally this would be a stack-allocated
30     // structure, but currently we don't have mappings for all the structures
31     // defined in libuv, so we're forced to malloc this.
32     last_write_req: Option<Request>,
33
34     blocked_writer: Option<BlockedTask>,
35 }
36
37 struct ReadContext {
38     buf: Option<Buf>,
39     result: ssize_t,
40     task: Option<BlockedTask>,
41 }
42
43 struct WriteContext {
44     result: c_int,
45     stream: *mut StreamWatcher,
46     data: Option<Vec<u8>>,
47 }
48
49 impl StreamWatcher {
50     // Creates a new helper structure which should be then embedded into another
51     // watcher. This provides the generic read/write methods on streams.
52     //
53     // This structure will *not* close the stream when it is dropped. It is up
54     // to the enclosure structure to be sure to call the close method (which
55     // will block the task). Note that this is also required to prevent memory
56     // leaks.
57     //
58     // It should also be noted that the `data` field of the underlying uv handle
59     // will be manipulated on each of the methods called on this watcher.
60     // Wrappers should ensure to always reset the field to an appropriate value
61     // if they rely on the field to perform an action.
62     pub fn new(stream: *mut uvll::uv_stream_t,
63                init: bool) -> StreamWatcher {
64         if init {
65             unsafe { uvll::set_data_for_uv_handle(stream, 0 as *mut int) }
66         }
67         StreamWatcher {
68             handle: stream,
69             last_write_req: None,
70             blocked_writer: None,
71         }
72     }
73
74     pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
75         // This read operation needs to get canceled on an unwind via libuv's
76         // uv_read_stop function
77         let _f = ForbidUnwind::new("stream read");
78
79         let mut rcx = ReadContext {
80             buf: Some(slice_to_uv_buf(buf)),
81             // if the read is canceled, we'll see eof, otherwise this will get
82             // overwritten
83             result: 0,
84             task: None,
85         };
86         // When reading a TTY stream on windows, libuv will invoke alloc_cb
87         // immediately as part of the call to alloc_cb. What this means is that
88         // we must be ready for this to happen (by setting the data in the uv
89         // handle). In theory this otherwise doesn't need to happen until after
90         // the read is successfully started.
91         unsafe { uvll::set_data_for_uv_handle(self.handle, &mut rcx) }
92
93         // Send off the read request, but don't block until we're sure that the
94         // read request is queued.
95         let ret = match unsafe {
96             uvll::uv_read_start(self.handle, alloc_cb, read_cb)
97         } {
98             0 => {
99                 let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
100                 wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
101                 match rcx.result {
102                     n if n < 0 => Err(UvError(n as c_int)),
103                     n => Ok(n as uint),
104                 }
105             }
106             n => Err(UvError(n))
107         };
108         // Make sure a read cancellation sees that there's no pending read
109         unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *mut int) }
110         return ret;
111     }
112
113     pub fn cancel_read(&mut self, reason: ssize_t) -> Option<BlockedTask> {
114         // When we invoke uv_read_stop, it cancels the read and alloc
115         // callbacks. We need to manually wake up a pending task (if one was
116         // present).
117         assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
118         let data = unsafe {
119             let data = uvll::get_data_for_uv_handle(self.handle);
120             if data.is_null() { return None }
121             uvll::set_data_for_uv_handle(self.handle, 0 as *mut int);
122             &mut *(data as *mut ReadContext)
123         };
124         data.result = reason;
125         data.task.take()
126     }
127
128     pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> {
129         // The ownership of the write request is dubious if this function
130         // unwinds. I believe that if the write_cb fails to re-schedule the task
131         // then the write request will be leaked.
132         let _f = ForbidUnwind::new("stream write");
133
134         // Prepare the write request, either using a cached one or allocating a
135         // new one
136         let mut req = match self.last_write_req.take() {
137             Some(req) => req, None => Request::new(uvll::UV_WRITE),
138         };
139         req.set_data(ptr::mut_null::<()>());
140
141         // And here's where timeouts get a little interesting. Currently, libuv
142         // does not support canceling an in-flight write request. Consequently,
143         // when a write timeout expires, there's not much we can do other than
144         // detach the sleeping task from the write request itself. Semantically,
145         // this means that the write request will complete asynchronously, but
146         // the calling task will return error (because the write timed out).
147         //
148         // There is special wording in the documentation of set_write_timeout()
149         // indicating that this is a plausible failure scenario, and this
150         // function is why that wording exists.
151         //
152         // Implementation-wise, we must be careful when passing a buffer down to
153         // libuv. Most of this implementation avoids allocations because of the
154         // blocking guarantee (all stack local variables are valid for the
155         // entire read/write request). If our write request can be timed out,
156         // however, we must heap allocate the data and pass that to the libuv
157         // functions instead. The reason for this is that if we time out and
158         // return, there's no guarantee that `buf` is a valid buffer any more.
159         //
160         // To do this, the write context has an optionally owned vector of
161         // bytes.
162         let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
163         let uv_buf = if may_timeout {
164             slice_to_uv_buf(data.as_ref().unwrap().as_slice())
165         } else {
166             slice_to_uv_buf(buf)
167         };
168
169         // Send off the request, but be careful to not block until we're sure
170         // that the write request is queued. If the request couldn't be queued,
171         // then we should return immediately with an error.
172         match unsafe {
173             uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
174         } {
175             0 => {
176                 let mut wcx = WriteContext {
177                     result: uvll::ECANCELED,
178                     stream: self as *mut _,
179                     data: data,
180                 };
181                 req.defuse(); // uv callback now owns this request
182
183                 let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
184                 wait_until_woken_after(&mut self.blocked_writer,
185                                        &Loop::wrap(loop_), || {
186                     req.set_data(&mut wcx);
187                 });
188
189                 if wcx.result != uvll::ECANCELED {
190                     self.last_write_req = Some(Request::wrap(req.handle));
191                     return match wcx.result {
192                         0 => Ok(()),
193                         n => Err(UvError(n)),
194                     }
195                 }
196
197                 // This is the second case where canceling an in-flight write
198                 // gets interesting. If we've been canceled (no one reset our
199                 // result), then someone still needs to free the request, and
200                 // someone still needs to free the allocate buffer.
201                 //
202                 // To take care of this, we swap out the stack-allocated write
203                 // context for a heap-allocated context, transferring ownership
204                 // of everything to the write_cb. Libuv guarantees that this
205                 // callback will be invoked at some point, and the callback will
206                 // be responsible for deallocating these resources.
207                 //
208                 // Note that we don't cache this write request back in the
209                 // stream watcher because we no longer have ownership of it, and
210                 // we never will.
211                 let mut new_wcx = box WriteContext {
212                     result: 0,
213                     stream: 0 as *mut StreamWatcher,
214                     data: wcx.data.take(),
215                 };
216                 unsafe {
217                     req.set_data(&mut *new_wcx);
218                     mem::forget(new_wcx);
219                 }
220                 Err(UvError(wcx.result))
221             }
222             n => Err(UvError(n)),
223         }
224     }
225
226     pub fn cancel_write(&mut self) -> Option<BlockedTask> {
227         self.blocked_writer.take()
228     }
229 }
230
231 // This allocation callback expects to be invoked once and only once. It will
232 // unwrap the buffer in the ReadContext stored in the stream and return it. This
233 // will fail if it is called more than once.
234 extern fn alloc_cb(stream: *mut uvll::uv_stream_t, _hint: size_t, buf: *mut Buf) {
235     uvdebug!("alloc_cb");
236     unsafe {
237         let rcx: &mut ReadContext =
238             mem::transmute(uvll::get_data_for_uv_handle(stream));
239         *buf = rcx.buf.take().expect("stream alloc_cb called more than once");
240     }
241 }
242
243 // When a stream has read some data, we will always forcibly stop reading and
244 // return all the data read (even if it didn't fill the whole buffer).
245 extern fn read_cb(handle: *mut uvll::uv_stream_t, nread: ssize_t,
246                   _buf: *const Buf) {
247     uvdebug!("read_cb {}", nread);
248     assert!(nread != uvll::ECANCELED as ssize_t);
249     let rcx: &mut ReadContext = unsafe {
250         mem::transmute(uvll::get_data_for_uv_handle(handle))
251     };
252     // Stop reading so that no read callbacks are
253     // triggered before the user calls `read` again.
254     // FIXME: Is there a performance impact to calling
255     // stop here?
256     unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
257     rcx.result = nread;
258
259     wakeup(&mut rcx.task);
260 }
261
262 // Unlike reading, the WriteContext is stored in the uv_write_t request. Like
263 // reading, however, all this does is wake up the blocked task after squirreling
264 // away the error code as a result.
265 extern fn write_cb(req: *mut uvll::uv_write_t, status: c_int) {
266     let mut req = Request::wrap(req);
267     // Remember to not free the request because it is re-used between writes on
268     // the same stream.
269     let wcx: &mut WriteContext = unsafe { req.get_data() };
270     wcx.result = status;
271
272     // If the stream is present, we haven't timed out, otherwise we acquire
273     // ownership of everything and then deallocate it all at once.
274     if wcx.stream as uint != 0 {
275         req.defuse();
276         let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
277         wakeup(&mut stream.blocked_writer);
278     } else {
279         let _wcx: Box<WriteContext> = unsafe { mem::transmute(wcx) };
280     }
281 }