1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
14 use std::rt::BlockedTask;
15 use std::rt::io::IoError;
16 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
17 use std::rt::local::Local;
19 use std::rt::sched::{Scheduler, SchedHandle};
20 use std::rt::tube::Tube;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26 uv_error_to_io_error, UvHandle, slice_to_uv_buf};
30 ////////////////////////////////////////////////////////////////////////////////
31 /// Generic functions related to dealing with sockaddr things
32 ////////////////////////////////////////////////////////////////////////////////
34 pub enum UvSocketAddr {
35 UvIpv4SocketAddr(*uvll::sockaddr_in),
36 UvIpv6SocketAddr(*uvll::sockaddr_in6),
39 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
41 assert!((uvll::is_ip4_addr(addr) || uvll::is_ip6_addr(addr)));
42 assert!(!(uvll::is_ip4_addr(addr) && uvll::is_ip6_addr(addr)));
44 _ if uvll::is_ip4_addr(addr) =>
45 UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
46 _ if uvll::is_ip6_addr(addr) =>
47 UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
53 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
54 let malloc = match addr.ip {
55 Ipv4Addr(*) => uvll::malloc_ip4_addr,
56 Ipv6Addr(*) => uvll::malloc_ip6_addr,
58 let wrap = match addr.ip {
59 Ipv4Addr(*) => UvIpv4SocketAddr,
60 Ipv6Addr(*) => UvIpv6SocketAddr,
62 let free = match addr.ip {
63 Ipv4Addr(*) => uvll::free_ip4_addr,
64 Ipv6Addr(*) => uvll::free_ip6_addr,
67 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
71 unsafe { free(addr) };
75 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
76 let ip_size = match addr {
77 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
78 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
81 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
83 let buf_ptr = vec::raw::to_ptr(buf);
85 UvIpv4SocketAddr(addr) =>
86 uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
87 UvIpv6SocketAddr(addr) =>
88 uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
93 let ip_port = unsafe {
94 let port = match addr {
95 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
96 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
100 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
101 let ip_addr = FromStr::from_str(ip_str).unwrap();
103 // finally run the closure
104 f(SocketAddr { ip: ip_addr, port: ip_port })
107 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
109 uv_socket_addr_as_socket_addr(addr, util::id)
114 fn test_ip4_conversion() {
116 let ip4 = rt::test::next_test_ip4();
117 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
122 fn test_ip6_conversion() {
124 let ip6 = rt::test::next_test_ip6();
125 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
128 enum SocketNameKind {
134 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
135 let getsockname = match sk {
136 TcpPeer => uvll::tcp_getpeername,
137 Tcp => uvll::tcp_getsockname,
138 Udp => uvll::udp_getsockname,
141 // Allocate a sockaddr_storage
142 // since we don't know if it's ipv4 or ipv6
143 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
146 getsockname(handle, r_addr as *uvll::sockaddr_storage)
150 return Err(uv_error_to_io_error(UvError(r)));
154 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
155 uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
157 uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
161 unsafe { uvll::free_sockaddr_storage(r_addr); }
167 ////////////////////////////////////////////////////////////////////////////////
168 /// TCP implementation
169 ////////////////////////////////////////////////////////////////////////////////
171 pub struct TcpWatcher {
172 handle: *uvll::uv_tcp_t,
173 stream: StreamWatcher,
177 pub struct TcpListener {
179 handle: *uvll::uv_pipe_t,
180 priv closing_task: Option<BlockedTask>,
181 priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
184 pub struct TcpAcceptor {
185 listener: ~TcpListener,
186 priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
189 // TCP watchers (clients/streams)
192 pub fn new(loop_: &Loop) -> TcpWatcher {
193 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
195 uvll::uv_tcp_init(loop_.handle, handle)
198 home: get_handle_to_current_scheduler!(),
200 stream: StreamWatcher::new(handle),
204 pub fn connect(loop_: &mut Loop, address: SocketAddr)
205 -> Result<TcpWatcher, UvError>
207 struct Ctx { status: c_int, task: Option<BlockedTask> }
209 let tcp = TcpWatcher::new(loop_);
210 let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
211 let req = Request::new(uvll::UV_CONNECT);
212 let result = match addr {
213 UvIpv4SocketAddr(addr) => unsafe {
214 uvll::tcp_connect(req.handle, tcp.handle, addr,
217 UvIpv6SocketAddr(addr) => unsafe {
218 uvll::tcp_connect6(req.handle, tcp.handle, addr,
224 let mut cx = Ctx { status: 0, task: None };
227 let scheduler: ~Scheduler = Local::take();
228 do scheduler.deschedule_running_task_and_then |_, task| {
229 cx.task = Some(task);
233 n => Err(UvError(n)),
245 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
246 let req = Request::wrap(req);
247 if status == uvll::ECANCELED { return }
248 let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
250 let scheduler: ~Scheduler = Local::take();
251 scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
256 impl HomingIO for TcpWatcher {
257 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
260 impl rtio::RtioSocket for TcpWatcher {
261 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
262 let _m = self.fire_missiles();
263 socket_name(Tcp, self.handle)
267 impl rtio::RtioTcpStream for TcpWatcher {
268 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
269 let _m = self.fire_missiles();
270 self.stream.read(buf).map_err(uv_error_to_io_error)
273 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
274 let _m = self.fire_missiles();
275 self.stream.write(buf).map_err(uv_error_to_io_error)
278 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
279 let _m = self.fire_missiles();
280 socket_name(TcpPeer, self.handle)
283 fn control_congestion(&mut self) -> Result<(), IoError> {
284 let _m = self.fire_missiles();
285 status_to_io_result(unsafe {
286 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
290 fn nodelay(&mut self) -> Result<(), IoError> {
291 let _m = self.fire_missiles();
292 status_to_io_result(unsafe {
293 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
297 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
298 let _m = self.fire_missiles();
299 status_to_io_result(unsafe {
300 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
301 delay_in_seconds as c_uint)
305 fn letdie(&mut self) -> Result<(), IoError> {
306 let _m = self.fire_missiles();
307 status_to_io_result(unsafe {
308 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
313 impl Drop for TcpWatcher {
315 let _m = self.fire_missiles();
320 // TCP listeners (unbound servers)
323 pub fn bind(loop_: &mut Loop, address: SocketAddr)
324 -> Result<~TcpListener, UvError>
326 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
328 uvll::uv_tcp_init(loop_.handle, handle)
330 let l = ~TcpListener {
331 home: get_handle_to_current_scheduler!(),
334 outgoing: Tube::new(),
336 let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
338 UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
339 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
343 0 => Ok(l.install()),
349 impl HomingIO for TcpListener {
350 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
353 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
354 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
357 impl rtio::RtioSocket for TcpListener {
358 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
359 let _m = self.fire_missiles();
360 socket_name(Tcp, self.handle)
364 impl rtio::RtioTcpListener for TcpListener {
365 fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
366 // create the acceptor object from ourselves
367 let incoming = self.outgoing.clone();
368 let mut acceptor = ~TcpAcceptor {
373 let _m = acceptor.fire_missiles();
374 // XXX: the 128 backlog should be configurable
375 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
376 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
377 n => Err(uv_error_to_io_error(UvError(n))),
382 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
383 let msg = match status {
385 let loop_ = Loop::wrap(unsafe {
386 uvll::get_loop_for_uv_handle(server)
388 let client = TcpWatcher::new(&loop_);
389 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
390 Ok(~client as ~rtio::RtioTcpStream)
392 uvll::ECANCELED => return,
393 n => Err(uv_error_to_io_error(UvError(n)))
396 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
397 tcp.outgoing.send(msg);
400 impl Drop for TcpListener {
402 let (_m, sched) = self.fire_missiles_sched();
404 do sched.deschedule_running_task_and_then |_, task| {
405 self.closing_task = Some(task);
406 unsafe { uvll::uv_close(self.handle, listener_close_cb) }
411 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
412 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
413 unsafe { uvll::free_handle(handle) }
415 let sched: ~Scheduler = Local::take();
416 sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
419 // TCP acceptors (bound servers)
421 impl HomingIO for TcpAcceptor {
422 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
425 impl rtio::RtioSocket for TcpAcceptor {
426 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
427 let _m = self.fire_missiles();
428 socket_name(Tcp, self.listener.handle)
432 impl rtio::RtioTcpAcceptor for TcpAcceptor {
433 fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
434 let _m = self.fire_missiles();
438 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
439 let _m = self.fire_missiles();
440 status_to_io_result(unsafe {
441 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
445 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
446 let _m = self.fire_missiles();
447 status_to_io_result(unsafe {
448 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
453 ////////////////////////////////////////////////////////////////////////////////
454 /// UDP implementation
455 ////////////////////////////////////////////////////////////////////////////////
457 pub struct UdpWatcher {
458 handle: *uvll::uv_udp_t,
463 pub fn bind(loop_: &Loop, address: SocketAddr)
464 -> Result<UdpWatcher, UvError>
466 let udp = UdpWatcher {
467 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
468 home: get_handle_to_current_scheduler!(),
471 uvll::uv_udp_init(loop_.handle, udp.handle)
473 let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
475 UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
476 UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
481 n => Err(UvError(n)),
486 impl HomingIO for UdpWatcher {
487 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
490 impl rtio::RtioSocket for UdpWatcher {
491 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
492 let _m = self.fire_missiles();
493 socket_name(Udp, self.handle)
497 impl rtio::RtioUdpSocket for UdpWatcher {
498 fn recvfrom(&mut self, buf: &mut [u8])
499 -> Result<(uint, SocketAddr), IoError>
502 task: Option<BlockedTask>,
504 result: Option<(ssize_t, SocketAddr)>,
506 let _m = self.fire_missiles();
508 return match unsafe {
509 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
514 buf: Some(slice_to_uv_buf(buf)),
517 unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
518 let scheduler: ~Scheduler = Local::take();
519 do scheduler.deschedule_running_task_and_then |_, task| {
520 cx.task = Some(task);
522 match cx.result.take_unwrap() {
524 Err(uv_error_to_io_error(UvError(n as c_int))),
525 (n, addr) => Ok((n as uint, addr))
528 n => Err(uv_error_to_io_error(UvError(n)))
531 extern fn alloc_cb(handle: *uvll::uv_udp_t,
532 _suggested_size: size_t) -> Buf {
533 let cx: &mut Ctx = unsafe {
534 cast::transmute(uvll::get_data_for_uv_handle(handle))
536 cx.buf.take().expect("alloc_cb called more than once")
539 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
540 addr: *uvll::sockaddr, _flags: c_uint) {
542 // When there's no data to read the recv callback can be a no-op.
543 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
544 // this we just drop back to kqueue and wait for the next callback.
545 if nread == 0 { return }
546 if nread == uvll::ECANCELED as ssize_t { return }
549 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
552 let cx: &mut Ctx = unsafe {
553 cast::transmute(uvll::get_data_for_uv_handle(handle))
555 let addr = sockaddr_to_UvSocketAddr(addr);
556 let addr = uv_socket_addr_to_socket_addr(addr);
557 cx.result = Some((nread, addr));
559 let sched: ~Scheduler = Local::take();
560 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
564 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
565 struct Ctx { task: Option<BlockedTask>, result: c_int }
567 let _m = self.fire_missiles();
569 let req = Request::new(uvll::UV_UDP_SEND);
570 let buf = slice_to_uv_buf(buf);
571 let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
573 UvIpv4SocketAddr(dst) =>
574 uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
575 UvIpv6SocketAddr(dst) =>
576 uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
580 return match result {
582 let mut cx = Ctx { task: None, result: 0 };
586 let sched: ~Scheduler = Local::take();
587 do sched.deschedule_running_task_and_then |_, task| {
588 cx.task = Some(task);
593 n => Err(uv_error_to_io_error(UvError(n)))
596 n => Err(uv_error_to_io_error(UvError(n)))
599 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
600 let req = Request::wrap(req);
601 let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
604 let sched: ~Scheduler = Local::take();
605 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
609 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
610 let _m = self.fire_missiles();
611 status_to_io_result(unsafe {
612 do multi.to_str().with_c_str |m_addr| {
613 uvll::uv_udp_set_membership(self.handle,
620 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
621 let _m = self.fire_missiles();
622 status_to_io_result(unsafe {
623 do multi.to_str().with_c_str |m_addr| {
624 uvll::uv_udp_set_membership(self.handle,
626 uvll::UV_LEAVE_GROUP)
631 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
632 let _m = self.fire_missiles();
633 status_to_io_result(unsafe {
634 uvll::uv_udp_set_multicast_loop(self.handle,
639 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
640 let _m = self.fire_missiles();
641 status_to_io_result(unsafe {
642 uvll::uv_udp_set_multicast_loop(self.handle,
647 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
648 let _m = self.fire_missiles();
649 status_to_io_result(unsafe {
650 uvll::uv_udp_set_multicast_ttl(self.handle,
655 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
656 let _m = self.fire_missiles();
657 status_to_io_result(unsafe {
658 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
662 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
663 let _m = self.fire_missiles();
664 status_to_io_result(unsafe {
665 uvll::uv_udp_set_broadcast(self.handle,
670 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
671 let _m = self.fire_missiles();
672 status_to_io_result(unsafe {
673 uvll::uv_udp_set_broadcast(self.handle,
679 impl Drop for UdpWatcher {
681 // Send ourselves home to close this handle (blocking while doing so).
682 let (_m, sched) = self.fire_missiles_sched();
685 uvll::set_data_for_uv_handle(self.handle, &slot);
686 uvll::uv_close(self.handle, close_cb);
688 do sched.deschedule_running_task_and_then |_, task| {
692 extern fn close_cb(handle: *uvll::uv_handle_t) {
693 let slot: &mut Option<BlockedTask> = unsafe {
694 cast::transmute(uvll::get_data_for_uv_handle(handle))
696 let sched: ~Scheduler = Local::take();
697 sched.resume_blocked_task_immediately(slot.take_unwrap());
702 ////////////////////////////////////////////////////////////////////////////////
703 /// UV request support
704 ////////////////////////////////////////////////////////////////////////////////
709 use std::comm::oneshot;
710 use std::rt::test::*;
711 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
716 use super::super::{Loop, run_uv_loop};
719 fn connect_close_ip4() {
721 match TcpWatcher::connect(l, next_test_ip4()) {
723 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
729 fn connect_close_ip6() {
731 match TcpWatcher::connect(l, next_test_ip6()) {
733 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
739 fn udp_bind_close_ip4() {
741 match UdpWatcher::bind(l, next_test_ip4()) {
749 fn udp_bind_close_ip6() {
751 match UdpWatcher::bind(l, next_test_ip6()) {
761 let (port, chan) = oneshot();
762 let chan = Cell::new(chan);
763 let addr = next_test_ip4();
765 let handle = l.handle;
767 let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
768 Ok(w) => w, Err(e) => fail!("{:?}", e)
770 let mut w = match w.listen() {
771 Ok(w) => w, Err(e) => fail!("{:?}", e),
773 chan.take().send(());
776 let mut buf = [0u8, ..10];
777 match stream.read(buf) {
778 Ok(10) => {} e => fail!("{:?}", e),
780 for i in range(0, 10u8) {
781 assert_eq!(buf[i], i + 1);
784 Err(e) => fail!("{:?}", e)
789 let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
790 Ok(w) => w, Err(e) => fail!("{:?}", e)
792 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
793 Ok(()) => {}, Err(e) => fail!("{:?}", e)
801 let (port, chan) = oneshot();
802 let chan = Cell::new(chan);
803 let addr = next_test_ip6();
805 let handle = l.handle;
807 let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
808 Ok(w) => w, Err(e) => fail!("{:?}", e)
810 let mut w = match w.listen() {
811 Ok(w) => w, Err(e) => fail!("{:?}", e),
813 chan.take().send(());
816 let mut buf = [0u8, ..10];
817 match stream.read(buf) {
818 Ok(10) => {} e => fail!("{:?}", e),
820 for i in range(0, 10u8) {
821 assert_eq!(buf[i], i + 1);
824 Err(e) => fail!("{:?}", e)
829 let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
830 Ok(w) => w, Err(e) => fail!("{:?}", e)
832 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
833 Ok(()) => {}, Err(e) => fail!("{:?}", e)
841 let (port, chan) = oneshot();
842 let chan = Cell::new(chan);
843 let client = next_test_ip4();
844 let server = next_test_ip4();
846 let handle = l.handle;
848 match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
850 chan.take().send(());
851 let mut buf = [0u8, ..10];
852 match w.recvfrom(buf) {
853 Ok((10, addr)) => assert_eq!(addr, client),
854 e => fail!("{:?}", e),
856 for i in range(0, 10u8) {
857 assert_eq!(buf[i], i + 1);
860 Err(e) => fail!("{:?}", e)
865 let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
866 Ok(w) => w, Err(e) => fail!("{:?}", e)
868 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
869 Ok(()) => {}, Err(e) => fail!("{:?}", e)
877 let (port, chan) = oneshot();
878 let chan = Cell::new(chan);
879 let client = next_test_ip6();
880 let server = next_test_ip6();
882 let handle = l.handle;
884 match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
886 chan.take().send(());
887 let mut buf = [0u8, ..10];
888 match w.recvfrom(buf) {
889 Ok((10, addr)) => assert_eq!(addr, client),
890 e => fail!("{:?}", e),
892 for i in range(0, 10u8) {
893 assert_eq!(buf[i], i + 1);
896 Err(e) => fail!("{:?}", e)
901 let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
902 Ok(w) => w, Err(e) => fail!("{:?}", e)
904 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
905 Ok(()) => {}, Err(e) => fail!("{:?}", e)
911 fn test_read_read_read() {
913 let addr = next_test_ip4();
914 static MAX: uint = 500000;
915 let (port, chan) = oneshot();
916 let port = Cell::new(port);
917 let chan = Cell::new(chan);
919 let handle = l.handle;
921 let l = &mut Loop::wrap(handle);
922 let listener = TcpListener::bind(l, addr).unwrap();
923 let mut acceptor = listener.listen().unwrap();
924 chan.take().send(());
925 let mut stream = acceptor.accept().unwrap();
926 let buf = [1, .. 2048];
927 let mut total_bytes_written = 0;
928 while total_bytes_written < MAX {
930 total_bytes_written += buf.len();
935 let l = &mut Loop::wrap(handle);
937 let mut stream = TcpWatcher::connect(l, addr).unwrap();
938 let mut buf = [0, .. 2048];
939 let mut total_bytes_read = 0;
940 while total_bytes_read < MAX {
941 let nread = stream.read(buf).unwrap();
942 uvdebug!("read {} bytes", nread);
943 total_bytes_read += nread;
944 for i in range(0u, nread) {
945 assert_eq!(buf[i], 1);
948 uvdebug!("read {} bytes total", total_bytes_read);
954 #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
955 fn test_udp_twice() {
957 let server_addr = next_test_ip4();
958 let client_addr = next_test_ip4();
959 let (port, chan) = oneshot();
960 let port = Cell::new(port);
961 let chan = Cell::new(chan);
963 let handle = l.handle;
965 let l = &mut Loop::wrap(handle);
966 let mut client = UdpWatcher::bind(l, client_addr).unwrap();
968 assert!(client.sendto([1], server_addr).is_ok());
969 assert!(client.sendto([2], server_addr).is_ok());
973 let l = &mut Loop::wrap(handle);
974 let mut server = UdpWatcher::bind(l, server_addr).unwrap();
975 chan.take().send(());
978 let (nread1, src1) = server.recvfrom(buf1).unwrap();
979 let (nread2, src2) = server.recvfrom(buf2).unwrap();
980 assert_eq!(nread1, 1);
981 assert_eq!(nread2, 1);
982 assert_eq!(src1, client_addr);
983 assert_eq!(src2, client_addr);
984 assert_eq!(buf1[0], 1);
985 assert_eq!(buf2[0], 2);
991 fn test_udp_many_read() {
993 let server_out_addr = next_test_ip4();
994 let server_in_addr = next_test_ip4();
995 let client_out_addr = next_test_ip4();
996 let client_in_addr = next_test_ip4();
997 static MAX: uint = 500_000;
999 let (p1, c1) = oneshot();
1000 let (p2, c2) = oneshot();
1002 let first = Cell::new((p1, c2));
1003 let second = Cell::new((p2, c1));
1005 let handle = l.handle;
1007 let l = &mut Loop::wrap(handle);
1008 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1009 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1010 let (port, chan) = first.take();
1013 let msg = [1, .. 2048];
1014 let mut total_bytes_sent = 0;
1018 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1019 total_bytes_sent += msg.len();
1020 // check if the client has received enough
1021 let res = server_in.recvfrom(buf);
1022 assert!(res.is_ok());
1023 let (nread, src) = res.unwrap();
1024 assert_eq!(nread, 1);
1025 assert_eq!(src, client_out_addr);
1027 assert!(total_bytes_sent >= MAX);
1031 let l = &mut Loop::wrap(handle);
1032 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1033 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1034 let (port, chan) = second.take();
1037 let mut total_bytes_recv = 0;
1038 let mut buf = [0, .. 2048];
1039 while total_bytes_recv < MAX {
1041 assert!(client_out.sendto([1], server_in_addr).is_ok());
1043 let res = client_in.recvfrom(buf);
1044 assert!(res.is_ok());
1045 let (nread, src) = res.unwrap();
1046 assert_eq!(src, server_out_addr);
1047 total_bytes_recv += nread;
1048 for i in range(0u, nread) {
1049 assert_eq!(buf[i], 1);
1052 // tell the server we're done
1053 assert!(client_out.sendto([0], server_in_addr).is_ok());
1059 fn test_read_and_block() {
1060 do run_uv_loop |l| {
1061 let addr = next_test_ip4();
1062 let (port, chan) = oneshot();
1063 let port = Cell::new(port);
1064 let chan = Cell::new(chan);
1066 let handle = l.handle;
1068 let l = &mut Loop::wrap(handle);
1069 let listener = TcpListener::bind(l, addr).unwrap();
1070 let mut acceptor = listener.listen().unwrap();
1071 let (port2, chan2) = stream();
1072 chan.take().send(port2);
1073 let mut stream = acceptor.accept().unwrap();
1074 let mut buf = [0, .. 2048];
1077 let mut current = 0;
1080 while current < expected {
1081 let nread = stream.read(buf).unwrap();
1082 for i in range(0u, nread) {
1083 let val = buf[i] as uint;
1084 assert_eq!(val, current % 8);
1092 // Make sure we had multiple reads
1097 let l = &mut Loop::wrap(handle);
1098 let port2 = port.take().recv();
1099 let mut stream = TcpWatcher::connect(l, addr).unwrap();
1100 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1101 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1103 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1104 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1111 fn test_simple_tcp_server_and_client_on_diff_threads() {
1112 let addr = next_test_ip4();
1114 do task::spawn_sched(task::SingleThreaded) {
1115 do run_uv_loop |l| {
1116 let listener = TcpListener::bind(l, addr).unwrap();
1117 let mut acceptor = listener.listen().unwrap();
1118 let mut stream = acceptor.accept().unwrap();
1119 let mut buf = [0, .. 2048];
1120 let nread = stream.read(buf).unwrap();
1121 assert_eq!(nread, 8);
1122 for i in range(0u, nread) {
1123 assert_eq!(buf[i], i as u8);
1128 do task::spawn_sched(task::SingleThreaded) {
1129 do run_uv_loop |l| {
1130 let mut stream = TcpWatcher::connect(l, addr);
1131 while stream.is_err() {
1132 stream = TcpWatcher::connect(l, addr);
1134 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1139 // On one thread, create a udp socket. Then send that socket to another
1140 // thread and destroy the socket on the remote thread. This should make sure
1141 // that homing kicks in for the socket to go back home to the original
1142 // thread, close itself, and then come back to the last thread.
1144 fn test_homing_closes_correctly() {
1145 let (port, chan) = oneshot();
1146 let port = Cell::new(port);
1147 let chan = Cell::new(chan);
1149 do task::spawn_sched(task::SingleThreaded) {
1150 let chan = Cell::new(chan.take());
1151 do run_uv_loop |l| {
1152 let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
1153 chan.take().send(listener);
1157 do task::spawn_sched(task::SingleThreaded) {
1158 let port = Cell::new(port.take());
1159 do run_uv_loop |_l| {
1165 // This is a bit of a crufty old test, but it has its uses.
1167 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1169 use std::rt::local::Local;
1170 use std::rt::rtio::{EventLoop, IoFactory};
1171 use std::rt::sched::Scheduler;
1172 use std::rt::sched::{Shutdown, TaskFromFriend};
1173 use std::rt::sleeper_list::SleeperList;
1174 use std::rt::task::Task;
1175 use std::rt::task::UnwindResult;
1176 use std::rt::thread::Thread;
1177 use std::rt::work_queue::WorkQueue;
1178 use std::unstable::run_in_bare_thread;
1179 use uvio::UvEventLoop;
1181 do run_in_bare_thread {
1182 let sleepers = SleeperList::new();
1183 let work_queue1 = WorkQueue::new();
1184 let work_queue2 = WorkQueue::new();
1185 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1187 let loop1 = ~UvEventLoop::new() as ~EventLoop;
1188 let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
1190 let loop2 = ~UvEventLoop::new() as ~EventLoop;
1191 let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
1194 let handle1 = Cell::new(sched1.make_handle());
1195 let handle2 = Cell::new(sched2.make_handle());
1196 let tasksFriendHandle = Cell::new(sched2.make_handle());
1198 let on_exit: ~fn(UnwindResult) = |exit_status| {
1199 handle1.take().send(Shutdown);
1200 handle2.take().send(Shutdown);
1201 assert!(exit_status.is_success());
1204 unsafe fn local_io() -> &'static mut IoFactory {
1205 do Local::borrow |sched: &mut Scheduler| {
1207 sched.event_loop.io(|i| io = Some(i));
1208 cast::transmute(io.unwrap())
1212 let test_function: ~fn() = || {
1213 let io = unsafe { local_io() };
1214 let addr = next_test_ip4();
1215 let maybe_socket = io.udp_bind(addr);
1216 // this socket is bound to this event loop
1217 assert!(maybe_socket.is_ok());
1219 // block self on sched1
1220 do task::unkillable { // FIXME(#8674)
1221 let scheduler: ~Scheduler = Local::take();
1222 do scheduler.deschedule_running_task_and_then |_, task| {
1224 do task.wake().map |task| {
1225 // send self to sched2
1226 tasksFriendHandle.take().send(TaskFromFriend(task));
1228 // sched1 should now sleep since it has nothing else to do
1231 // sched2 will wake up and get the task as we do nothing else,
1232 // the function ends and the socket goes out of scope sched2
1233 // will start to run the destructor the destructor will first
1234 // block the task, set it's home as sched1, then enqueue it
1235 // sched2 will dequeue the task, see that it has a home, and
1236 // send it to sched1 sched1 will wake up, exec the close
1237 // function on the correct loop, and then we're done
1240 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
1242 main_task.death.on_exit = Some(on_exit);
1243 let main_task = Cell::new(main_task);
1245 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool,
1248 let sched1 = Cell::new(sched1);
1249 let sched2 = Cell::new(sched2);
1251 let thread1 = do Thread::start {
1252 sched1.take().bootstrap(main_task.take());
1254 let thread2 = do Thread::start {
1255 sched2.take().bootstrap(null_task.take());