]> git.lizzy.rs Git - rust.git/blobdiff - src/librustuv/net.rs
Replace all ~"" with "".to_owned()
[rust.git] / src / librustuv / net.rs
index 551e2c9faf74f47ef980cc2fa8e8cd780d93aacf..73454aaf13f7d56ada018ad97c06cb9b02d0443c 100644 (file)
@@ -11,8 +11,8 @@
 use std::cast;
 use std::io::IoError;
 use std::io::net::ip;
-use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
-use std::libc;
+use libc::{size_t, ssize_t, c_int, c_void, c_uint};
+use libc;
 use std::mem;
 use std::ptr;
 use std::rt::rtio;
@@ -32,8 +32,8 @@
 /// Generic functions related to dealing with sockaddr things
 ////////////////////////////////////////////////////////////////////////////////
 
-pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
-pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
+pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
+pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
 
 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
                         len: uint) -> ip::SocketAddr {
@@ -153,22 +153,22 @@ pub struct TcpWatcher {
     handle: *uvll::uv_tcp_t,
     stream: StreamWatcher,
     home: HomeHandle,
-    priv refcount: Refcount,
+    refcount: Refcount,
 
     // libuv can't support concurrent reads and concurrent writes of the same
     // stream object, so we use these access guards in order to arbitrate among
     // multiple concurrent reads and writes. Note that libuv *can* read and
     // write simultaneously, it just can't read and read simultaneously.
-    priv read_access: Access,
-    priv write_access: Access,
+    read_access: Access,
+    write_access: Access,
 }
 
 pub struct TcpListener {
     home: HomeHandle,
     handle: *uvll::uv_pipe_t,
-    priv closing_task: Option<BlockedTask>,
-    priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
-    priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
+    closing_task: Option<BlockedTask>,
+    outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+    incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
 }
 
 pub struct TcpAcceptor {
@@ -216,7 +216,7 @@ struct Ctx { status: c_int, task: Option<BlockedTask> }
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { status: 0, task: None };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &io.loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.status {
@@ -295,7 +295,7 @@ fn letdie(&mut self) -> Result<(), IoError> {
         })
     }
 
-    fn clone(&self) -> ~rtio::RtioTcpStream {
+    fn clone(&self) -> ~rtio::RtioTcpStream:Send {
         ~TcpWatcher {
             handle: self.handle,
             stream: StreamWatcher::new(self.handle),
@@ -303,7 +303,39 @@ fn clone(&self) -> ~rtio::RtioTcpStream {
             refcount: self.refcount.clone(),
             write_access: self.write_access.clone(),
             read_access: self.read_access.clone(),
-        } as ~rtio::RtioTcpStream
+        } as ~rtio::RtioTcpStream:Send
+    }
+
+    fn close_write(&mut self) -> Result<(), IoError> {
+        struct Ctx {
+            slot: Option<BlockedTask>,
+            status: c_int,
+        }
+        let mut req = Request::new(uvll::UV_SHUTDOWN);
+
+        return match unsafe {
+            uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
+        } {
+            0 => {
+                req.defuse(); // uv callback now owns this request
+                let mut cx = Ctx { slot: None, status: 0 };
+
+                wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
+                    req.set_data(&cx);
+                });
+
+                status_to_io_result(cx.status)
+            }
+            n => Err(uv_error_to_io_error(UvError(n)))
+        };
+
+        extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
+            let req = Request::wrap(req);
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
+            cx.status = status;
+            wakeup(&mut cx.slot);
+        }
     }
 }
 
@@ -329,13 +361,13 @@ pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
         assert_eq!(unsafe {
             uvll::uv_tcp_init(io.uv_loop(), handle)
         }, 0);
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let l = ~TcpListener {
             home: io.make_handle(),
             handle: handle,
             closing_task: None,
-            outgoing: chan,
-            incoming: port,
+            outgoing: tx,
+            incoming: rx,
         };
         let (addr, _len) = addr_to_sockaddr(address);
         let res = unsafe {
@@ -365,14 +397,14 @@ fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
 }
 
 impl rtio::RtioTcpListener for TcpListener {
-    fn listen(~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
+    fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
         // create the acceptor object from ourselves
         let mut acceptor = ~TcpAcceptor { listener: self };
 
         let _m = acceptor.fire_homing_missile();
         // FIXME: the 128 backlog should be configurable
         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
-            0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
+            0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
             n => Err(uv_error_to_io_error(UvError(n))),
         }
     }
@@ -388,7 +420,7 @@ fn listen(~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
             });
             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
-            Ok(~client as ~rtio::RtioTcpStream)
+            Ok(~client as ~rtio::RtioTcpStream:Send)
         }
         n => Err(uv_error_to_io_error(UvError(n)))
     };
@@ -416,7 +448,7 @@ fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
 }
 
 impl rtio::RtioTcpAcceptor for TcpAcceptor {
-    fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
+    fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
         self.listener.incoming.recv()
     }
 
@@ -444,9 +476,9 @@ pub struct UdpWatcher {
     home: HomeHandle,
 
     // See above for what these fields are
-    priv refcount: Refcount,
-    priv read_access: Access,
-    priv write_access: Access,
+    refcount: Refcount,
+    read_access: Access,
+    write_access: Access,
 }
 
 impl UdpWatcher {
@@ -498,6 +530,7 @@ struct Ctx {
             buf: Option<Buf>,
             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
         }
+        let loop_ = self.uv_loop();
         let m = self.fire_homing_missile();
         let _g = self.read_access.grant(m);
 
@@ -511,7 +544,7 @@ struct Ctx {
                     result: None,
                 };
                 let handle = self.handle;
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
                 });
                 match cx.result.take_unwrap() {
@@ -571,6 +604,7 @@ fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
         struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         let m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
         let _g = self.write_access.grant(m);
 
         let mut req = Request::new(uvll::UV_UDP_SEND);
@@ -586,7 +620,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { task: None, result: 0 };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.result {
@@ -675,14 +709,14 @@ fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
         })
     }
 
-    fn clone(&self) -> ~rtio::RtioUdpSocket {
+    fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
         ~UdpWatcher {
             handle: self.handle,
             home: self.home.clone(),
             refcount: self.refcount.clone(),
             write_access: self.write_access.clone(),
             read_access: self.read_access.clone(),
-        } as ~rtio::RtioUdpSocket
+        } as ~rtio::RtioUdpSocket:Send
     }
 }
 
@@ -709,7 +743,7 @@ mod test {
     fn connect_close_ip4() {
         match TcpWatcher::connect(local_loop(), next_test_ip4()) {
             Ok(..) => fail!(),
-            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+            Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
         }
     }
 
@@ -717,7 +751,7 @@ fn connect_close_ip4() {
     fn connect_close_ip6() {
         match TcpWatcher::connect(local_loop(), next_test_ip6()) {
             Ok(..) => fail!(),
-            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+            Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
         }
     }
 
@@ -739,7 +773,7 @@ fn udp_bind_close_ip6() {
 
     #[test]
     fn listen_ip4() {
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let addr = next_test_ip4();
 
         spawn(proc() {
@@ -749,7 +783,7 @@ fn listen_ip4() {
             let mut w = match w.listen() {
                 Ok(w) => w, Err(e) => fail!("{:?}", e),
             };
-            chan.send(());
+            tx.send(());
             match w.accept() {
                 Ok(mut stream) => {
                     let mut buf = [0u8, ..10];
@@ -757,14 +791,14 @@ fn listen_ip4() {
                         Ok(10) => {} e => fail!("{:?}", e),
                     }
                     for i in range(0, 10u8) {
-                        assert_eq!(buf[i], i + 1);
+                        assert_eq!(buf[i as uint], i + 1);
                     }
                 }
                 Err(e) => fail!("{:?}", e)
             }
         });
 
-        port.recv();
+        rx.recv();
         let mut w = match TcpWatcher::connect(local_loop(), addr) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
@@ -775,7 +809,7 @@ fn listen_ip4() {
 
     #[test]
     fn listen_ip6() {
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let addr = next_test_ip6();
 
         spawn(proc() {
@@ -785,7 +819,7 @@ fn listen_ip6() {
             let mut w = match w.listen() {
                 Ok(w) => w, Err(e) => fail!("{:?}", e),
             };
-            chan.send(());
+            tx.send(());
             match w.accept() {
                 Ok(mut stream) => {
                     let mut buf = [0u8, ..10];
@@ -793,14 +827,14 @@ fn listen_ip6() {
                         Ok(10) => {} e => fail!("{:?}", e),
                     }
                     for i in range(0, 10u8) {
-                        assert_eq!(buf[i], i + 1);
+                        assert_eq!(buf[i as uint], i + 1);
                     }
                 }
                 Err(e) => fail!("{:?}", e)
             }
         });
 
-        port.recv();
+        rx.recv();
         let mut w = match TcpWatcher::connect(local_loop(), addr) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
@@ -811,28 +845,28 @@ fn listen_ip6() {
 
     #[test]
     fn udp_recv_ip4() {
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let client = next_test_ip4();
         let server = next_test_ip4();
 
         spawn(proc() {
             match UdpWatcher::bind(local_loop(), server) {
                 Ok(mut w) => {
-                    chan.send(());
+                    tx.send(());
                     let mut buf = [0u8, ..10];
                     match w.recvfrom(buf) {
                         Ok((10, addr)) => assert_eq!(addr, client),
                         e => fail!("{:?}", e),
                     }
                     for i in range(0, 10u8) {
-                        assert_eq!(buf[i], i + 1);
+                        assert_eq!(buf[i as uint], i + 1);
                     }
                 }
                 Err(e) => fail!("{:?}", e)
             }
         });
 
-        port.recv();
+        rx.recv();
         let mut w = match UdpWatcher::bind(local_loop(), client) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
@@ -843,28 +877,28 @@ fn udp_recv_ip4() {
 
     #[test]
     fn udp_recv_ip6() {
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let client = next_test_ip6();
         let server = next_test_ip6();
 
         spawn(proc() {
             match UdpWatcher::bind(local_loop(), server) {
                 Ok(mut w) => {
-                    chan.send(());
+                    tx.send(());
                     let mut buf = [0u8, ..10];
                     match w.recvfrom(buf) {
                         Ok((10, addr)) => assert_eq!(addr, client),
                         e => fail!("{:?}", e),
                     }
                     for i in range(0, 10u8) {
-                        assert_eq!(buf[i], i + 1);
+                        assert_eq!(buf[i as uint], i + 1);
                     }
                 }
                 Err(e) => fail!("{:?}", e)
             }
         });
 
-        port.recv();
+        rx.recv();
         let mut w = match UdpWatcher::bind(local_loop(), client) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
@@ -877,12 +911,12 @@ fn udp_recv_ip6() {
     fn test_read_read_read() {
         let addr = next_test_ip4();
         static MAX: uint = 5000;
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
 
         spawn(proc() {
             let listener = TcpListener::bind(local_loop(), addr).unwrap();
             let mut acceptor = listener.listen().unwrap();
-            chan.send(());
+            tx.send(());
             let mut stream = acceptor.accept().unwrap();
             let buf = [1, .. 2048];
             let mut total_bytes_written = 0;
@@ -893,7 +927,7 @@ fn test_read_read_read() {
             }
         });
 
-        port.recv();
+        rx.recv();
         let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
         let mut buf = [0, .. 2048];
         let mut total_bytes_read = 0;
@@ -912,17 +946,17 @@ fn test_read_read_read() {
     fn test_udp_twice() {
         let server_addr = next_test_ip4();
         let client_addr = next_test_ip4();
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
 
         spawn(proc() {
             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
-            port.recv();
+            rx.recv();
             assert!(client.sendto([1], server_addr).is_ok());
             assert!(client.sendto([2], server_addr).is_ok());
         });
 
         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
-        chan.send(());
+        tx.send(());
         let mut buf1 = [0];
         let mut buf2 = [0];
         let (nread1, src1) = server.recvfrom(buf1).unwrap();
@@ -943,16 +977,16 @@ fn test_udp_many_read() {
         let client_in_addr = next_test_ip4();
         static MAX: uint = 500_000;
 
-        let (p1, c1) = Chan::new();
-        let (p2, c2) = Chan::new();
+        let (tx1, rx1) = channel::<()>();
+        let (tx2, rx2) = channel::<()>();
 
         spawn(proc() {
             let l = local_loop();
             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
-            let (port, chan) = (p1, c2);
-            chan.send(());
-            port.recv();
+            let (tx, rx) = (tx2, rx1);
+            tx.send(());
+            rx.recv();
             let msg = [1, .. 2048];
             let mut total_bytes_sent = 0;
             let mut buf = [1];
@@ -973,9 +1007,9 @@ fn test_udp_many_read() {
         let l = local_loop();
         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
-        let (port, chan) = (p2, c1);
-        port.recv();
-        chan.send(());
+        let (tx, rx) = (tx1, rx2);
+        rx.recv();
+        tx.send(());
         let mut total_bytes_recv = 0;
         let mut buf = [0, .. 2048];
         while total_bytes_recv < MAX {
@@ -998,23 +1032,23 @@ fn test_udp_many_read() {
     #[test]
     fn test_read_and_block() {
         let addr = next_test_ip4();
-        let (port, chan) = Chan::<Port<()>>::new();
+        let (tx, rx) = channel::<Receiver<()>>();
 
         spawn(proc() {
-            let port2 = port.recv();
+            let rx = rx.recv();
             let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
-            port2.recv();
+            rx.recv();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
-            port2.recv();
+            rx.recv();
         });
 
         let listener = TcpListener::bind(local_loop(), addr).unwrap();
         let mut acceptor = listener.listen().unwrap();
-        let (port2, chan2) = Chan::new();
-        chan.send(port2);
+        let (tx2, rx2) = channel();
+        tx.send(rx2);
         let mut stream = acceptor.accept().unwrap();
         let mut buf = [0, .. 2048];
 
@@ -1031,7 +1065,7 @@ fn test_read_and_block() {
             }
             reads += 1;
 
-            chan2.try_send(());
+            let _ = tx2.send_opt(());
         }
 
         // Make sure we had multiple reads
@@ -1071,16 +1105,16 @@ fn tcp_listener_fail_cleanup() {
 
     #[should_fail] #[test]
     fn tcp_stream_fail_cleanup() {
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
         let addr = next_test_ip4();
 
         spawn(proc() {
             let w = TcpListener::bind(local_loop(), addr).unwrap();
             let mut w = w.listen().unwrap();
-            chan.send(());
+            tx.send(());
             drop(w.accept().unwrap());
         });
-        port.recv();
+        rx.recv();
         let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
         fail!();
     }
@@ -1095,17 +1129,17 @@ fn udp_listener_fail_cleanup() {
     #[should_fail] #[test]
     fn udp_fail_other_task() {
         let addr = next_test_ip4();
-        let (port, chan) = Chan::new();
+        let (tx, rx) = channel();
 
         // force the handle to be created on a different scheduler, failure in
         // the original task will force a homing operation back to this
         // scheduler.
         spawn(proc() {
             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
-            chan.send(w);
+            tx.send(w);
         });
 
-        let _w = port.recv();
+        let _w = rx.recv();
         fail!();
     }
 }