1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use libc::{c_int, c_uint, c_void};
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
27 use rt::sched::{Scheduler, SchedHandle};
30 use rt::uv::idle::IdleWatcher;
31 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
32 use unstable::sync::Exclusive;
33 use super::super::io::support::PathLike;
34 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
36 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
37 CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
40 #[cfg(test)] use container::Container;
41 #[cfg(test)] use unstable::run_in_bare_thread;
42 #[cfg(test)] use rt::test::{spawntask,
44 run_in_newsched_task};
45 #[cfg(test)] use iterator::{Iterator, range};
47 // XXX we should not be calling uvll functions in here.
50 fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
51 /* XXX This will move pinned tasks to do IO on the proper scheduler
52 * and then move them back to their home.
54 fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
55 use rt::sched::{PinnedTask, TaskFromFriend};
57 let old_home = Cell::new_empty();
58 let old_home_ptr = &old_home;
59 do task::unkillable { // FIXME(#8674)
60 let scheduler: ~Scheduler = Local::take();
61 do scheduler.deschedule_running_task_and_then |_, task| {
62 // get the old home first
63 do task.wake().map_move |mut task| {
64 old_home_ptr.put_back(task.take_unwrap_home());
65 self.home().send(PinnedTask(task));
74 do task::unkillable { // FIXME(#8674)
75 let scheduler: ~Scheduler = Local::take();
76 do scheduler.deschedule_running_task_and_then |scheduler, task| {
77 do task.wake().map_move |mut task| {
78 task.give_home(old_home.take());
79 scheduler.make_handle().send(TaskFromFriend(task));
84 // return the result of the IO
88 fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
89 use rt::sched::{PinnedTask, TaskFromFriend};
91 do task::unkillable { // FIXME(#8674)
93 let old_home = Cell::new_empty();
94 let old_home_ptr = &old_home;
95 let scheduler: ~Scheduler = Local::take();
96 do scheduler.deschedule_running_task_and_then |_, task| {
97 // get the old home first
98 do task.wake().map_move |mut task| {
99 old_home_ptr.put_back(task.take_unwrap_home());
100 self.home().send(PinnedTask(task));
105 let scheduler: ~Scheduler = Local::take();
106 let a = io_sched(self, scheduler);
109 let scheduler: ~Scheduler = Local::take();
110 do scheduler.deschedule_running_task_and_then |scheduler, task| {
111 do task.wake().map_move |mut task| {
112 task.give_home(old_home.take());
113 scheduler.make_handle().send(TaskFromFriend(task));
117 // return the result of the IO
123 // get a handle for the current scheduler
124 macro_rules! get_handle_to_current_scheduler(
125 () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
128 enum SocketNameKind {
134 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
135 handle: U) -> Result<SocketAddr, IoError> {
136 let getsockname = match sk {
137 TcpPeer => uvll::tcp_getpeername,
138 Tcp => uvll::tcp_getsockname,
139 Udp => uvll::udp_getsockname,
142 // Allocate a sockaddr_storage
143 // since we don't know if it's ipv4 or ipv6
144 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
147 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
151 let status = status_to_maybe_uv_error(handle, r);
152 return Err(uv_error_to_io_error(status.unwrap()));
156 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
157 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
159 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
163 unsafe { uvll::free_sockaddr_storage(r_addr); }
169 // Obviously an Event Loop is always home.
170 pub struct UvEventLoop {
175 pub fn new() -> UvEventLoop {
177 uvio: UvIoFactory(Loop::new())
182 impl Drop for UvEventLoop {
184 // XXX: Need mutable finalizer
186 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
188 this.uvio.uv_loop().close();
192 impl EventLoop for UvEventLoop {
194 self.uvio.uv_loop().run();
197 fn callback(&mut self, f: ~fn()) {
198 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
199 do idle_watcher.start |mut idle_watcher, status| {
200 assert!(status.is_none());
202 idle_watcher.close(||());
207 fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
208 let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
209 return ~UvPausibleIdleCallback {
210 watcher: idle_watcher,
216 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
217 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
218 do timer.start(ms, 0) |timer, status| {
219 assert!(status.is_none());
225 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
226 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
229 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
234 pub struct UvPausibleIdleCallback {
235 watcher: IdleWatcher,
240 impl UvPausibleIdleCallback {
242 pub fn start(&mut self, f: ~fn()) {
243 do self.watcher.start |_idle_watcher, _status| {
246 self.idle_flag = true;
249 pub fn pause(&mut self) {
250 if self.idle_flag == true {
252 self.idle_flag = false;
256 pub fn resume(&mut self) {
257 if self.idle_flag == false {
258 self.watcher.restart();
259 self.idle_flag = true;
263 pub fn close(&mut self) {
267 self.watcher.close(||{});
273 fn test_callback_run_once() {
274 do run_in_bare_thread {
275 let mut event_loop = UvEventLoop::new();
277 let count_ptr: *mut int = &mut count;
278 do event_loop.callback {
279 unsafe { *count_ptr += 1 }
282 assert_eq!(count, 1);
286 // The entire point of async is to call into a loop from other threads so it does not need to home.
287 pub struct UvRemoteCallback {
288 // The uv async handle for triggering the callback
290 // A flag to tell the callback to exit, set from the dtor. This is
291 // almost never contested - only in rare races with the dtor.
292 exit_flag: Exclusive<bool>
295 impl UvRemoteCallback {
296 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
297 let exit_flag = Exclusive::new(false);
298 let exit_flag_clone = exit_flag.clone();
299 let async = do AsyncWatcher::new(loop_) |watcher, status| {
300 assert!(status.is_none());
302 // The synchronization logic here is subtle. To review,
303 // the uv async handle type promises that, after it is
304 // triggered the remote callback is definitely called at
305 // least once. UvRemoteCallback needs to maintain those
306 // semantics while also shutting down cleanly from the
307 // dtor. In our case that means that, when the
308 // UvRemoteCallback dtor calls `async.send()`, here `f` is
309 // always called later.
311 // In the dtor both the exit flag is set and the async
312 // callback fired under a lock. Here, before calling `f`,
313 // we take the lock and check the flag. Because we are
314 // checking the flag before calling `f`, and the flag is
315 // set under the same lock as the send, then if the flag
316 // is set then we're guaranteed to call `f` after the
319 // If the check was done after `f()` then there would be a
320 // period between that call and the check where the dtor
321 // could be called in the other thread, missing the final
322 // callback while still destroying the handle.
324 let should_exit = unsafe {
325 exit_flag_clone.with_imm(|&should_exit| should_exit)
342 impl RemoteCallback for UvRemoteCallback {
343 fn fire(&mut self) { self.async.send() }
346 impl Drop for UvRemoteCallback {
349 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
350 do this.exit_flag.with |should_exit| {
351 // NB: These two things need to happen atomically. Otherwise
352 // the event handler could wake up due to a *previous*
353 // signal and see the exit flag, destroying the handle
354 // before the final send.
366 use rt::thread::Thread;
368 use rt::rtio::EventLoop;
369 use rt::local::Local;
370 use rt::sched::Scheduler;
373 fn test_uv_remote() {
374 do run_in_newsched_task {
375 let mut tube = Tube::new();
376 let tube_clone = tube.clone();
377 let remote_cell = Cell::new_empty();
378 do Local::borrow |sched: &mut Scheduler| {
379 let tube_clone = tube_clone.clone();
380 let tube_clone_cell = Cell::new(tube_clone);
381 let remote = do sched.event_loop.remote_callback {
382 // This could be called multiple times
383 if !tube_clone_cell.is_empty() {
384 tube_clone_cell.take().send(1);
387 remote_cell.put_back(remote);
389 let thread = do Thread::start {
390 remote_cell.take().fire();
393 assert!(tube.recv() == 1);
399 pub struct UvIoFactory(Loop);
402 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
403 match self { &UvIoFactory(ref mut ptr) => ptr }
407 impl IoFactory for UvIoFactory {
408 // Connect to an address and return a new stream
409 // NB: This blocks the task waiting on the connection.
410 // It would probably be better to return a future
411 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
412 // Create a cell in the task to hold the result. We will fill
413 // the cell before resuming the task.
414 let result_cell = Cell::new_empty();
415 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
417 // Block this task and take ownership, switch to scheduler context
418 do task::unkillable { // FIXME(#8674)
419 let scheduler: ~Scheduler = Local::take();
420 do scheduler.deschedule_running_task_and_then |_, task| {
422 let mut tcp = TcpWatcher::new(self.uv_loop());
423 let task_cell = Cell::new(task);
425 // Wait for a connection
426 do tcp.connect(addr) |stream, status| {
429 let tcp = NativeHandle::from_native_handle(stream.native_handle());
430 let home = get_handle_to_current_scheduler!();
431 let res = Ok(~UvTcpStream { watcher: tcp, home: home });
433 // Store the stream in the task's stack
434 unsafe { (*result_cell_ptr).put_back(res); }
437 let scheduler: ~Scheduler = Local::take();
438 scheduler.resume_blocked_task_immediately(task_cell.take());
441 let task_cell = Cell::new(task_cell.take());
443 let res = Err(uv_error_to_io_error(status.unwrap()));
444 unsafe { (*result_cell_ptr).put_back(res); }
445 let scheduler: ~Scheduler = Local::take();
446 scheduler.resume_blocked_task_immediately(task_cell.take());
454 assert!(!result_cell.is_empty());
455 return result_cell.take();
458 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
459 let mut watcher = TcpWatcher::new(self.uv_loop());
460 match watcher.bind(addr) {
462 let home = get_handle_to_current_scheduler!();
463 Ok(~UvTcpListener::new(watcher, home))
466 do task::unkillable { // FIXME(#8674)
467 let scheduler: ~Scheduler = Local::take();
468 do scheduler.deschedule_running_task_and_then |_, task| {
469 let task_cell = Cell::new(task);
470 do watcher.as_stream().close {
471 let scheduler: ~Scheduler = Local::take();
472 scheduler.resume_blocked_task_immediately(task_cell.take());
475 Err(uv_error_to_io_error(uverr))
481 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
482 let mut watcher = UdpWatcher::new(self.uv_loop());
483 match watcher.bind(addr) {
485 let home = get_handle_to_current_scheduler!();
486 Ok(~UvUdpSocket { watcher: watcher, home: home })
489 do task::unkillable { // FIXME(#8674)
490 let scheduler: ~Scheduler = Local::take();
491 do scheduler.deschedule_running_task_and_then |_, task| {
492 let task_cell = Cell::new(task);
494 let scheduler: ~Scheduler = Local::take();
495 scheduler.resume_blocked_task_immediately(task_cell.take());
498 Err(uv_error_to_io_error(uverr))
504 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
505 let watcher = TimerWatcher::new(self.uv_loop());
506 let home = get_handle_to_current_scheduler!();
507 Ok(~UvTimer::new(watcher, home))
510 fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
511 let loop_ = Loop {handle: self.uv_loop().native_handle()};
512 let fd = file::FileDescriptor(fd);
513 let home = get_handle_to_current_scheduler!();
514 ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
517 fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
518 -> Result<~RtioFileStream, IoError> {
519 let mut flags = match fm {
522 OpenOrCreate => O_CREAT,
525 CreateOrTruncate => O_TRUNC | O_CREAT
528 Read => flags | O_RDONLY,
529 Write => flags | O_WRONLY,
530 ReadWrite => flags | O_RDWR
532 let create_mode = match fm {
533 Create|OpenOrCreate|CreateOrTruncate =>
537 let result_cell = Cell::new_empty();
538 let result_cell_ptr: *Cell<Result<~RtioFileStream,
539 IoError>> = &result_cell;
540 let path_cell = Cell::new(path);
541 do task::unkillable { // FIXME(#8674)
542 let scheduler: ~Scheduler = Local::take();
543 do scheduler.deschedule_running_task_and_then |_, task| {
544 let task_cell = Cell::new(task);
545 let path = path_cell.take();
546 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
549 let loop_ = Loop {handle: req.get_loop().native_handle()};
550 let home = get_handle_to_current_scheduler!();
551 let fd = file::FileDescriptor(req.get_result());
552 let fs = ~UvFileStream::new(
553 loop_, fd, true, home) as ~RtioFileStream;
555 unsafe { (*result_cell_ptr).put_back(res); }
556 let scheduler: ~Scheduler = Local::take();
557 scheduler.resume_blocked_task_immediately(task_cell.take());
559 let res = Err(uv_error_to_io_error(err.unwrap()));
560 unsafe { (*result_cell_ptr).put_back(res); }
561 let scheduler: ~Scheduler = Local::take();
562 scheduler.resume_blocked_task_immediately(task_cell.take());
567 assert!(!result_cell.is_empty());
568 return result_cell.take();
571 fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
572 let result_cell = Cell::new_empty();
573 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
574 let path_cell = Cell::new(path);
575 do task::unkillable { // FIXME(#8674)
576 let scheduler: ~Scheduler = Local::take();
577 do scheduler.deschedule_running_task_and_then |_, task| {
578 let task_cell = Cell::new(task);
579 let path = path_cell.take();
580 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
581 let res = match err {
583 Some(err) => Err(uv_error_to_io_error(err))
585 unsafe { (*result_cell_ptr).put_back(res); }
586 let scheduler: ~Scheduler = Local::take();
587 scheduler.resume_blocked_task_immediately(task_cell.take());
591 assert!(!result_cell.is_empty());
592 return result_cell.take();
596 pub struct UvTcpListener {
599 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
603 impl HomingIO for UvTcpListener {
604 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
608 fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
612 incoming_streams: Tube::new(),
617 fn watcher(&self) -> TcpWatcher { self.watcher }
620 impl Drop for UvTcpListener {
622 // XXX need mutable finalizer
623 let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
624 do self_.home_for_io_with_sched |self_, scheduler| {
625 do scheduler.deschedule_running_task_and_then |_, task| {
626 let task_cell = Cell::new(task);
627 do self_.watcher().as_stream().close {
628 let scheduler: ~Scheduler = Local::take();
629 scheduler.resume_blocked_task_immediately(task_cell.take());
636 impl RtioSocket for UvTcpListener {
637 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
638 do self.home_for_io |self_| {
639 socket_name(Tcp, self_.watcher)
644 impl RtioTcpListener for UvTcpListener {
646 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
647 do self.home_for_io |self_| {
649 if !self_.listening {
650 self_.listening = true;
652 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
654 do self_.watcher().listen |mut server, status| {
655 let stream = match status {
656 Some(_) => Err(standard_error(OtherIoError)),
658 let client = TcpWatcher::new(&server.event_loop());
659 // XXX: needs to be surfaced in interface
660 server.accept(client.as_stream());
661 let home = get_handle_to_current_scheduler!();
662 Ok(~UvTcpStream { watcher: client, home: home })
666 let mut incoming_streams = incoming_streams_cell.take();
667 incoming_streams.send(stream);
668 incoming_streams_cell.put_back(incoming_streams);
672 self_.incoming_streams.recv()
676 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
677 do self.home_for_io |self_| {
679 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
682 match status_to_maybe_uv_error(self_.watcher(), r) {
683 Some(err) => Err(uv_error_to_io_error(err)),
689 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
690 do self.home_for_io |self_| {
692 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
695 match status_to_maybe_uv_error(self_.watcher(), r) {
696 Some(err) => Err(uv_error_to_io_error(err)),
703 pub struct UvTcpStream {
708 impl HomingIO for UvTcpStream {
709 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
712 impl Drop for UvTcpStream {
714 // XXX need mutable finalizer
715 let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
716 do this.home_for_io_with_sched |self_, scheduler| {
717 do scheduler.deschedule_running_task_and_then |_, task| {
718 let task_cell = Cell::new(task);
719 do self_.watcher.as_stream().close {
720 let scheduler: ~Scheduler = Local::take();
721 scheduler.resume_blocked_task_immediately(task_cell.take());
728 impl RtioSocket for UvTcpStream {
729 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
730 do self.home_for_io |self_| {
731 socket_name(Tcp, self_.watcher)
736 impl RtioTcpStream for UvTcpStream {
737 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
738 do self.home_for_io_with_sched |self_, scheduler| {
739 let result_cell = Cell::new_empty();
740 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
742 let buf_ptr: *&mut [u8] = &buf;
743 do scheduler.deschedule_running_task_and_then |_sched, task| {
744 let task_cell = Cell::new(task);
745 // XXX: We shouldn't reallocate these callbacks every
747 let alloc: AllocCallback = |_| unsafe {
748 slice_to_uv_buf(*buf_ptr)
750 let mut watcher = self_.watcher.as_stream();
751 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
753 // Stop reading so that no read callbacks are
754 // triggered before the user calls `read` again.
755 // XXX: Is there a performance impact to calling
759 let result = if status.is_none() {
763 Err(uv_error_to_io_error(status.unwrap()))
766 unsafe { (*result_cell_ptr).put_back(result); }
768 let scheduler: ~Scheduler = Local::take();
769 scheduler.resume_blocked_task_immediately(task_cell.take());
773 assert!(!result_cell.is_empty());
778 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
779 do self.home_for_io_with_sched |self_, scheduler| {
780 let result_cell = Cell::new_empty();
781 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
782 let buf_ptr: *&[u8] = &buf;
783 do scheduler.deschedule_running_task_and_then |_, task| {
784 let task_cell = Cell::new(task);
785 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
786 let mut watcher = self_.watcher.as_stream();
787 do watcher.write(buf) |_watcher, status| {
788 let result = if status.is_none() {
791 Err(uv_error_to_io_error(status.unwrap()))
794 unsafe { (*result_cell_ptr).put_back(result); }
796 let scheduler: ~Scheduler = Local::take();
797 scheduler.resume_blocked_task_immediately(task_cell.take());
801 assert!(!result_cell.is_empty());
806 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
807 do self.home_for_io |self_| {
808 socket_name(TcpPeer, self_.watcher)
812 fn control_congestion(&mut self) -> Result<(), IoError> {
813 do self.home_for_io |self_| {
814 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
816 match status_to_maybe_uv_error(self_.watcher, r) {
817 Some(err) => Err(uv_error_to_io_error(err)),
823 fn nodelay(&mut self) -> Result<(), IoError> {
824 do self.home_for_io |self_| {
825 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
827 match status_to_maybe_uv_error(self_.watcher, r) {
828 Some(err) => Err(uv_error_to_io_error(err)),
834 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
835 do self.home_for_io |self_| {
837 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
838 delay_in_seconds as c_uint)
841 match status_to_maybe_uv_error(self_.watcher, r) {
842 Some(err) => Err(uv_error_to_io_error(err)),
848 fn letdie(&mut self) -> Result<(), IoError> {
849 do self.home_for_io |self_| {
851 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
854 match status_to_maybe_uv_error(self_.watcher, r) {
855 Some(err) => Err(uv_error_to_io_error(err)),
862 pub struct UvUdpSocket {
867 impl HomingIO for UvUdpSocket {
868 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
871 impl Drop for UvUdpSocket {
873 // XXX need mutable finalizer
874 let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
875 do this.home_for_io_with_sched |self_, scheduler| {
876 do scheduler.deschedule_running_task_and_then |_, task| {
877 let task_cell = Cell::new(task);
878 do self_.watcher.close {
879 let scheduler: ~Scheduler = Local::take();
880 scheduler.resume_blocked_task_immediately(task_cell.take());
887 impl RtioSocket for UvUdpSocket {
888 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
889 do self.home_for_io |self_| {
890 socket_name(Udp, self_.watcher)
895 impl RtioUdpSocket for UvUdpSocket {
896 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
897 do self.home_for_io_with_sched |self_, scheduler| {
898 let result_cell = Cell::new_empty();
899 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
901 let buf_ptr: *&mut [u8] = &buf;
902 do scheduler.deschedule_running_task_and_then |_, task| {
903 let task_cell = Cell::new(task);
904 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
905 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
906 let _ = flags; // /XXX add handling for partials?
910 let result = match status {
913 Ok((nread as uint, addr))
915 Some(err) => Err(uv_error_to_io_error(err)),
918 unsafe { (*result_cell_ptr).put_back(result); }
920 let scheduler: ~Scheduler = Local::take();
921 scheduler.resume_blocked_task_immediately(task_cell.take());
925 assert!(!result_cell.is_empty());
930 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
931 do self.home_for_io_with_sched |self_, scheduler| {
932 let result_cell = Cell::new_empty();
933 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
934 let buf_ptr: *&[u8] = &buf;
935 do scheduler.deschedule_running_task_and_then |_, task| {
936 let task_cell = Cell::new(task);
937 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
938 do self_.watcher.send(buf, dst) |_watcher, status| {
940 let result = match status {
942 Some(err) => Err(uv_error_to_io_error(err)),
945 unsafe { (*result_cell_ptr).put_back(result); }
947 let scheduler: ~Scheduler = Local::take();
948 scheduler.resume_blocked_task_immediately(task_cell.take());
952 assert!(!result_cell.is_empty());
957 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
958 do self.home_for_io |self_| {
960 do multi.to_str().with_c_str |m_addr| {
961 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
962 ptr::null(), uvll::UV_JOIN_GROUP)
966 match status_to_maybe_uv_error(self_.watcher, r) {
967 Some(err) => Err(uv_error_to_io_error(err)),
973 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
974 do self.home_for_io |self_| {
976 do multi.to_str().with_c_str |m_addr| {
977 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
978 ptr::null(), uvll::UV_LEAVE_GROUP)
982 match status_to_maybe_uv_error(self_.watcher, r) {
983 Some(err) => Err(uv_error_to_io_error(err)),
989 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
990 do self.home_for_io |self_| {
993 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
996 match status_to_maybe_uv_error(self_.watcher, r) {
997 Some(err) => Err(uv_error_to_io_error(err)),
1003 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1004 do self.home_for_io |self_| {
1007 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1010 match status_to_maybe_uv_error(self_.watcher, r) {
1011 Some(err) => Err(uv_error_to_io_error(err)),
1017 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1018 do self.home_for_io |self_| {
1021 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1024 match status_to_maybe_uv_error(self_.watcher, r) {
1025 Some(err) => Err(uv_error_to_io_error(err)),
1031 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1032 do self.home_for_io |self_| {
1035 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1038 match status_to_maybe_uv_error(self_.watcher, r) {
1039 Some(err) => Err(uv_error_to_io_error(err)),
1045 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1046 do self.home_for_io |self_| {
1049 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1052 match status_to_maybe_uv_error(self_.watcher, r) {
1053 Some(err) => Err(uv_error_to_io_error(err)),
1059 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1060 do self.home_for_io |self_| {
1063 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1066 match status_to_maybe_uv_error(self_.watcher, r) {
1067 Some(err) => Err(uv_error_to_io_error(err)),
1074 pub struct UvTimer {
1075 watcher: timer::TimerWatcher,
1079 impl HomingIO for UvTimer {
1080 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1084 fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1085 UvTimer { watcher: w, home: home }
1089 impl Drop for UvTimer {
1091 let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1092 do self_.home_for_io_with_sched |self_, scheduler| {
1093 rtdebug!("closing UvTimer");
1094 do scheduler.deschedule_running_task_and_then |_, task| {
1095 let task_cell = Cell::new(task);
1096 do self_.watcher.close {
1097 let scheduler: ~Scheduler = Local::take();
1098 scheduler.resume_blocked_task_immediately(task_cell.take());
1105 impl RtioTimer for UvTimer {
1106 fn sleep(&mut self, msecs: u64) {
1107 do self.home_for_io_with_sched |self_, scheduler| {
1108 do scheduler.deschedule_running_task_and_then |_sched, task| {
1109 rtdebug!("sleep: entered scheduler context");
1110 let task_cell = Cell::new(task);
1111 do self_.watcher.start(msecs, 0) |_, status| {
1112 assert!(status.is_none());
1113 let scheduler: ~Scheduler = Local::take();
1114 scheduler.resume_blocked_task_immediately(task_cell.take());
1117 self_.watcher.stop();
1122 pub struct UvFileStream {
1124 fd: file::FileDescriptor,
1125 close_on_drop: bool,
1129 impl HomingIO for UvFileStream {
1130 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1134 fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1135 home: SchedHandle) -> UvFileStream {
1139 close_on_drop: close_on_drop,
1143 fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1144 let result_cell = Cell::new_empty();
1145 let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1146 let buf_ptr: *&mut [u8] = &buf;
1147 do self.home_for_io_with_sched |self_, scheduler| {
1148 do scheduler.deschedule_running_task_and_then |_, task| {
1149 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1150 let task_cell = Cell::new(task);
1151 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1152 let res = match uverr {
1153 None => Ok(req.get_result() as int),
1154 Some(err) => Err(uv_error_to_io_error(err))
1156 unsafe { (*result_cell_ptr).put_back(res); }
1157 let scheduler: ~Scheduler = Local::take();
1158 scheduler.resume_blocked_task_immediately(task_cell.take());
1164 fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1165 let result_cell = Cell::new_empty();
1166 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1167 let buf_ptr: *&[u8] = &buf;
1168 do self.home_for_io_with_sched |self_, scheduler| {
1169 do scheduler.deschedule_running_task_and_then |_, task| {
1170 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1171 let task_cell = Cell::new(task);
1172 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1173 let res = match uverr {
1175 Some(err) => Err(uv_error_to_io_error(err))
1177 unsafe { (*result_cell_ptr).put_back(res); }
1178 let scheduler: ~Scheduler = Local::take();
1179 scheduler.resume_blocked_task_immediately(task_cell.take());
1185 fn seek_common(&mut self, pos: i64, whence: c_int) ->
1186 Result<u64, IoError>{
1187 #[fixed_stack_segment]; #[inline(never)];
1189 match lseek((*self.fd), pos as off_t, whence) {
1193 desc: "Failed to lseek.",
1203 impl Drop for UvFileStream {
1205 let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1206 if self.close_on_drop {
1207 do self_.home_for_io_with_sched |self_, scheduler| {
1208 do scheduler.deschedule_running_task_and_then |_, task| {
1209 let task_cell = Cell::new(task);
1210 do self_.fd.close(&self.loop_) |_,_| {
1211 let scheduler: ~Scheduler = Local::take();
1212 scheduler.resume_blocked_task_immediately(task_cell.take());
1220 impl RtioFileStream for UvFileStream {
1221 fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1222 self.base_read(buf, -1)
1224 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1225 self.base_write(buf, -1)
1227 fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1228 self.base_read(buf, offset as i64)
1230 fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1231 self.base_write(buf, offset as i64)
1233 fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1234 use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1235 let whence = match whence {
1236 SeekSet => SEEK_SET,
1237 SeekCur => SEEK_CUR,
1240 self.seek_common(pos, whence)
1242 fn tell(&self) -> Result<u64, IoError> {
1244 // this is temporary
1245 let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1246 self_.seek_common(0, SEEK_CUR)
1248 fn flush(&mut self) -> Result<(), IoError> {
1254 fn test_simple_io_no_connect() {
1255 do run_in_newsched_task {
1257 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1258 let addr = next_test_ip4();
1259 let maybe_chan = (*io).tcp_connect(addr);
1260 assert!(maybe_chan.is_err());
1266 fn test_simple_udp_io_bind_only() {
1267 do run_in_newsched_task {
1269 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1270 let addr = next_test_ip4();
1271 let maybe_socket = (*io).udp_bind(addr);
1272 assert!(maybe_socket.is_ok());
1278 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1279 use rt::sleeper_list::SleeperList;
1280 use rt::work_queue::WorkQueue;
1281 use rt::thread::Thread;
1283 use rt::sched::{Shutdown, TaskFromFriend};
1284 do run_in_bare_thread {
1285 let sleepers = SleeperList::new();
1286 let work_queue1 = WorkQueue::new();
1287 let work_queue2 = WorkQueue::new();
1288 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1290 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1292 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1295 let handle1 = Cell::new(sched1.make_handle());
1296 let handle2 = Cell::new(sched2.make_handle());
1297 let tasksFriendHandle = Cell::new(sched2.make_handle());
1299 let on_exit: ~fn(bool) = |exit_status| {
1300 handle1.take().send(Shutdown);
1301 handle2.take().send(Shutdown);
1302 rtassert!(exit_status);
1305 let test_function: ~fn() = || {
1306 let io: *mut IoFactoryObject = unsafe {
1307 Local::unsafe_borrow()
1309 let addr = next_test_ip4();
1310 let maybe_socket = unsafe { (*io).udp_bind(addr) };
1311 // this socket is bound to this event loop
1312 assert!(maybe_socket.is_ok());
1314 // block self on sched1
1315 do task::unkillable { // FIXME(#8674)
1316 let scheduler: ~Scheduler = Local::take();
1317 do scheduler.deschedule_running_task_and_then |_, task| {
1319 do task.wake().map_move |task| {
1320 // send self to sched2
1321 tasksFriendHandle.take().send(TaskFromFriend(task));
1323 // sched1 should now sleep since it has nothing else to do
1326 // sched2 will wake up and get the task
1327 // as we do nothing else, the function ends and the socket goes out of scope
1328 // sched2 will start to run the destructor
1329 // the destructor will first block the task, set it's home as sched1, then enqueue it
1330 // sched2 will dequeue the task, see that it has a home, and send it to sched1
1331 // sched1 will wake up, exec the close function on the correct loop, and then we're done
1334 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1335 main_task.death.on_exit = Some(on_exit);
1336 let main_task = Cell::new(main_task);
1338 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1340 let sched1 = Cell::new(sched1);
1341 let sched2 = Cell::new(sched2);
1343 let thread1 = do Thread::start {
1344 sched1.take().bootstrap(main_task.take());
1346 let thread2 = do Thread::start {
1347 sched2.take().bootstrap(null_task.take());
1356 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1357 use rt::sleeper_list::SleeperList;
1358 use rt::work_queue::WorkQueue;
1359 use rt::thread::Thread;
1361 use rt::comm::oneshot;
1362 use rt::sched::Shutdown;
1363 do run_in_bare_thread {
1364 let sleepers = SleeperList::new();
1365 let work_queue1 = WorkQueue::new();
1366 let work_queue2 = WorkQueue::new();
1367 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1369 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1371 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1374 let handle1 = Cell::new(sched1.make_handle());
1375 let handle2 = Cell::new(sched2.make_handle());
1377 let (port, chan) = oneshot();
1378 let port = Cell::new(port);
1379 let chan = Cell::new(chan);
1381 let body1: ~fn() = || {
1382 let io: *mut IoFactoryObject = unsafe {
1383 Local::unsafe_borrow()
1385 let addr = next_test_ip4();
1386 let socket = unsafe { (*io).udp_bind(addr) };
1387 assert!(socket.is_ok());
1388 chan.take().send(socket);
1391 let body2: ~fn() = || {
1392 let socket = port.take().recv();
1393 assert!(socket.is_ok());
1394 /* The socket goes out of scope and the destructor is called.
1396 * - sends itself back to sched1
1397 * - frees the socket
1398 * - resets the home of the task to whatever it was previously
1402 let on_exit: ~fn(bool) = |exit| {
1403 handle1.take().send(Shutdown);
1404 handle2.take().send(Shutdown);
1408 let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1410 let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1411 task2.death.on_exit = Some(on_exit);
1412 let task2 = Cell::new(task2);
1414 let sched1 = Cell::new(sched1);
1415 let sched2 = Cell::new(sched2);
1417 let thread1 = do Thread::start {
1418 sched1.take().bootstrap(task1.take());
1420 let thread2 = do Thread::start {
1421 sched2.take().bootstrap(task2.take());
1430 fn test_simple_tcp_server_and_client() {
1431 do run_in_newsched_task {
1432 let addr = next_test_ip4();
1434 // Start the server first so it's listening when we connect
1437 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1438 let mut listener = (*io).tcp_bind(addr).unwrap();
1439 let mut stream = listener.accept().unwrap();
1440 let mut buf = [0, .. 2048];
1441 let nread = stream.read(buf).unwrap();
1442 assert_eq!(nread, 8);
1443 for i in range(0u, nread) {
1444 rtdebug!("%u", buf[i] as uint);
1445 assert_eq!(buf[i], i as u8);
1452 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1453 let mut stream = (*io).tcp_connect(addr).unwrap();
1454 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1461 fn test_simple_tcp_server_and_client_on_diff_threads() {
1462 use rt::sleeper_list::SleeperList;
1463 use rt::work_queue::WorkQueue;
1464 use rt::thread::Thread;
1466 use rt::sched::{Shutdown};
1467 do run_in_bare_thread {
1468 let sleepers = SleeperList::new();
1470 let server_addr = next_test_ip4();
1471 let client_addr = server_addr.clone();
1473 let server_work_queue = WorkQueue::new();
1474 let client_work_queue = WorkQueue::new();
1475 let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1477 let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1478 queues.clone(), sleepers.clone());
1479 let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1480 queues.clone(), sleepers.clone());
1482 let server_handle = Cell::new(server_sched.make_handle());
1483 let client_handle = Cell::new(client_sched.make_handle());
1485 let server_on_exit: ~fn(bool) = |exit_status| {
1486 server_handle.take().send(Shutdown);
1487 rtassert!(exit_status);
1490 let client_on_exit: ~fn(bool) = |exit_status| {
1491 client_handle.take().send(Shutdown);
1492 rtassert!(exit_status);
1495 let server_fn: ~fn() = || {
1496 let io: *mut IoFactoryObject = unsafe {
1497 Local::unsafe_borrow()
1499 let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1500 let mut stream = listener.accept().unwrap();
1501 let mut buf = [0, .. 2048];
1502 let nread = stream.read(buf).unwrap();
1503 assert_eq!(nread, 8);
1504 for i in range(0u, nread) {
1505 assert_eq!(buf[i], i as u8);
1509 let client_fn: ~fn() = || {
1510 let io: *mut IoFactoryObject = unsafe {
1511 Local::unsafe_borrow()
1513 let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1514 while stream.is_err() {
1515 stream = unsafe { (*io).tcp_connect(client_addr) };
1517 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1520 let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1521 server_task.death.on_exit = Some(server_on_exit);
1522 let server_task = Cell::new(server_task);
1524 let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1525 client_task.death.on_exit = Some(client_on_exit);
1526 let client_task = Cell::new(client_task);
1528 let server_sched = Cell::new(server_sched);
1529 let client_sched = Cell::new(client_sched);
1531 let server_thread = do Thread::start {
1532 server_sched.take().bootstrap(server_task.take());
1534 let client_thread = do Thread::start {
1535 client_sched.take().bootstrap(client_task.take());
1538 server_thread.join();
1539 client_thread.join();
1544 fn test_simple_udp_server_and_client() {
1545 do run_in_newsched_task {
1546 let server_addr = next_test_ip4();
1547 let client_addr = next_test_ip4();
1551 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1552 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1553 let mut buf = [0, .. 2048];
1554 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1555 assert_eq!(nread, 8);
1556 for i in range(0u, nread) {
1557 rtdebug!("%u", buf[i] as uint);
1558 assert_eq!(buf[i], i as u8);
1560 assert_eq!(src, client_addr);
1566 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1567 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1568 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1574 #[test] #[ignore(reason = "busted")]
1575 fn test_read_and_block() {
1576 do run_in_newsched_task {
1577 let addr = next_test_ip4();
1580 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1581 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1582 let mut stream = listener.accept().unwrap();
1583 let mut buf = [0, .. 2048];
1586 let mut current = 0;
1589 while current < expected {
1590 let nread = stream.read(buf).unwrap();
1591 for i in range(0u, nread) {
1592 let val = buf[i] as uint;
1593 assert_eq!(val, current % 8);
1598 do task::unkillable { // FIXME(#8674)
1599 let scheduler: ~Scheduler = Local::take();
1600 // Yield to the other task in hopes that it
1601 // will trigger a read callback while we are
1603 do scheduler.deschedule_running_task_and_then |sched, task| {
1604 let task = Cell::new(task);
1605 sched.enqueue_blocked_task(task.take());
1610 // Make sure we had multiple reads
1616 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1617 let mut stream = (*io).tcp_connect(addr).unwrap();
1618 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1619 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1620 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1621 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1629 fn test_read_read_read() {
1630 do run_in_newsched_task {
1631 let addr = next_test_ip4();
1632 static MAX: uint = 500000;
1636 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1637 let mut listener = (*io).tcp_bind(addr).unwrap();
1638 let mut stream = listener.accept().unwrap();
1639 let buf = [1, .. 2048];
1640 let mut total_bytes_written = 0;
1641 while total_bytes_written < MAX {
1643 total_bytes_written += buf.len();
1650 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1651 let mut stream = (*io).tcp_connect(addr).unwrap();
1652 let mut buf = [0, .. 2048];
1653 let mut total_bytes_read = 0;
1654 while total_bytes_read < MAX {
1655 let nread = stream.read(buf).unwrap();
1656 rtdebug!("read %u bytes", nread as uint);
1657 total_bytes_read += nread;
1658 for i in range(0u, nread) {
1659 assert_eq!(buf[i], 1);
1662 rtdebug!("read %u bytes total", total_bytes_read as uint);
1669 fn test_udp_twice() {
1670 do run_in_newsched_task {
1671 let server_addr = next_test_ip4();
1672 let client_addr = next_test_ip4();
1676 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1677 let mut client = (*io).udp_bind(client_addr).unwrap();
1678 assert!(client.sendto([1], server_addr).is_ok());
1679 assert!(client.sendto([2], server_addr).is_ok());
1685 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1686 let mut server = (*io).udp_bind(server_addr).unwrap();
1689 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1690 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1691 assert_eq!(nread1, 1);
1692 assert_eq!(nread2, 1);
1693 assert_eq!(src1, client_addr);
1694 assert_eq!(src2, client_addr);
1695 assert_eq!(buf1[0], 1);
1696 assert_eq!(buf2[0], 2);
1703 fn test_udp_many_read() {
1704 do run_in_newsched_task {
1705 let server_out_addr = next_test_ip4();
1706 let server_in_addr = next_test_ip4();
1707 let client_out_addr = next_test_ip4();
1708 let client_in_addr = next_test_ip4();
1709 static MAX: uint = 500_000;
1713 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1714 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1715 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1716 let msg = [1, .. 2048];
1717 let mut total_bytes_sent = 0;
1721 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1722 total_bytes_sent += msg.len();
1723 // check if the client has received enough
1724 let res = server_in.recvfrom(buf);
1725 assert!(res.is_ok());
1726 let (nread, src) = res.unwrap();
1727 assert_eq!(nread, 1);
1728 assert_eq!(src, client_out_addr);
1730 assert!(total_bytes_sent >= MAX);
1736 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1737 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1738 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1739 let mut total_bytes_recv = 0;
1740 let mut buf = [0, .. 2048];
1741 while total_bytes_recv < MAX {
1743 assert!(client_out.sendto([1], server_in_addr).is_ok());
1745 let res = client_in.recvfrom(buf);
1746 assert!(res.is_ok());
1747 let (nread, src) = res.unwrap();
1748 assert_eq!(src, server_out_addr);
1749 total_bytes_recv += nread;
1750 for i in range(0u, nread) {
1751 assert_eq!(buf[i], 1);
1754 // tell the server we're done
1755 assert!(client_out.sendto([0], server_in_addr).is_ok());
1762 fn test_timer_sleep_simple() {
1763 do run_in_newsched_task {
1765 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1766 let timer = (*io).timer_init();
1767 do timer.map_move |mut t| { t.sleep(1) };
1772 fn file_test_uvio_full_simple_impl() {
1773 use str::StrSlice; // why does this have to be explicitly imported to work?
1774 // compiler was complaining about no trait for str that
1775 // does .as_bytes() ..
1777 use rt::io::{Open, Create, ReadWrite, Read};
1779 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1780 let write_val = "hello uvio!";
1781 let path = "./tmp/file_test_uvio_full.txt";
1783 let create_fm = Create;
1784 let create_fa = ReadWrite;
1785 let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1786 let write_buf = write_val.as_bytes();
1787 fd.write(write_buf);
1792 let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1793 let mut read_vec = [0, .. 1028];
1794 let nread = fd.read(read_vec).unwrap();
1795 let read_val = str::from_bytes(read_vec.slice(0, nread as uint));
1796 assert!(read_val == write_val.to_owned());
1798 (*io).fs_unlink(&Path(path));
1803 fn file_test_uvio_full_simple() {
1804 do run_in_newsched_task {
1805 file_test_uvio_full_simple_impl();
1809 fn uvio_naive_print(input: &str) {
1812 use libc::{STDOUT_FILENO};
1813 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1815 let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
1816 let write_buf = input.as_bytes();
1817 fd.write(write_buf);
1823 fn file_test_uvio_write_to_stdout() {
1824 do run_in_newsched_task {
1825 uvio_naive_print("jubilation\n");