1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
19 use rt::io::net::ip::IpAddr;
21 use rt::uv::idle::IdleWatcher;
23 use rt::sched::Scheduler;
24 use rt::io::{standard_error, OtherIoError};
27 use unstable::sync::{Exclusive, exclusive};
29 #[cfg(test)] use container::Container;
30 #[cfg(test)] use uint;
31 #[cfg(test)] use unstable::run_in_bare_thread;
32 #[cfg(test)] use rt::test::{spawntask_immediately,
34 run_in_newsched_task};
37 pub struct UvEventLoop {
42 pub fn new() -> UvEventLoop {
44 uvio: UvIoFactory(Loop::new())
49 impl Drop for UvEventLoop {
51 // XXX: Need mutable finalizer
53 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
55 this.uvio.uv_loop().close();
59 impl EventLoop for UvEventLoop {
61 self.uvio.uv_loop().run();
64 fn callback(&mut self, f: ~fn()) {
65 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
66 do idle_watcher.start |mut idle_watcher, status| {
67 assert!(status.is_none());
69 idle_watcher.close(||());
74 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
75 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
76 do timer.start(ms, 0) |timer, status| {
77 assert!(status.is_none());
83 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
84 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
87 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
93 fn test_callback_run_once() {
94 do run_in_bare_thread {
95 let mut event_loop = UvEventLoop::new();
97 let count_ptr: *mut int = &mut count;
98 do event_loop.callback {
99 unsafe { *count_ptr += 1 }
102 assert_eq!(count, 1);
106 pub struct UvRemoteCallback {
107 // The uv async handle for triggering the callback
109 // A flag to tell the callback to exit, set from the dtor. This is
110 // almost never contested - only in rare races with the dtor.
111 exit_flag: Exclusive<bool>
114 impl UvRemoteCallback {
115 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
116 let exit_flag = exclusive(false);
117 let exit_flag_clone = exit_flag.clone();
118 let async = do AsyncWatcher::new(loop_) |watcher, status| {
119 assert!(status.is_none());
122 do exit_flag_clone.with_imm |&should_exit| {
136 impl RemoteCallback for UvRemoteCallback {
137 fn fire(&mut self) { self.async.send() }
140 impl Drop for UvRemoteCallback {
143 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
144 do this.exit_flag.with |should_exit| {
145 // NB: These two things need to happen atomically. Otherwise
146 // the event handler could wake up due to a *previous*
147 // signal and see the exit flag, destroying the handle
148 // before the final send.
160 use rt::thread::Thread;
162 use rt::rtio::EventLoop;
163 use rt::local::Local;
164 use rt::sched::Scheduler;
167 fn test_uv_remote() {
168 do run_in_newsched_task {
169 let mut tube = Tube::new();
170 let tube_clone = tube.clone();
171 let remote_cell = Cell::new_empty();
172 do Local::borrow::<Scheduler, ()>() |sched| {
173 let tube_clone = tube_clone.clone();
174 let tube_clone_cell = Cell::new(tube_clone);
175 let remote = do sched.event_loop.remote_callback {
176 tube_clone_cell.take().send(1);
178 remote_cell.put_back(remote);
180 let _thread = do Thread::start {
181 remote_cell.take().fire();
184 assert!(tube.recv() == 1);
189 pub struct UvIoFactory(Loop);
192 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
193 match self { &UvIoFactory(ref mut ptr) => ptr }
197 impl IoFactory for UvIoFactory {
198 // Connect to an address and return a new stream
199 // NB: This blocks the task waiting on the connection.
200 // It would probably be better to return a future
201 fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
202 // Create a cell in the task to hold the result. We will fill
203 // the cell before resuming the task.
204 let result_cell = Cell::new_empty();
205 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
207 let scheduler = Local::take::<Scheduler>();
208 assert!(scheduler.in_task_context());
210 // Block this task and take ownership, switch to scheduler context
211 do scheduler.deschedule_running_task_and_then |sched, task| {
213 rtdebug!("connect: entered scheduler context");
214 assert!(!sched.in_task_context());
215 let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
216 let task_cell = Cell::new(task);
218 // Wait for a connection
219 do tcp_watcher.connect(addr) |stream_watcher, status| {
220 rtdebug!("connect: in connect callback");
221 if status.is_none() {
222 rtdebug!("status is none");
223 let res = Ok(~UvTcpStream(stream_watcher));
225 // Store the stream in the task's stack
226 unsafe { (*result_cell_ptr).put_back(res); }
229 let scheduler = Local::take::<Scheduler>();
230 scheduler.resume_blocked_task_immediately(task_cell.take());
232 rtdebug!("status is some");
233 let task_cell = Cell::new(task_cell.take());
234 do stream_watcher.close {
235 let res = Err(uv_error_to_io_error(status.get()));
236 unsafe { (*result_cell_ptr).put_back(res); }
237 let scheduler = Local::take::<Scheduler>();
238 scheduler.resume_blocked_task_immediately(task_cell.take());
244 assert!(!result_cell.is_empty());
245 return result_cell.take();
248 fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
249 let mut watcher = TcpWatcher::new(self.uv_loop());
250 match watcher.bind(addr) {
251 Ok(_) => Ok(~UvTcpListener::new(watcher)),
253 let scheduler = Local::take::<Scheduler>();
254 do scheduler.deschedule_running_task_and_then |_, task| {
255 let task_cell = Cell::new(task);
256 do watcher.as_stream().close {
257 let scheduler = Local::take::<Scheduler>();
258 scheduler.resume_blocked_task_immediately(task_cell.take());
261 Err(uv_error_to_io_error(uverr))
266 fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> {
267 let mut watcher = UdpWatcher::new(self.uv_loop());
268 match watcher.bind(addr) {
269 Ok(_) => Ok(~UvUdpSocket(watcher)),
271 let scheduler = Local::take::<Scheduler>();
272 do scheduler.deschedule_running_task_and_then |_, task| {
273 let task_cell = Cell::new(task);
275 let scheduler = Local::take::<Scheduler>();
276 scheduler.resume_blocked_task_immediately(task_cell.take());
279 Err(uv_error_to_io_error(uverr))
285 // FIXME #6090: Prefer newtype structs but Drop doesn't work
286 pub struct UvTcpListener {
289 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
293 fn new(watcher: TcpWatcher) -> UvTcpListener {
297 incoming_streams: Tube::new()
301 fn watcher(&self) -> TcpWatcher { self.watcher }
304 impl Drop for UvTcpListener {
306 let watcher = self.watcher();
307 let scheduler = Local::take::<Scheduler>();
308 do scheduler.deschedule_running_task_and_then |_, task| {
309 let task_cell = Cell::new(task);
310 do watcher.as_stream().close {
311 let scheduler = Local::take::<Scheduler>();
312 scheduler.resume_blocked_task_immediately(task_cell.take());
318 impl RtioSocket for UvTcpListener {
320 fn socket_name(&mut self) -> IpAddr { fail!(); }
323 impl RtioTcpListener for UvTcpListener {
325 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
326 rtdebug!("entering listen");
329 return self.incoming_streams.recv();
332 self.listening = true;
334 let server_tcp_watcher = self.watcher();
335 let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
337 let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
338 let mut server_tcp_watcher = server_tcp_watcher;
339 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
340 let maybe_stream = if status.is_none() {
341 let mut loop_ = server_stream_watcher.event_loop();
342 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
343 let client_tcp_watcher = client_tcp_watcher.as_stream();
344 // XXX: Need's to be surfaced in interface
345 server_stream_watcher.accept(client_tcp_watcher);
346 Ok(~UvTcpStream(client_tcp_watcher))
348 Err(standard_error(OtherIoError))
351 let mut incoming_streams = incoming_streams_cell.take();
352 incoming_streams.send(maybe_stream);
353 incoming_streams_cell.put_back(incoming_streams);
356 return self.incoming_streams.recv();
360 fn accept_simultaneously(&mut self) { fail!(); }
361 fn dont_accept_simultaneously(&mut self) { fail!(); }
364 // FIXME #6090: Prefer newtype structs but Drop doesn't work
365 pub struct UvTcpStream(StreamWatcher);
367 impl Drop for UvTcpStream {
369 rtdebug!("closing tcp stream");
370 let scheduler = Local::take::<Scheduler>();
371 do scheduler.deschedule_running_task_and_then |_, task| {
372 let task_cell = Cell::new(task);
374 let scheduler = Local::take::<Scheduler>();
375 scheduler.resume_blocked_task_immediately(task_cell.take());
381 impl RtioSocket for UvTcpStream {
383 fn socket_name(&mut self) -> IpAddr { fail!(); }
386 impl RtioTcpStream for UvTcpStream {
387 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
388 let result_cell = Cell::new_empty();
389 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
391 let scheduler = Local::take::<Scheduler>();
392 assert!(scheduler.in_task_context());
393 let buf_ptr: *&mut [u8] = &buf;
394 do scheduler.deschedule_running_task_and_then |sched, task| {
395 rtdebug!("read: entered scheduler context");
396 assert!(!sched.in_task_context());
397 let task_cell = Cell::new(task);
398 // XXX: We shouldn't reallocate these callbacks every
400 let alloc: AllocCallback = |_| unsafe {
401 slice_to_uv_buf(*buf_ptr)
403 let mut watcher = **self;
404 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
406 // Stop reading so that no read callbacks are
407 // triggered before the user calls `read` again.
408 // XXX: Is there a performance impact to calling
412 let result = if status.is_none() {
416 Err(uv_error_to_io_error(status.unwrap()))
419 unsafe { (*result_cell_ptr).put_back(result); }
421 let scheduler = Local::take::<Scheduler>();
422 scheduler.resume_blocked_task_immediately(task_cell.take());
426 assert!(!result_cell.is_empty());
427 return result_cell.take();
430 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
431 let result_cell = Cell::new_empty();
432 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
433 let scheduler = Local::take::<Scheduler>();
434 assert!(scheduler.in_task_context());
435 let buf_ptr: *&[u8] = &buf;
436 do scheduler.deschedule_running_task_and_then |_, task| {
437 let task_cell = Cell::new(task);
438 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
439 let mut watcher = **self;
440 do watcher.write(buf) |_watcher, status| {
441 let result = if status.is_none() {
444 Err(uv_error_to_io_error(status.unwrap()))
447 unsafe { (*result_cell_ptr).put_back(result); }
449 let scheduler = Local::take::<Scheduler>();
450 scheduler.resume_blocked_task_immediately(task_cell.take());
454 assert!(!result_cell.is_empty());
455 return result_cell.take();
459 fn peer_name(&mut self) -> IpAddr { fail!(); }
460 fn control_congestion(&mut self) { fail!(); }
461 fn nodelay(&mut self) { fail!(); }
462 fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
463 fn letdie(&mut self) { fail!(); }
466 pub struct UvUdpSocket(UdpWatcher);
468 impl Drop for UvUdpSocket {
470 rtdebug!("closing udp socket");
471 let scheduler = Local::take::<Scheduler>();
472 do scheduler.deschedule_running_task_and_then |_, task| {
473 let task_cell = Cell::new(task);
475 let scheduler = Local::take::<Scheduler>();
476 scheduler.resume_blocked_task_immediately(task_cell.take());
482 impl RtioSocket for UvUdpSocket {
484 fn socket_name(&mut self) -> IpAddr { fail!(); }
487 impl RtioUdpSocket for UvUdpSocket {
488 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
489 let result_cell = Cell::new_empty();
490 let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
492 let scheduler = Local::take::<Scheduler>();
493 assert!(scheduler.in_task_context());
494 let buf_ptr: *&mut [u8] = &buf;
495 do scheduler.deschedule_running_task_and_then |sched, task| {
496 rtdebug!("recvfrom: entered scheduler context");
497 assert!(!sched.in_task_context());
498 let task_cell = Cell::new(task);
499 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
500 do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
501 let _ = flags; // XXX add handling for partials?
505 let result = match status {
508 Ok((nread as uint, addr))
510 Some(err) => Err(uv_error_to_io_error(err))
513 unsafe { (*result_cell_ptr).put_back(result); }
515 let scheduler = Local::take::<Scheduler>();
516 scheduler.resume_blocked_task_immediately(task_cell.take());
520 assert!(!result_cell.is_empty());
521 return result_cell.take();
524 fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
525 let result_cell = Cell::new_empty();
526 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
527 let scheduler = Local::take::<Scheduler>();
528 assert!(scheduler.in_task_context());
529 let buf_ptr: *&[u8] = &buf;
530 do scheduler.deschedule_running_task_and_then |_, task| {
531 let task_cell = Cell::new(task);
532 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
533 do self.send(buf, dst) |_watcher, status| {
535 let result = match status {
537 Some(err) => Err(uv_error_to_io_error(err)),
540 unsafe { (*result_cell_ptr).put_back(result); }
542 let scheduler = Local::take::<Scheduler>();
543 scheduler.resume_blocked_task_immediately(task_cell.take());
547 assert!(!result_cell.is_empty());
548 return result_cell.take();
552 fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
553 fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }
555 fn loop_multicast_locally(&mut self) { fail!(); }
556 fn dont_loop_multicast_locally(&mut self) { fail!(); }
558 fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
559 fn time_to_live(&mut self, _ttl: int) { fail!(); }
561 fn hear_broadcasts(&mut self) { fail!(); }
562 fn ignore_broadcasts(&mut self) { fail!(); }
566 fn test_simple_io_no_connect() {
567 do run_in_newsched_task {
569 let io = Local::unsafe_borrow::<IoFactoryObject>();
570 let addr = next_test_ip4();
571 let maybe_chan = (*io).tcp_connect(addr);
572 assert!(maybe_chan.is_err());
578 fn test_simple_udp_io_bind_only() {
579 do run_in_newsched_task {
581 let io = Local::unsafe_borrow::<IoFactoryObject>();
582 let addr = next_test_ip4();
583 let maybe_socket = (*io).udp_bind(addr);
584 assert!(maybe_socket.is_ok());
590 fn test_simple_tcp_server_and_client() {
591 do run_in_newsched_task {
592 let addr = next_test_ip4();
594 // Start the server first so it's listening when we connect
595 do spawntask_immediately {
597 let io = Local::unsafe_borrow::<IoFactoryObject>();
598 let mut listener = (*io).tcp_bind(addr).unwrap();
599 let mut stream = listener.accept().unwrap();
600 let mut buf = [0, .. 2048];
601 let nread = stream.read(buf).unwrap();
602 assert_eq!(nread, 8);
603 for uint::range(0, nread) |i| {
604 rtdebug!("%u", buf[i] as uint);
605 assert_eq!(buf[i], i as u8);
610 do spawntask_immediately {
612 let io = Local::unsafe_borrow::<IoFactoryObject>();
613 let mut stream = (*io).tcp_connect(addr).unwrap();
614 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
621 fn test_simple_udp_server_and_client() {
622 do run_in_newsched_task {
623 let server_addr = next_test_ip4();
624 let client_addr = next_test_ip4();
626 do spawntask_immediately {
628 let io = Local::unsafe_borrow::<IoFactoryObject>();
629 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
630 let mut buf = [0, .. 2048];
631 let (nread,src) = server_socket.recvfrom(buf).unwrap();
632 assert_eq!(nread, 8);
633 for uint::range(0, nread) |i| {
634 rtdebug!("%u", buf[i] as uint);
635 assert_eq!(buf[i], i as u8);
637 assert_eq!(src, client_addr);
641 do spawntask_immediately {
643 let io = Local::unsafe_borrow::<IoFactoryObject>();
644 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
645 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
651 #[test] #[ignore(reason = "busted")]
652 fn test_read_and_block() {
653 do run_in_newsched_task {
654 let addr = next_test_ip4();
656 do spawntask_immediately {
657 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
658 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
659 let mut stream = listener.accept().unwrap();
660 let mut buf = [0, .. 2048];
666 while current < expected {
667 let nread = stream.read(buf).unwrap();
668 for uint::range(0, nread) |i| {
669 let val = buf[i] as uint;
670 assert_eq!(val, current % 8);
675 let scheduler = Local::take::<Scheduler>();
676 // Yield to the other task in hopes that it
677 // will trigger a read callback while we are
679 do scheduler.deschedule_running_task_and_then |sched, task| {
680 let task = Cell::new(task);
681 sched.enqueue_blocked_task(task.take());
685 // Make sure we had multiple reads
689 do spawntask_immediately {
691 let io = Local::unsafe_borrow::<IoFactoryObject>();
692 let mut stream = (*io).tcp_connect(addr).unwrap();
693 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
694 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
695 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
696 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
704 fn test_read_read_read() {
705 do run_in_newsched_task {
706 let addr = next_test_ip4();
707 static MAX: uint = 500000;
709 do spawntask_immediately {
711 let io = Local::unsafe_borrow::<IoFactoryObject>();
712 let mut listener = (*io).tcp_bind(addr).unwrap();
713 let mut stream = listener.accept().unwrap();
714 let buf = [1, .. 2048];
715 let mut total_bytes_written = 0;
716 while total_bytes_written < MAX {
718 total_bytes_written += buf.len();
723 do spawntask_immediately {
725 let io = Local::unsafe_borrow::<IoFactoryObject>();
726 let mut stream = (*io).tcp_connect(addr).unwrap();
727 let mut buf = [0, .. 2048];
728 let mut total_bytes_read = 0;
729 while total_bytes_read < MAX {
730 let nread = stream.read(buf).unwrap();
731 rtdebug!("read %u bytes", nread as uint);
732 total_bytes_read += nread;
733 for uint::range(0, nread) |i| {
734 assert_eq!(buf[i], 1);
737 rtdebug!("read %u bytes total", total_bytes_read as uint);
744 fn test_udp_twice() {
745 do run_in_newsched_task {
746 let server_addr = next_test_ip4();
747 let client_addr = next_test_ip4();
749 do spawntask_immediately {
751 let io = Local::unsafe_borrow::<IoFactoryObject>();
752 let mut client = (*io).udp_bind(client_addr).unwrap();
753 assert!(client.sendto([1], server_addr).is_ok());
754 assert!(client.sendto([2], server_addr).is_ok());
758 do spawntask_immediately {
760 let io = Local::unsafe_borrow::<IoFactoryObject>();
761 let mut server = (*io).udp_bind(server_addr).unwrap();
764 let (nread1, src1) = server.recvfrom(buf1).unwrap();
765 let (nread2, src2) = server.recvfrom(buf2).unwrap();
766 assert_eq!(nread1, 1);
767 assert_eq!(nread2, 1);
768 assert_eq!(src1, client_addr);
769 assert_eq!(src2, client_addr);
770 assert_eq!(buf1[0], 1);
771 assert_eq!(buf2[0], 2);
778 fn test_udp_many_read() {
779 do run_in_newsched_task {
780 let server_out_addr = next_test_ip4();
781 let server_in_addr = next_test_ip4();
782 let client_out_addr = next_test_ip4();
783 let client_in_addr = next_test_ip4();
784 static MAX: uint = 500_000;
786 do spawntask_immediately {
788 let io = Local::unsafe_borrow::<IoFactoryObject>();
789 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
790 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
791 let msg = [1, .. 2048];
792 let mut total_bytes_sent = 0;
796 assert!(server_out.sendto(msg, client_in_addr).is_ok());
797 total_bytes_sent += msg.len();
798 // check if the client has received enough
799 let res = server_in.recvfrom(buf);
800 assert!(res.is_ok());
801 let (nread, src) = res.unwrap();
802 assert_eq!(nread, 1);
803 assert_eq!(src, client_out_addr);
805 assert!(total_bytes_sent >= MAX);
809 do spawntask_immediately {
811 let io = Local::unsafe_borrow::<IoFactoryObject>();
812 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
813 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
814 let mut total_bytes_recv = 0;
815 let mut buf = [0, .. 2048];
816 while total_bytes_recv < MAX {
818 assert!(client_out.sendto([1], server_in_addr).is_ok());
820 let res = client_in.recvfrom(buf);
821 assert!(res.is_ok());
822 let (nread, src) = res.unwrap();
823 assert_eq!(src, server_out_addr);
824 total_bytes_recv += nread;
825 for uint::range(0, nread) |i| {
826 assert_eq!(buf[i], 1);
829 // tell the server we're done
830 assert!(client_out.sendto([0], server_in_addr).is_ok());