]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/uvio.rs
b930ea2437ea86aa4eeb5186d19c056e6a560d5a
[rust.git] / src / libstd / rt / uv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use str;
21 use result::*;
22 use rt::io::IoError;
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
25 use rt::local::Local;
26 use rt::rtio::*;
27 use rt::sched::{Scheduler, SchedHandle};
28 use rt::tube::Tube;
29 use rt::task::SchedHome;
30 use rt::uv::*;
31 use rt::uv::idle::IdleWatcher;
32 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs};
33 use rt::uv::addrinfo::GetAddrInfoRequest;
34 use unstable::sync::Exclusive;
35 use super::super::io::support::PathLike;
36 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
37           S_IRUSR, S_IWUSR};
38 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
39             CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
40 use task;
41
42 #[cfg(test)] use container::Container;
43 #[cfg(test)] use unstable::run_in_bare_thread;
44 #[cfg(test)] use rt::test::{spawntask,
45                             next_test_ip4,
46                             run_in_mt_newsched_task};
47 #[cfg(test)] use iter::{Iterator, range};
48 #[cfg(test)] use rt::comm::oneshot;
49
50 // XXX we should not be calling uvll functions in here.
51
52 trait HomingIO {
53
54     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
55
56     /* XXX This will move pinned tasks to do IO on the proper scheduler
57      * and then move them back to their home.
58      */
59     fn go_to_IO_home(&mut self) -> SchedHome {
60         use rt::sched::PinnedTask;
61
62         do task::unkillable { // FIXME(#8674)
63             let mut old = None;
64             {
65                 let ptr = &mut old;
66                 let scheduler: ~Scheduler = Local::take();
67                 do scheduler.deschedule_running_task_and_then |_, task| {
68                     /* FIXME(#8674) if the task was already killed then wake
69                      * will return None. In that case, the home pointer will never be set.
70                      *
71                      * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
72                      */
73                     do task.wake().map_move |mut task| {
74                         *ptr = Some(task.take_unwrap_home());
75                         self.home().send(PinnedTask(task));
76                     };
77                 }
78             }
79             old.expect("No old home because task had already been killed.")
80         }
81     }
82
83     // XXX dummy self param
84     fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
85         use rt::sched::TaskFromFriend;
86
87         let old = Cell::new(old);
88         do task::unkillable { // FIXME(#8674)
89             let scheduler: ~Scheduler = Local::take();
90             do scheduler.deschedule_running_task_and_then |scheduler, task| {
91                 /* FIXME(#8674) if the task was already killed then wake
92                  * will return None. In that case, the home pointer will never be restored.
93                  *
94                  * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
95                  */
96                 do task.wake().map_move |mut task| {
97                     task.give_home(old.take());
98                     scheduler.make_handle().send(TaskFromFriend(task));
99                 };
100             }
101         }
102     }
103
104     fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
105         let home = self.go_to_IO_home();
106         let a = io(self); // do IO
107         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
108         a // return the result of the IO
109     }
110
111     fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
112         let mut this = self;
113         let home = this.go_to_IO_home();
114         let a = io(this); // do IO
115         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
116         a // return the result of the IO
117     }
118
119     fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
120         let home = self.go_to_IO_home();
121         let a = do task::unkillable { // FIXME(#8674)
122             let scheduler: ~Scheduler = Local::take();
123             io_sched(self, scheduler) // do IO and scheduling action
124         };
125         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
126         a // return result of IO
127     }
128 }
129
130 // get a handle for the current scheduler
131 macro_rules! get_handle_to_current_scheduler(
132     () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
133 )
134
135 enum SocketNameKind {
136     TcpPeer,
137     Tcp,
138     Udp
139 }
140
141 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
142                                                  handle: U) -> Result<SocketAddr, IoError> {
143     let getsockname = match sk {
144         TcpPeer => uvll::tcp_getpeername,
145         Tcp     => uvll::tcp_getsockname,
146         Udp     => uvll::udp_getsockname,
147     };
148
149     // Allocate a sockaddr_storage
150     // since we don't know if it's ipv4 or ipv6
151     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
152
153     let r = unsafe {
154         getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
155     };
156
157     if r != 0 {
158         let status = status_to_maybe_uv_error(r);
159         return Err(uv_error_to_io_error(status.unwrap()));
160     }
161
162     let addr = unsafe {
163         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
164             net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
165         } else {
166             net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
167         }
168     };
169
170     unsafe { uvll::free_sockaddr_storage(r_addr); }
171
172     Ok(addr)
173
174 }
175
176 // Obviously an Event Loop is always home.
177 pub struct UvEventLoop {
178     uvio: UvIoFactory
179 }
180
181 impl UvEventLoop {
182     pub fn new() -> UvEventLoop {
183         UvEventLoop {
184             uvio: UvIoFactory(Loop::new())
185         }
186     }
187 }
188
189 impl Drop for UvEventLoop {
190     fn drop(&self) {
191         // XXX: Need mutable finalizer
192         let this = unsafe {
193             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
194         };
195         this.uvio.uv_loop().close();
196     }
197 }
198
199 impl EventLoop for UvEventLoop {
200     fn run(&mut self) {
201         self.uvio.uv_loop().run();
202     }
203
204     fn callback(&mut self, f: ~fn()) {
205         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
206         do idle_watcher.start |mut idle_watcher, status| {
207             assert!(status.is_none());
208             idle_watcher.stop();
209             idle_watcher.close(||());
210             f();
211         }
212     }
213
214     fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
215         let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
216         return ~UvPausibleIdleCallback {
217             watcher: idle_watcher,
218             idle_flag: false,
219             closed: false
220         };
221     }
222
223     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
224         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
225         do timer.start(ms, 0) |timer, status| {
226             assert!(status.is_none());
227             timer.close(||());
228             f();
229         }
230     }
231
232     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
233         ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
234     }
235
236     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
237         Some(&mut self.uvio)
238     }
239 }
240
241 pub struct UvPausibleIdleCallback {
242     watcher: IdleWatcher,
243     idle_flag: bool,
244     closed: bool
245 }
246
247 impl UvPausibleIdleCallback {
248     #[inline]
249     pub fn start(&mut self, f: ~fn()) {
250         do self.watcher.start |_idle_watcher, _status| {
251             f();
252         };
253         self.idle_flag = true;
254     }
255     #[inline]
256     pub fn pause(&mut self) {
257         if self.idle_flag == true {
258             self.watcher.stop();
259             self.idle_flag = false;
260         }
261     }
262     #[inline]
263     pub fn resume(&mut self) {
264         if self.idle_flag == false {
265             self.watcher.restart();
266             self.idle_flag = true;
267         }
268     }
269     #[inline]
270     pub fn close(&mut self) {
271         self.pause();
272         if !self.closed {
273             self.closed = true;
274             self.watcher.close(||{});
275         }
276     }
277 }
278
279 #[test]
280 fn test_callback_run_once() {
281     do run_in_bare_thread {
282         let mut event_loop = UvEventLoop::new();
283         let mut count = 0;
284         let count_ptr: *mut int = &mut count;
285         do event_loop.callback {
286             unsafe { *count_ptr += 1 }
287         }
288         event_loop.run();
289         assert_eq!(count, 1);
290     }
291 }
292
293 // The entire point of async is to call into a loop from other threads so it does not need to home.
294 pub struct UvRemoteCallback {
295     // The uv async handle for triggering the callback
296     async: AsyncWatcher,
297     // A flag to tell the callback to exit, set from the dtor. This is
298     // almost never contested - only in rare races with the dtor.
299     exit_flag: Exclusive<bool>
300 }
301
302 impl UvRemoteCallback {
303     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
304         let exit_flag = Exclusive::new(false);
305         let exit_flag_clone = exit_flag.clone();
306         let async = do AsyncWatcher::new(loop_) |watcher, status| {
307             assert!(status.is_none());
308
309             // The synchronization logic here is subtle. To review,
310             // the uv async handle type promises that, after it is
311             // triggered the remote callback is definitely called at
312             // least once. UvRemoteCallback needs to maintain those
313             // semantics while also shutting down cleanly from the
314             // dtor. In our case that means that, when the
315             // UvRemoteCallback dtor calls `async.send()`, here `f` is
316             // always called later.
317
318             // In the dtor both the exit flag is set and the async
319             // callback fired under a lock.  Here, before calling `f`,
320             // we take the lock and check the flag. Because we are
321             // checking the flag before calling `f`, and the flag is
322             // set under the same lock as the send, then if the flag
323             // is set then we're guaranteed to call `f` after the
324             // final send.
325
326             // If the check was done after `f()` then there would be a
327             // period between that call and the check where the dtor
328             // could be called in the other thread, missing the final
329             // callback while still destroying the handle.
330
331             let should_exit = unsafe {
332                 exit_flag_clone.with_imm(|&should_exit| should_exit)
333             };
334
335             f();
336
337             if should_exit {
338                 watcher.close(||());
339             }
340
341         };
342         UvRemoteCallback {
343             async: async,
344             exit_flag: exit_flag
345         }
346     }
347 }
348
349 impl RemoteCallback for UvRemoteCallback {
350     fn fire(&mut self) { self.async.send() }
351 }
352
353 impl Drop for UvRemoteCallback {
354     fn drop(&self) {
355         unsafe {
356             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
357             do this.exit_flag.with |should_exit| {
358                 // NB: These two things need to happen atomically. Otherwise
359                 // the event handler could wake up due to a *previous*
360                 // signal and see the exit flag, destroying the handle
361                 // before the final send.
362                 *should_exit = true;
363                 this.async.send();
364             }
365         }
366     }
367 }
368
369 #[cfg(test)]
370 mod test_remote {
371     use cell::Cell;
372     use rt::test::*;
373     use rt::thread::Thread;
374     use rt::tube::Tube;
375     use rt::rtio::EventLoop;
376     use rt::local::Local;
377     use rt::sched::Scheduler;
378
379     #[test]
380     fn test_uv_remote() {
381         do run_in_mt_newsched_task {
382             let mut tube = Tube::new();
383             let tube_clone = tube.clone();
384             let remote_cell = Cell::new_empty();
385             do Local::borrow |sched: &mut Scheduler| {
386                 let tube_clone = tube_clone.clone();
387                 let tube_clone_cell = Cell::new(tube_clone);
388                 let remote = do sched.event_loop.remote_callback {
389                     // This could be called multiple times
390                     if !tube_clone_cell.is_empty() {
391                         tube_clone_cell.take().send(1);
392                     }
393                 };
394                 remote_cell.put_back(remote);
395             }
396             let thread = do Thread::start {
397                 remote_cell.take().fire();
398             };
399
400             assert!(tube.recv() == 1);
401             thread.join();
402         }
403     }
404 }
405
406 pub struct UvIoFactory(Loop);
407
408 impl UvIoFactory {
409     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
410         match self { &UvIoFactory(ref mut ptr) => ptr }
411     }
412 }
413
414 impl IoFactory for UvIoFactory {
415     // Connect to an address and return a new stream
416     // NB: This blocks the task waiting on the connection.
417     // It would probably be better to return a future
418     fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
419         // Create a cell in the task to hold the result. We will fill
420         // the cell before resuming the task.
421         let result_cell = Cell::new_empty();
422         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
423
424         // Block this task and take ownership, switch to scheduler context
425         do task::unkillable { // FIXME(#8674)
426             let scheduler: ~Scheduler = Local::take();
427             do scheduler.deschedule_running_task_and_then |_, task| {
428
429                 let mut tcp = TcpWatcher::new(self.uv_loop());
430                 let task_cell = Cell::new(task);
431
432                 // Wait for a connection
433                 do tcp.connect(addr) |stream, status| {
434                     match status {
435                         None => {
436                             let tcp = NativeHandle::from_native_handle(stream.native_handle());
437                             let home = get_handle_to_current_scheduler!();
438                             let res = Ok(~UvTcpStream { watcher: tcp, home: home });
439
440                             // Store the stream in the task's stack
441                             unsafe { (*result_cell_ptr).put_back(res); }
442
443                             // Context switch
444                             let scheduler: ~Scheduler = Local::take();
445                             scheduler.resume_blocked_task_immediately(task_cell.take());
446                         }
447                         Some(_) => {
448                             let task_cell = Cell::new(task_cell.take());
449                             do stream.close {
450                                 let res = Err(uv_error_to_io_error(status.unwrap()));
451                                 unsafe { (*result_cell_ptr).put_back(res); }
452                                 let scheduler: ~Scheduler = Local::take();
453                                 scheduler.resume_blocked_task_immediately(task_cell.take());
454                             }
455                         }
456                     }
457                 }
458             }
459         }
460
461         assert!(!result_cell.is_empty());
462         return result_cell.take();
463     }
464
465     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
466         let mut watcher = TcpWatcher::new(self.uv_loop());
467         match watcher.bind(addr) {
468             Ok(_) => {
469                 let home = get_handle_to_current_scheduler!();
470                 Ok(~UvTcpListener::new(watcher, home))
471             }
472             Err(uverr) => {
473                 do task::unkillable { // FIXME(#8674)
474                     let scheduler: ~Scheduler = Local::take();
475                     do scheduler.deschedule_running_task_and_then |_, task| {
476                         let task_cell = Cell::new(task);
477                         do watcher.as_stream().close {
478                             let scheduler: ~Scheduler = Local::take();
479                             scheduler.resume_blocked_task_immediately(task_cell.take());
480                         }
481                     }
482                     Err(uv_error_to_io_error(uverr))
483                 }
484             }
485         }
486     }
487
488     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
489         let mut watcher = UdpWatcher::new(self.uv_loop());
490         match watcher.bind(addr) {
491             Ok(_) => {
492                 let home = get_handle_to_current_scheduler!();
493                 Ok(~UvUdpSocket { watcher: watcher, home: home })
494             }
495             Err(uverr) => {
496                 do task::unkillable { // FIXME(#8674)
497                     let scheduler: ~Scheduler = Local::take();
498                     do scheduler.deschedule_running_task_and_then |_, task| {
499                         let task_cell = Cell::new(task);
500                         do watcher.close {
501                             let scheduler: ~Scheduler = Local::take();
502                             scheduler.resume_blocked_task_immediately(task_cell.take());
503                         }
504                     }
505                     Err(uv_error_to_io_error(uverr))
506                 }
507             }
508         }
509     }
510
511     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
512         let watcher = TimerWatcher::new(self.uv_loop());
513         let home = get_handle_to_current_scheduler!();
514         Ok(~UvTimer::new(watcher, home))
515     }
516
517     fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
518         let loop_ = Loop {handle: self.uv_loop().native_handle()};
519         let fd = file::FileDescriptor(fd);
520         let home = get_handle_to_current_scheduler!();
521         ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
522     }
523
524     fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
525         -> Result<~RtioFileStream, IoError> {
526         let mut flags = match fm {
527             Open => 0,
528             Create => O_CREAT,
529             OpenOrCreate => O_CREAT,
530             Append => O_APPEND,
531             Truncate => O_TRUNC,
532             CreateOrTruncate => O_TRUNC | O_CREAT
533         };
534         flags = match fa {
535             Read => flags | O_RDONLY,
536             Write => flags | O_WRONLY,
537             ReadWrite => flags | O_RDWR
538         };
539         let create_mode = match fm {
540             Create|OpenOrCreate|CreateOrTruncate =>
541                 S_IRUSR | S_IWUSR,
542             _ => 0
543         };
544         let result_cell = Cell::new_empty();
545         let result_cell_ptr: *Cell<Result<~RtioFileStream,
546                                            IoError>> = &result_cell;
547         let path_cell = Cell::new(path);
548         do task::unkillable { // FIXME(#8674)
549             let scheduler: ~Scheduler = Local::take();
550             do scheduler.deschedule_running_task_and_then |_, task| {
551                 let task_cell = Cell::new(task);
552                 let path = path_cell.take();
553                 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
554                       |req,err| {
555                     if err.is_none() {
556                         let loop_ = Loop {handle: req.get_loop().native_handle()};
557                         let home = get_handle_to_current_scheduler!();
558                         let fd = file::FileDescriptor(req.get_result());
559                         let fs = ~UvFileStream::new(
560                             loop_, fd, true, home) as ~RtioFileStream;
561                         let res = Ok(fs);
562                         unsafe { (*result_cell_ptr).put_back(res); }
563                         let scheduler: ~Scheduler = Local::take();
564                         scheduler.resume_blocked_task_immediately(task_cell.take());
565                     } else {
566                         let res = Err(uv_error_to_io_error(err.unwrap()));
567                         unsafe { (*result_cell_ptr).put_back(res); }
568                         let scheduler: ~Scheduler = Local::take();
569                         scheduler.resume_blocked_task_immediately(task_cell.take());
570                     }
571                 };
572             };
573         }
574         assert!(!result_cell.is_empty());
575         return result_cell.take();
576     }
577
578     fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
579         let result_cell = Cell::new_empty();
580         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
581         let path_cell = Cell::new(path);
582         do task::unkillable { // FIXME(#8674)
583             let scheduler: ~Scheduler = Local::take();
584             do scheduler.deschedule_running_task_and_then |_, task| {
585                 let task_cell = Cell::new(task);
586                 let path = path_cell.take();
587                 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
588                     let res = match err {
589                         None => Ok(()),
590                         Some(err) => Err(uv_error_to_io_error(err))
591                     };
592                     unsafe { (*result_cell_ptr).put_back(res); }
593                     let scheduler: ~Scheduler = Local::take();
594                     scheduler.resume_blocked_task_immediately(task_cell.take());
595                 };
596             };
597         }
598         assert!(!result_cell.is_empty());
599         return result_cell.take();
600     }
601
602     fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError> {
603         let result_cell = Cell::new_empty();
604         let result_cell_ptr: *Cell<Result<~[IpAddr], IoError>> = &result_cell;
605         let host_ptr: *&str = &host;
606         let addrinfo_req = GetAddrInfoRequest::new();
607         let addrinfo_req_cell = Cell::new(addrinfo_req);
608         do task::unkillable { // FIXME(#8674)
609             let scheduler: ~Scheduler = Local::take();
610             do scheduler.deschedule_running_task_and_then |_, task| {
611                 let task_cell = Cell::new(task);
612                 let mut addrinfo_req = addrinfo_req_cell.take();
613                 unsafe {
614                     do addrinfo_req.getaddrinfo(self.uv_loop(),
615                                                 Some(*host_ptr),
616                                                 None, None) |_, addrinfo, err| {
617                         let res = match err {
618                             None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())),
619                             Some(err) => Err(uv_error_to_io_error(err))
620                         };
621                         (*result_cell_ptr).put_back(res);
622                         let scheduler: ~Scheduler = Local::take();
623                         scheduler.resume_blocked_task_immediately(task_cell.take());
624                     }
625                 }
626             }
627         }
628         addrinfo_req.delete();
629         assert!(!result_cell.is_empty());
630         return result_cell.take();
631     }
632 }
633
634 pub struct UvTcpListener {
635     watcher : TcpWatcher,
636     home: SchedHandle,
637 }
638
639 impl HomingIO for UvTcpListener {
640     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
641 }
642
643 impl UvTcpListener {
644     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
645         UvTcpListener { watcher: watcher, home: home }
646     }
647 }
648
649 impl Drop for UvTcpListener {
650     fn drop(&self) {
651         // XXX need mutable finalizer
652         let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
653         do self_.home_for_io_with_sched |self_, scheduler| {
654             do scheduler.deschedule_running_task_and_then |_, task| {
655                 let task = Cell::new(task);
656                 do self_.watcher.as_stream().close {
657                     let scheduler: ~Scheduler = Local::take();
658                     scheduler.resume_blocked_task_immediately(task.take());
659                 }
660             }
661         }
662     }
663 }
664
665 impl RtioSocket for UvTcpListener {
666     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
667         do self.home_for_io |self_| {
668             socket_name(Tcp, self_.watcher)
669         }
670     }
671 }
672
673 impl RtioTcpListener for UvTcpListener {
674     fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
675         do self.home_for_io_consume |self_| {
676             let mut acceptor = ~UvTcpAcceptor::new(self_);
677             let incoming = Cell::new(acceptor.incoming.clone());
678             do acceptor.listener.watcher.listen |mut server, status| {
679                 do incoming.with_mut_ref |incoming| {
680                     let inc = match status {
681                         Some(_) => Err(standard_error(OtherIoError)),
682                         None => {
683                             let inc = TcpWatcher::new(&server.event_loop());
684                             // first accept call in the callback guarenteed to succeed
685                             server.accept(inc.as_stream());
686                             let home = get_handle_to_current_scheduler!();
687                             Ok(~UvTcpStream { watcher: inc, home: home })
688                         }
689                     };
690                     incoming.send(inc);
691                 }
692             };
693             Ok(acceptor)
694         }
695     }
696 }
697
698 pub struct UvTcpAcceptor {
699     listener: UvTcpListener,
700     incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
701 }
702
703 impl HomingIO for UvTcpAcceptor {
704     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
705 }
706
707 impl UvTcpAcceptor {
708     fn new(listener: UvTcpListener) -> UvTcpAcceptor {
709         UvTcpAcceptor { listener: listener, incoming: Tube::new() }
710     }
711 }
712
713 impl RtioSocket for UvTcpAcceptor {
714     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
715         do self.home_for_io |self_| {
716             socket_name(Tcp, self_.listener.watcher)
717         }
718     }
719 }
720
721 impl RtioTcpAcceptor for UvTcpAcceptor {
722     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
723         do self.home_for_io |self_| {
724             self_.incoming.recv()
725         }
726     }
727
728     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
729         do self.home_for_io |self_| {
730             let r = unsafe {
731                 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
732             };
733
734             match status_to_maybe_uv_error(r) {
735                 Some(err) => Err(uv_error_to_io_error(err)),
736                 None => Ok(())
737             }
738         }
739     }
740
741     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
742         do self.home_for_io |self_| {
743             let r = unsafe {
744                 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
745             };
746
747             match status_to_maybe_uv_error(r) {
748                 Some(err) => Err(uv_error_to_io_error(err)),
749                 None => Ok(())
750             }
751         }
752     }
753 }
754
755 pub struct UvTcpStream {
756     watcher: TcpWatcher,
757     home: SchedHandle,
758 }
759
760 impl HomingIO for UvTcpStream {
761     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
762 }
763
764 impl Drop for UvTcpStream {
765     fn drop(&self) {
766         // XXX need mutable finalizer
767         let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
768         do this.home_for_io_with_sched |self_, scheduler| {
769             do scheduler.deschedule_running_task_and_then |_, task| {
770                 let task_cell = Cell::new(task);
771                 do self_.watcher.as_stream().close {
772                     let scheduler: ~Scheduler = Local::take();
773                     scheduler.resume_blocked_task_immediately(task_cell.take());
774                 }
775             }
776         }
777     }
778 }
779
780 impl RtioSocket for UvTcpStream {
781     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
782         do self.home_for_io |self_| {
783             socket_name(Tcp, self_.watcher)
784         }
785     }
786 }
787
788 impl RtioTcpStream for UvTcpStream {
789     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
790         do self.home_for_io_with_sched |self_, scheduler| {
791             let result_cell = Cell::new_empty();
792             let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
793
794             let buf_ptr: *&mut [u8] = &buf;
795             do scheduler.deschedule_running_task_and_then |_sched, task| {
796                 let task_cell = Cell::new(task);
797                 // XXX: We shouldn't reallocate these callbacks every
798                 // call to read
799                 let alloc: AllocCallback = |_| unsafe {
800                     slice_to_uv_buf(*buf_ptr)
801                 };
802                 let mut watcher = self_.watcher.as_stream();
803                 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
804
805                     // Stop reading so that no read callbacks are
806                     // triggered before the user calls `read` again.
807                     // XXX: Is there a performance impact to calling
808                     // stop here?
809                     watcher.read_stop();
810
811                     let result = if status.is_none() {
812                         assert!(nread >= 0);
813                         Ok(nread as uint)
814                     } else {
815                         Err(uv_error_to_io_error(status.unwrap()))
816                     };
817
818                     unsafe { (*result_cell_ptr).put_back(result); }
819
820                     let scheduler: ~Scheduler = Local::take();
821                     scheduler.resume_blocked_task_immediately(task_cell.take());
822                 }
823             }
824
825             assert!(!result_cell.is_empty());
826             result_cell.take()
827         }
828     }
829
830     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
831         do self.home_for_io_with_sched |self_, scheduler| {
832             let result_cell = Cell::new_empty();
833             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
834             let buf_ptr: *&[u8] = &buf;
835             do scheduler.deschedule_running_task_and_then |_, task| {
836                 let task_cell = Cell::new(task);
837                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
838                 let mut watcher = self_.watcher.as_stream();
839                 do watcher.write(buf) |_watcher, status| {
840                     let result = if status.is_none() {
841                         Ok(())
842                     } else {
843                         Err(uv_error_to_io_error(status.unwrap()))
844                     };
845
846                     unsafe { (*result_cell_ptr).put_back(result); }
847
848                     let scheduler: ~Scheduler = Local::take();
849                     scheduler.resume_blocked_task_immediately(task_cell.take());
850                 }
851             }
852
853             assert!(!result_cell.is_empty());
854             result_cell.take()
855         }
856     }
857
858     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
859         do self.home_for_io |self_| {
860             socket_name(TcpPeer, self_.watcher)
861         }
862     }
863
864     fn control_congestion(&mut self) -> Result<(), IoError> {
865         do self.home_for_io |self_| {
866             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
867
868             match status_to_maybe_uv_error(r) {
869                 Some(err) => Err(uv_error_to_io_error(err)),
870                 None => Ok(())
871             }
872         }
873     }
874
875     fn nodelay(&mut self) -> Result<(), IoError> {
876         do self.home_for_io |self_| {
877             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
878
879             match status_to_maybe_uv_error(r) {
880                 Some(err) => Err(uv_error_to_io_error(err)),
881                 None => Ok(())
882             }
883         }
884     }
885
886     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
887         do self.home_for_io |self_| {
888             let r = unsafe {
889                 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
890                                     delay_in_seconds as c_uint)
891             };
892
893             match status_to_maybe_uv_error(r) {
894                 Some(err) => Err(uv_error_to_io_error(err)),
895                 None => Ok(())
896             }
897         }
898     }
899
900     fn letdie(&mut self) -> Result<(), IoError> {
901         do self.home_for_io |self_| {
902             let r = unsafe {
903                 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
904             };
905
906             match status_to_maybe_uv_error(r) {
907                 Some(err) => Err(uv_error_to_io_error(err)),
908                 None => Ok(())
909             }
910         }
911     }
912 }
913
914 pub struct UvUdpSocket {
915     watcher: UdpWatcher,
916     home: SchedHandle,
917 }
918
919 impl HomingIO for UvUdpSocket {
920     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
921 }
922
923 impl Drop for UvUdpSocket {
924     fn drop(&self) {
925         // XXX need mutable finalizer
926         let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
927         do this.home_for_io_with_sched |self_, scheduler| {
928             do scheduler.deschedule_running_task_and_then |_, task| {
929                 let task_cell = Cell::new(task);
930                 do self_.watcher.close {
931                     let scheduler: ~Scheduler = Local::take();
932                     scheduler.resume_blocked_task_immediately(task_cell.take());
933                 }
934             }
935         }
936     }
937 }
938
939 impl RtioSocket for UvUdpSocket {
940     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
941         do self.home_for_io |self_| {
942             socket_name(Udp, self_.watcher)
943         }
944     }
945 }
946
947 impl RtioUdpSocket for UvUdpSocket {
948     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
949         do self.home_for_io_with_sched |self_, scheduler| {
950             let result_cell = Cell::new_empty();
951             let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
952
953             let buf_ptr: *&mut [u8] = &buf;
954             do scheduler.deschedule_running_task_and_then |_, task| {
955                 let task_cell = Cell::new(task);
956                 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
957                 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
958                     let _ = flags; // /XXX add handling for partials?
959
960                     watcher.recv_stop();
961
962                     let result = match status {
963                         None => {
964                             assert!(nread >= 0);
965                             Ok((nread as uint, addr))
966                         }
967                         Some(err) => Err(uv_error_to_io_error(err)),
968                     };
969
970                     unsafe { (*result_cell_ptr).put_back(result); }
971
972                     let scheduler: ~Scheduler = Local::take();
973                     scheduler.resume_blocked_task_immediately(task_cell.take());
974                 }
975             }
976
977             assert!(!result_cell.is_empty());
978             result_cell.take()
979         }
980     }
981
982     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
983         do self.home_for_io_with_sched |self_, scheduler| {
984             let result_cell = Cell::new_empty();
985             let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
986             let buf_ptr: *&[u8] = &buf;
987             do scheduler.deschedule_running_task_and_then |_, task| {
988                 let task_cell = Cell::new(task);
989                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
990                 do self_.watcher.send(buf, dst) |_watcher, status| {
991
992                     let result = match status {
993                         None => Ok(()),
994                         Some(err) => Err(uv_error_to_io_error(err)),
995                     };
996
997                     unsafe { (*result_cell_ptr).put_back(result); }
998
999                     let scheduler: ~Scheduler = Local::take();
1000                     scheduler.resume_blocked_task_immediately(task_cell.take());
1001                 }
1002             }
1003
1004             assert!(!result_cell.is_empty());
1005             result_cell.take()
1006         }
1007     }
1008
1009     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1010         do self.home_for_io |self_| {
1011             let r = unsafe {
1012                 do multi.to_str().with_c_str |m_addr| {
1013                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1014                                              ptr::null(), uvll::UV_JOIN_GROUP)
1015                 }
1016             };
1017
1018             match status_to_maybe_uv_error(r) {
1019                 Some(err) => Err(uv_error_to_io_error(err)),
1020                 None => Ok(())
1021             }
1022         }
1023     }
1024
1025     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1026         do self.home_for_io |self_| {
1027             let r = unsafe {
1028                 do multi.to_str().with_c_str |m_addr| {
1029                     uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1030                                              ptr::null(), uvll::UV_LEAVE_GROUP)
1031                 }
1032             };
1033
1034             match status_to_maybe_uv_error(r) {
1035                 Some(err) => Err(uv_error_to_io_error(err)),
1036                 None => Ok(())
1037             }
1038         }
1039     }
1040
1041     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
1042         do self.home_for_io |self_| {
1043
1044             let r = unsafe {
1045                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
1046             };
1047
1048             match status_to_maybe_uv_error(r) {
1049                 Some(err) => Err(uv_error_to_io_error(err)),
1050                 None => Ok(())
1051             }
1052         }
1053     }
1054
1055     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1056         do self.home_for_io |self_| {
1057
1058             let r = unsafe {
1059                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1060             };
1061
1062             match status_to_maybe_uv_error(r) {
1063                 Some(err) => Err(uv_error_to_io_error(err)),
1064                 None => Ok(())
1065             }
1066         }
1067     }
1068
1069     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1070         do self.home_for_io |self_| {
1071
1072             let r = unsafe {
1073                 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1074             };
1075
1076             match status_to_maybe_uv_error(r) {
1077                 Some(err) => Err(uv_error_to_io_error(err)),
1078                 None => Ok(())
1079             }
1080         }
1081     }
1082
1083     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1084         do self.home_for_io |self_| {
1085
1086             let r = unsafe {
1087                 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1088             };
1089
1090             match status_to_maybe_uv_error(r) {
1091                 Some(err) => Err(uv_error_to_io_error(err)),
1092                 None => Ok(())
1093             }
1094         }
1095     }
1096
1097     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1098         do self.home_for_io |self_| {
1099
1100             let r = unsafe {
1101                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1102             };
1103
1104             match status_to_maybe_uv_error(r) {
1105                 Some(err) => Err(uv_error_to_io_error(err)),
1106                 None => Ok(())
1107             }
1108         }
1109     }
1110
1111     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1112         do self.home_for_io |self_| {
1113
1114             let r = unsafe {
1115                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1116             };
1117
1118             match status_to_maybe_uv_error(r) {
1119                 Some(err) => Err(uv_error_to_io_error(err)),
1120                 None => Ok(())
1121             }
1122         }
1123     }
1124 }
1125
1126 pub struct UvTimer {
1127     watcher: timer::TimerWatcher,
1128     home: SchedHandle,
1129 }
1130
1131 impl HomingIO for UvTimer {
1132     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1133 }
1134
1135 impl UvTimer {
1136     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1137         UvTimer { watcher: w, home: home }
1138     }
1139 }
1140
1141 impl Drop for UvTimer {
1142     fn drop(&self) {
1143         let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1144         do self_.home_for_io_with_sched |self_, scheduler| {
1145             rtdebug!("closing UvTimer");
1146             do scheduler.deschedule_running_task_and_then |_, task| {
1147                 let task_cell = Cell::new(task);
1148                 do self_.watcher.close {
1149                     let scheduler: ~Scheduler = Local::take();
1150                     scheduler.resume_blocked_task_immediately(task_cell.take());
1151                 }
1152             }
1153         }
1154     }
1155 }
1156
1157 impl RtioTimer for UvTimer {
1158     fn sleep(&mut self, msecs: u64) {
1159         do self.home_for_io_with_sched |self_, scheduler| {
1160             do scheduler.deschedule_running_task_and_then |_sched, task| {
1161                 rtdebug!("sleep: entered scheduler context");
1162                 let task_cell = Cell::new(task);
1163                 do self_.watcher.start(msecs, 0) |_, status| {
1164                     assert!(status.is_none());
1165                     let scheduler: ~Scheduler = Local::take();
1166                     scheduler.resume_blocked_task_immediately(task_cell.take());
1167                 }
1168             }
1169             self_.watcher.stop();
1170         }
1171     }
1172 }
1173
1174 pub struct UvFileStream {
1175     loop_: Loop,
1176     fd: file::FileDescriptor,
1177     close_on_drop: bool,
1178     home: SchedHandle
1179 }
1180
1181 impl HomingIO for UvFileStream {
1182     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1183 }
1184
1185 impl UvFileStream {
1186     fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1187            home: SchedHandle) -> UvFileStream {
1188         UvFileStream {
1189             loop_: loop_,
1190             fd: fd,
1191             close_on_drop: close_on_drop,
1192             home: home
1193         }
1194     }
1195     fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1196         let result_cell = Cell::new_empty();
1197         let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1198         let buf_ptr: *&mut [u8] = &buf;
1199         do self.home_for_io_with_sched |self_, scheduler| {
1200             do scheduler.deschedule_running_task_and_then |_, task| {
1201                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1202                 let task_cell = Cell::new(task);
1203                 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1204                     let res = match uverr  {
1205                         None => Ok(req.get_result() as int),
1206                         Some(err) => Err(uv_error_to_io_error(err))
1207                     };
1208                     unsafe { (*result_cell_ptr).put_back(res); }
1209                     let scheduler: ~Scheduler = Local::take();
1210                     scheduler.resume_blocked_task_immediately(task_cell.take());
1211                 };
1212             };
1213         };
1214         result_cell.take()
1215     }
1216     fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1217         let result_cell = Cell::new_empty();
1218         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1219         let buf_ptr: *&[u8] = &buf;
1220         do self.home_for_io_with_sched |self_, scheduler| {
1221             do scheduler.deschedule_running_task_and_then |_, task| {
1222                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1223                 let task_cell = Cell::new(task);
1224                 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1225                     let res = match uverr  {
1226                         None => Ok(()),
1227                         Some(err) => Err(uv_error_to_io_error(err))
1228                     };
1229                     unsafe { (*result_cell_ptr).put_back(res); }
1230                     let scheduler: ~Scheduler = Local::take();
1231                     scheduler.resume_blocked_task_immediately(task_cell.take());
1232                 };
1233             };
1234         };
1235         result_cell.take()
1236     }
1237     fn seek_common(&mut self, pos: i64, whence: c_int) ->
1238         Result<u64, IoError>{
1239         #[fixed_stack_segment]; #[inline(never)];
1240         unsafe {
1241             match lseek((*self.fd), pos as off_t, whence) {
1242                 -1 => {
1243                     Err(IoError {
1244                         kind: OtherIoError,
1245                         desc: "Failed to lseek.",
1246                         detail: None
1247                     })
1248                 },
1249                 n => Ok(n as u64)
1250             }
1251         }
1252     }
1253 }
1254
1255 impl Drop for UvFileStream {
1256     fn drop(&self) {
1257         let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1258         if self.close_on_drop {
1259             do self_.home_for_io_with_sched |self_, scheduler| {
1260                 do scheduler.deschedule_running_task_and_then |_, task| {
1261                     let task_cell = Cell::new(task);
1262                     do self_.fd.close(&self.loop_) |_,_| {
1263                         let scheduler: ~Scheduler = Local::take();
1264                         scheduler.resume_blocked_task_immediately(task_cell.take());
1265                     };
1266                 };
1267             }
1268         }
1269     }
1270 }
1271
1272 impl RtioFileStream for UvFileStream {
1273     fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1274         self.base_read(buf, -1)
1275     }
1276     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1277         self.base_write(buf, -1)
1278     }
1279     fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1280         self.base_read(buf, offset as i64)
1281     }
1282     fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1283         self.base_write(buf, offset as i64)
1284     }
1285     fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1286         use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1287         let whence = match whence {
1288             SeekSet => SEEK_SET,
1289             SeekCur => SEEK_CUR,
1290             SeekEnd => SEEK_END
1291         };
1292         self.seek_common(pos, whence)
1293     }
1294     fn tell(&self) -> Result<u64, IoError> {
1295         use libc::SEEK_CUR;
1296         // this is temporary
1297         let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1298         self_.seek_common(0, SEEK_CUR)
1299     }
1300     fn flush(&mut self) -> Result<(), IoError> {
1301         Ok(())
1302     }
1303 }
1304
1305 #[test]
1306 fn test_simple_io_no_connect() {
1307     do run_in_mt_newsched_task {
1308         unsafe {
1309             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1310             let addr = next_test_ip4();
1311             let maybe_chan = (*io).tcp_connect(addr);
1312             assert!(maybe_chan.is_err());
1313         }
1314     }
1315 }
1316
1317 #[test]
1318 fn test_simple_udp_io_bind_only() {
1319     do run_in_mt_newsched_task {
1320         unsafe {
1321             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1322             let addr = next_test_ip4();
1323             let maybe_socket = (*io).udp_bind(addr);
1324             assert!(maybe_socket.is_ok());
1325         }
1326     }
1327 }
1328
1329 #[test]
1330 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1331     use rt::sleeper_list::SleeperList;
1332     use rt::work_queue::WorkQueue;
1333     use rt::thread::Thread;
1334     use rt::task::Task;
1335     use rt::sched::{Shutdown, TaskFromFriend};
1336     do run_in_bare_thread {
1337         let sleepers = SleeperList::new();
1338         let work_queue1 = WorkQueue::new();
1339         let work_queue2 = WorkQueue::new();
1340         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1341
1342         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1343                                          sleepers.clone());
1344         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1345                                          sleepers.clone());
1346
1347         let handle1 = Cell::new(sched1.make_handle());
1348         let handle2 = Cell::new(sched2.make_handle());
1349         let tasksFriendHandle = Cell::new(sched2.make_handle());
1350
1351         let on_exit: ~fn(bool) = |exit_status| {
1352             handle1.take().send(Shutdown);
1353             handle2.take().send(Shutdown);
1354             rtassert!(exit_status);
1355         };
1356
1357         let test_function: ~fn() = || {
1358             let io: *mut IoFactoryObject = unsafe {
1359                 Local::unsafe_borrow()
1360             };
1361             let addr = next_test_ip4();
1362             let maybe_socket = unsafe { (*io).udp_bind(addr) };
1363             // this socket is bound to this event loop
1364             assert!(maybe_socket.is_ok());
1365
1366             // block self on sched1
1367             do task::unkillable { // FIXME(#8674)
1368                 let scheduler: ~Scheduler = Local::take();
1369                 do scheduler.deschedule_running_task_and_then |_, task| {
1370                     // unblock task
1371                     do task.wake().map_move |task| {
1372                       // send self to sched2
1373                       tasksFriendHandle.take().send(TaskFromFriend(task));
1374                     };
1375                     // sched1 should now sleep since it has nothing else to do
1376                 }
1377             }
1378             // sched2 will wake up and get the task
1379             // as we do nothing else, the function ends and the socket goes out of scope
1380             // sched2 will start to run the destructor
1381             // the destructor will first block the task, set it's home as sched1, then enqueue it
1382             // sched2 will dequeue the task, see that it has a home, and send it to sched1
1383             // sched1 will wake up, exec the close function on the correct loop, and then we're done
1384         };
1385
1386         let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1387         main_task.death.on_exit = Some(on_exit);
1388         let main_task = Cell::new(main_task);
1389
1390         let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1391
1392         let sched1 = Cell::new(sched1);
1393         let sched2 = Cell::new(sched2);
1394
1395         let thread1 = do Thread::start {
1396             sched1.take().bootstrap(main_task.take());
1397         };
1398         let thread2 = do Thread::start {
1399             sched2.take().bootstrap(null_task.take());
1400         };
1401
1402         thread1.join();
1403         thread2.join();
1404     }
1405 }
1406
1407 #[test]
1408 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1409     use rt::sleeper_list::SleeperList;
1410     use rt::work_queue::WorkQueue;
1411     use rt::thread::Thread;
1412     use rt::task::Task;
1413     use rt::comm::oneshot;
1414     use rt::sched::Shutdown;
1415     do run_in_bare_thread {
1416         let sleepers = SleeperList::new();
1417         let work_queue1 = WorkQueue::new();
1418         let work_queue2 = WorkQueue::new();
1419         let queues = ~[work_queue1.clone(), work_queue2.clone()];
1420
1421         let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1422                                          sleepers.clone());
1423         let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1424                                          sleepers.clone());
1425
1426         let handle1 = Cell::new(sched1.make_handle());
1427         let handle2 = Cell::new(sched2.make_handle());
1428
1429         let (port, chan) = oneshot();
1430         let port = Cell::new(port);
1431         let chan = Cell::new(chan);
1432
1433         let body1: ~fn() = || {
1434             let io: *mut IoFactoryObject = unsafe {
1435                 Local::unsafe_borrow()
1436             };
1437             let addr = next_test_ip4();
1438             let socket = unsafe { (*io).udp_bind(addr) };
1439             assert!(socket.is_ok());
1440             chan.take().send(socket);
1441         };
1442
1443         let body2: ~fn() = || {
1444             let socket = port.take().recv();
1445             assert!(socket.is_ok());
1446             /* The socket goes out of scope and the destructor is called.
1447              * The destructor:
1448              *  - sends itself back to sched1
1449              *  - frees the socket
1450              *  - resets the home of the task to whatever it was previously
1451              */
1452         };
1453
1454         let on_exit: ~fn(bool) = |exit| {
1455             handle1.take().send(Shutdown);
1456             handle2.take().send(Shutdown);
1457             rtassert!(exit);
1458         };
1459
1460         let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1461
1462         let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1463         task2.death.on_exit = Some(on_exit);
1464         let task2 = Cell::new(task2);
1465
1466         let sched1 = Cell::new(sched1);
1467         let sched2 = Cell::new(sched2);
1468
1469         let thread1 = do Thread::start {
1470             sched1.take().bootstrap(task1.take());
1471         };
1472         let thread2 = do Thread::start {
1473             sched2.take().bootstrap(task2.take());
1474         };
1475
1476         thread1.join();
1477         thread2.join();
1478     }
1479 }
1480
1481 #[test]
1482 fn test_simple_tcp_server_and_client() {
1483     do run_in_mt_newsched_task {
1484         let addr = next_test_ip4();
1485         let (port, chan) = oneshot();
1486         let port = Cell::new(port);
1487         let chan = Cell::new(chan);
1488
1489         // Start the server first so it's listening when we connect
1490         do spawntask {
1491             unsafe {
1492                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1493                 let listener = (*io).tcp_bind(addr).unwrap();
1494                 let mut acceptor = listener.listen().unwrap();
1495                 chan.take().send(());
1496                 let mut stream = acceptor.accept().unwrap();
1497                 let mut buf = [0, .. 2048];
1498                 let nread = stream.read(buf).unwrap();
1499                 assert_eq!(nread, 8);
1500                 for i in range(0u, nread) {
1501                     rtdebug!("%u", buf[i] as uint);
1502                     assert_eq!(buf[i], i as u8);
1503                 }
1504             }
1505         }
1506
1507         do spawntask {
1508             unsafe {
1509                 port.take().recv();
1510                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1511                 let mut stream = (*io).tcp_connect(addr).unwrap();
1512                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1513             }
1514         }
1515     }
1516 }
1517
1518 #[test]
1519 fn test_simple_tcp_server_and_client_on_diff_threads() {
1520     use rt::sleeper_list::SleeperList;
1521     use rt::work_queue::WorkQueue;
1522     use rt::thread::Thread;
1523     use rt::task::Task;
1524     use rt::sched::{Shutdown};
1525     do run_in_bare_thread {
1526         let sleepers = SleeperList::new();
1527
1528         let server_addr = next_test_ip4();
1529         let client_addr = server_addr.clone();
1530
1531         let server_work_queue = WorkQueue::new();
1532         let client_work_queue = WorkQueue::new();
1533         let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1534
1535         let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1536                                                queues.clone(), sleepers.clone());
1537         let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1538                                                queues.clone(), sleepers.clone());
1539
1540         let server_handle = Cell::new(server_sched.make_handle());
1541         let client_handle = Cell::new(client_sched.make_handle());
1542
1543         let server_on_exit: ~fn(bool) = |exit_status| {
1544             server_handle.take().send(Shutdown);
1545             rtassert!(exit_status);
1546         };
1547
1548         let client_on_exit: ~fn(bool) = |exit_status| {
1549             client_handle.take().send(Shutdown);
1550             rtassert!(exit_status);
1551         };
1552
1553         let server_fn: ~fn() = || {
1554             let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1555             let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1556             let mut acceptor = listener.listen().unwrap();
1557             let mut stream = acceptor.accept().unwrap();
1558             let mut buf = [0, .. 2048];
1559             let nread = stream.read(buf).unwrap();
1560             assert_eq!(nread, 8);
1561             for i in range(0u, nread) {
1562                 assert_eq!(buf[i], i as u8);
1563             }
1564         };
1565
1566         let client_fn: ~fn() = || {
1567             let io: *mut IoFactoryObject = unsafe {
1568                 Local::unsafe_borrow()
1569             };
1570             let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1571             while stream.is_err() {
1572                 stream = unsafe { (*io).tcp_connect(client_addr) };
1573             }
1574             stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1575         };
1576
1577         let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1578         server_task.death.on_exit = Some(server_on_exit);
1579         let server_task = Cell::new(server_task);
1580
1581         let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1582         client_task.death.on_exit = Some(client_on_exit);
1583         let client_task = Cell::new(client_task);
1584
1585         let server_sched = Cell::new(server_sched);
1586         let client_sched = Cell::new(client_sched);
1587
1588         let server_thread = do Thread::start {
1589             server_sched.take().bootstrap(server_task.take());
1590         };
1591         let client_thread = do Thread::start {
1592             client_sched.take().bootstrap(client_task.take());
1593         };
1594
1595         server_thread.join();
1596         client_thread.join();
1597     }
1598 }
1599
1600 #[test]
1601 fn test_simple_udp_server_and_client() {
1602     do run_in_mt_newsched_task {
1603         let server_addr = next_test_ip4();
1604         let client_addr = next_test_ip4();
1605         let (port, chan) = oneshot();
1606         let port = Cell::new(port);
1607         let chan = Cell::new(chan);
1608
1609         do spawntask {
1610             unsafe {
1611                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1612                 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1613                 chan.take().send(());
1614                 let mut buf = [0, .. 2048];
1615                 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1616                 assert_eq!(nread, 8);
1617                 for i in range(0u, nread) {
1618                     rtdebug!("%u", buf[i] as uint);
1619                     assert_eq!(buf[i], i as u8);
1620                 }
1621                 assert_eq!(src, client_addr);
1622             }
1623         }
1624
1625         do spawntask {
1626             unsafe {
1627                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1628                 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1629                 port.take().recv();
1630                 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1631             }
1632         }
1633     }
1634 }
1635
1636 #[test] #[ignore(reason = "busted")]
1637 fn test_read_and_block() {
1638     do run_in_mt_newsched_task {
1639         let addr = next_test_ip4();
1640         let (port, chan) = oneshot();
1641         let port = Cell::new(port);
1642         let chan = Cell::new(chan);
1643
1644         do spawntask {
1645             let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1646             let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1647             let mut acceptor = listener.listen().unwrap();
1648             chan.take().send(());
1649             let mut stream = acceptor.accept().unwrap();
1650             let mut buf = [0, .. 2048];
1651
1652             let expected = 32;
1653             let mut current = 0;
1654             let mut reads = 0;
1655
1656             while current < expected {
1657                 let nread = stream.read(buf).unwrap();
1658                 for i in range(0u, nread) {
1659                     let val = buf[i] as uint;
1660                     assert_eq!(val, current % 8);
1661                     current += 1;
1662                 }
1663                 reads += 1;
1664
1665                 do task::unkillable { // FIXME(#8674)
1666                     let scheduler: ~Scheduler = Local::take();
1667                     // Yield to the other task in hopes that it
1668                     // will trigger a read callback while we are
1669                     // not ready for it
1670                     do scheduler.deschedule_running_task_and_then |sched, task| {
1671                         let task = Cell::new(task);
1672                         sched.enqueue_blocked_task(task.take());
1673                     }
1674                 }
1675             }
1676
1677             // Make sure we had multiple reads
1678             assert!(reads > 1);
1679         }
1680
1681         do spawntask {
1682             unsafe {
1683                 port.take().recv();
1684                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1685                 let mut stream = (*io).tcp_connect(addr).unwrap();
1686                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1687                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1688                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1689                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1690             }
1691         }
1692
1693     }
1694 }
1695
1696 #[test]
1697 fn test_read_read_read() {
1698     do run_in_mt_newsched_task {
1699         let addr = next_test_ip4();
1700         static MAX: uint = 500000;
1701         let (port, chan) = oneshot();
1702         let port = Cell::new(port);
1703         let chan = Cell::new(chan);
1704
1705         do spawntask {
1706             unsafe {
1707                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1708                 let listener = (*io).tcp_bind(addr).unwrap();
1709                 let mut acceptor = listener.listen().unwrap();
1710                 chan.take().send(());
1711                 let mut stream = acceptor.accept().unwrap();
1712                 let buf = [1, .. 2048];
1713                 let mut total_bytes_written = 0;
1714                 while total_bytes_written < MAX {
1715                     stream.write(buf);
1716                     total_bytes_written += buf.len();
1717                 }
1718             }
1719         }
1720
1721         do spawntask {
1722             unsafe {
1723                 port.take().recv();
1724                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1725                 let mut stream = (*io).tcp_connect(addr).unwrap();
1726                 let mut buf = [0, .. 2048];
1727                 let mut total_bytes_read = 0;
1728                 while total_bytes_read < MAX {
1729                     let nread = stream.read(buf).unwrap();
1730                     rtdebug!("read %u bytes", nread as uint);
1731                     total_bytes_read += nread;
1732                     for i in range(0u, nread) {
1733                         assert_eq!(buf[i], 1);
1734                     }
1735                 }
1736                 rtdebug!("read %u bytes total", total_bytes_read as uint);
1737             }
1738         }
1739     }
1740 }
1741
1742 #[test]
1743 fn test_udp_twice() {
1744     do run_in_mt_newsched_task {
1745         let server_addr = next_test_ip4();
1746         let client_addr = next_test_ip4();
1747         let (port, chan) = oneshot();
1748         let port = Cell::new(port);
1749         let chan = Cell::new(chan);
1750
1751         do spawntask {
1752             unsafe {
1753                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1754                 let mut client = (*io).udp_bind(client_addr).unwrap();
1755                 port.take().recv();
1756                 assert!(client.sendto([1], server_addr).is_ok());
1757                 assert!(client.sendto([2], server_addr).is_ok());
1758             }
1759         }
1760
1761         do spawntask {
1762             unsafe {
1763                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1764                 let mut server = (*io).udp_bind(server_addr).unwrap();
1765                 chan.take().send(());
1766                 let mut buf1 = [0];
1767                 let mut buf2 = [0];
1768                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1769                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1770                 assert_eq!(nread1, 1);
1771                 assert_eq!(nread2, 1);
1772                 assert_eq!(src1, client_addr);
1773                 assert_eq!(src2, client_addr);
1774                 assert_eq!(buf1[0], 1);
1775                 assert_eq!(buf2[0], 2);
1776             }
1777         }
1778     }
1779 }
1780
1781 #[test]
1782 fn test_udp_many_read() {
1783     do run_in_mt_newsched_task {
1784         let server_out_addr = next_test_ip4();
1785         let server_in_addr = next_test_ip4();
1786         let client_out_addr = next_test_ip4();
1787         let client_in_addr = next_test_ip4();
1788         static MAX: uint = 500_000;
1789
1790         let (p1, c1) = oneshot();
1791         let (p2, c2) = oneshot();
1792
1793         let first = Cell::new((p1, c2));
1794         let second = Cell::new((p2, c1));
1795
1796         do spawntask {
1797             unsafe {
1798                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1799                 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1800                 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1801                 let (port, chan) = first.take();
1802                 chan.send(());
1803                 port.recv();
1804                 let msg = [1, .. 2048];
1805                 let mut total_bytes_sent = 0;
1806                 let mut buf = [1];
1807                 while buf[0] == 1 {
1808                     // send more data
1809                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1810                     total_bytes_sent += msg.len();
1811                     // check if the client has received enough
1812                     let res = server_in.recvfrom(buf);
1813                     assert!(res.is_ok());
1814                     let (nread, src) = res.unwrap();
1815                     assert_eq!(nread, 1);
1816                     assert_eq!(src, client_out_addr);
1817                 }
1818                 assert!(total_bytes_sent >= MAX);
1819             }
1820         }
1821
1822         do spawntask {
1823             unsafe {
1824                 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1825                 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1826                 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1827                 let (port, chan) = second.take();
1828                 port.recv();
1829                 chan.send(());
1830                 let mut total_bytes_recv = 0;
1831                 let mut buf = [0, .. 2048];
1832                 while total_bytes_recv < MAX {
1833                     // ask for more
1834                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1835                     // wait for data
1836                     let res = client_in.recvfrom(buf);
1837                     assert!(res.is_ok());
1838                     let (nread, src) = res.unwrap();
1839                     assert_eq!(src, server_out_addr);
1840                     total_bytes_recv += nread;
1841                     for i in range(0u, nread) {
1842                         assert_eq!(buf[i], 1);
1843                     }
1844                 }
1845                 // tell the server we're done
1846                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1847             }
1848         }
1849     }
1850 }
1851
1852 #[test]
1853 fn test_timer_sleep_simple() {
1854     do run_in_mt_newsched_task {
1855         unsafe {
1856             let io: *mut IoFactoryObject = Local::unsafe_borrow();
1857             let timer = (*io).timer_init();
1858             do timer.map_move |mut t| { t.sleep(1) };
1859         }
1860     }
1861 }
1862
1863 fn file_test_uvio_full_simple_impl() {
1864     use str::StrSlice; // why does this have to be explicitly imported to work?
1865                        // compiler was complaining about no trait for str that
1866                        // does .as_bytes() ..
1867     use path::Path;
1868     use rt::io::{Open, Create, ReadWrite, Read};
1869     unsafe {
1870         let io: *mut IoFactoryObject = Local::unsafe_borrow();
1871         let write_val = "hello uvio!";
1872         let path = "./tmp/file_test_uvio_full.txt";
1873         {
1874             let create_fm = Create;
1875             let create_fa = ReadWrite;
1876             let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1877             let write_buf = write_val.as_bytes();
1878             fd.write(write_buf);
1879         }
1880         {
1881             let ro_fm = Open;
1882             let ro_fa = Read;
1883             let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1884             let mut read_vec = [0, .. 1028];
1885             let nread = fd.read(read_vec).unwrap();
1886             let read_val = str::from_utf8(read_vec.slice(0, nread as uint));
1887             assert!(read_val == write_val.to_owned());
1888         }
1889         (*io).fs_unlink(&Path(path));
1890     }
1891 }
1892
1893 #[test]
1894 fn file_test_uvio_full_simple() {
1895     do run_in_mt_newsched_task {
1896         file_test_uvio_full_simple_impl();
1897     }
1898 }
1899
1900 fn uvio_naive_print(input: &str) {
1901     use str::StrSlice;
1902     unsafe {
1903         use libc::{STDOUT_FILENO};
1904         let io: *mut IoFactoryObject = Local::unsafe_borrow();
1905         {
1906             let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
1907             let write_buf = input.as_bytes();
1908             fd.write(write_buf);
1909         }
1910     }
1911 }
1912
1913 #[test]
1914 fn file_test_uvio_write_to_stdout() {
1915     do run_in_mt_newsched_task {
1916         uvio_naive_print("jubilation\n");
1917     }
1918 }