1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use libc::{c_int, c_uint, c_void};
22 use rt::io::net::ip::{SocketAddr, IpAddr};
23 use rt::io::{standard_error, OtherIoError};
26 use rt::sched::Scheduler;
29 use rt::uv::idle::IdleWatcher;
30 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
31 use unstable::sync::Exclusive;
33 #[cfg(test)] use container::Container;
34 #[cfg(test)] use unstable::run_in_bare_thread;
35 #[cfg(test)] use rt::test::{spawntask,
37 run_in_newsched_task};
38 #[cfg(test)] use iterator::{Iterator, range};
46 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
47 handle: U) -> Result<SocketAddr, IoError> {
49 let getsockname = match sk {
50 TcpPeer => uvll::rust_uv_tcp_getpeername,
51 Tcp => uvll::rust_uv_tcp_getsockname,
52 Udp => uvll::rust_uv_udp_getsockname
55 // Allocate a sockaddr_storage
56 // since we don't know if it's ipv4 or ipv6
57 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
60 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
64 let status = status_to_maybe_uv_error(handle, r);
65 return Err(uv_error_to_io_error(status.unwrap()));
69 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
70 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
72 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
76 unsafe { uvll::free_sockaddr_storage(r_addr); }
82 pub struct UvEventLoop {
87 pub fn new() -> UvEventLoop {
89 uvio: UvIoFactory(Loop::new())
94 impl Drop for UvEventLoop {
96 // XXX: Need mutable finalizer
98 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
100 this.uvio.uv_loop().close();
104 impl EventLoop for UvEventLoop {
106 self.uvio.uv_loop().run();
109 fn callback(&mut self, f: ~fn()) {
110 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
111 do idle_watcher.start |mut idle_watcher, status| {
112 assert!(status.is_none());
114 idle_watcher.close(||());
119 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
120 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
121 do timer.start(ms, 0) |timer, status| {
122 assert!(status.is_none());
128 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
129 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
132 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
138 fn test_callback_run_once() {
139 do run_in_bare_thread {
140 let mut event_loop = UvEventLoop::new();
142 let count_ptr: *mut int = &mut count;
143 do event_loop.callback {
144 unsafe { *count_ptr += 1 }
147 assert_eq!(count, 1);
151 pub struct UvRemoteCallback {
152 // The uv async handle for triggering the callback
154 // A flag to tell the callback to exit, set from the dtor. This is
155 // almost never contested - only in rare races with the dtor.
156 exit_flag: Exclusive<bool>
159 impl UvRemoteCallback {
160 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
161 let exit_flag = Exclusive::new(false);
162 let exit_flag_clone = exit_flag.clone();
163 let async = do AsyncWatcher::new(loop_) |watcher, status| {
164 assert!(status.is_none());
167 do exit_flag_clone.with_imm |&should_exit| {
181 impl RemoteCallback for UvRemoteCallback {
182 fn fire(&mut self) { self.async.send() }
185 impl Drop for UvRemoteCallback {
188 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
189 do this.exit_flag.with |should_exit| {
190 // NB: These two things need to happen atomically. Otherwise
191 // the event handler could wake up due to a *previous*
192 // signal and see the exit flag, destroying the handle
193 // before the final send.
205 use rt::thread::Thread;
207 use rt::rtio::EventLoop;
208 use rt::local::Local;
209 use rt::sched::Scheduler;
212 fn test_uv_remote() {
213 do run_in_newsched_task {
214 let mut tube = Tube::new();
215 let tube_clone = tube.clone();
216 let remote_cell = Cell::new_empty();
217 do Local::borrow::<Scheduler, ()>() |sched| {
218 let tube_clone = tube_clone.clone();
219 let tube_clone_cell = Cell::new(tube_clone);
220 let remote = do sched.event_loop.remote_callback {
221 tube_clone_cell.take().send(1);
223 remote_cell.put_back(remote);
225 let thread = do Thread::start {
226 remote_cell.take().fire();
229 assert!(tube.recv() == 1);
235 pub struct UvIoFactory(Loop);
238 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
239 match self { &UvIoFactory(ref mut ptr) => ptr }
243 impl IoFactory for UvIoFactory {
244 // Connect to an address and return a new stream
245 // NB: This blocks the task waiting on the connection.
246 // It would probably be better to return a future
247 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
248 // Create a cell in the task to hold the result. We will fill
249 // the cell before resuming the task.
250 let result_cell = Cell::new_empty();
251 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
253 let scheduler = Local::take::<Scheduler>();
255 // Block this task and take ownership, switch to scheduler context
256 do scheduler.deschedule_running_task_and_then |_, task| {
258 rtdebug!("connect: entered scheduler context");
259 let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
260 let task_cell = Cell::new(task);
262 // Wait for a connection
263 do tcp_watcher.connect(addr) |stream_watcher, status| {
264 rtdebug!("connect: in connect callback");
265 if status.is_none() {
266 rtdebug!("status is none");
268 NativeHandle::from_native_handle(stream_watcher.native_handle());
269 let res = Ok(~UvTcpStream(tcp_watcher));
271 // Store the stream in the task's stack
272 unsafe { (*result_cell_ptr).put_back(res); }
275 let scheduler = Local::take::<Scheduler>();
276 scheduler.resume_blocked_task_immediately(task_cell.take());
278 rtdebug!("status is some");
279 let task_cell = Cell::new(task_cell.take());
280 do stream_watcher.close {
281 let res = Err(uv_error_to_io_error(status.unwrap()));
282 unsafe { (*result_cell_ptr).put_back(res); }
283 let scheduler = Local::take::<Scheduler>();
284 scheduler.resume_blocked_task_immediately(task_cell.take());
290 assert!(!result_cell.is_empty());
291 return result_cell.take();
294 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
295 let mut watcher = TcpWatcher::new(self.uv_loop());
296 match watcher.bind(addr) {
297 Ok(_) => Ok(~UvTcpListener::new(watcher)),
299 let scheduler = Local::take::<Scheduler>();
300 do scheduler.deschedule_running_task_and_then |_, task| {
301 let task_cell = Cell::new(task);
302 do watcher.as_stream().close {
303 let scheduler = Local::take::<Scheduler>();
304 scheduler.resume_blocked_task_immediately(task_cell.take());
307 Err(uv_error_to_io_error(uverr))
312 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
313 let mut watcher = UdpWatcher::new(self.uv_loop());
314 match watcher.bind(addr) {
315 Ok(_) => Ok(~UvUdpSocket(watcher)),
317 let scheduler = Local::take::<Scheduler>();
318 do scheduler.deschedule_running_task_and_then |_, task| {
319 let task_cell = Cell::new(task);
321 let scheduler = Local::take::<Scheduler>();
322 scheduler.resume_blocked_task_immediately(task_cell.take());
325 Err(uv_error_to_io_error(uverr))
330 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
331 Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
335 pub struct UvTcpListener {
338 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
342 fn new(watcher: TcpWatcher) -> UvTcpListener {
346 incoming_streams: Tube::new()
350 fn watcher(&self) -> TcpWatcher { self.watcher }
353 impl Drop for UvTcpListener {
355 let watcher = self.watcher();
356 let scheduler = Local::take::<Scheduler>();
357 do scheduler.deschedule_running_task_and_then |_, task| {
358 let task_cell = Cell::new(task);
359 do watcher.as_stream().close {
360 let scheduler = Local::take::<Scheduler>();
361 scheduler.resume_blocked_task_immediately(task_cell.take());
367 impl RtioSocket for UvTcpListener {
368 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
369 socket_name(Tcp, self.watcher)
373 impl RtioTcpListener for UvTcpListener {
375 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
376 rtdebug!("entering listen");
379 return self.incoming_streams.recv();
382 self.listening = true;
384 let server_tcp_watcher = self.watcher();
385 let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
387 let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
388 let mut server_tcp_watcher = server_tcp_watcher;
389 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
390 let maybe_stream = if status.is_none() {
391 let mut loop_ = server_stream_watcher.event_loop();
392 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
393 // XXX: Need's to be surfaced in interface
394 server_stream_watcher.accept(client_tcp_watcher.as_stream());
395 Ok(~UvTcpStream(client_tcp_watcher))
397 Err(standard_error(OtherIoError))
400 let mut incoming_streams = incoming_streams_cell.take();
401 incoming_streams.send(maybe_stream);
402 incoming_streams_cell.put_back(incoming_streams);
405 return self.incoming_streams.recv();
408 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
410 uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int)
413 match status_to_maybe_uv_error(self.watcher, r) {
414 Some(err) => Err(uv_error_to_io_error(err)),
419 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
421 uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int)
424 match status_to_maybe_uv_error(self.watcher, r) {
425 Some(err) => Err(uv_error_to_io_error(err)),
431 pub struct UvTcpStream(TcpWatcher);
433 impl Drop for UvTcpStream {
435 rtdebug!("closing tcp stream");
436 let scheduler = Local::take::<Scheduler>();
437 do scheduler.deschedule_running_task_and_then |_, task| {
438 let task_cell = Cell::new(task);
439 do self.as_stream().close {
440 let scheduler = Local::take::<Scheduler>();
441 scheduler.resume_blocked_task_immediately(task_cell.take());
447 impl RtioSocket for UvTcpStream {
448 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
449 socket_name(Tcp, **self)
453 impl RtioTcpStream for UvTcpStream {
454 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
455 let result_cell = Cell::new_empty();
456 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
458 let scheduler = Local::take::<Scheduler>();
459 let buf_ptr: *&mut [u8] = &buf;
460 do scheduler.deschedule_running_task_and_then |_sched, task| {
461 rtdebug!("read: entered scheduler context");
462 let task_cell = Cell::new(task);
463 // XXX: We shouldn't reallocate these callbacks every
465 let alloc: AllocCallback = |_| unsafe {
466 slice_to_uv_buf(*buf_ptr)
468 let mut watcher = self.as_stream();
469 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
471 // Stop reading so that no read callbacks are
472 // triggered before the user calls `read` again.
473 // XXX: Is there a performance impact to calling
477 let result = if status.is_none() {
481 Err(uv_error_to_io_error(status.unwrap()))
484 unsafe { (*result_cell_ptr).put_back(result); }
486 let scheduler = Local::take::<Scheduler>();
487 scheduler.resume_blocked_task_immediately(task_cell.take());
491 assert!(!result_cell.is_empty());
492 return result_cell.take();
495 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
496 let result_cell = Cell::new_empty();
497 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
498 let scheduler = Local::take::<Scheduler>();
499 let buf_ptr: *&[u8] = &buf;
500 do scheduler.deschedule_running_task_and_then |_, task| {
501 let task_cell = Cell::new(task);
502 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
503 let mut watcher = self.as_stream();
504 do watcher.write(buf) |_watcher, status| {
505 let result = if status.is_none() {
508 Err(uv_error_to_io_error(status.unwrap()))
511 unsafe { (*result_cell_ptr).put_back(result); }
513 let scheduler = Local::take::<Scheduler>();
514 scheduler.resume_blocked_task_immediately(task_cell.take());
518 assert!(!result_cell.is_empty());
519 return result_cell.take();
522 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
523 socket_name(TcpPeer, **self)
526 fn control_congestion(&mut self) -> Result<(), IoError> {
528 uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int)
531 match status_to_maybe_uv_error(**self, r) {
532 Some(err) => Err(uv_error_to_io_error(err)),
537 fn nodelay(&mut self) -> Result<(), IoError> {
539 uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int)
542 match status_to_maybe_uv_error(**self, r) {
543 Some(err) => Err(uv_error_to_io_error(err)),
548 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
550 uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int,
551 delay_in_seconds as c_uint)
554 match status_to_maybe_uv_error(**self, r) {
555 Some(err) => Err(uv_error_to_io_error(err)),
560 fn letdie(&mut self) -> Result<(), IoError> {
562 uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint)
565 match status_to_maybe_uv_error(**self, r) {
566 Some(err) => Err(uv_error_to_io_error(err)),
572 pub struct UvUdpSocket(UdpWatcher);
574 impl Drop for UvUdpSocket {
576 rtdebug!("closing udp socket");
577 let scheduler = Local::take::<Scheduler>();
578 do scheduler.deschedule_running_task_and_then |_, task| {
579 let task_cell = Cell::new(task);
581 let scheduler = Local::take::<Scheduler>();
582 scheduler.resume_blocked_task_immediately(task_cell.take());
588 impl RtioSocket for UvUdpSocket {
589 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
590 socket_name(Udp, **self)
594 impl RtioUdpSocket for UvUdpSocket {
595 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
596 let result_cell = Cell::new_empty();
597 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
599 let scheduler = Local::take::<Scheduler>();
600 let buf_ptr: *&mut [u8] = &buf;
601 do scheduler.deschedule_running_task_and_then |_sched, task| {
602 rtdebug!("recvfrom: entered scheduler context");
603 let task_cell = Cell::new(task);
604 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
605 do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
606 let _ = flags; // XXX add handling for partials?
610 let result = match status {
613 Ok((nread as uint, addr))
615 Some(err) => Err(uv_error_to_io_error(err))
618 unsafe { (*result_cell_ptr).put_back(result); }
620 let scheduler = Local::take::<Scheduler>();
621 scheduler.resume_blocked_task_immediately(task_cell.take());
625 assert!(!result_cell.is_empty());
626 return result_cell.take();
629 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
630 let result_cell = Cell::new_empty();
631 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
632 let scheduler = Local::take::<Scheduler>();
633 let buf_ptr: *&[u8] = &buf;
634 do scheduler.deschedule_running_task_and_then |_, task| {
635 let task_cell = Cell::new(task);
636 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
637 do self.send(buf, dst) |_watcher, status| {
639 let result = match status {
641 Some(err) => Err(uv_error_to_io_error(err)),
644 unsafe { (*result_cell_ptr).put_back(result); }
646 let scheduler = Local::take::<Scheduler>();
647 scheduler.resume_blocked_task_immediately(task_cell.take());
651 assert!(!result_cell.is_empty());
652 return result_cell.take();
655 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
657 do multi.to_str().to_c_str().with_ref |m_addr| {
658 uvll::udp_set_membership(self.native_handle(), m_addr,
659 ptr::null(), uvll::UV_JOIN_GROUP)
663 match status_to_maybe_uv_error(**self, r) {
664 Some(err) => Err(uv_error_to_io_error(err)),
669 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
671 do multi.to_str().to_c_str().with_ref |m_addr| {
672 uvll::udp_set_membership(self.native_handle(), m_addr,
673 ptr::null(), uvll::UV_LEAVE_GROUP)
677 match status_to_maybe_uv_error(**self, r) {
678 Some(err) => Err(uv_error_to_io_error(err)),
683 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
685 uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int)
688 match status_to_maybe_uv_error(**self, r) {
689 Some(err) => Err(uv_error_to_io_error(err)),
694 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
696 uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int)
699 match status_to_maybe_uv_error(**self, r) {
700 Some(err) => Err(uv_error_to_io_error(err)),
705 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
707 uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int)
710 match status_to_maybe_uv_error(**self, r) {
711 Some(err) => Err(uv_error_to_io_error(err)),
716 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
718 uvll::udp_set_ttl(self.native_handle(), ttl as c_int)
721 match status_to_maybe_uv_error(**self, r) {
722 Some(err) => Err(uv_error_to_io_error(err)),
727 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
729 uvll::udp_set_broadcast(self.native_handle(), 1 as c_int)
732 match status_to_maybe_uv_error(**self, r) {
733 Some(err) => Err(uv_error_to_io_error(err)),
738 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
740 uvll::udp_set_broadcast(self.native_handle(), 0 as c_int)
743 match status_to_maybe_uv_error(**self, r) {
744 Some(err) => Err(uv_error_to_io_error(err)),
750 pub struct UvTimer(timer::TimerWatcher);
753 fn new(w: timer::TimerWatcher) -> UvTimer {
758 impl Drop for UvTimer {
760 rtdebug!("closing UvTimer");
761 let scheduler = Local::take::<Scheduler>();
762 do scheduler.deschedule_running_task_and_then |_, task| {
763 let task_cell = Cell::new(task);
765 let scheduler = Local::take::<Scheduler>();
766 scheduler.resume_blocked_task_immediately(task_cell.take());
772 impl RtioTimer for UvTimer {
773 fn sleep(&self, msecs: u64) {
774 let scheduler = Local::take::<Scheduler>();
775 do scheduler.deschedule_running_task_and_then |_sched, task| {
776 rtdebug!("sleep: entered scheduler context");
777 let task_cell = Cell::new(task);
778 let mut watcher = **self;
779 do watcher.start(msecs, 0) |_, status| {
780 assert!(status.is_none());
781 let scheduler = Local::take::<Scheduler>();
782 scheduler.resume_blocked_task_immediately(task_cell.take());
791 fn test_simple_io_no_connect() {
792 do run_in_newsched_task {
794 let io = Local::unsafe_borrow::<IoFactoryObject>();
795 let addr = next_test_ip4();
796 let maybe_chan = (*io).tcp_connect(addr);
797 assert!(maybe_chan.is_err());
803 fn test_simple_udp_io_bind_only() {
804 do run_in_newsched_task {
806 let io = Local::unsafe_borrow::<IoFactoryObject>();
807 let addr = next_test_ip4();
808 let maybe_socket = (*io).udp_bind(addr);
809 assert!(maybe_socket.is_ok());
815 fn test_simple_tcp_server_and_client() {
816 do run_in_newsched_task {
817 let addr = next_test_ip4();
819 // Start the server first so it's listening when we connect
822 let io = Local::unsafe_borrow::<IoFactoryObject>();
823 let mut listener = (*io).tcp_bind(addr).unwrap();
824 let mut stream = listener.accept().unwrap();
825 let mut buf = [0, .. 2048];
826 let nread = stream.read(buf).unwrap();
827 assert_eq!(nread, 8);
828 for i in range(0u, nread) {
829 rtdebug!("%u", buf[i] as uint);
830 assert_eq!(buf[i], i as u8);
837 let io = Local::unsafe_borrow::<IoFactoryObject>();
838 let mut stream = (*io).tcp_connect(addr).unwrap();
839 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
846 fn test_simple_udp_server_and_client() {
847 do run_in_newsched_task {
848 let server_addr = next_test_ip4();
849 let client_addr = next_test_ip4();
853 let io = Local::unsafe_borrow::<IoFactoryObject>();
854 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
855 let mut buf = [0, .. 2048];
856 let (nread,src) = server_socket.recvfrom(buf).unwrap();
857 assert_eq!(nread, 8);
858 for i in range(0u, nread) {
859 rtdebug!("%u", buf[i] as uint);
860 assert_eq!(buf[i], i as u8);
862 assert_eq!(src, client_addr);
868 let io = Local::unsafe_borrow::<IoFactoryObject>();
869 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
870 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
876 #[test] #[ignore(reason = "busted")]
877 fn test_read_and_block() {
878 do run_in_newsched_task {
879 let addr = next_test_ip4();
882 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
883 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
884 let mut stream = listener.accept().unwrap();
885 let mut buf = [0, .. 2048];
891 while current < expected {
892 let nread = stream.read(buf).unwrap();
893 for i in range(0u, nread) {
894 let val = buf[i] as uint;
895 assert_eq!(val, current % 8);
900 let scheduler = Local::take::<Scheduler>();
901 // Yield to the other task in hopes that it
902 // will trigger a read callback while we are
904 do scheduler.deschedule_running_task_and_then |sched, task| {
905 let task = Cell::new(task);
906 sched.enqueue_blocked_task(task.take());
910 // Make sure we had multiple reads
916 let io = Local::unsafe_borrow::<IoFactoryObject>();
917 let mut stream = (*io).tcp_connect(addr).unwrap();
918 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
919 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
920 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
921 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
929 fn test_read_read_read() {
930 do run_in_newsched_task {
931 let addr = next_test_ip4();
932 static MAX: uint = 500000;
936 let io = Local::unsafe_borrow::<IoFactoryObject>();
937 let mut listener = (*io).tcp_bind(addr).unwrap();
938 let mut stream = listener.accept().unwrap();
939 let buf = [1, .. 2048];
940 let mut total_bytes_written = 0;
941 while total_bytes_written < MAX {
943 total_bytes_written += buf.len();
950 let io = Local::unsafe_borrow::<IoFactoryObject>();
951 let mut stream = (*io).tcp_connect(addr).unwrap();
952 let mut buf = [0, .. 2048];
953 let mut total_bytes_read = 0;
954 while total_bytes_read < MAX {
955 let nread = stream.read(buf).unwrap();
956 rtdebug!("read %u bytes", nread as uint);
957 total_bytes_read += nread;
958 for i in range(0u, nread) {
959 assert_eq!(buf[i], 1);
962 rtdebug!("read %u bytes total", total_bytes_read as uint);
969 fn test_udp_twice() {
970 do run_in_newsched_task {
971 let server_addr = next_test_ip4();
972 let client_addr = next_test_ip4();
976 let io = Local::unsafe_borrow::<IoFactoryObject>();
977 let mut client = (*io).udp_bind(client_addr).unwrap();
978 assert!(client.sendto([1], server_addr).is_ok());
979 assert!(client.sendto([2], server_addr).is_ok());
985 let io = Local::unsafe_borrow::<IoFactoryObject>();
986 let mut server = (*io).udp_bind(server_addr).unwrap();
989 let (nread1, src1) = server.recvfrom(buf1).unwrap();
990 let (nread2, src2) = server.recvfrom(buf2).unwrap();
991 assert_eq!(nread1, 1);
992 assert_eq!(nread2, 1);
993 assert_eq!(src1, client_addr);
994 assert_eq!(src2, client_addr);
995 assert_eq!(buf1[0], 1);
996 assert_eq!(buf2[0], 2);
1003 fn test_udp_many_read() {
1004 do run_in_newsched_task {
1005 let server_out_addr = next_test_ip4();
1006 let server_in_addr = next_test_ip4();
1007 let client_out_addr = next_test_ip4();
1008 let client_in_addr = next_test_ip4();
1009 static MAX: uint = 500_000;
1013 let io = Local::unsafe_borrow::<IoFactoryObject>();
1014 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1015 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1016 let msg = [1, .. 2048];
1017 let mut total_bytes_sent = 0;
1021 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1022 total_bytes_sent += msg.len();
1023 // check if the client has received enough
1024 let res = server_in.recvfrom(buf);
1025 assert!(res.is_ok());
1026 let (nread, src) = res.unwrap();
1027 assert_eq!(nread, 1);
1028 assert_eq!(src, client_out_addr);
1030 assert!(total_bytes_sent >= MAX);
1036 let io = Local::unsafe_borrow::<IoFactoryObject>();
1037 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1038 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1039 let mut total_bytes_recv = 0;
1040 let mut buf = [0, .. 2048];
1041 while total_bytes_recv < MAX {
1043 assert!(client_out.sendto([1], server_in_addr).is_ok());
1045 let res = client_in.recvfrom(buf);
1046 assert!(res.is_ok());
1047 let (nread, src) = res.unwrap();
1048 assert_eq!(src, server_out_addr);
1049 total_bytes_recv += nread;
1050 for i in range(0u, nread) {
1051 assert_eq!(buf[i], 1);
1054 // tell the server we're done
1055 assert!(client_out.sendto([0], server_in_addr).is_ok());
1061 fn test_timer_sleep_simple_impl() {
1063 let io = Local::unsafe_borrow::<IoFactoryObject>();
1064 let timer = (*io).timer_init();
1066 Ok(t) => t.sleep(1),
1067 Err(_) => assert!(false)
1072 fn test_timer_sleep_simple() {
1073 do run_in_newsched_task {
1074 test_timer_sleep_simple_impl();