]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/udp.rs
e1f9cb3889f8ce3d378819549d742a02860b7e33
[rust.git] / src / libstd / io / net / udp.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! UDP (User Datagram Protocol) network connections.
12 //!
13 //! This module contains the ability to open a UDP stream to a socket address.
14 //! The destination and binding addresses can either be an IPv4 or IPv6
15 //! address. There is no corresponding notion of a server because UDP is a
16 //! datagram protocol.
17
18 use clone::Clone;
19 use io::net::ip::{SocketAddr, IpAddr};
20 use io::{Reader, Writer, IoResult, IoError};
21 use kinds::Send;
22 use owned::Box;
23 use option::Option;
24 use result::{Ok, Err};
25 use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
26 use rt::rtio;
27
28 /// A User Datagram Protocol socket.
29 ///
30 /// This is an implementation of a bound UDP socket. This supports both IPv4 and
31 /// IPv6 addresses, and there is no corresponding notion of a server because UDP
32 /// is a datagram protocol.
33 ///
34 /// # Example
35 ///
36 /// ```rust,no_run
37 /// # #![allow(unused_must_use)]
38 /// use std::io::net::udp::UdpSocket;
39 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
40 ///
41 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
42 /// let mut socket = match UdpSocket::bind(addr) {
43 ///     Ok(s) => s,
44 ///     Err(e) => fail!("couldn't bind socket: {}", e),
45 /// };
46 ///
47 /// let mut buf = [0, ..10];
48 /// match socket.recvfrom(buf) {
49 ///     Ok((amt, src)) => {
50 ///         // Send a reply to the socket we received data from
51 ///         let buf = buf.mut_slice_to(amt);
52 ///         buf.reverse();
53 ///         socket.sendto(buf, src);
54 ///     }
55 ///     Err(e) => println!("couldn't receive a datagram: {}", e)
56 /// }
57 /// drop(socket); // close the socket
58 /// ```
59 pub struct UdpSocket {
60     obj: Box<RtioUdpSocket + Send>,
61 }
62
63 impl UdpSocket {
64     /// Creates a UDP socket from the given socket address.
65     pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> {
66         let SocketAddr { ip, port } = addr;
67         LocalIo::maybe_raise(|io| {
68             let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
69             io.udp_bind(addr).map(|s| UdpSocket { obj: s })
70         }).map_err(IoError::from_rtio_error)
71     }
72
73     /// Receives data from the socket. On success, returns the number of bytes
74     /// read and the address from whence the data came.
75     pub fn recvfrom(&mut self, buf: &mut [u8])
76                     -> IoResult<(uint, SocketAddr)> {
77         match self.obj.recvfrom(buf) {
78             Ok((amt, rtio::SocketAddr { ip, port })) => {
79                 Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port }))
80             }
81             Err(e) => Err(IoError::from_rtio_error(e)),
82         }
83     }
84
85     /// Sends data on the socket to the given address. Returns nothing on
86     /// success.
87     pub fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> {
88         self.obj.sendto(buf, rtio::SocketAddr {
89             ip: super::to_rtio(dst.ip),
90             port: dst.port,
91         }).map_err(IoError::from_rtio_error)
92     }
93
94     /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
95     /// traits to receive and send data from the same address. This transfers
96     /// ownership of the socket to the stream.
97     ///
98     /// Note that this call does not perform any actual network communication,
99     /// because UDP is a datagram protocol.
100     pub fn connect(self, other: SocketAddr) -> UdpStream {
101         UdpStream {
102             socket: self,
103             connected_to: other,
104         }
105     }
106
107     /// Returns the socket address that this socket was created from.
108     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
109         match self.obj.socket_name() {
110             Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }),
111             Err(e) => Err(IoError::from_rtio_error(e))
112         }
113     }
114
115     /// Joins a multicast IP address (becomes a member of it)
116     #[experimental]
117     pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
118         let e = self.obj.join_multicast(super::to_rtio(multi));
119         e.map_err(IoError::from_rtio_error)
120     }
121
122     /// Leaves a multicast IP address (drops membership from it)
123     #[experimental]
124     pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
125         let e = self.obj.leave_multicast(super::to_rtio(multi));
126         e.map_err(IoError::from_rtio_error)
127     }
128
129     /// Set the multicast loop flag to the specified value
130     ///
131     /// This lets multicast packets loop back to local sockets (if enabled)
132     #[experimental]
133     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
134         if on {
135             self.obj.loop_multicast_locally()
136         } else {
137             self.obj.dont_loop_multicast_locally()
138         }.map_err(IoError::from_rtio_error)
139     }
140
141     /// Sets the multicast TTL
142     #[experimental]
143     pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
144         self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error)
145     }
146
147     /// Sets this socket's TTL
148     #[experimental]
149     pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
150         self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error)
151     }
152
153     /// Sets the broadcast flag on or off
154     #[experimental]
155     pub fn set_broadast(&mut self, broadcast: bool) -> IoResult<()> {
156         if broadcast {
157             self.obj.hear_broadcasts()
158         } else {
159             self.obj.ignore_broadcasts()
160         }.map_err(IoError::from_rtio_error)
161     }
162
163     /// Sets the read/write timeout for this socket.
164     ///
165     /// For more information, see `TcpStream::set_timeout`
166     #[experimental = "the timeout argument may change in type and value"]
167     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
168         self.obj.set_timeout(timeout_ms)
169     }
170
171     /// Sets the read timeout for this socket.
172     ///
173     /// For more information, see `TcpStream::set_timeout`
174     #[experimental = "the timeout argument may change in type and value"]
175     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
176         self.obj.set_read_timeout(timeout_ms)
177     }
178
179     /// Sets the write timeout for this socket.
180     ///
181     /// For more information, see `TcpStream::set_timeout`
182     #[experimental = "the timeout argument may change in type and value"]
183     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
184         self.obj.set_write_timeout(timeout_ms)
185     }
186 }
187
188 impl Clone for UdpSocket {
189     /// Creates a new handle to this UDP socket, allowing for simultaneous
190     /// reads and writes of the socket.
191     ///
192     /// The underlying UDP socket will not be closed until all handles to the
193     /// socket have been deallocated. Two concurrent reads will not receive
194     /// the same data. Instead, the first read will receive the first packet
195     /// received, and the second read will receive the second packet.
196     fn clone(&self) -> UdpSocket {
197         UdpSocket {
198             obj: self.obj.clone(),
199         }
200     }
201 }
202
203 /// A type that allows convenient usage of a UDP stream connected to one
204 /// address via the `Reader` and `Writer` traits.
205 pub struct UdpStream {
206     socket: UdpSocket,
207     connected_to: SocketAddr
208 }
209
210 impl UdpStream {
211     /// Allows access to the underlying UDP socket owned by this stream. This
212     /// is useful to, for example, use the socket to send data to hosts other
213     /// than the one that this stream is connected to.
214     pub fn as_socket<T>(&mut self, f: |&mut UdpSocket| -> T) -> T {
215         f(&mut self.socket)
216     }
217
218     /// Consumes this UDP stream and returns out the underlying socket.
219     pub fn disconnect(self) -> UdpSocket {
220         self.socket
221     }
222 }
223
224 impl Reader for UdpStream {
225     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
226         let peer = self.connected_to;
227         self.as_socket(|sock| {
228             match sock.recvfrom(buf) {
229                 Ok((_nread, src)) if src != peer => Ok(0),
230                 Ok((nread, _src)) => Ok(nread),
231                 Err(e) => Err(e),
232             }
233         })
234     }
235 }
236
237 impl Writer for UdpStream {
238     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
239         let connected_to = self.connected_to;
240         self.as_socket(|sock| sock.sendto(buf, connected_to))
241     }
242 }
243
244 #[cfg(test)]
245 #[allow(experimental)]
246 mod test {
247     use super::*;
248     use io::net::ip::{SocketAddr};
249
250     // FIXME #11530 this fails on android because tests are run as root
251     iotest!(fn bind_error() {
252         let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
253         match UdpSocket::bind(addr) {
254             Ok(..) => fail!(),
255             Err(e) => assert_eq!(e.kind, PermissionDenied),
256         }
257     } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
258
259     iotest!(fn socket_smoke_test_ip4() {
260         let server_ip = next_test_ip4();
261         let client_ip = next_test_ip4();
262         let (tx1, rx1) = channel();
263         let (tx2, rx2) = channel();
264
265         spawn(proc() {
266             match UdpSocket::bind(client_ip) {
267                 Ok(ref mut client) => {
268                     rx1.recv();
269                     client.sendto([99], server_ip).unwrap()
270                 }
271                 Err(..) => fail!()
272             }
273             tx2.send(());
274         });
275
276         match UdpSocket::bind(server_ip) {
277             Ok(ref mut server) => {
278                 tx1.send(());
279                 let mut buf = [0];
280                 match server.recvfrom(buf) {
281                     Ok((nread, src)) => {
282                         assert_eq!(nread, 1);
283                         assert_eq!(buf[0], 99);
284                         assert_eq!(src, client_ip);
285                     }
286                     Err(..) => fail!()
287                 }
288             }
289             Err(..) => fail!()
290         }
291         rx2.recv();
292     })
293
294     iotest!(fn socket_smoke_test_ip6() {
295         let server_ip = next_test_ip6();
296         let client_ip = next_test_ip6();
297         let (tx, rx) = channel::<()>();
298
299         spawn(proc() {
300             match UdpSocket::bind(client_ip) {
301                 Ok(ref mut client) => {
302                     rx.recv();
303                     client.sendto([99], server_ip).unwrap()
304                 }
305                 Err(..) => fail!()
306             }
307         });
308
309         match UdpSocket::bind(server_ip) {
310             Ok(ref mut server) => {
311                 tx.send(());
312                 let mut buf = [0];
313                 match server.recvfrom(buf) {
314                     Ok((nread, src)) => {
315                         assert_eq!(nread, 1);
316                         assert_eq!(buf[0], 99);
317                         assert_eq!(src, client_ip);
318                     }
319                     Err(..) => fail!()
320                 }
321             }
322             Err(..) => fail!()
323         }
324     })
325
326     iotest!(fn stream_smoke_test_ip4() {
327         let server_ip = next_test_ip4();
328         let client_ip = next_test_ip4();
329         let (tx1, rx1) = channel();
330         let (tx2, rx2) = channel();
331
332         spawn(proc() {
333             match UdpSocket::bind(client_ip) {
334                 Ok(client) => {
335                     let client = box client;
336                     let mut stream = client.connect(server_ip);
337                     rx1.recv();
338                     stream.write([99]).unwrap();
339                 }
340                 Err(..) => fail!()
341             }
342             tx2.send(());
343         });
344
345         match UdpSocket::bind(server_ip) {
346             Ok(server) => {
347                 let server = box server;
348                 let mut stream = server.connect(client_ip);
349                 tx1.send(());
350                 let mut buf = [0];
351                 match stream.read(buf) {
352                     Ok(nread) => {
353                         assert_eq!(nread, 1);
354                         assert_eq!(buf[0], 99);
355                     }
356                     Err(..) => fail!()
357                 }
358             }
359             Err(..) => fail!()
360         }
361         rx2.recv();
362     })
363
364     iotest!(fn stream_smoke_test_ip6() {
365         let server_ip = next_test_ip6();
366         let client_ip = next_test_ip6();
367         let (tx1, rx1) = channel();
368         let (tx2, rx2) = channel();
369
370         spawn(proc() {
371             match UdpSocket::bind(client_ip) {
372                 Ok(client) => {
373                     let client = box client;
374                     let mut stream = client.connect(server_ip);
375                     rx1.recv();
376                     stream.write([99]).unwrap();
377                 }
378                 Err(..) => fail!()
379             }
380             tx2.send(());
381         });
382
383         match UdpSocket::bind(server_ip) {
384             Ok(server) => {
385                 let server = box server;
386                 let mut stream = server.connect(client_ip);
387                 tx1.send(());
388                 let mut buf = [0];
389                 match stream.read(buf) {
390                     Ok(nread) => {
391                         assert_eq!(nread, 1);
392                         assert_eq!(buf[0], 99);
393                     }
394                     Err(..) => fail!()
395                 }
396             }
397             Err(..) => fail!()
398         }
399         rx2.recv();
400     })
401
402     pub fn socket_name(addr: SocketAddr) {
403         let server = UdpSocket::bind(addr);
404
405         assert!(server.is_ok());
406         let mut server = server.unwrap();
407
408         // Make sure socket_name gives
409         // us the socket we binded to.
410         let so_name = server.socket_name();
411         assert!(so_name.is_ok());
412         assert_eq!(addr, so_name.unwrap());
413     }
414
415     iotest!(fn socket_name_ip4() {
416         socket_name(next_test_ip4());
417     })
418
419     iotest!(fn socket_name_ip6() {
420         socket_name(next_test_ip6());
421     })
422
423     iotest!(fn udp_clone_smoke() {
424         let addr1 = next_test_ip4();
425         let addr2 = next_test_ip4();
426         let mut sock1 = UdpSocket::bind(addr1).unwrap();
427         let sock2 = UdpSocket::bind(addr2).unwrap();
428
429         spawn(proc() {
430             let mut sock2 = sock2;
431             let mut buf = [0, 0];
432             assert_eq!(sock2.recvfrom(buf), Ok((1, addr1)));
433             assert_eq!(buf[0], 1);
434             sock2.sendto([2], addr1).unwrap();
435         });
436
437         let sock3 = sock1.clone();
438
439         let (tx1, rx1) = channel();
440         let (tx2, rx2) = channel();
441         spawn(proc() {
442             let mut sock3 = sock3;
443             rx1.recv();
444             sock3.sendto([1], addr2).unwrap();
445             tx2.send(());
446         });
447         tx1.send(());
448         let mut buf = [0, 0];
449         assert_eq!(sock1.recvfrom(buf), Ok((1, addr2)));
450         rx2.recv();
451     })
452
453     iotest!(fn udp_clone_two_read() {
454         let addr1 = next_test_ip4();
455         let addr2 = next_test_ip4();
456         let mut sock1 = UdpSocket::bind(addr1).unwrap();
457         let sock2 = UdpSocket::bind(addr2).unwrap();
458         let (tx1, rx) = channel();
459         let tx2 = tx1.clone();
460
461         spawn(proc() {
462             let mut sock2 = sock2;
463             sock2.sendto([1], addr1).unwrap();
464             rx.recv();
465             sock2.sendto([2], addr1).unwrap();
466             rx.recv();
467         });
468
469         let sock3 = sock1.clone();
470
471         let (done, rx) = channel();
472         spawn(proc() {
473             let mut sock3 = sock3;
474             let mut buf = [0, 0];
475             sock3.recvfrom(buf).unwrap();
476             tx2.send(());
477             done.send(());
478         });
479         let mut buf = [0, 0];
480         sock1.recvfrom(buf).unwrap();
481         tx1.send(());
482
483         rx.recv();
484     })
485
486     iotest!(fn udp_clone_two_write() {
487         let addr1 = next_test_ip4();
488         let addr2 = next_test_ip4();
489         let mut sock1 = UdpSocket::bind(addr1).unwrap();
490         let sock2 = UdpSocket::bind(addr2).unwrap();
491
492         let (tx, rx) = channel();
493         let (serv_tx, serv_rx) = channel();
494
495         spawn(proc() {
496             let mut sock2 = sock2;
497             let mut buf = [0, 1];
498
499             rx.recv();
500             match sock2.recvfrom(buf) {
501                 Ok(..) => {}
502                 Err(e) => fail!("failed receive: {}", e),
503             }
504             serv_tx.send(());
505         });
506
507         let sock3 = sock1.clone();
508
509         let (done, rx) = channel();
510         let tx2 = tx.clone();
511         spawn(proc() {
512             let mut sock3 = sock3;
513             match sock3.sendto([1], addr2) {
514                 Ok(..) => { let _ = tx2.send_opt(()); }
515                 Err(..) => {}
516             }
517             done.send(());
518         });
519         match sock1.sendto([2], addr2) {
520             Ok(..) => { let _ = tx.send_opt(()); }
521             Err(..) => {}
522         }
523         drop(tx);
524
525         rx.recv();
526         serv_rx.recv();
527     })
528
529     iotest!(fn recvfrom_timeout() {
530         let addr1 = next_test_ip4();
531         let addr2 = next_test_ip4();
532         let mut a = UdpSocket::bind(addr1).unwrap();
533
534         let (tx, rx) = channel();
535         let (tx2, rx2) = channel();
536         spawn(proc() {
537             let mut a = UdpSocket::bind(addr2).unwrap();
538             assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
539             assert_eq!(a.sendto([0], addr1), Ok(()));
540             rx.recv();
541             assert_eq!(a.sendto([0], addr1), Ok(()));
542
543             tx2.send(());
544         });
545
546         // Make sure that reads time out, but writes can continue
547         a.set_read_timeout(Some(20));
548         assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
549         assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
550         assert_eq!(a.sendto([0], addr2), Ok(()));
551
552         // Cloned handles should be able to block
553         let mut a2 = a.clone();
554         assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
555
556         // Clearing the timeout should allow for receiving
557         a.set_timeout(None);
558         tx.send(());
559         assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
560
561         // Make sure the child didn't die
562         rx2.recv();
563     })
564
565     iotest!(fn sendto_timeout() {
566         let addr1 = next_test_ip4();
567         let addr2 = next_test_ip4();
568         let mut a = UdpSocket::bind(addr1).unwrap();
569         let _b = UdpSocket::bind(addr2).unwrap();
570
571         a.set_write_timeout(Some(1000));
572         for _ in range(0, 100) {
573             match a.sendto([0, ..4*1024], addr2) {
574                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
575                 Err(IoError { kind: TimedOut, .. }) => break,
576                 Err(e) => fail!("other error: {}", e),
577             }
578         }
579     })
580 }