1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use libc::{c_int, c_uint, c_void};
22 use rt::io::net::ip::{SocketAddr, IpAddr};
23 use rt::io::{standard_error, OtherIoError};
26 use rt::sched::{Scheduler, SchedHandle};
29 use rt::uv::idle::IdleWatcher;
30 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
31 use unstable::sync::Exclusive;
33 #[cfg(test)] use container::Container;
34 #[cfg(test)] use unstable::run_in_bare_thread;
35 #[cfg(test)] use rt::test::{spawntask,
37 run_in_newsched_task};
38 #[cfg(test)] use iterator::{Iterator, range};
40 // XXX we should not be calling uvll functions in here.
43 fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
44 /* XXX This will move pinned tasks to do IO on the proper scheduler
45 * and then move them back to their home.
47 fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
48 use rt::sched::{PinnedTask, TaskFromFriend};
50 let old_home = Cell::new_empty();
51 let old_home_ptr = &old_home;
52 let scheduler = Local::take::<Scheduler>();
53 do scheduler.deschedule_running_task_and_then |_, task| {
54 // get the old home first
55 do task.wake().map_move |mut task| {
56 old_home_ptr.put_back(task.take_unwrap_home());
57 self.home().send(PinnedTask(task));
65 let scheduler = Local::take::<Scheduler>();
66 do scheduler.deschedule_running_task_and_then |scheduler, task| {
67 do task.wake().map_move |mut task| {
68 task.give_home(old_home.take());
69 scheduler.make_handle().send(TaskFromFriend(task));
73 // return the result of the IO
78 // get a handle for the current scheduler
79 macro_rules! get_handle_to_current_scheduler(
80 () => (do Local::borrow::<Scheduler, SchedHandle> |sched| { sched.make_handle() })
89 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
90 handle: U) -> Result<SocketAddr, IoError> {
91 let getsockname = match sk {
92 TcpPeer => uvll::tcp_getpeername,
93 Tcp => uvll::tcp_getsockname,
94 Udp => uvll::udp_getsockname,
97 // Allocate a sockaddr_storage
98 // since we don't know if it's ipv4 or ipv6
99 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
102 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
106 let status = status_to_maybe_uv_error(handle, r);
107 return Err(uv_error_to_io_error(status.unwrap()));
111 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
112 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
114 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
118 unsafe { uvll::free_sockaddr_storage(r_addr); }
124 // Obviously an Event Loop is always home.
125 pub struct UvEventLoop {
130 pub fn new() -> UvEventLoop {
132 uvio: UvIoFactory(Loop::new())
137 impl Drop for UvEventLoop {
139 // XXX: Need mutable finalizer
141 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
143 this.uvio.uv_loop().close();
147 impl EventLoop for UvEventLoop {
149 self.uvio.uv_loop().run();
152 fn callback(&mut self, f: ~fn()) {
153 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
154 do idle_watcher.start |mut idle_watcher, status| {
155 assert!(status.is_none());
157 idle_watcher.close(||());
162 fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
163 let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
164 return ~UvPausibleIdleCallback {
165 watcher: idle_watcher,
171 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
172 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
173 do timer.start(ms, 0) |timer, status| {
174 assert!(status.is_none());
180 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
181 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
184 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
189 pub struct UvPausibleIdleCallback {
190 watcher: IdleWatcher,
195 impl UvPausibleIdleCallback {
197 pub fn start(&mut self, f: ~fn()) {
198 do self.watcher.start |_idle_watcher, _status| {
201 self.idle_flag = true;
204 pub fn pause(&mut self) {
205 if self.idle_flag == true {
207 self.idle_flag = false;
211 pub fn resume(&mut self) {
212 if self.idle_flag == false {
213 self.watcher.restart();
214 self.idle_flag = true;
218 pub fn close(&mut self) {
222 self.watcher.close(||{});
228 fn test_callback_run_once() {
229 do run_in_bare_thread {
230 let mut event_loop = UvEventLoop::new();
232 let count_ptr: *mut int = &mut count;
233 do event_loop.callback {
234 unsafe { *count_ptr += 1 }
237 assert_eq!(count, 1);
241 // The entire point of async is to call into a loop from other threads so it does not need to home.
242 pub struct UvRemoteCallback {
243 // The uv async handle for triggering the callback
245 // A flag to tell the callback to exit, set from the dtor. This is
246 // almost never contested - only in rare races with the dtor.
247 exit_flag: Exclusive<bool>
250 impl UvRemoteCallback {
251 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
252 let exit_flag = Exclusive::new(false);
253 let exit_flag_clone = exit_flag.clone();
254 let async = do AsyncWatcher::new(loop_) |watcher, status| {
255 assert!(status.is_none());
257 // The synchronization logic here is subtle. To review,
258 // the uv async handle type promises that, after it is
259 // triggered the remote callback is definitely called at
260 // least once. UvRemoteCallback needs to maintain those
261 // semantics while also shutting down cleanly from the
262 // dtor. In our case that means that, when the
263 // UvRemoteCallback dtor calls `async.send()`, here `f` is
264 // always called later.
266 // In the dtor both the exit flag is set and the async
267 // callback fired under a lock. Here, before calling `f`,
268 // we take the lock and check the flag. Because we are
269 // checking the flag before calling `f`, and the flag is
270 // set under the same lock as the send, then if the flag
271 // is set then we're guaranteed to call `f` after the
274 // If the check was done after `f()` then there would be a
275 // period between that call and the check where the dtor
276 // could be called in the other thread, missing the final
277 // callback while still destroying the handle.
279 let should_exit = unsafe {
280 exit_flag_clone.with_imm(|&should_exit| should_exit)
297 impl RemoteCallback for UvRemoteCallback {
298 fn fire(&mut self) { self.async.send() }
301 impl Drop for UvRemoteCallback {
304 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
305 do this.exit_flag.with |should_exit| {
306 // NB: These two things need to happen atomically. Otherwise
307 // the event handler could wake up due to a *previous*
308 // signal and see the exit flag, destroying the handle
309 // before the final send.
321 use rt::thread::Thread;
323 use rt::rtio::EventLoop;
324 use rt::local::Local;
325 use rt::sched::Scheduler;
328 fn test_uv_remote() {
329 do run_in_newsched_task {
330 let mut tube = Tube::new();
331 let tube_clone = tube.clone();
332 let remote_cell = Cell::new_empty();
333 do Local::borrow::<Scheduler, ()>() |sched| {
334 let tube_clone = tube_clone.clone();
335 let tube_clone_cell = Cell::new(tube_clone);
336 let remote = do sched.event_loop.remote_callback {
337 // This could be called multiple times
338 if !tube_clone_cell.is_empty() {
339 tube_clone_cell.take().send(1);
342 remote_cell.put_back(remote);
344 let thread = do Thread::start {
345 remote_cell.take().fire();
348 assert!(tube.recv() == 1);
354 pub struct UvIoFactory(Loop);
357 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
358 match self { &UvIoFactory(ref mut ptr) => ptr }
362 impl IoFactory for UvIoFactory {
363 // Connect to an address and return a new stream
364 // NB: This blocks the task waiting on the connection.
365 // It would probably be better to return a future
366 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
367 // Create a cell in the task to hold the result. We will fill
368 // the cell before resuming the task.
369 let result_cell = Cell::new_empty();
370 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
372 // Block this task and take ownership, switch to scheduler context
373 let scheduler = Local::take::<Scheduler>();
374 do scheduler.deschedule_running_task_and_then |_, task| {
376 let mut tcp = TcpWatcher::new(self.uv_loop());
377 let task_cell = Cell::new(task);
379 // Wait for a connection
380 do tcp.connect(addr) |stream, status| {
383 let tcp = NativeHandle::from_native_handle(stream.native_handle());
384 let home = get_handle_to_current_scheduler!();
385 let res = Ok(~UvTcpStream { watcher: tcp, home: home });
387 // Store the stream in the task's stack
388 unsafe { (*result_cell_ptr).put_back(res); }
391 let scheduler = Local::take::<Scheduler>();
392 scheduler.resume_blocked_task_immediately(task_cell.take());
395 let task_cell = Cell::new(task_cell.take());
397 let res = Err(uv_error_to_io_error(status.unwrap()));
398 unsafe { (*result_cell_ptr).put_back(res); }
399 let scheduler = Local::take::<Scheduler>();
400 scheduler.resume_blocked_task_immediately(task_cell.take());
407 assert!(!result_cell.is_empty());
408 return result_cell.take();
411 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
412 let mut watcher = TcpWatcher::new(self.uv_loop());
413 match watcher.bind(addr) {
415 let home = get_handle_to_current_scheduler!();
416 Ok(~UvTcpListener::new(watcher, home))
419 let scheduler = Local::take::<Scheduler>();
420 do scheduler.deschedule_running_task_and_then |_, task| {
421 let task_cell = Cell::new(task);
422 do watcher.as_stream().close {
423 let scheduler = Local::take::<Scheduler>();
424 scheduler.resume_blocked_task_immediately(task_cell.take());
427 Err(uv_error_to_io_error(uverr))
432 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
433 let mut watcher = UdpWatcher::new(self.uv_loop());
434 match watcher.bind(addr) {
436 let home = get_handle_to_current_scheduler!();
437 Ok(~UvUdpSocket { watcher: watcher, home: home })
440 let scheduler = Local::take::<Scheduler>();
441 do scheduler.deschedule_running_task_and_then |_, task| {
442 let task_cell = Cell::new(task);
444 let scheduler = Local::take::<Scheduler>();
445 scheduler.resume_blocked_task_immediately(task_cell.take());
448 Err(uv_error_to_io_error(uverr))
453 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
454 let watcher = TimerWatcher::new(self.uv_loop());
455 let home = get_handle_to_current_scheduler!();
456 Ok(~UvTimer::new(watcher, home))
460 pub struct UvTcpListener {
463 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
467 impl HomingIO for UvTcpListener {
468 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
472 fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
476 incoming_streams: Tube::new(),
481 fn watcher(&self) -> TcpWatcher { self.watcher }
484 impl Drop for UvTcpListener {
486 // XXX need mutable finalizer
487 let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
488 do self_.home_for_io |self_| {
489 let scheduler = Local::take::<Scheduler>();
490 do scheduler.deschedule_running_task_and_then |_, task| {
491 let task_cell = Cell::new(task);
492 do self_.watcher().as_stream().close {
493 let scheduler = Local::take::<Scheduler>();
494 scheduler.resume_blocked_task_immediately(task_cell.take());
501 impl RtioSocket for UvTcpListener {
502 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
503 do self.home_for_io |self_| {
504 socket_name(Tcp, self_.watcher)
509 impl RtioTcpListener for UvTcpListener {
511 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
512 do self.home_for_io |self_| {
514 if !self_.listening {
515 self_.listening = true;
517 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
519 do self_.watcher().listen |mut server, status| {
520 let stream = match status {
521 Some(_) => Err(standard_error(OtherIoError)),
523 let client = TcpWatcher::new(&server.event_loop());
524 // XXX: needs to be surfaced in interface
525 server.accept(client.as_stream());
526 let home = get_handle_to_current_scheduler!();
527 Ok(~UvTcpStream { watcher: client, home: home })
531 let mut incoming_streams = incoming_streams_cell.take();
532 incoming_streams.send(stream);
533 incoming_streams_cell.put_back(incoming_streams);
537 self_.incoming_streams.recv()
541 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
542 do self.home_for_io |self_| {
544 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
547 match status_to_maybe_uv_error(self_.watcher(), r) {
548 Some(err) => Err(uv_error_to_io_error(err)),
554 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
555 do self.home_for_io |self_| {
557 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
560 match status_to_maybe_uv_error(self_.watcher(), r) {
561 Some(err) => Err(uv_error_to_io_error(err)),
568 pub struct UvTcpStream {
573 impl HomingIO for UvTcpStream {
574 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
577 impl Drop for UvTcpStream {
579 // XXX need mutable finalizer
580 let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
581 do this.home_for_io |self_| {
582 let scheduler = Local::take::<Scheduler>();
583 do scheduler.deschedule_running_task_and_then |_, task| {
584 let task_cell = Cell::new(task);
585 do self_.watcher.as_stream().close {
586 let scheduler = Local::take::<Scheduler>();
587 scheduler.resume_blocked_task_immediately(task_cell.take());
594 impl RtioSocket for UvTcpStream {
595 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
596 do self.home_for_io |self_| {
597 socket_name(Tcp, self_.watcher)
602 impl RtioTcpStream for UvTcpStream {
603 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
604 do self.home_for_io |self_| {
605 let result_cell = Cell::new_empty();
606 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
608 let scheduler = Local::take::<Scheduler>();
609 let buf_ptr: *&mut [u8] = &buf;
610 do scheduler.deschedule_running_task_and_then |_sched, task| {
611 let task_cell = Cell::new(task);
612 // XXX: We shouldn't reallocate these callbacks every
614 let alloc: AllocCallback = |_| unsafe {
615 slice_to_uv_buf(*buf_ptr)
617 let mut watcher = self_.watcher.as_stream();
618 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
620 // Stop reading so that no read callbacks are
621 // triggered before the user calls `read` again.
622 // XXX: Is there a performance impact to calling
626 let result = if status.is_none() {
630 Err(uv_error_to_io_error(status.unwrap()))
633 unsafe { (*result_cell_ptr).put_back(result); }
635 let scheduler = Local::take::<Scheduler>();
636 scheduler.resume_blocked_task_immediately(task_cell.take());
640 assert!(!result_cell.is_empty());
645 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
646 do self.home_for_io |self_| {
647 let result_cell = Cell::new_empty();
648 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
649 let scheduler = Local::take::<Scheduler>();
650 let buf_ptr: *&[u8] = &buf;
651 do scheduler.deschedule_running_task_and_then |_, task| {
652 let task_cell = Cell::new(task);
653 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
654 let mut watcher = self_.watcher.as_stream();
655 do watcher.write(buf) |_watcher, status| {
656 let result = if status.is_none() {
659 Err(uv_error_to_io_error(status.unwrap()))
662 unsafe { (*result_cell_ptr).put_back(result); }
664 let scheduler = Local::take::<Scheduler>();
665 scheduler.resume_blocked_task_immediately(task_cell.take());
669 assert!(!result_cell.is_empty());
674 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
675 do self.home_for_io |self_| {
676 socket_name(TcpPeer, self_.watcher)
680 fn control_congestion(&mut self) -> Result<(), IoError> {
681 do self.home_for_io |self_| {
682 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
684 match status_to_maybe_uv_error(self_.watcher, r) {
685 Some(err) => Err(uv_error_to_io_error(err)),
691 fn nodelay(&mut self) -> Result<(), IoError> {
692 do self.home_for_io |self_| {
693 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
695 match status_to_maybe_uv_error(self_.watcher, r) {
696 Some(err) => Err(uv_error_to_io_error(err)),
702 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
703 do self.home_for_io |self_| {
705 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
706 delay_in_seconds as c_uint)
709 match status_to_maybe_uv_error(self_.watcher, r) {
710 Some(err) => Err(uv_error_to_io_error(err)),
716 fn letdie(&mut self) -> Result<(), IoError> {
717 do self.home_for_io |self_| {
719 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
722 match status_to_maybe_uv_error(self_.watcher, r) {
723 Some(err) => Err(uv_error_to_io_error(err)),
730 pub struct UvUdpSocket {
735 impl HomingIO for UvUdpSocket {
736 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
739 impl Drop for UvUdpSocket {
741 // XXX need mutable finalizer
742 let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
743 do this.home_for_io |_| {
744 let scheduler = Local::take::<Scheduler>();
745 do scheduler.deschedule_running_task_and_then |_, task| {
746 let task_cell = Cell::new(task);
747 do this.watcher.close {
748 let scheduler = Local::take::<Scheduler>();
749 scheduler.resume_blocked_task_immediately(task_cell.take());
756 impl RtioSocket for UvUdpSocket {
757 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
758 do self.home_for_io |self_| {
759 socket_name(Udp, self_.watcher)
764 impl RtioUdpSocket for UvUdpSocket {
765 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
766 do self.home_for_io |self_| {
767 let result_cell = Cell::new_empty();
768 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
770 let scheduler = Local::take::<Scheduler>();
771 let buf_ptr: *&mut [u8] = &buf;
772 do scheduler.deschedule_running_task_and_then |_, task| {
773 let task_cell = Cell::new(task);
774 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
775 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
776 let _ = flags; // /XXX add handling for partials?
780 let result = match status {
783 Ok((nread as uint, addr))
785 Some(err) => Err(uv_error_to_io_error(err)),
788 unsafe { (*result_cell_ptr).put_back(result); }
790 let scheduler = Local::take::<Scheduler>();
791 scheduler.resume_blocked_task_immediately(task_cell.take());
795 assert!(!result_cell.is_empty());
800 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
801 do self.home_for_io |self_| {
802 let result_cell = Cell::new_empty();
803 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
804 let scheduler = Local::take::<Scheduler>();
805 let buf_ptr: *&[u8] = &buf;
806 do scheduler.deschedule_running_task_and_then |_, task| {
807 let task_cell = Cell::new(task);
808 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
809 do self_.watcher.send(buf, dst) |_watcher, status| {
811 let result = match status {
813 Some(err) => Err(uv_error_to_io_error(err)),
816 unsafe { (*result_cell_ptr).put_back(result); }
818 let scheduler = Local::take::<Scheduler>();
819 scheduler.resume_blocked_task_immediately(task_cell.take());
823 assert!(!result_cell.is_empty());
828 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
829 do self.home_for_io |self_| {
831 do multi.to_str().with_c_str |m_addr| {
832 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
833 ptr::null(), uvll::UV_JOIN_GROUP)
837 match status_to_maybe_uv_error(self_.watcher, r) {
838 Some(err) => Err(uv_error_to_io_error(err)),
844 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
845 do self.home_for_io |self_| {
847 do multi.to_str().with_c_str |m_addr| {
848 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
849 ptr::null(), uvll::UV_LEAVE_GROUP)
853 match status_to_maybe_uv_error(self_.watcher, r) {
854 Some(err) => Err(uv_error_to_io_error(err)),
860 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
861 do self.home_for_io |self_| {
864 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
867 match status_to_maybe_uv_error(self_.watcher, r) {
868 Some(err) => Err(uv_error_to_io_error(err)),
874 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
875 do self.home_for_io |self_| {
878 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
881 match status_to_maybe_uv_error(self_.watcher, r) {
882 Some(err) => Err(uv_error_to_io_error(err)),
888 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
889 do self.home_for_io |self_| {
892 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
895 match status_to_maybe_uv_error(self_.watcher, r) {
896 Some(err) => Err(uv_error_to_io_error(err)),
902 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
903 do self.home_for_io |self_| {
906 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
909 match status_to_maybe_uv_error(self_.watcher, r) {
910 Some(err) => Err(uv_error_to_io_error(err)),
916 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
917 do self.home_for_io |self_| {
920 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
923 match status_to_maybe_uv_error(self_.watcher, r) {
924 Some(err) => Err(uv_error_to_io_error(err)),
930 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
931 do self.home_for_io |self_| {
934 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
937 match status_to_maybe_uv_error(self_.watcher, r) {
938 Some(err) => Err(uv_error_to_io_error(err)),
946 watcher: timer::TimerWatcher,
950 impl HomingIO for UvTimer {
951 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
955 fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
956 UvTimer { watcher: w, home: home }
960 impl Drop for UvTimer {
962 let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
963 do self_.home_for_io |self_| {
964 rtdebug!("closing UvTimer");
965 let scheduler = Local::take::<Scheduler>();
966 do scheduler.deschedule_running_task_and_then |_, task| {
967 let task_cell = Cell::new(task);
968 do self_.watcher.close {
969 let scheduler = Local::take::<Scheduler>();
970 scheduler.resume_blocked_task_immediately(task_cell.take());
977 impl RtioTimer for UvTimer {
978 fn sleep(&mut self, msecs: u64) {
979 do self.home_for_io |self_| {
980 let scheduler = Local::take::<Scheduler>();
981 do scheduler.deschedule_running_task_and_then |_sched, task| {
982 rtdebug!("sleep: entered scheduler context");
983 let task_cell = Cell::new(task);
984 do self_.watcher.start(msecs, 0) |_, status| {
985 assert!(status.is_none());
986 let scheduler = Local::take::<Scheduler>();
987 scheduler.resume_blocked_task_immediately(task_cell.take());
990 self_.watcher.stop();
996 fn test_simple_io_no_connect() {
997 do run_in_newsched_task {
999 let io = Local::unsafe_borrow::<IoFactoryObject>();
1000 let addr = next_test_ip4();
1001 let maybe_chan = (*io).tcp_connect(addr);
1002 assert!(maybe_chan.is_err());
1008 fn test_simple_udp_io_bind_only() {
1009 do run_in_newsched_task {
1011 let io = Local::unsafe_borrow::<IoFactoryObject>();
1012 let addr = next_test_ip4();
1013 let maybe_socket = (*io).udp_bind(addr);
1014 assert!(maybe_socket.is_ok());
1020 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1021 use rt::sleeper_list::SleeperList;
1022 use rt::work_queue::WorkQueue;
1023 use rt::thread::Thread;
1025 use rt::sched::{Shutdown, TaskFromFriend};
1026 do run_in_bare_thread {
1027 let sleepers = SleeperList::new();
1028 let work_queue1 = WorkQueue::new();
1029 let work_queue2 = WorkQueue::new();
1030 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1032 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1034 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1037 let handle1 = Cell::new(sched1.make_handle());
1038 let handle2 = Cell::new(sched2.make_handle());
1039 let tasksFriendHandle = Cell::new(sched2.make_handle());
1041 let on_exit: ~fn(bool) = |exit_status| {
1042 handle1.take().send(Shutdown);
1043 handle2.take().send(Shutdown);
1044 rtassert!(exit_status);
1047 let test_function: ~fn() = || {
1048 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1049 let addr = next_test_ip4();
1050 let maybe_socket = unsafe { (*io).udp_bind(addr) };
1051 // this socket is bound to this event loop
1052 assert!(maybe_socket.is_ok());
1054 // block self on sched1
1055 let scheduler = Local::take::<Scheduler>();
1056 do scheduler.deschedule_running_task_and_then |_, task| {
1058 do task.wake().map_move |task| {
1059 // send self to sched2
1060 tasksFriendHandle.take().send(TaskFromFriend(task));
1062 // sched1 should now sleep since it has nothing else to do
1064 // sched2 will wake up and get the task
1065 // as we do nothing else, the function ends and the socket goes out of scope
1066 // sched2 will start to run the destructor
1067 // the destructor will first block the task, set it's home as sched1, then enqueue it
1068 // sched2 will dequeue the task, see that it has a home, and send it to sched1
1069 // sched1 will wake up, exec the close function on the correct loop, and then we're done
1072 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1073 main_task.death.on_exit = Some(on_exit);
1074 let main_task = Cell::new(main_task);
1076 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1078 let sched1 = Cell::new(sched1);
1079 let sched2 = Cell::new(sched2);
1081 let thread1 = do Thread::start {
1082 sched1.take().bootstrap(main_task.take());
1084 let thread2 = do Thread::start {
1085 sched2.take().bootstrap(null_task.take());
1094 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1095 use rt::sleeper_list::SleeperList;
1096 use rt::work_queue::WorkQueue;
1097 use rt::thread::Thread;
1099 use rt::comm::oneshot;
1100 use rt::sched::Shutdown;
1101 do run_in_bare_thread {
1102 let sleepers = SleeperList::new();
1103 let work_queue1 = WorkQueue::new();
1104 let work_queue2 = WorkQueue::new();
1105 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1107 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1109 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1112 let handle1 = Cell::new(sched1.make_handle());
1113 let handle2 = Cell::new(sched2.make_handle());
1115 let (port, chan) = oneshot();
1116 let port = Cell::new(port);
1117 let chan = Cell::new(chan);
1119 let body1: ~fn() = || {
1120 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1121 let addr = next_test_ip4();
1122 let socket = unsafe { (*io).udp_bind(addr) };
1123 assert!(socket.is_ok());
1124 chan.take().send(socket);
1127 let body2: ~fn() = || {
1128 let socket = port.take().recv();
1129 assert!(socket.is_ok());
1130 /* The socket goes out of scope and the destructor is called.
1132 * - sends itself back to sched1
1133 * - frees the socket
1134 * - resets the home of the task to whatever it was previously
1138 let on_exit: ~fn(bool) = |exit| {
1139 handle1.take().send(Shutdown);
1140 handle2.take().send(Shutdown);
1144 let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1146 let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1147 task2.death.on_exit = Some(on_exit);
1148 let task2 = Cell::new(task2);
1150 let sched1 = Cell::new(sched1);
1151 let sched2 = Cell::new(sched2);
1153 let thread1 = do Thread::start {
1154 sched1.take().bootstrap(task1.take());
1156 let thread2 = do Thread::start {
1157 sched2.take().bootstrap(task2.take());
1166 fn test_simple_tcp_server_and_client() {
1167 do run_in_newsched_task {
1168 let addr = next_test_ip4();
1170 // Start the server first so it's listening when we connect
1173 let io = Local::unsafe_borrow::<IoFactoryObject>();
1174 let mut listener = (*io).tcp_bind(addr).unwrap();
1175 let mut stream = listener.accept().unwrap();
1176 let mut buf = [0, .. 2048];
1177 let nread = stream.read(buf).unwrap();
1178 assert_eq!(nread, 8);
1179 for i in range(0u, nread) {
1180 rtdebug!("%u", buf[i] as uint);
1181 assert_eq!(buf[i], i as u8);
1188 let io = Local::unsafe_borrow::<IoFactoryObject>();
1189 let mut stream = (*io).tcp_connect(addr).unwrap();
1190 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1197 fn test_simple_tcp_server_and_client_on_diff_threads() {
1198 use rt::sleeper_list::SleeperList;
1199 use rt::work_queue::WorkQueue;
1200 use rt::thread::Thread;
1202 use rt::sched::{Shutdown};
1203 do run_in_bare_thread {
1204 let sleepers = SleeperList::new();
1206 let server_addr = next_test_ip4();
1207 let client_addr = server_addr.clone();
1209 let server_work_queue = WorkQueue::new();
1210 let client_work_queue = WorkQueue::new();
1211 let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1213 let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1214 queues.clone(), sleepers.clone());
1215 let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1216 queues.clone(), sleepers.clone());
1218 let server_handle = Cell::new(server_sched.make_handle());
1219 let client_handle = Cell::new(client_sched.make_handle());
1221 let server_on_exit: ~fn(bool) = |exit_status| {
1222 server_handle.take().send(Shutdown);
1223 rtassert!(exit_status);
1226 let client_on_exit: ~fn(bool) = |exit_status| {
1227 client_handle.take().send(Shutdown);
1228 rtassert!(exit_status);
1231 let server_fn: ~fn() = || {
1232 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1233 let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1234 let mut stream = listener.accept().unwrap();
1235 let mut buf = [0, .. 2048];
1236 let nread = stream.read(buf).unwrap();
1237 assert_eq!(nread, 8);
1238 for i in range(0u, nread) {
1239 assert_eq!(buf[i], i as u8);
1243 let client_fn: ~fn() = || {
1244 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1245 let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1246 while stream.is_err() {
1247 stream = unsafe { (*io).tcp_connect(client_addr) };
1249 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1252 let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1253 server_task.death.on_exit = Some(server_on_exit);
1254 let server_task = Cell::new(server_task);
1256 let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1257 client_task.death.on_exit = Some(client_on_exit);
1258 let client_task = Cell::new(client_task);
1260 let server_sched = Cell::new(server_sched);
1261 let client_sched = Cell::new(client_sched);
1263 let server_thread = do Thread::start {
1264 server_sched.take().bootstrap(server_task.take());
1266 let client_thread = do Thread::start {
1267 client_sched.take().bootstrap(client_task.take());
1270 server_thread.join();
1271 client_thread.join();
1276 fn test_simple_udp_server_and_client() {
1277 do run_in_newsched_task {
1278 let server_addr = next_test_ip4();
1279 let client_addr = next_test_ip4();
1283 let io = Local::unsafe_borrow::<IoFactoryObject>();
1284 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1285 let mut buf = [0, .. 2048];
1286 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1287 assert_eq!(nread, 8);
1288 for i in range(0u, nread) {
1289 rtdebug!("%u", buf[i] as uint);
1290 assert_eq!(buf[i], i as u8);
1292 assert_eq!(src, client_addr);
1298 let io = Local::unsafe_borrow::<IoFactoryObject>();
1299 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1300 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1306 #[test] #[ignore(reason = "busted")]
1307 fn test_read_and_block() {
1308 do run_in_newsched_task {
1309 let addr = next_test_ip4();
1312 let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
1313 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1314 let mut stream = listener.accept().unwrap();
1315 let mut buf = [0, .. 2048];
1318 let mut current = 0;
1321 while current < expected {
1322 let nread = stream.read(buf).unwrap();
1323 for i in range(0u, nread) {
1324 let val = buf[i] as uint;
1325 assert_eq!(val, current % 8);
1330 let scheduler = Local::take::<Scheduler>();
1331 // Yield to the other task in hopes that it
1332 // will trigger a read callback while we are
1334 do scheduler.deschedule_running_task_and_then |sched, task| {
1335 let task = Cell::new(task);
1336 sched.enqueue_blocked_task(task.take());
1340 // Make sure we had multiple reads
1346 let io = Local::unsafe_borrow::<IoFactoryObject>();
1347 let mut stream = (*io).tcp_connect(addr).unwrap();
1348 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1349 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1350 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1351 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1359 fn test_read_read_read() {
1360 do run_in_newsched_task {
1361 let addr = next_test_ip4();
1362 static MAX: uint = 500000;
1366 let io = Local::unsafe_borrow::<IoFactoryObject>();
1367 let mut listener = (*io).tcp_bind(addr).unwrap();
1368 let mut stream = listener.accept().unwrap();
1369 let buf = [1, .. 2048];
1370 let mut total_bytes_written = 0;
1371 while total_bytes_written < MAX {
1373 total_bytes_written += buf.len();
1380 let io = Local::unsafe_borrow::<IoFactoryObject>();
1381 let mut stream = (*io).tcp_connect(addr).unwrap();
1382 let mut buf = [0, .. 2048];
1383 let mut total_bytes_read = 0;
1384 while total_bytes_read < MAX {
1385 let nread = stream.read(buf).unwrap();
1386 rtdebug!("read %u bytes", nread as uint);
1387 total_bytes_read += nread;
1388 for i in range(0u, nread) {
1389 assert_eq!(buf[i], 1);
1392 rtdebug!("read %u bytes total", total_bytes_read as uint);
1399 fn test_udp_twice() {
1400 do run_in_newsched_task {
1401 let server_addr = next_test_ip4();
1402 let client_addr = next_test_ip4();
1406 let io = Local::unsafe_borrow::<IoFactoryObject>();
1407 let mut client = (*io).udp_bind(client_addr).unwrap();
1408 assert!(client.sendto([1], server_addr).is_ok());
1409 assert!(client.sendto([2], server_addr).is_ok());
1415 let io = Local::unsafe_borrow::<IoFactoryObject>();
1416 let mut server = (*io).udp_bind(server_addr).unwrap();
1419 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1420 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1421 assert_eq!(nread1, 1);
1422 assert_eq!(nread2, 1);
1423 assert_eq!(src1, client_addr);
1424 assert_eq!(src2, client_addr);
1425 assert_eq!(buf1[0], 1);
1426 assert_eq!(buf2[0], 2);
1433 fn test_udp_many_read() {
1434 do run_in_newsched_task {
1435 let server_out_addr = next_test_ip4();
1436 let server_in_addr = next_test_ip4();
1437 let client_out_addr = next_test_ip4();
1438 let client_in_addr = next_test_ip4();
1439 static MAX: uint = 500_000;
1443 let io = Local::unsafe_borrow::<IoFactoryObject>();
1444 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1445 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1446 let msg = [1, .. 2048];
1447 let mut total_bytes_sent = 0;
1451 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1452 total_bytes_sent += msg.len();
1453 // check if the client has received enough
1454 let res = server_in.recvfrom(buf);
1455 assert!(res.is_ok());
1456 let (nread, src) = res.unwrap();
1457 assert_eq!(nread, 1);
1458 assert_eq!(src, client_out_addr);
1460 assert!(total_bytes_sent >= MAX);
1466 let io = Local::unsafe_borrow::<IoFactoryObject>();
1467 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1468 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1469 let mut total_bytes_recv = 0;
1470 let mut buf = [0, .. 2048];
1471 while total_bytes_recv < MAX {
1473 assert!(client_out.sendto([1], server_in_addr).is_ok());
1475 let res = client_in.recvfrom(buf);
1476 assert!(res.is_ok());
1477 let (nread, src) = res.unwrap();
1478 assert_eq!(src, server_out_addr);
1479 total_bytes_recv += nread;
1480 for i in range(0u, nread) {
1481 assert_eq!(buf[i], 1);
1484 // tell the server we're done
1485 assert!(client_out.sendto([0], server_in_addr).is_ok());
1492 fn test_timer_sleep_simple() {
1493 do run_in_newsched_task {
1495 let io = Local::unsafe_borrow::<IoFactoryObject>();
1496 let timer = (*io).timer_init();
1497 do timer.map_move |mut t| { t.sleep(1) };