1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
15 use std::rt::BlockedTask;
16 use std::rt::io::IoError;
17 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
18 use std::rt::local::Local;
20 use std::rt::sched::{Scheduler, SchedHandle};
21 use std::rt::tube::Tube;
26 use stream::StreamWatcher;
27 use super::{Loop, Request, UvError, Buf, status_to_io_result,
28 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
29 wait_until_woken_after};
34 ////////////////////////////////////////////////////////////////////////////////
35 /// Generic functions related to dealing with sockaddr things
36 ////////////////////////////////////////////////////////////////////////////////
38 #[fixed_stack_segment]
39 fn socket_addr_as_sockaddr<T>(addr: SocketAddr, f: &fn(*sockaddr) -> T) -> T {
40 let malloc = match addr.ip {
41 Ipv4Addr(*) => uvll::rust_malloc_ip4_addr,
42 Ipv6Addr(*) => uvll::rust_malloc_ip6_addr,
45 let ip = addr.ip.to_str();
46 let addr = ip.with_c_str(|p| unsafe { malloc(p, addr.port as c_int) });
50 unsafe { libc::free(addr) };
54 #[fixed_stack_segment]
55 pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr {
57 let ip_size = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
58 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/
59 } else if uvll::rust_is_ipv6_sockaddr(addr) == 1 {
60 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/
62 fail!("unknown address?");
65 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
66 let buf_ptr = vec::raw::to_ptr(buf);
67 if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
68 uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t);
70 uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t);
75 let port = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
76 uvll::rust_ip4_port(addr)
78 uvll::rust_ip6_port(addr)
82 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
83 let ip_addr = FromStr::from_str(ip_str).unwrap();
85 SocketAddr { ip: ip_addr, port: ip_port }
91 fn test_ip4_conversion() {
93 let ip4 = rt::test::next_test_ip4();
94 do socket_addr_as_sockaddr(ip4) |addr| {
95 assert_eq!(ip4, sockaddr_to_socket_addr(addr));
101 fn test_ip6_conversion() {
103 let ip6 = rt::test::next_test_ip6();
104 do socket_addr_as_sockaddr(ip6) |addr| {
105 assert_eq!(ip6, sockaddr_to_socket_addr(addr));
109 enum SocketNameKind {
115 #[fixed_stack_segment]
116 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
118 let getsockname = match sk {
119 TcpPeer => uvll::uv_tcp_getpeername,
120 Tcp => uvll::uv_tcp_getsockname,
121 Udp => uvll::uv_udp_getsockname,
124 // Allocate a sockaddr_storage
125 // since we don't know if it's ipv4 or ipv6
126 let size = uvll::rust_sockaddr_size();
127 let name = libc::malloc(size as size_t);
128 assert!(!name.is_null());
129 let mut namelen = size;
131 let ret = match getsockname(handle, name, &mut namelen) {
132 0 => Ok(sockaddr_to_socket_addr(name)),
133 n => Err(uv_error_to_io_error(UvError(n)))
140 ////////////////////////////////////////////////////////////////////////////////
141 /// TCP implementation
142 ////////////////////////////////////////////////////////////////////////////////
144 pub struct TcpWatcher {
145 handle: *uvll::uv_tcp_t,
146 stream: StreamWatcher,
150 pub struct TcpListener {
152 handle: *uvll::uv_pipe_t,
153 priv closing_task: Option<BlockedTask>,
154 priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
157 pub struct TcpAcceptor {
158 listener: ~TcpListener,
159 priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
162 // TCP watchers (clients/streams)
165 pub fn new(loop_: &Loop) -> TcpWatcher {
166 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
168 uvll::uv_tcp_init(loop_.handle, handle)
171 home: get_handle_to_current_scheduler!(),
173 stream: StreamWatcher::new(handle),
177 pub fn connect(loop_: &mut Loop, address: SocketAddr)
178 -> Result<TcpWatcher, UvError>
180 struct Ctx { status: c_int, task: Option<BlockedTask> }
182 return do task::unkillable {
183 let tcp = TcpWatcher::new(loop_);
184 let ret = do socket_addr_as_sockaddr(address) |addr| {
185 let mut req = Request::new(uvll::UV_CONNECT);
186 let result = unsafe {
187 uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
192 req.defuse(); // uv callback now owns this request
193 let mut cx = Ctx { status: 0, task: None };
194 do wait_until_woken_after(&mut cx.task) {
199 n => Err(UvError(n)),
212 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
213 let req = Request::wrap(req);
214 assert!(status != uvll::ECANCELED);
215 let cx: &mut Ctx = unsafe { req.get_data() };
217 let scheduler: ~Scheduler = Local::take();
218 scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
223 impl HomingIO for TcpWatcher {
224 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
227 impl rtio::RtioSocket for TcpWatcher {
228 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
229 let _m = self.fire_homing_missile();
230 socket_name(Tcp, self.handle)
234 impl rtio::RtioTcpStream for TcpWatcher {
235 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
236 let _m = self.fire_homing_missile();
237 self.stream.read(buf).map_err(uv_error_to_io_error)
240 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
241 let _m = self.fire_homing_missile();
242 self.stream.write(buf).map_err(uv_error_to_io_error)
245 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
246 let _m = self.fire_homing_missile();
247 socket_name(TcpPeer, self.handle)
250 fn control_congestion(&mut self) -> Result<(), IoError> {
251 let _m = self.fire_homing_missile();
252 status_to_io_result(unsafe {
253 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
257 fn nodelay(&mut self) -> Result<(), IoError> {
258 let _m = self.fire_homing_missile();
259 status_to_io_result(unsafe {
260 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
264 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
265 let _m = self.fire_homing_missile();
266 status_to_io_result(unsafe {
267 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
268 delay_in_seconds as c_uint)
272 fn letdie(&mut self) -> Result<(), IoError> {
273 let _m = self.fire_homing_missile();
274 status_to_io_result(unsafe {
275 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
280 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
281 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
284 impl Drop for TcpWatcher {
286 let _m = self.fire_homing_missile();
291 // TCP listeners (unbound servers)
294 pub fn bind(loop_: &mut Loop, address: SocketAddr)
295 -> Result<~TcpListener, UvError>
297 do task::unkillable {
298 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
300 uvll::uv_tcp_init(loop_.handle, handle)
302 let l = ~TcpListener {
303 home: get_handle_to_current_scheduler!(),
306 outgoing: Tube::new(),
308 let res = socket_addr_as_sockaddr(address, |addr| unsafe {
309 uvll::uv_tcp_bind(l.handle, addr)
312 0 => Ok(l.install()),
319 impl HomingIO for TcpListener {
320 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
323 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
324 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
327 impl rtio::RtioSocket for TcpListener {
328 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
329 let _m = self.fire_homing_missile();
330 socket_name(Tcp, self.handle)
334 impl rtio::RtioTcpListener for TcpListener {
335 fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
336 // create the acceptor object from ourselves
337 let incoming = self.outgoing.clone();
338 let mut acceptor = ~TcpAcceptor {
343 let _m = acceptor.fire_homing_missile();
344 // XXX: the 128 backlog should be configurable
345 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
346 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
347 n => Err(uv_error_to_io_error(UvError(n))),
352 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
353 assert!(status != uvll::ECANCELED);
354 let msg = match status {
356 let loop_ = Loop::wrap(unsafe {
357 uvll::get_loop_for_uv_handle(server)
359 let client = TcpWatcher::new(&loop_);
360 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
361 Ok(~client as ~rtio::RtioTcpStream)
363 n => Err(uv_error_to_io_error(UvError(n)))
366 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
367 tcp.outgoing.send(msg);
370 impl Drop for TcpListener {
372 let _m = self.fire_homing_missile();
377 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
378 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
379 unsafe { uvll::free_handle(handle) }
381 let sched: ~Scheduler = Local::take();
382 sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
385 // TCP acceptors (bound servers)
387 impl HomingIO for TcpAcceptor {
388 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
391 impl rtio::RtioSocket for TcpAcceptor {
392 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
393 let _m = self.fire_homing_missile();
394 socket_name(Tcp, self.listener.handle)
398 impl rtio::RtioTcpAcceptor for TcpAcceptor {
399 fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
400 let _m = self.fire_homing_missile();
404 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
405 let _m = self.fire_homing_missile();
406 status_to_io_result(unsafe {
407 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
411 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
412 let _m = self.fire_homing_missile();
413 status_to_io_result(unsafe {
414 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
419 ////////////////////////////////////////////////////////////////////////////////
420 /// UDP implementation
421 ////////////////////////////////////////////////////////////////////////////////
423 pub struct UdpWatcher {
424 handle: *uvll::uv_udp_t,
429 pub fn bind(loop_: &Loop, address: SocketAddr)
430 -> Result<UdpWatcher, UvError>
432 do task::unkillable {
433 let udp = UdpWatcher {
434 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
435 home: get_handle_to_current_scheduler!(),
438 uvll::uv_udp_init(loop_.handle, udp.handle)
440 let result = socket_addr_as_sockaddr(address, |addr| unsafe {
441 uvll::uv_udp_bind(udp.handle, addr, 0u32)
445 n => Err(UvError(n)),
451 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
452 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
455 impl HomingIO for UdpWatcher {
456 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
459 impl rtio::RtioSocket for UdpWatcher {
460 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
461 let _m = self.fire_homing_missile();
462 socket_name(Udp, self.handle)
466 impl rtio::RtioUdpSocket for UdpWatcher {
467 fn recvfrom(&mut self, buf: &mut [u8])
468 -> Result<(uint, SocketAddr), IoError>
471 task: Option<BlockedTask>,
473 result: Option<(ssize_t, Option<SocketAddr>)>,
475 let _m = self.fire_homing_missile();
477 let a = match unsafe {
478 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
483 buf: Some(slice_to_uv_buf(buf)),
486 do wait_until_woken_after(&mut cx.task) {
487 unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
489 match cx.result.take_unwrap() {
491 Err(uv_error_to_io_error(UvError(n as c_int))),
492 (n, addr) => Ok((n as uint, addr.unwrap()))
495 n => Err(uv_error_to_io_error(UvError(n)))
499 extern fn alloc_cb(handle: *uvll::uv_udp_t,
500 _suggested_size: size_t,
504 cast::transmute(uvll::get_data_for_uv_handle(handle));
505 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
509 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
510 addr: *uvll::sockaddr, _flags: c_uint) {
511 assert!(nread != uvll::ECANCELED as ssize_t);
512 let cx: &mut Ctx = unsafe {
513 cast::transmute(uvll::get_data_for_uv_handle(handle))
516 // When there's no data to read the recv callback can be a no-op.
517 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
518 // this we just drop back to kqueue and wait for the next callback.
520 cx.buf = Some(unsafe { *buf });
525 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
528 let cx: &mut Ctx = unsafe {
529 cast::transmute(uvll::get_data_for_uv_handle(handle))
531 let addr = if addr == ptr::null() {
534 Some(sockaddr_to_socket_addr(addr))
536 cx.result = Some((nread, addr));
538 let sched: ~Scheduler = Local::take();
539 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
543 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
544 struct Ctx { task: Option<BlockedTask>, result: c_int }
546 let _m = self.fire_homing_missile();
548 let mut req = Request::new(uvll::UV_UDP_SEND);
549 let buf = slice_to_uv_buf(buf);
550 let result = socket_addr_as_sockaddr(dst, |dst| unsafe {
551 uvll::uv_udp_send(req.handle, self.handle, [buf], dst, send_cb)
554 return match result {
556 req.defuse(); // uv callback now owns this request
557 let mut cx = Ctx { task: None, result: 0 };
558 do wait_until_woken_after(&mut cx.task) {
563 n => Err(uv_error_to_io_error(UvError(n)))
566 n => Err(uv_error_to_io_error(UvError(n)))
569 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
570 let req = Request::wrap(req);
571 assert!(status != uvll::ECANCELED);
572 let cx: &mut Ctx = unsafe { req.get_data() };
575 let sched: ~Scheduler = Local::take();
576 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
580 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
581 let _m = self.fire_homing_missile();
582 status_to_io_result(unsafe {
583 do multi.to_str().with_c_str |m_addr| {
584 uvll::uv_udp_set_membership(self.handle,
591 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
592 let _m = self.fire_homing_missile();
593 status_to_io_result(unsafe {
594 do multi.to_str().with_c_str |m_addr| {
595 uvll::uv_udp_set_membership(self.handle,
597 uvll::UV_LEAVE_GROUP)
602 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
603 let _m = self.fire_homing_missile();
604 status_to_io_result(unsafe {
605 uvll::uv_udp_set_multicast_loop(self.handle,
610 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
611 let _m = self.fire_homing_missile();
612 status_to_io_result(unsafe {
613 uvll::uv_udp_set_multicast_loop(self.handle,
618 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
619 let _m = self.fire_homing_missile();
620 status_to_io_result(unsafe {
621 uvll::uv_udp_set_multicast_ttl(self.handle,
626 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
627 let _m = self.fire_homing_missile();
628 status_to_io_result(unsafe {
629 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
633 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
634 let _m = self.fire_homing_missile();
635 status_to_io_result(unsafe {
636 uvll::uv_udp_set_broadcast(self.handle,
641 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
642 let _m = self.fire_homing_missile();
643 status_to_io_result(unsafe {
644 uvll::uv_udp_set_broadcast(self.handle,
650 impl Drop for UdpWatcher {
652 // Send ourselves home to close this handle (blocking while doing so).
653 let _m = self.fire_homing_missile();
658 ////////////////////////////////////////////////////////////////////////////////
659 /// UV request support
660 ////////////////////////////////////////////////////////////////////////////////
665 use std::comm::oneshot;
666 use std::rt::test::*;
667 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
672 use super::super::local_loop;
675 fn connect_close_ip4() {
676 match TcpWatcher::connect(local_loop(), next_test_ip4()) {
678 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
683 fn connect_close_ip6() {
684 match TcpWatcher::connect(local_loop(), next_test_ip6()) {
686 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
691 fn udp_bind_close_ip4() {
692 match UdpWatcher::bind(local_loop(), next_test_ip4()) {
699 fn udp_bind_close_ip6() {
700 match UdpWatcher::bind(local_loop(), next_test_ip6()) {
708 let (port, chan) = oneshot();
709 let chan = Cell::new(chan);
710 let addr = next_test_ip4();
713 let w = match TcpListener::bind(local_loop(), addr) {
714 Ok(w) => w, Err(e) => fail!("{:?}", e)
716 let mut w = match w.listen() {
717 Ok(w) => w, Err(e) => fail!("{:?}", e),
719 chan.take().send(());
722 let mut buf = [0u8, ..10];
723 match stream.read(buf) {
724 Ok(10) => {} e => fail!("{:?}", e),
726 for i in range(0, 10u8) {
727 assert_eq!(buf[i], i + 1);
730 Err(e) => fail!("{:?}", e)
735 let mut w = match TcpWatcher::connect(local_loop(), addr) {
736 Ok(w) => w, Err(e) => fail!("{:?}", e)
738 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
739 Ok(()) => {}, Err(e) => fail!("{:?}", e)
745 let (port, chan) = oneshot();
746 let chan = Cell::new(chan);
747 let addr = next_test_ip6();
750 let w = match TcpListener::bind(local_loop(), addr) {
751 Ok(w) => w, Err(e) => fail!("{:?}", e)
753 let mut w = match w.listen() {
754 Ok(w) => w, Err(e) => fail!("{:?}", e),
756 chan.take().send(());
759 let mut buf = [0u8, ..10];
760 match stream.read(buf) {
761 Ok(10) => {} e => fail!("{:?}", e),
763 for i in range(0, 10u8) {
764 assert_eq!(buf[i], i + 1);
767 Err(e) => fail!("{:?}", e)
772 let mut w = match TcpWatcher::connect(local_loop(), addr) {
773 Ok(w) => w, Err(e) => fail!("{:?}", e)
775 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
776 Ok(()) => {}, Err(e) => fail!("{:?}", e)
782 let (port, chan) = oneshot();
783 let chan = Cell::new(chan);
784 let client = next_test_ip4();
785 let server = next_test_ip4();
788 match UdpWatcher::bind(local_loop(), server) {
790 chan.take().send(());
791 let mut buf = [0u8, ..10];
792 match w.recvfrom(buf) {
793 Ok((10, addr)) => assert_eq!(addr, client),
794 e => fail!("{:?}", e),
796 for i in range(0, 10u8) {
797 assert_eq!(buf[i], i + 1);
800 Err(e) => fail!("{:?}", e)
805 let mut w = match UdpWatcher::bind(local_loop(), client) {
806 Ok(w) => w, Err(e) => fail!("{:?}", e)
808 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
809 Ok(()) => {}, Err(e) => fail!("{:?}", e)
815 let (port, chan) = oneshot();
816 let chan = Cell::new(chan);
817 let client = next_test_ip6();
818 let server = next_test_ip6();
821 match UdpWatcher::bind(local_loop(), server) {
823 chan.take().send(());
824 let mut buf = [0u8, ..10];
825 match w.recvfrom(buf) {
826 Ok((10, addr)) => assert_eq!(addr, client),
827 e => fail!("{:?}", e),
829 for i in range(0, 10u8) {
830 assert_eq!(buf[i], i + 1);
833 Err(e) => fail!("{:?}", e)
838 let mut w = match UdpWatcher::bind(local_loop(), client) {
839 Ok(w) => w, Err(e) => fail!("{:?}", e)
841 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
842 Ok(()) => {}, Err(e) => fail!("{:?}", e)
847 fn test_read_read_read() {
848 use std::rt::rtio::*;
849 let addr = next_test_ip4();
850 static MAX: uint = 5000;
851 let (port, chan) = oneshot();
852 let port = Cell::new(port);
853 let chan = Cell::new(chan);
856 let listener = TcpListener::bind(local_loop(), addr).unwrap();
857 let mut acceptor = listener.listen().unwrap();
858 chan.take().send(());
859 let mut stream = acceptor.accept().unwrap();
860 let buf = [1, .. 2048];
861 let mut total_bytes_written = 0;
862 while total_bytes_written < MAX {
863 assert!(stream.write(buf).is_ok());
864 uvdebug!("wrote bytes");
865 total_bytes_written += buf.len();
871 let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
872 let mut buf = [0, .. 2048];
873 let mut total_bytes_read = 0;
874 while total_bytes_read < MAX {
875 let nread = stream.read(buf).unwrap();
876 total_bytes_read += nread;
877 for i in range(0u, nread) {
878 assert_eq!(buf[i], 1);
881 uvdebug!("read {} bytes total", total_bytes_read);
886 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
887 fn test_udp_twice() {
888 let server_addr = next_test_ip4();
889 let client_addr = next_test_ip4();
890 let (port, chan) = oneshot();
891 let port = Cell::new(port);
892 let chan = Cell::new(chan);
895 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
897 assert!(client.sendto([1], server_addr).is_ok());
898 assert!(client.sendto([2], server_addr).is_ok());
901 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
902 chan.take().send(());
905 let (nread1, src1) = server.recvfrom(buf1).unwrap();
906 let (nread2, src2) = server.recvfrom(buf2).unwrap();
907 assert_eq!(nread1, 1);
908 assert_eq!(nread2, 1);
909 assert_eq!(src1, client_addr);
910 assert_eq!(src2, client_addr);
911 assert_eq!(buf1[0], 1);
912 assert_eq!(buf2[0], 2);
916 fn test_udp_many_read() {
917 let server_out_addr = next_test_ip4();
918 let server_in_addr = next_test_ip4();
919 let client_out_addr = next_test_ip4();
920 let client_in_addr = next_test_ip4();
921 static MAX: uint = 500_000;
923 let (p1, c1) = oneshot();
924 let (p2, c2) = oneshot();
926 let first = Cell::new((p1, c2));
927 let second = Cell::new((p2, c1));
930 let l = local_loop();
931 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
932 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
933 let (port, chan) = first.take();
936 let msg = [1, .. 2048];
937 let mut total_bytes_sent = 0;
941 assert!(server_out.sendto(msg, client_in_addr).is_ok());
942 total_bytes_sent += msg.len();
943 // check if the client has received enough
944 let res = server_in.recvfrom(buf);
945 assert!(res.is_ok());
946 let (nread, src) = res.unwrap();
947 assert_eq!(nread, 1);
948 assert_eq!(src, client_out_addr);
950 assert!(total_bytes_sent >= MAX);
954 let l = local_loop();
955 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
956 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
957 let (port, chan) = second.take();
960 let mut total_bytes_recv = 0;
961 let mut buf = [0, .. 2048];
962 while total_bytes_recv < MAX {
964 assert!(client_out.sendto([1], server_in_addr).is_ok());
966 let res = client_in.recvfrom(buf);
967 assert!(res.is_ok());
968 let (nread, src) = res.unwrap();
969 assert_eq!(src, server_out_addr);
970 total_bytes_recv += nread;
971 for i in range(0u, nread) {
972 assert_eq!(buf[i], 1);
975 // tell the server we're done
976 assert!(client_out.sendto([0], server_in_addr).is_ok());
981 fn test_read_and_block() {
982 let addr = next_test_ip4();
983 let (port, chan) = oneshot();
984 let port = Cell::new(port);
985 let chan = Cell::new(chan);
988 let listener = TcpListener::bind(local_loop(), addr).unwrap();
989 let mut acceptor = listener.listen().unwrap();
990 let (port2, chan2) = stream();
991 chan.take().send(port2);
992 let mut stream = acceptor.accept().unwrap();
993 let mut buf = [0, .. 2048];
999 while current < expected {
1000 let nread = stream.read(buf).unwrap();
1001 for i in range(0u, nread) {
1002 let val = buf[i] as uint;
1003 assert_eq!(val, current % 8);
1011 // Make sure we had multiple reads
1016 let port2 = port.take().recv();
1017 let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
1018 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1019 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1021 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1022 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1028 fn test_simple_tcp_server_and_client_on_diff_threads() {
1029 let addr = next_test_ip4();
1031 do task::spawn_sched(task::SingleThreaded) {
1032 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1033 let mut acceptor = listener.listen().unwrap();
1034 let mut stream = acceptor.accept().unwrap();
1035 let mut buf = [0, .. 2048];
1036 let nread = stream.read(buf).unwrap();
1037 assert_eq!(nread, 8);
1038 for i in range(0u, nread) {
1039 assert_eq!(buf[i], i as u8);
1043 do task::spawn_sched(task::SingleThreaded) {
1044 let mut stream = TcpWatcher::connect(local_loop(), addr);
1045 while stream.is_err() {
1046 stream = TcpWatcher::connect(local_loop(), addr);
1048 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1052 // On one thread, create a udp socket. Then send that socket to another
1053 // thread and destroy the socket on the remote thread. This should make sure
1054 // that homing kicks in for the socket to go back home to the original
1055 // thread, close itself, and then come back to the last thread.
1057 fn test_homing_closes_correctly() {
1058 let (port, chan) = oneshot();
1059 let port = Cell::new(port);
1060 let chan = Cell::new(chan);
1062 do task::spawn_sched(task::SingleThreaded) {
1063 let chan = Cell::new(chan.take());
1064 let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
1065 chan.take().send(listener);
1068 do task::spawn_sched(task::SingleThreaded) {
1069 let port = Cell::new(port.take());
1074 // This is a bit of a crufty old test, but it has its uses.
1076 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1078 use std::rt::local::Local;
1079 use std::rt::rtio::{EventLoop, IoFactory};
1080 use std::rt::sched::Scheduler;
1081 use std::rt::sched::{Shutdown, TaskFromFriend};
1082 use std::rt::sleeper_list::SleeperList;
1083 use std::rt::task::Task;
1084 use std::rt::task::UnwindResult;
1085 use std::rt::thread::Thread;
1086 use std::rt::work_queue::WorkQueue;
1087 use std::unstable::run_in_bare_thread;
1088 use uvio::UvEventLoop;
1090 do run_in_bare_thread {
1091 let sleepers = SleeperList::new();
1092 let work_queue1 = WorkQueue::new();
1093 let work_queue2 = WorkQueue::new();
1094 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1096 let loop1 = ~UvEventLoop::new() as ~EventLoop;
1097 let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
1099 let loop2 = ~UvEventLoop::new() as ~EventLoop;
1100 let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
1103 let handle1 = Cell::new(sched1.make_handle());
1104 let handle2 = Cell::new(sched2.make_handle());
1105 let tasksFriendHandle = Cell::new(sched2.make_handle());
1107 let on_exit: ~fn(UnwindResult) = |exit_status| {
1108 handle1.take().send(Shutdown);
1109 handle2.take().send(Shutdown);
1110 assert!(exit_status.is_success());
1113 unsafe fn local_io() -> &'static mut IoFactory {
1114 do Local::borrow |sched: &mut Scheduler| {
1116 sched.event_loop.io(|i| io = Some(i));
1117 cast::transmute(io.unwrap())
1121 let test_function: ~fn() = || {
1122 let io = unsafe { local_io() };
1123 let addr = next_test_ip4();
1124 let maybe_socket = io.udp_bind(addr);
1125 // this socket is bound to this event loop
1126 assert!(maybe_socket.is_ok());
1128 // block self on sched1
1129 do task::unkillable { // FIXME(#8674)
1130 let scheduler: ~Scheduler = Local::take();
1131 do scheduler.deschedule_running_task_and_then |_, task| {
1133 do task.wake().map |task| {
1134 // send self to sched2
1135 tasksFriendHandle.take().send(TaskFromFriend(task));
1137 // sched1 should now sleep since it has nothing else to do
1140 // sched2 will wake up and get the task as we do nothing else,
1141 // the function ends and the socket goes out of scope sched2
1142 // will start to run the destructor the destructor will first
1143 // block the task, set it's home as sched1, then enqueue it
1144 // sched2 will dequeue the task, see that it has a home, and
1145 // send it to sched1 sched1 will wake up, exec the close
1146 // function on the correct loop, and then we're done
1149 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
1151 main_task.death.on_exit = Some(on_exit);
1152 let main_task = Cell::new(main_task);
1154 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool,
1157 let sched1 = Cell::new(sched1);
1158 let sched2 = Cell::new(sched2);
1160 let thread1 = do Thread::start {
1161 sched1.take().bootstrap(main_task.take());
1163 let thread2 = do Thread::start {
1164 sched2.take().bootstrap(null_task.take());
1172 #[should_fail] #[test]
1173 fn tcp_listener_fail_cleanup() {
1174 let addr = next_test_ip4();
1175 let w = TcpListener::bind(local_loop(), addr).unwrap();
1176 let _w = w.listen().unwrap();
1180 #[should_fail] #[test]
1181 fn tcp_stream_fail_cleanup() {
1182 let (port, chan) = oneshot();
1183 let chan = Cell::new(chan);
1184 let addr = next_test_ip4();
1186 do task::spawn_unlinked { // please no linked failure
1187 let w = TcpListener::bind(local_loop(), addr).unwrap();
1188 let mut w = w.listen().unwrap();
1189 chan.take().send(());
1193 let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1197 #[should_fail] #[test]
1198 fn udp_listener_fail_cleanup() {
1199 let addr = next_test_ip4();
1200 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1204 #[should_fail] #[test]
1205 fn udp_fail_other_task() {
1206 let addr = next_test_ip4();
1207 let (port, chan) = oneshot();
1208 let chan = Cell::new(chan);
1210 // force the handle to be created on a different scheduler, failure in
1211 // the original task will force a homing operation back to this
1213 do task::spawn_sched(task::SingleThreaded) {
1214 let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1215 chan.take().send(w);
1218 let _w = port.recv();
1224 #[ignore(reason = "linked failure")]
1225 fn linked_failure1() {
1226 let (port, chan) = oneshot();
1227 let chan = Cell::new(chan);
1228 let addr = next_test_ip4();
1231 let w = TcpListener::bind(local_loop(), addr).unwrap();
1232 let mut w = w.listen().unwrap();
1233 chan.take().send(());
1243 #[ignore(reason = "linked failure")]
1244 fn linked_failure2() {
1245 let (port, chan) = oneshot();
1246 let chan = Cell::new(chan);
1247 let addr = next_test_ip4();
1250 let w = TcpListener::bind(local_loop(), addr).unwrap();
1251 let mut w = w.listen().unwrap();
1252 chan.take().send(());
1254 w.accept().unwrap().read(buf);
1258 let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1265 #[ignore(reason = "linked failure")]
1266 fn linked_failure3() {
1267 let (port, chan) = stream();
1268 let chan = Cell::new(chan);
1269 let addr = next_test_ip4();
1272 let chan = chan.take();
1273 let w = TcpListener::bind(local_loop(), addr).unwrap();
1274 let mut w = w.listen().unwrap();
1276 let mut conn = w.accept().unwrap();
1278 let buf = [0, ..65536];
1283 let _w = TcpWatcher::connect(local_loop(), addr).unwrap();