1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
19 use rt::io::net::ip::IpAddr;
21 use rt::uv::idle::IdleWatcher;
23 use rt::sched::Scheduler;
24 use rt::io::{standard_error, OtherIoError};
27 use unstable::sync::{Exclusive, exclusive};
29 #[cfg(test)] use container::Container;
30 #[cfg(test)] use uint;
31 #[cfg(test)] use unstable::run_in_bare_thread;
32 #[cfg(test)] use rt::test::{spawntask_immediately,
34 run_in_newsched_task};
37 pub struct UvEventLoop {
42 pub fn new() -> UvEventLoop {
44 uvio: UvIoFactory(Loop::new())
49 impl Drop for UvEventLoop {
51 // XXX: Need mutable finalizer
53 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
55 this.uvio.uv_loop().close();
59 impl EventLoop for UvEventLoop {
61 self.uvio.uv_loop().run();
64 fn callback(&mut self, f: ~fn()) {
65 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
66 do idle_watcher.start |mut idle_watcher, status| {
67 assert!(status.is_none());
69 idle_watcher.close(||());
74 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
75 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
76 do timer.start(ms, 0) |timer, status| {
77 assert!(status.is_none());
83 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
84 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
87 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
93 fn test_callback_run_once() {
94 do run_in_bare_thread {
95 let mut event_loop = UvEventLoop::new();
97 let count_ptr: *mut int = &mut count;
98 do event_loop.callback {
99 unsafe { *count_ptr += 1 }
102 assert_eq!(count, 1);
106 pub struct UvRemoteCallback {
107 // The uv async handle for triggering the callback
109 // A flag to tell the callback to exit, set from the dtor. This is
110 // almost never contested - only in rare races with the dtor.
111 exit_flag: Exclusive<bool>
114 impl UvRemoteCallback {
115 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
116 let exit_flag = exclusive(false);
117 let exit_flag_clone = exit_flag.clone();
118 let async = do AsyncWatcher::new(loop_) |watcher, status| {
119 assert!(status.is_none());
122 do exit_flag_clone.with_imm |&should_exit| {
136 impl RemoteCallback for UvRemoteCallback {
137 fn fire(&mut self) { self.async.send() }
140 impl Drop for UvRemoteCallback {
143 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
144 do this.exit_flag.with |should_exit| {
145 // NB: These two things need to happen atomically. Otherwise
146 // the event handler could wake up due to a *previous*
147 // signal and see the exit flag, destroying the handle
148 // before the final send.
160 use rt::thread::Thread;
162 use rt::rtio::EventLoop;
163 use rt::local::Local;
164 use rt::sched::Scheduler;
167 fn test_uv_remote() {
168 do run_in_newsched_task {
169 let mut tube = Tube::new();
170 let tube_clone = tube.clone();
171 let remote_cell = Cell::new_empty();
172 do Local::borrow::<Scheduler, ()>() |sched| {
173 let tube_clone = tube_clone.clone();
174 let tube_clone_cell = Cell::new(tube_clone);
175 let remote = do sched.event_loop.remote_callback {
176 tube_clone_cell.take().send(1);
178 remote_cell.put_back(remote);
180 let _thread = do Thread::start {
181 remote_cell.take().fire();
184 assert!(tube.recv() == 1);
189 pub struct UvIoFactory(Loop);
192 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
193 match self { &UvIoFactory(ref mut ptr) => ptr }
197 impl IoFactory for UvIoFactory {
198 // Connect to an address and return a new stream
199 // NB: This blocks the task waiting on the connection.
200 // It would probably be better to return a future
201 fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
202 // Create a cell in the task to hold the result. We will fill
203 // the cell before resuming the task.
204 let result_cell = Cell::new_empty();
205 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
207 let scheduler = Local::take::<Scheduler>();
208 assert!(scheduler.in_task_context());
210 // Block this task and take ownership, switch to scheduler context
211 do scheduler.deschedule_running_task_and_then |sched, task| {
213 rtdebug!("connect: entered scheduler context");
214 assert!(!sched.in_task_context());
215 let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
216 let task_cell = Cell::new(task);
218 // Wait for a connection
219 do tcp_watcher.connect(addr) |stream_watcher, status| {
220 rtdebug!("connect: in connect callback");
221 if status.is_none() {
222 rtdebug!("status is none");
223 let res = Ok(~UvTcpStream(stream_watcher));
225 // Store the stream in the task's stack
226 unsafe { (*result_cell_ptr).put_back(res); }
229 let scheduler = Local::take::<Scheduler>();
230 scheduler.resume_blocked_task_immediately(task_cell.take());
232 rtdebug!("status is some");
233 let task_cell = Cell::new(task_cell.take());
234 do stream_watcher.close {
235 let res = Err(uv_error_to_io_error(status.get()));
236 unsafe { (*result_cell_ptr).put_back(res); }
237 let scheduler = Local::take::<Scheduler>();
238 scheduler.resume_blocked_task_immediately(task_cell.take());
244 assert!(!result_cell.is_empty());
245 return result_cell.take();
248 fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
249 let mut watcher = TcpWatcher::new(self.uv_loop());
250 match watcher.bind(addr) {
251 Ok(_) => Ok(~UvTcpListener::new(watcher)),
253 let scheduler = Local::take::<Scheduler>();
254 do scheduler.deschedule_running_task_and_then |_, task| {
255 let task_cell = Cell::new(task);
256 do watcher.as_stream().close {
257 let scheduler = Local::take::<Scheduler>();
258 scheduler.resume_blocked_task_immediately(task_cell.take());
261 Err(uv_error_to_io_error(uverr))
266 fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> {
267 let mut watcher = UdpWatcher::new(self.uv_loop());
268 match watcher.bind(addr) {
269 Ok(_) => Ok(~UvUdpSocket(watcher)),
271 let scheduler = Local::take::<Scheduler>();
272 do scheduler.deschedule_running_task_and_then |_, task| {
273 let task_cell = Cell::new(task);
275 let scheduler = Local::take::<Scheduler>();
276 scheduler.resume_blocked_task_immediately(task_cell.take());
279 Err(uv_error_to_io_error(uverr))
284 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
285 Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
289 // FIXME #6090: Prefer newtype structs but Drop doesn't work
290 pub struct UvTcpListener {
293 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
297 fn new(watcher: TcpWatcher) -> UvTcpListener {
301 incoming_streams: Tube::new()
305 fn watcher(&self) -> TcpWatcher { self.watcher }
308 impl Drop for UvTcpListener {
310 let watcher = self.watcher();
311 let scheduler = Local::take::<Scheduler>();
312 do scheduler.deschedule_running_task_and_then |_, task| {
313 let task_cell = Cell::new(task);
314 do watcher.as_stream().close {
315 let scheduler = Local::take::<Scheduler>();
316 scheduler.resume_blocked_task_immediately(task_cell.take());
322 impl RtioSocket for UvTcpListener {
324 fn socket_name(&mut self) -> IpAddr { fail!(); }
327 impl RtioTcpListener for UvTcpListener {
329 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
330 rtdebug!("entering listen");
333 return self.incoming_streams.recv();
336 self.listening = true;
338 let server_tcp_watcher = self.watcher();
339 let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
341 let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
342 let mut server_tcp_watcher = server_tcp_watcher;
343 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
344 let maybe_stream = if status.is_none() {
345 let mut loop_ = server_stream_watcher.event_loop();
346 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
347 let client_tcp_watcher = client_tcp_watcher.as_stream();
348 // XXX: Need's to be surfaced in interface
349 server_stream_watcher.accept(client_tcp_watcher);
350 Ok(~UvTcpStream(client_tcp_watcher))
352 Err(standard_error(OtherIoError))
355 let mut incoming_streams = incoming_streams_cell.take();
356 incoming_streams.send(maybe_stream);
357 incoming_streams_cell.put_back(incoming_streams);
360 return self.incoming_streams.recv();
364 fn accept_simultaneously(&mut self) { fail!(); }
365 fn dont_accept_simultaneously(&mut self) { fail!(); }
368 // FIXME #6090: Prefer newtype structs but Drop doesn't work
369 pub struct UvTcpStream(StreamWatcher);
371 impl Drop for UvTcpStream {
373 rtdebug!("closing tcp stream");
374 let scheduler = Local::take::<Scheduler>();
375 do scheduler.deschedule_running_task_and_then |_, task| {
376 let task_cell = Cell::new(task);
378 let scheduler = Local::take::<Scheduler>();
379 scheduler.resume_blocked_task_immediately(task_cell.take());
385 impl RtioSocket for UvTcpStream {
387 fn socket_name(&mut self) -> IpAddr { fail!(); }
390 impl RtioTcpStream for UvTcpStream {
391 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
392 let result_cell = Cell::new_empty();
393 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
395 let scheduler = Local::take::<Scheduler>();
396 assert!(scheduler.in_task_context());
397 let buf_ptr: *&mut [u8] = &buf;
398 do scheduler.deschedule_running_task_and_then |sched, task| {
399 rtdebug!("read: entered scheduler context");
400 assert!(!sched.in_task_context());
401 let task_cell = Cell::new(task);
402 // XXX: We shouldn't reallocate these callbacks every
404 let alloc: AllocCallback = |_| unsafe {
405 slice_to_uv_buf(*buf_ptr)
407 let mut watcher = **self;
408 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
410 // Stop reading so that no read callbacks are
411 // triggered before the user calls `read` again.
412 // XXX: Is there a performance impact to calling
416 let result = if status.is_none() {
420 Err(uv_error_to_io_error(status.unwrap()))
423 unsafe { (*result_cell_ptr).put_back(result); }
425 let scheduler = Local::take::<Scheduler>();
426 scheduler.resume_blocked_task_immediately(task_cell.take());
430 assert!(!result_cell.is_empty());
431 return result_cell.take();
434 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
435 let result_cell = Cell::new_empty();
436 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
437 let scheduler = Local::take::<Scheduler>();
438 assert!(scheduler.in_task_context());
439 let buf_ptr: *&[u8] = &buf;
440 do scheduler.deschedule_running_task_and_then |_, task| {
441 let task_cell = Cell::new(task);
442 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
443 let mut watcher = **self;
444 do watcher.write(buf) |_watcher, status| {
445 let result = if status.is_none() {
448 Err(uv_error_to_io_error(status.unwrap()))
451 unsafe { (*result_cell_ptr).put_back(result); }
453 let scheduler = Local::take::<Scheduler>();
454 scheduler.resume_blocked_task_immediately(task_cell.take());
458 assert!(!result_cell.is_empty());
459 return result_cell.take();
463 fn peer_name(&mut self) -> IpAddr { fail!(); }
464 fn control_congestion(&mut self) { fail!(); }
465 fn nodelay(&mut self) { fail!(); }
466 fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
467 fn letdie(&mut self) { fail!(); }
470 pub struct UvUdpSocket(UdpWatcher);
472 impl Drop for UvUdpSocket {
474 rtdebug!("closing udp socket");
475 let scheduler = Local::take::<Scheduler>();
476 do scheduler.deschedule_running_task_and_then |_, task| {
477 let task_cell = Cell::new(task);
479 let scheduler = Local::take::<Scheduler>();
480 scheduler.resume_blocked_task_immediately(task_cell.take());
486 impl RtioSocket for UvUdpSocket {
488 fn socket_name(&mut self) -> IpAddr { fail!(); }
491 impl RtioUdpSocket for UvUdpSocket {
492 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
493 let result_cell = Cell::new_empty();
494 let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
496 let scheduler = Local::take::<Scheduler>();
497 assert!(scheduler.in_task_context());
498 let buf_ptr: *&mut [u8] = &buf;
499 do scheduler.deschedule_running_task_and_then |sched, task| {
500 rtdebug!("recvfrom: entered scheduler context");
501 assert!(!sched.in_task_context());
502 let task_cell = Cell::new(task);
503 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
504 do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
505 let _ = flags; // XXX add handling for partials?
509 let result = match status {
512 Ok((nread as uint, addr))
514 Some(err) => Err(uv_error_to_io_error(err))
517 unsafe { (*result_cell_ptr).put_back(result); }
519 let scheduler = Local::take::<Scheduler>();
520 scheduler.resume_blocked_task_immediately(task_cell.take());
524 assert!(!result_cell.is_empty());
525 return result_cell.take();
528 fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
529 let result_cell = Cell::new_empty();
530 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
531 let scheduler = Local::take::<Scheduler>();
532 assert!(scheduler.in_task_context());
533 let buf_ptr: *&[u8] = &buf;
534 do scheduler.deschedule_running_task_and_then |_, task| {
535 let task_cell = Cell::new(task);
536 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
537 do self.send(buf, dst) |_watcher, status| {
539 let result = match status {
541 Some(err) => Err(uv_error_to_io_error(err)),
544 unsafe { (*result_cell_ptr).put_back(result); }
546 let scheduler = Local::take::<Scheduler>();
547 scheduler.resume_blocked_task_immediately(task_cell.take());
551 assert!(!result_cell.is_empty());
552 return result_cell.take();
556 fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
557 fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }
559 fn loop_multicast_locally(&mut self) { fail!(); }
560 fn dont_loop_multicast_locally(&mut self) { fail!(); }
562 fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
563 fn time_to_live(&mut self, _ttl: int) { fail!(); }
565 fn hear_broadcasts(&mut self) { fail!(); }
566 fn ignore_broadcasts(&mut self) { fail!(); }
569 pub struct UvTimer(timer::TimerWatcher);
572 fn new(w: timer::TimerWatcher) -> UvTimer {
577 impl Drop for UvTimer {
579 rtdebug!("closing UvTimer");
580 let scheduler = Local::take::<Scheduler>();
581 do scheduler.deschedule_running_task_and_then |_, task| {
582 let task_cell = Cell::new(task);
584 let scheduler = Local::take::<Scheduler>();
585 scheduler.resume_task_immediately(task_cell.take());
591 impl RtioTimer for UvTimer {
592 fn sleep(&self, msecs: u64) {
593 let scheduler = Local::take::<Scheduler>();
594 assert!(scheduler.in_task_context());
595 do scheduler.deschedule_running_task_and_then |sched, task| {
596 rtdebug!("sleep: entered scheduler context");
597 assert!(!sched.in_task_context());
598 let task_cell = Cell::new(task);
599 let mut watcher = **self;
600 do watcher.start(msecs, 0) |_, status| {
601 assert!(status.is_none());
602 let scheduler = Local::take::<Scheduler>();
603 scheduler.resume_task_immediately(task_cell.take());
612 fn test_simple_io_no_connect() {
613 do run_in_newsched_task {
615 let io = Local::unsafe_borrow::<IoFactoryObject>();
616 let addr = next_test_ip4();
617 let maybe_chan = (*io).tcp_connect(addr);
618 assert!(maybe_chan.is_err());
624 fn test_simple_udp_io_bind_only() {
625 do run_in_newsched_task {
627 let io = Local::unsafe_borrow::<IoFactoryObject>();
628 let addr = next_test_ip4();
629 let maybe_socket = (*io).udp_bind(addr);
630 assert!(maybe_socket.is_ok());
636 fn test_simple_tcp_server_and_client() {
637 do run_in_newsched_task {
638 let addr = next_test_ip4();
640 // Start the server first so it's listening when we connect
641 do spawntask_immediately {
643 let io = Local::unsafe_borrow::<IoFactoryObject>();
644 let mut listener = (*io).tcp_bind(addr).unwrap();
645 let mut stream = listener.accept().unwrap();
646 let mut buf = [0, .. 2048];
647 let nread = stream.read(buf).unwrap();
648 assert_eq!(nread, 8);
649 for uint::range(0, nread) |i| {
650 rtdebug!("%u", buf[i] as uint);
651 assert_eq!(buf[i], i as u8);
656 do spawntask_immediately {
658 let io = Local::unsafe_borrow::<IoFactoryObject>();
659 let mut stream = (*io).tcp_connect(addr).unwrap();
660 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
667 fn test_simple_udp_server_and_client() {
668 do run_in_newsched_task {
669 let server_addr = next_test_ip4();
670 let client_addr = next_test_ip4();
672 do spawntask_immediately {
674 let io = Local::unsafe_borrow::<IoFactoryObject>();
675 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
676 let mut buf = [0, .. 2048];
677 let (nread,src) = server_socket.recvfrom(buf).unwrap();
678 assert_eq!(nread, 8);
679 for uint::range(0, nread) |i| {
680 rtdebug!("%u", buf[i] as uint);
681 assert_eq!(buf[i], i as u8);
683 assert_eq!(src, client_addr);
687 do spawntask_immediately {
689 let io = Local::unsafe_borrow::<IoFactoryObject>();
690 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
691 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
697 #[test] #[ignore(reason = "busted")]
698 fn test_read_and_block() {
699 do run_in_newsched_task {
700 let addr = next_test_ip4();
702 do spawntask_immediately {
703 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
704 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
705 let mut stream = listener.accept().unwrap();
706 let mut buf = [0, .. 2048];
712 while current < expected {
713 let nread = stream.read(buf).unwrap();
714 for uint::range(0, nread) |i| {
715 let val = buf[i] as uint;
716 assert_eq!(val, current % 8);
721 let scheduler = Local::take::<Scheduler>();
722 // Yield to the other task in hopes that it
723 // will trigger a read callback while we are
725 do scheduler.deschedule_running_task_and_then |sched, task| {
726 let task = Cell::new(task);
727 sched.enqueue_blocked_task(task.take());
731 // Make sure we had multiple reads
735 do spawntask_immediately {
737 let io = Local::unsafe_borrow::<IoFactoryObject>();
738 let mut stream = (*io).tcp_connect(addr).unwrap();
739 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
740 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
741 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
742 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
750 fn test_read_read_read() {
751 do run_in_newsched_task {
752 let addr = next_test_ip4();
753 static MAX: uint = 500000;
755 do spawntask_immediately {
757 let io = Local::unsafe_borrow::<IoFactoryObject>();
758 let mut listener = (*io).tcp_bind(addr).unwrap();
759 let mut stream = listener.accept().unwrap();
760 let buf = [1, .. 2048];
761 let mut total_bytes_written = 0;
762 while total_bytes_written < MAX {
764 total_bytes_written += buf.len();
769 do spawntask_immediately {
771 let io = Local::unsafe_borrow::<IoFactoryObject>();
772 let mut stream = (*io).tcp_connect(addr).unwrap();
773 let mut buf = [0, .. 2048];
774 let mut total_bytes_read = 0;
775 while total_bytes_read < MAX {
776 let nread = stream.read(buf).unwrap();
777 rtdebug!("read %u bytes", nread as uint);
778 total_bytes_read += nread;
779 for uint::range(0, nread) |i| {
780 assert_eq!(buf[i], 1);
783 rtdebug!("read %u bytes total", total_bytes_read as uint);
790 fn test_udp_twice() {
791 do run_in_newsched_task {
792 let server_addr = next_test_ip4();
793 let client_addr = next_test_ip4();
795 do spawntask_immediately {
797 let io = Local::unsafe_borrow::<IoFactoryObject>();
798 let mut client = (*io).udp_bind(client_addr).unwrap();
799 assert!(client.sendto([1], server_addr).is_ok());
800 assert!(client.sendto([2], server_addr).is_ok());
804 do spawntask_immediately {
806 let io = Local::unsafe_borrow::<IoFactoryObject>();
807 let mut server = (*io).udp_bind(server_addr).unwrap();
810 let (nread1, src1) = server.recvfrom(buf1).unwrap();
811 let (nread2, src2) = server.recvfrom(buf2).unwrap();
812 assert_eq!(nread1, 1);
813 assert_eq!(nread2, 1);
814 assert_eq!(src1, client_addr);
815 assert_eq!(src2, client_addr);
816 assert_eq!(buf1[0], 1);
817 assert_eq!(buf2[0], 2);
824 fn test_udp_many_read() {
825 do run_in_newsched_task {
826 let server_out_addr = next_test_ip4();
827 let server_in_addr = next_test_ip4();
828 let client_out_addr = next_test_ip4();
829 let client_in_addr = next_test_ip4();
830 static MAX: uint = 500_000;
832 do spawntask_immediately {
834 let io = Local::unsafe_borrow::<IoFactoryObject>();
835 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
836 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
837 let msg = [1, .. 2048];
838 let mut total_bytes_sent = 0;
842 assert!(server_out.sendto(msg, client_in_addr).is_ok());
843 total_bytes_sent += msg.len();
844 // check if the client has received enough
845 let res = server_in.recvfrom(buf);
846 assert!(res.is_ok());
847 let (nread, src) = res.unwrap();
848 assert_eq!(nread, 1);
849 assert_eq!(src, client_out_addr);
851 assert!(total_bytes_sent >= MAX);
855 do spawntask_immediately {
857 let io = Local::unsafe_borrow::<IoFactoryObject>();
858 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
859 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
860 let mut total_bytes_recv = 0;
861 let mut buf = [0, .. 2048];
862 while total_bytes_recv < MAX {
864 assert!(client_out.sendto([1], server_in_addr).is_ok());
866 let res = client_in.recvfrom(buf);
867 assert!(res.is_ok());
868 let (nread, src) = res.unwrap();
869 assert_eq!(src, server_out_addr);
870 total_bytes_recv += nread;
871 for uint::range(0, nread) |i| {
872 assert_eq!(buf[i], 1);
875 // tell the server we're done
876 assert!(client_out.sendto([0], server_in_addr).is_ok());
882 fn test_timer_sleep_simple_impl() {
884 let io = Local::unsafe_borrow::<IoFactoryObject>();
885 let timer = (*io).timer_init();
888 Err(_) => assert!(false)
893 fn test_timer_sleep_simple() {
894 do run_in_newsched_task {
895 test_timer_sleep_simple_impl();