1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use libc::{c_int, c_uint, c_void};
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
27 use rt::sched::{Scheduler, SchedHandle};
29 use rt::task::SchedHome;
31 use rt::uv::idle::IdleWatcher;
32 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs};
33 use rt::uv::addrinfo::GetAddrInfoRequest;
34 use unstable::sync::Exclusive;
35 use super::super::io::support::PathLike;
36 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
38 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
39 CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
42 #[cfg(test)] use container::Container;
43 #[cfg(test)] use unstable::run_in_bare_thread;
44 #[cfg(test)] use rt::test::{spawntask,
46 run_in_mt_newsched_task};
47 #[cfg(test)] use iter::{Iterator, range};
48 #[cfg(test)] use rt::comm::oneshot;
50 // XXX we should not be calling uvll functions in here.
54 fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
56 /* XXX This will move pinned tasks to do IO on the proper scheduler
57 * and then move them back to their home.
59 fn go_to_IO_home(&mut self) -> SchedHome {
60 use rt::sched::PinnedTask;
62 do task::unkillable { // FIXME(#8674)
66 let scheduler: ~Scheduler = Local::take();
67 do scheduler.deschedule_running_task_and_then |_, task| {
68 /* FIXME(#8674) if the task was already killed then wake
69 * will return None. In that case, the home pointer will never be set.
71 * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
73 do task.wake().map_move |mut task| {
74 *ptr = Some(task.take_unwrap_home());
75 self.home().send(PinnedTask(task));
79 old.expect("No old home because task had already been killed.")
83 // XXX dummy self param
84 fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
85 use rt::sched::TaskFromFriend;
87 let old = Cell::new(old);
88 do task::unkillable { // FIXME(#8674)
89 let scheduler: ~Scheduler = Local::take();
90 do scheduler.deschedule_running_task_and_then |scheduler, task| {
91 /* FIXME(#8674) if the task was already killed then wake
92 * will return None. In that case, the home pointer will never be restored.
94 * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
96 do task.wake().map_move |mut task| {
97 task.give_home(old.take());
98 scheduler.make_handle().send(TaskFromFriend(task));
104 fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
105 let home = self.go_to_IO_home();
106 let a = io(self); // do IO
107 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
108 a // return the result of the IO
111 fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
113 let home = this.go_to_IO_home();
114 let a = io(this); // do IO
115 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
116 a // return the result of the IO
119 fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
120 let home = self.go_to_IO_home();
121 let a = do task::unkillable { // FIXME(#8674)
122 let scheduler: ~Scheduler = Local::take();
123 io_sched(self, scheduler) // do IO and scheduling action
125 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
126 a // return result of IO
130 // get a handle for the current scheduler
131 macro_rules! get_handle_to_current_scheduler(
132 () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
135 enum SocketNameKind {
141 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
142 handle: U) -> Result<SocketAddr, IoError> {
143 let getsockname = match sk {
144 TcpPeer => uvll::tcp_getpeername,
145 Tcp => uvll::tcp_getsockname,
146 Udp => uvll::udp_getsockname,
149 // Allocate a sockaddr_storage
150 // since we don't know if it's ipv4 or ipv6
151 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
154 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
158 let status = status_to_maybe_uv_error(r);
159 return Err(uv_error_to_io_error(status.unwrap()));
163 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
164 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
166 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
170 unsafe { uvll::free_sockaddr_storage(r_addr); }
176 // Obviously an Event Loop is always home.
177 pub struct UvEventLoop {
182 pub fn new() -> UvEventLoop {
184 uvio: UvIoFactory(Loop::new())
189 impl Drop for UvEventLoop {
191 // XXX: Need mutable finalizer
193 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
195 this.uvio.uv_loop().close();
199 impl EventLoop for UvEventLoop {
201 self.uvio.uv_loop().run();
204 fn callback(&mut self, f: ~fn()) {
205 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
206 do idle_watcher.start |mut idle_watcher, status| {
207 assert!(status.is_none());
209 idle_watcher.close(||());
214 fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
215 let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
216 return ~UvPausibleIdleCallback {
217 watcher: idle_watcher,
223 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
224 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
225 do timer.start(ms, 0) |timer, status| {
226 assert!(status.is_none());
232 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
233 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
236 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
241 pub struct UvPausibleIdleCallback {
242 watcher: IdleWatcher,
247 impl UvPausibleIdleCallback {
249 pub fn start(&mut self, f: ~fn()) {
250 do self.watcher.start |_idle_watcher, _status| {
253 self.idle_flag = true;
256 pub fn pause(&mut self) {
257 if self.idle_flag == true {
259 self.idle_flag = false;
263 pub fn resume(&mut self) {
264 if self.idle_flag == false {
265 self.watcher.restart();
266 self.idle_flag = true;
270 pub fn close(&mut self) {
274 self.watcher.close(||{});
280 fn test_callback_run_once() {
281 do run_in_bare_thread {
282 let mut event_loop = UvEventLoop::new();
284 let count_ptr: *mut int = &mut count;
285 do event_loop.callback {
286 unsafe { *count_ptr += 1 }
289 assert_eq!(count, 1);
293 // The entire point of async is to call into a loop from other threads so it does not need to home.
294 pub struct UvRemoteCallback {
295 // The uv async handle for triggering the callback
297 // A flag to tell the callback to exit, set from the dtor. This is
298 // almost never contested - only in rare races with the dtor.
299 exit_flag: Exclusive<bool>
302 impl UvRemoteCallback {
303 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
304 let exit_flag = Exclusive::new(false);
305 let exit_flag_clone = exit_flag.clone();
306 let async = do AsyncWatcher::new(loop_) |watcher, status| {
307 assert!(status.is_none());
309 // The synchronization logic here is subtle. To review,
310 // the uv async handle type promises that, after it is
311 // triggered the remote callback is definitely called at
312 // least once. UvRemoteCallback needs to maintain those
313 // semantics while also shutting down cleanly from the
314 // dtor. In our case that means that, when the
315 // UvRemoteCallback dtor calls `async.send()`, here `f` is
316 // always called later.
318 // In the dtor both the exit flag is set and the async
319 // callback fired under a lock. Here, before calling `f`,
320 // we take the lock and check the flag. Because we are
321 // checking the flag before calling `f`, and the flag is
322 // set under the same lock as the send, then if the flag
323 // is set then we're guaranteed to call `f` after the
326 // If the check was done after `f()` then there would be a
327 // period between that call and the check where the dtor
328 // could be called in the other thread, missing the final
329 // callback while still destroying the handle.
331 let should_exit = unsafe {
332 exit_flag_clone.with_imm(|&should_exit| should_exit)
349 impl RemoteCallback for UvRemoteCallback {
350 fn fire(&mut self) { self.async.send() }
353 impl Drop for UvRemoteCallback {
356 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
357 do this.exit_flag.with |should_exit| {
358 // NB: These two things need to happen atomically. Otherwise
359 // the event handler could wake up due to a *previous*
360 // signal and see the exit flag, destroying the handle
361 // before the final send.
373 use rt::thread::Thread;
375 use rt::rtio::EventLoop;
376 use rt::local::Local;
377 use rt::sched::Scheduler;
380 fn test_uv_remote() {
381 do run_in_mt_newsched_task {
382 let mut tube = Tube::new();
383 let tube_clone = tube.clone();
384 let remote_cell = Cell::new_empty();
385 do Local::borrow |sched: &mut Scheduler| {
386 let tube_clone = tube_clone.clone();
387 let tube_clone_cell = Cell::new(tube_clone);
388 let remote = do sched.event_loop.remote_callback {
389 // This could be called multiple times
390 if !tube_clone_cell.is_empty() {
391 tube_clone_cell.take().send(1);
394 remote_cell.put_back(remote);
396 let thread = do Thread::start {
397 remote_cell.take().fire();
400 assert!(tube.recv() == 1);
406 pub struct UvIoFactory(Loop);
409 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
410 match self { &UvIoFactory(ref mut ptr) => ptr }
414 impl IoFactory for UvIoFactory {
415 // Connect to an address and return a new stream
416 // NB: This blocks the task waiting on the connection.
417 // It would probably be better to return a future
418 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
419 // Create a cell in the task to hold the result. We will fill
420 // the cell before resuming the task.
421 let result_cell = Cell::new_empty();
422 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
424 // Block this task and take ownership, switch to scheduler context
425 do task::unkillable { // FIXME(#8674)
426 let scheduler: ~Scheduler = Local::take();
427 do scheduler.deschedule_running_task_and_then |_, task| {
429 let mut tcp = TcpWatcher::new(self.uv_loop());
430 let task_cell = Cell::new(task);
432 // Wait for a connection
433 do tcp.connect(addr) |stream, status| {
436 let tcp = NativeHandle::from_native_handle(stream.native_handle());
437 let home = get_handle_to_current_scheduler!();
438 let res = Ok(~UvTcpStream { watcher: tcp, home: home });
440 // Store the stream in the task's stack
441 unsafe { (*result_cell_ptr).put_back(res); }
444 let scheduler: ~Scheduler = Local::take();
445 scheduler.resume_blocked_task_immediately(task_cell.take());
448 let task_cell = Cell::new(task_cell.take());
450 let res = Err(uv_error_to_io_error(status.unwrap()));
451 unsafe { (*result_cell_ptr).put_back(res); }
452 let scheduler: ~Scheduler = Local::take();
453 scheduler.resume_blocked_task_immediately(task_cell.take());
461 assert!(!result_cell.is_empty());
462 return result_cell.take();
465 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
466 let mut watcher = TcpWatcher::new(self.uv_loop());
467 match watcher.bind(addr) {
469 let home = get_handle_to_current_scheduler!();
470 Ok(~UvTcpListener::new(watcher, home))
473 do task::unkillable { // FIXME(#8674)
474 let scheduler: ~Scheduler = Local::take();
475 do scheduler.deschedule_running_task_and_then |_, task| {
476 let task_cell = Cell::new(task);
477 do watcher.as_stream().close {
478 let scheduler: ~Scheduler = Local::take();
479 scheduler.resume_blocked_task_immediately(task_cell.take());
482 Err(uv_error_to_io_error(uverr))
488 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
489 let mut watcher = UdpWatcher::new(self.uv_loop());
490 match watcher.bind(addr) {
492 let home = get_handle_to_current_scheduler!();
493 Ok(~UvUdpSocket { watcher: watcher, home: home })
496 do task::unkillable { // FIXME(#8674)
497 let scheduler: ~Scheduler = Local::take();
498 do scheduler.deschedule_running_task_and_then |_, task| {
499 let task_cell = Cell::new(task);
501 let scheduler: ~Scheduler = Local::take();
502 scheduler.resume_blocked_task_immediately(task_cell.take());
505 Err(uv_error_to_io_error(uverr))
511 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
512 let watcher = TimerWatcher::new(self.uv_loop());
513 let home = get_handle_to_current_scheduler!();
514 Ok(~UvTimer::new(watcher, home))
517 fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
518 let loop_ = Loop {handle: self.uv_loop().native_handle()};
519 let fd = file::FileDescriptor(fd);
520 let home = get_handle_to_current_scheduler!();
521 ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
524 fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
525 -> Result<~RtioFileStream, IoError> {
526 let mut flags = match fm {
529 OpenOrCreate => O_CREAT,
532 CreateOrTruncate => O_TRUNC | O_CREAT
535 Read => flags | O_RDONLY,
536 Write => flags | O_WRONLY,
537 ReadWrite => flags | O_RDWR
539 let create_mode = match fm {
540 Create|OpenOrCreate|CreateOrTruncate =>
544 let result_cell = Cell::new_empty();
545 let result_cell_ptr: *Cell<Result<~RtioFileStream,
546 IoError>> = &result_cell;
547 let path_cell = Cell::new(path);
548 do task::unkillable { // FIXME(#8674)
549 let scheduler: ~Scheduler = Local::take();
550 do scheduler.deschedule_running_task_and_then |_, task| {
551 let task_cell = Cell::new(task);
552 let path = path_cell.take();
553 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
556 let loop_ = Loop {handle: req.get_loop().native_handle()};
557 let home = get_handle_to_current_scheduler!();
558 let fd = file::FileDescriptor(req.get_result());
559 let fs = ~UvFileStream::new(
560 loop_, fd, true, home) as ~RtioFileStream;
562 unsafe { (*result_cell_ptr).put_back(res); }
563 let scheduler: ~Scheduler = Local::take();
564 scheduler.resume_blocked_task_immediately(task_cell.take());
566 let res = Err(uv_error_to_io_error(err.unwrap()));
567 unsafe { (*result_cell_ptr).put_back(res); }
568 let scheduler: ~Scheduler = Local::take();
569 scheduler.resume_blocked_task_immediately(task_cell.take());
574 assert!(!result_cell.is_empty());
575 return result_cell.take();
578 fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
579 let result_cell = Cell::new_empty();
580 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
581 let path_cell = Cell::new(path);
582 do task::unkillable { // FIXME(#8674)
583 let scheduler: ~Scheduler = Local::take();
584 do scheduler.deschedule_running_task_and_then |_, task| {
585 let task_cell = Cell::new(task);
586 let path = path_cell.take();
587 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
588 let res = match err {
590 Some(err) => Err(uv_error_to_io_error(err))
592 unsafe { (*result_cell_ptr).put_back(res); }
593 let scheduler: ~Scheduler = Local::take();
594 scheduler.resume_blocked_task_immediately(task_cell.take());
598 assert!(!result_cell.is_empty());
599 return result_cell.take();
602 fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError> {
603 let result_cell = Cell::new_empty();
604 let result_cell_ptr: *Cell<Result<~[IpAddr], IoError>> = &result_cell;
605 let host_ptr: *&str = &host;
606 let addrinfo_req = GetAddrInfoRequest::new();
607 let addrinfo_req_cell = Cell::new(addrinfo_req);
608 do task::unkillable { // FIXME(#8674)
609 let scheduler: ~Scheduler = Local::take();
610 do scheduler.deschedule_running_task_and_then |_, task| {
611 let task_cell = Cell::new(task);
612 let mut addrinfo_req = addrinfo_req_cell.take();
614 do addrinfo_req.getaddrinfo(self.uv_loop(),
616 None, None) |_, addrinfo, err| {
617 let res = match err {
618 None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())),
619 Some(err) => Err(uv_error_to_io_error(err))
621 (*result_cell_ptr).put_back(res);
622 let scheduler: ~Scheduler = Local::take();
623 scheduler.resume_blocked_task_immediately(task_cell.take());
628 addrinfo_req.delete();
629 assert!(!result_cell.is_empty());
630 return result_cell.take();
634 pub struct UvTcpListener {
635 watcher : TcpWatcher,
639 impl HomingIO for UvTcpListener {
640 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
644 fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
645 UvTcpListener { watcher: watcher, home: home }
649 impl Drop for UvTcpListener {
651 // XXX need mutable finalizer
652 let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
653 do self_.home_for_io_with_sched |self_, scheduler| {
654 do scheduler.deschedule_running_task_and_then |_, task| {
655 let task = Cell::new(task);
656 do self_.watcher.as_stream().close {
657 let scheduler: ~Scheduler = Local::take();
658 scheduler.resume_blocked_task_immediately(task.take());
665 impl RtioSocket for UvTcpListener {
666 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
667 do self.home_for_io |self_| {
668 socket_name(Tcp, self_.watcher)
673 impl RtioTcpListener for UvTcpListener {
674 fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
675 do self.home_for_io_consume |self_| {
676 let mut acceptor = ~UvTcpAcceptor::new(self_);
677 let incoming = Cell::new(acceptor.incoming.clone());
678 do acceptor.listener.watcher.listen |mut server, status| {
679 do incoming.with_mut_ref |incoming| {
680 let inc = match status {
681 Some(_) => Err(standard_error(OtherIoError)),
683 let inc = TcpWatcher::new(&server.event_loop());
684 // first accept call in the callback guarenteed to succeed
685 server.accept(inc.as_stream());
686 let home = get_handle_to_current_scheduler!();
687 Ok(~UvTcpStream { watcher: inc, home: home })
698 pub struct UvTcpAcceptor {
699 listener: UvTcpListener,
700 incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
703 impl HomingIO for UvTcpAcceptor {
704 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
708 fn new(listener: UvTcpListener) -> UvTcpAcceptor {
709 UvTcpAcceptor { listener: listener, incoming: Tube::new() }
713 impl RtioSocket for UvTcpAcceptor {
714 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
715 do self.home_for_io |self_| {
716 socket_name(Tcp, self_.listener.watcher)
721 impl RtioTcpAcceptor for UvTcpAcceptor {
722 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
723 do self.home_for_io |self_| {
724 self_.incoming.recv()
728 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
729 do self.home_for_io |self_| {
731 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
734 match status_to_maybe_uv_error(r) {
735 Some(err) => Err(uv_error_to_io_error(err)),
741 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
742 do self.home_for_io |self_| {
744 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
747 match status_to_maybe_uv_error(r) {
748 Some(err) => Err(uv_error_to_io_error(err)),
755 pub struct UvTcpStream {
760 impl HomingIO for UvTcpStream {
761 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
764 impl Drop for UvTcpStream {
766 // XXX need mutable finalizer
767 let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
768 do this.home_for_io_with_sched |self_, scheduler| {
769 do scheduler.deschedule_running_task_and_then |_, task| {
770 let task_cell = Cell::new(task);
771 do self_.watcher.as_stream().close {
772 let scheduler: ~Scheduler = Local::take();
773 scheduler.resume_blocked_task_immediately(task_cell.take());
780 impl RtioSocket for UvTcpStream {
781 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
782 do self.home_for_io |self_| {
783 socket_name(Tcp, self_.watcher)
788 impl RtioTcpStream for UvTcpStream {
789 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
790 do self.home_for_io_with_sched |self_, scheduler| {
791 let result_cell = Cell::new_empty();
792 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
794 let buf_ptr: *&mut [u8] = &buf;
795 do scheduler.deschedule_running_task_and_then |_sched, task| {
796 let task_cell = Cell::new(task);
797 // XXX: We shouldn't reallocate these callbacks every
799 let alloc: AllocCallback = |_| unsafe {
800 slice_to_uv_buf(*buf_ptr)
802 let mut watcher = self_.watcher.as_stream();
803 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
805 // Stop reading so that no read callbacks are
806 // triggered before the user calls `read` again.
807 // XXX: Is there a performance impact to calling
811 let result = if status.is_none() {
815 Err(uv_error_to_io_error(status.unwrap()))
818 unsafe { (*result_cell_ptr).put_back(result); }
820 let scheduler: ~Scheduler = Local::take();
821 scheduler.resume_blocked_task_immediately(task_cell.take());
825 assert!(!result_cell.is_empty());
830 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
831 do self.home_for_io_with_sched |self_, scheduler| {
832 let result_cell = Cell::new_empty();
833 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
834 let buf_ptr: *&[u8] = &buf;
835 do scheduler.deschedule_running_task_and_then |_, task| {
836 let task_cell = Cell::new(task);
837 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
838 let mut watcher = self_.watcher.as_stream();
839 do watcher.write(buf) |_watcher, status| {
840 let result = if status.is_none() {
843 Err(uv_error_to_io_error(status.unwrap()))
846 unsafe { (*result_cell_ptr).put_back(result); }
848 let scheduler: ~Scheduler = Local::take();
849 scheduler.resume_blocked_task_immediately(task_cell.take());
853 assert!(!result_cell.is_empty());
858 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
859 do self.home_for_io |self_| {
860 socket_name(TcpPeer, self_.watcher)
864 fn control_congestion(&mut self) -> Result<(), IoError> {
865 do self.home_for_io |self_| {
866 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
868 match status_to_maybe_uv_error(r) {
869 Some(err) => Err(uv_error_to_io_error(err)),
875 fn nodelay(&mut self) -> Result<(), IoError> {
876 do self.home_for_io |self_| {
877 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
879 match status_to_maybe_uv_error(r) {
880 Some(err) => Err(uv_error_to_io_error(err)),
886 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
887 do self.home_for_io |self_| {
889 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
890 delay_in_seconds as c_uint)
893 match status_to_maybe_uv_error(r) {
894 Some(err) => Err(uv_error_to_io_error(err)),
900 fn letdie(&mut self) -> Result<(), IoError> {
901 do self.home_for_io |self_| {
903 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
906 match status_to_maybe_uv_error(r) {
907 Some(err) => Err(uv_error_to_io_error(err)),
914 pub struct UvUdpSocket {
919 impl HomingIO for UvUdpSocket {
920 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
923 impl Drop for UvUdpSocket {
925 // XXX need mutable finalizer
926 let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
927 do this.home_for_io_with_sched |self_, scheduler| {
928 do scheduler.deschedule_running_task_and_then |_, task| {
929 let task_cell = Cell::new(task);
930 do self_.watcher.close {
931 let scheduler: ~Scheduler = Local::take();
932 scheduler.resume_blocked_task_immediately(task_cell.take());
939 impl RtioSocket for UvUdpSocket {
940 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
941 do self.home_for_io |self_| {
942 socket_name(Udp, self_.watcher)
947 impl RtioUdpSocket for UvUdpSocket {
948 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
949 do self.home_for_io_with_sched |self_, scheduler| {
950 let result_cell = Cell::new_empty();
951 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
953 let buf_ptr: *&mut [u8] = &buf;
954 do scheduler.deschedule_running_task_and_then |_, task| {
955 let task_cell = Cell::new(task);
956 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
957 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
958 let _ = flags; // /XXX add handling for partials?
962 let result = match status {
965 Ok((nread as uint, addr))
967 Some(err) => Err(uv_error_to_io_error(err)),
970 unsafe { (*result_cell_ptr).put_back(result); }
972 let scheduler: ~Scheduler = Local::take();
973 scheduler.resume_blocked_task_immediately(task_cell.take());
977 assert!(!result_cell.is_empty());
982 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
983 do self.home_for_io_with_sched |self_, scheduler| {
984 let result_cell = Cell::new_empty();
985 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
986 let buf_ptr: *&[u8] = &buf;
987 do scheduler.deschedule_running_task_and_then |_, task| {
988 let task_cell = Cell::new(task);
989 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
990 do self_.watcher.send(buf, dst) |_watcher, status| {
992 let result = match status {
994 Some(err) => Err(uv_error_to_io_error(err)),
997 unsafe { (*result_cell_ptr).put_back(result); }
999 let scheduler: ~Scheduler = Local::take();
1000 scheduler.resume_blocked_task_immediately(task_cell.take());
1004 assert!(!result_cell.is_empty());
1009 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1010 do self.home_for_io |self_| {
1012 do multi.to_str().with_c_str |m_addr| {
1013 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1014 ptr::null(), uvll::UV_JOIN_GROUP)
1018 match status_to_maybe_uv_error(r) {
1019 Some(err) => Err(uv_error_to_io_error(err)),
1025 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1026 do self.home_for_io |self_| {
1028 do multi.to_str().with_c_str |m_addr| {
1029 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1030 ptr::null(), uvll::UV_LEAVE_GROUP)
1034 match status_to_maybe_uv_error(r) {
1035 Some(err) => Err(uv_error_to_io_error(err)),
1041 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
1042 do self.home_for_io |self_| {
1045 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
1048 match status_to_maybe_uv_error(r) {
1049 Some(err) => Err(uv_error_to_io_error(err)),
1055 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1056 do self.home_for_io |self_| {
1059 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1062 match status_to_maybe_uv_error(r) {
1063 Some(err) => Err(uv_error_to_io_error(err)),
1069 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1070 do self.home_for_io |self_| {
1073 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1076 match status_to_maybe_uv_error(r) {
1077 Some(err) => Err(uv_error_to_io_error(err)),
1083 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1084 do self.home_for_io |self_| {
1087 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1090 match status_to_maybe_uv_error(r) {
1091 Some(err) => Err(uv_error_to_io_error(err)),
1097 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1098 do self.home_for_io |self_| {
1101 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1104 match status_to_maybe_uv_error(r) {
1105 Some(err) => Err(uv_error_to_io_error(err)),
1111 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1112 do self.home_for_io |self_| {
1115 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1118 match status_to_maybe_uv_error(r) {
1119 Some(err) => Err(uv_error_to_io_error(err)),
1126 pub struct UvTimer {
1127 watcher: timer::TimerWatcher,
1131 impl HomingIO for UvTimer {
1132 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1136 fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1137 UvTimer { watcher: w, home: home }
1141 impl Drop for UvTimer {
1143 let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1144 do self_.home_for_io_with_sched |self_, scheduler| {
1145 rtdebug!("closing UvTimer");
1146 do scheduler.deschedule_running_task_and_then |_, task| {
1147 let task_cell = Cell::new(task);
1148 do self_.watcher.close {
1149 let scheduler: ~Scheduler = Local::take();
1150 scheduler.resume_blocked_task_immediately(task_cell.take());
1157 impl RtioTimer for UvTimer {
1158 fn sleep(&mut self, msecs: u64) {
1159 do self.home_for_io_with_sched |self_, scheduler| {
1160 do scheduler.deschedule_running_task_and_then |_sched, task| {
1161 rtdebug!("sleep: entered scheduler context");
1162 let task_cell = Cell::new(task);
1163 do self_.watcher.start(msecs, 0) |_, status| {
1164 assert!(status.is_none());
1165 let scheduler: ~Scheduler = Local::take();
1166 scheduler.resume_blocked_task_immediately(task_cell.take());
1169 self_.watcher.stop();
1174 pub struct UvFileStream {
1176 fd: file::FileDescriptor,
1177 close_on_drop: bool,
1181 impl HomingIO for UvFileStream {
1182 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1186 fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1187 home: SchedHandle) -> UvFileStream {
1191 close_on_drop: close_on_drop,
1195 fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1196 let result_cell = Cell::new_empty();
1197 let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1198 let buf_ptr: *&mut [u8] = &buf;
1199 do self.home_for_io_with_sched |self_, scheduler| {
1200 do scheduler.deschedule_running_task_and_then |_, task| {
1201 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1202 let task_cell = Cell::new(task);
1203 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1204 let res = match uverr {
1205 None => Ok(req.get_result() as int),
1206 Some(err) => Err(uv_error_to_io_error(err))
1208 unsafe { (*result_cell_ptr).put_back(res); }
1209 let scheduler: ~Scheduler = Local::take();
1210 scheduler.resume_blocked_task_immediately(task_cell.take());
1216 fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1217 let result_cell = Cell::new_empty();
1218 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1219 let buf_ptr: *&[u8] = &buf;
1220 do self.home_for_io_with_sched |self_, scheduler| {
1221 do scheduler.deschedule_running_task_and_then |_, task| {
1222 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1223 let task_cell = Cell::new(task);
1224 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1225 let res = match uverr {
1227 Some(err) => Err(uv_error_to_io_error(err))
1229 unsafe { (*result_cell_ptr).put_back(res); }
1230 let scheduler: ~Scheduler = Local::take();
1231 scheduler.resume_blocked_task_immediately(task_cell.take());
1237 fn seek_common(&mut self, pos: i64, whence: c_int) ->
1238 Result<u64, IoError>{
1239 #[fixed_stack_segment]; #[inline(never)];
1241 match lseek((*self.fd), pos as off_t, whence) {
1245 desc: "Failed to lseek.",
1255 impl Drop for UvFileStream {
1257 let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1258 if self.close_on_drop {
1259 do self_.home_for_io_with_sched |self_, scheduler| {
1260 do scheduler.deschedule_running_task_and_then |_, task| {
1261 let task_cell = Cell::new(task);
1262 do self_.fd.close(&self.loop_) |_,_| {
1263 let scheduler: ~Scheduler = Local::take();
1264 scheduler.resume_blocked_task_immediately(task_cell.take());
1272 impl RtioFileStream for UvFileStream {
1273 fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1274 self.base_read(buf, -1)
1276 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1277 self.base_write(buf, -1)
1279 fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1280 self.base_read(buf, offset as i64)
1282 fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1283 self.base_write(buf, offset as i64)
1285 fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1286 use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1287 let whence = match whence {
1288 SeekSet => SEEK_SET,
1289 SeekCur => SEEK_CUR,
1292 self.seek_common(pos, whence)
1294 fn tell(&self) -> Result<u64, IoError> {
1296 // this is temporary
1297 let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1298 self_.seek_common(0, SEEK_CUR)
1300 fn flush(&mut self) -> Result<(), IoError> {
1306 fn test_simple_io_no_connect() {
1307 do run_in_mt_newsched_task {
1309 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1310 let addr = next_test_ip4();
1311 let maybe_chan = (*io).tcp_connect(addr);
1312 assert!(maybe_chan.is_err());
1318 fn test_simple_udp_io_bind_only() {
1319 do run_in_mt_newsched_task {
1321 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1322 let addr = next_test_ip4();
1323 let maybe_socket = (*io).udp_bind(addr);
1324 assert!(maybe_socket.is_ok());
1330 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1331 use rt::sleeper_list::SleeperList;
1332 use rt::work_queue::WorkQueue;
1333 use rt::thread::Thread;
1335 use rt::sched::{Shutdown, TaskFromFriend};
1336 do run_in_bare_thread {
1337 let sleepers = SleeperList::new();
1338 let work_queue1 = WorkQueue::new();
1339 let work_queue2 = WorkQueue::new();
1340 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1342 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1344 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1347 let handle1 = Cell::new(sched1.make_handle());
1348 let handle2 = Cell::new(sched2.make_handle());
1349 let tasksFriendHandle = Cell::new(sched2.make_handle());
1351 let on_exit: ~fn(bool) = |exit_status| {
1352 handle1.take().send(Shutdown);
1353 handle2.take().send(Shutdown);
1354 rtassert!(exit_status);
1357 let test_function: ~fn() = || {
1358 let io: *mut IoFactoryObject = unsafe {
1359 Local::unsafe_borrow()
1361 let addr = next_test_ip4();
1362 let maybe_socket = unsafe { (*io).udp_bind(addr) };
1363 // this socket is bound to this event loop
1364 assert!(maybe_socket.is_ok());
1366 // block self on sched1
1367 do task::unkillable { // FIXME(#8674)
1368 let scheduler: ~Scheduler = Local::take();
1369 do scheduler.deschedule_running_task_and_then |_, task| {
1371 do task.wake().map_move |task| {
1372 // send self to sched2
1373 tasksFriendHandle.take().send(TaskFromFriend(task));
1375 // sched1 should now sleep since it has nothing else to do
1378 // sched2 will wake up and get the task
1379 // as we do nothing else, the function ends and the socket goes out of scope
1380 // sched2 will start to run the destructor
1381 // the destructor will first block the task, set it's home as sched1, then enqueue it
1382 // sched2 will dequeue the task, see that it has a home, and send it to sched1
1383 // sched1 will wake up, exec the close function on the correct loop, and then we're done
1386 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1387 main_task.death.on_exit = Some(on_exit);
1388 let main_task = Cell::new(main_task);
1390 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1392 let sched1 = Cell::new(sched1);
1393 let sched2 = Cell::new(sched2);
1395 let thread1 = do Thread::start {
1396 sched1.take().bootstrap(main_task.take());
1398 let thread2 = do Thread::start {
1399 sched2.take().bootstrap(null_task.take());
1408 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1409 use rt::sleeper_list::SleeperList;
1410 use rt::work_queue::WorkQueue;
1411 use rt::thread::Thread;
1413 use rt::comm::oneshot;
1414 use rt::sched::Shutdown;
1415 do run_in_bare_thread {
1416 let sleepers = SleeperList::new();
1417 let work_queue1 = WorkQueue::new();
1418 let work_queue2 = WorkQueue::new();
1419 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1421 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1423 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1426 let handle1 = Cell::new(sched1.make_handle());
1427 let handle2 = Cell::new(sched2.make_handle());
1429 let (port, chan) = oneshot();
1430 let port = Cell::new(port);
1431 let chan = Cell::new(chan);
1433 let body1: ~fn() = || {
1434 let io: *mut IoFactoryObject = unsafe {
1435 Local::unsafe_borrow()
1437 let addr = next_test_ip4();
1438 let socket = unsafe { (*io).udp_bind(addr) };
1439 assert!(socket.is_ok());
1440 chan.take().send(socket);
1443 let body2: ~fn() = || {
1444 let socket = port.take().recv();
1445 assert!(socket.is_ok());
1446 /* The socket goes out of scope and the destructor is called.
1448 * - sends itself back to sched1
1449 * - frees the socket
1450 * - resets the home of the task to whatever it was previously
1454 let on_exit: ~fn(bool) = |exit| {
1455 handle1.take().send(Shutdown);
1456 handle2.take().send(Shutdown);
1460 let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1462 let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1463 task2.death.on_exit = Some(on_exit);
1464 let task2 = Cell::new(task2);
1466 let sched1 = Cell::new(sched1);
1467 let sched2 = Cell::new(sched2);
1469 let thread1 = do Thread::start {
1470 sched1.take().bootstrap(task1.take());
1472 let thread2 = do Thread::start {
1473 sched2.take().bootstrap(task2.take());
1482 fn test_simple_tcp_server_and_client() {
1483 do run_in_mt_newsched_task {
1484 let addr = next_test_ip4();
1485 let (port, chan) = oneshot();
1486 let port = Cell::new(port);
1487 let chan = Cell::new(chan);
1489 // Start the server first so it's listening when we connect
1492 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1493 let listener = (*io).tcp_bind(addr).unwrap();
1494 let mut acceptor = listener.listen().unwrap();
1495 chan.take().send(());
1496 let mut stream = acceptor.accept().unwrap();
1497 let mut buf = [0, .. 2048];
1498 let nread = stream.read(buf).unwrap();
1499 assert_eq!(nread, 8);
1500 for i in range(0u, nread) {
1501 rtdebug!("%u", buf[i] as uint);
1502 assert_eq!(buf[i], i as u8);
1510 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1511 let mut stream = (*io).tcp_connect(addr).unwrap();
1512 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1519 fn test_simple_tcp_server_and_client_on_diff_threads() {
1520 use rt::sleeper_list::SleeperList;
1521 use rt::work_queue::WorkQueue;
1522 use rt::thread::Thread;
1524 use rt::sched::{Shutdown};
1525 do run_in_bare_thread {
1526 let sleepers = SleeperList::new();
1528 let server_addr = next_test_ip4();
1529 let client_addr = server_addr.clone();
1531 let server_work_queue = WorkQueue::new();
1532 let client_work_queue = WorkQueue::new();
1533 let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1535 let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1536 queues.clone(), sleepers.clone());
1537 let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1538 queues.clone(), sleepers.clone());
1540 let server_handle = Cell::new(server_sched.make_handle());
1541 let client_handle = Cell::new(client_sched.make_handle());
1543 let server_on_exit: ~fn(bool) = |exit_status| {
1544 server_handle.take().send(Shutdown);
1545 rtassert!(exit_status);
1548 let client_on_exit: ~fn(bool) = |exit_status| {
1549 client_handle.take().send(Shutdown);
1550 rtassert!(exit_status);
1553 let server_fn: ~fn() = || {
1554 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1555 let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1556 let mut acceptor = listener.listen().unwrap();
1557 let mut stream = acceptor.accept().unwrap();
1558 let mut buf = [0, .. 2048];
1559 let nread = stream.read(buf).unwrap();
1560 assert_eq!(nread, 8);
1561 for i in range(0u, nread) {
1562 assert_eq!(buf[i], i as u8);
1566 let client_fn: ~fn() = || {
1567 let io: *mut IoFactoryObject = unsafe {
1568 Local::unsafe_borrow()
1570 let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1571 while stream.is_err() {
1572 stream = unsafe { (*io).tcp_connect(client_addr) };
1574 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1577 let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1578 server_task.death.on_exit = Some(server_on_exit);
1579 let server_task = Cell::new(server_task);
1581 let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1582 client_task.death.on_exit = Some(client_on_exit);
1583 let client_task = Cell::new(client_task);
1585 let server_sched = Cell::new(server_sched);
1586 let client_sched = Cell::new(client_sched);
1588 let server_thread = do Thread::start {
1589 server_sched.take().bootstrap(server_task.take());
1591 let client_thread = do Thread::start {
1592 client_sched.take().bootstrap(client_task.take());
1595 server_thread.join();
1596 client_thread.join();
1601 fn test_simple_udp_server_and_client() {
1602 do run_in_mt_newsched_task {
1603 let server_addr = next_test_ip4();
1604 let client_addr = next_test_ip4();
1605 let (port, chan) = oneshot();
1606 let port = Cell::new(port);
1607 let chan = Cell::new(chan);
1611 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1612 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1613 chan.take().send(());
1614 let mut buf = [0, .. 2048];
1615 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1616 assert_eq!(nread, 8);
1617 for i in range(0u, nread) {
1618 rtdebug!("%u", buf[i] as uint);
1619 assert_eq!(buf[i], i as u8);
1621 assert_eq!(src, client_addr);
1627 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1628 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1630 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1636 #[test] #[ignore(reason = "busted")]
1637 fn test_read_and_block() {
1638 do run_in_mt_newsched_task {
1639 let addr = next_test_ip4();
1640 let (port, chan) = oneshot();
1641 let port = Cell::new(port);
1642 let chan = Cell::new(chan);
1645 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1646 let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1647 let mut acceptor = listener.listen().unwrap();
1648 chan.take().send(());
1649 let mut stream = acceptor.accept().unwrap();
1650 let mut buf = [0, .. 2048];
1653 let mut current = 0;
1656 while current < expected {
1657 let nread = stream.read(buf).unwrap();
1658 for i in range(0u, nread) {
1659 let val = buf[i] as uint;
1660 assert_eq!(val, current % 8);
1665 do task::unkillable { // FIXME(#8674)
1666 let scheduler: ~Scheduler = Local::take();
1667 // Yield to the other task in hopes that it
1668 // will trigger a read callback while we are
1670 do scheduler.deschedule_running_task_and_then |sched, task| {
1671 let task = Cell::new(task);
1672 sched.enqueue_blocked_task(task.take());
1677 // Make sure we had multiple reads
1684 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1685 let mut stream = (*io).tcp_connect(addr).unwrap();
1686 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1687 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1688 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1689 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1697 fn test_read_read_read() {
1698 do run_in_mt_newsched_task {
1699 let addr = next_test_ip4();
1700 static MAX: uint = 500000;
1701 let (port, chan) = oneshot();
1702 let port = Cell::new(port);
1703 let chan = Cell::new(chan);
1707 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1708 let listener = (*io).tcp_bind(addr).unwrap();
1709 let mut acceptor = listener.listen().unwrap();
1710 chan.take().send(());
1711 let mut stream = acceptor.accept().unwrap();
1712 let buf = [1, .. 2048];
1713 let mut total_bytes_written = 0;
1714 while total_bytes_written < MAX {
1716 total_bytes_written += buf.len();
1724 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1725 let mut stream = (*io).tcp_connect(addr).unwrap();
1726 let mut buf = [0, .. 2048];
1727 let mut total_bytes_read = 0;
1728 while total_bytes_read < MAX {
1729 let nread = stream.read(buf).unwrap();
1730 rtdebug!("read %u bytes", nread as uint);
1731 total_bytes_read += nread;
1732 for i in range(0u, nread) {
1733 assert_eq!(buf[i], 1);
1736 rtdebug!("read %u bytes total", total_bytes_read as uint);
1743 fn test_udp_twice() {
1744 do run_in_mt_newsched_task {
1745 let server_addr = next_test_ip4();
1746 let client_addr = next_test_ip4();
1747 let (port, chan) = oneshot();
1748 let port = Cell::new(port);
1749 let chan = Cell::new(chan);
1753 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1754 let mut client = (*io).udp_bind(client_addr).unwrap();
1756 assert!(client.sendto([1], server_addr).is_ok());
1757 assert!(client.sendto([2], server_addr).is_ok());
1763 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1764 let mut server = (*io).udp_bind(server_addr).unwrap();
1765 chan.take().send(());
1768 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1769 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1770 assert_eq!(nread1, 1);
1771 assert_eq!(nread2, 1);
1772 assert_eq!(src1, client_addr);
1773 assert_eq!(src2, client_addr);
1774 assert_eq!(buf1[0], 1);
1775 assert_eq!(buf2[0], 2);
1782 fn test_udp_many_read() {
1783 do run_in_mt_newsched_task {
1784 let server_out_addr = next_test_ip4();
1785 let server_in_addr = next_test_ip4();
1786 let client_out_addr = next_test_ip4();
1787 let client_in_addr = next_test_ip4();
1788 static MAX: uint = 500_000;
1790 let (p1, c1) = oneshot();
1791 let (p2, c2) = oneshot();
1793 let first = Cell::new((p1, c2));
1794 let second = Cell::new((p2, c1));
1798 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1799 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1800 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1801 let (port, chan) = first.take();
1804 let msg = [1, .. 2048];
1805 let mut total_bytes_sent = 0;
1809 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1810 total_bytes_sent += msg.len();
1811 // check if the client has received enough
1812 let res = server_in.recvfrom(buf);
1813 assert!(res.is_ok());
1814 let (nread, src) = res.unwrap();
1815 assert_eq!(nread, 1);
1816 assert_eq!(src, client_out_addr);
1818 assert!(total_bytes_sent >= MAX);
1824 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1825 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1826 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1827 let (port, chan) = second.take();
1830 let mut total_bytes_recv = 0;
1831 let mut buf = [0, .. 2048];
1832 while total_bytes_recv < MAX {
1834 assert!(client_out.sendto([1], server_in_addr).is_ok());
1836 let res = client_in.recvfrom(buf);
1837 assert!(res.is_ok());
1838 let (nread, src) = res.unwrap();
1839 assert_eq!(src, server_out_addr);
1840 total_bytes_recv += nread;
1841 for i in range(0u, nread) {
1842 assert_eq!(buf[i], 1);
1845 // tell the server we're done
1846 assert!(client_out.sendto([0], server_in_addr).is_ok());
1853 fn test_timer_sleep_simple() {
1854 do run_in_mt_newsched_task {
1856 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1857 let timer = (*io).timer_init();
1858 do timer.map_move |mut t| { t.sleep(1) };
1863 fn file_test_uvio_full_simple_impl() {
1864 use str::StrSlice; // why does this have to be explicitly imported to work?
1865 // compiler was complaining about no trait for str that
1866 // does .as_bytes() ..
1868 use rt::io::{Open, Create, ReadWrite, Read};
1870 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1871 let write_val = "hello uvio!";
1872 let path = "./tmp/file_test_uvio_full.txt";
1874 let create_fm = Create;
1875 let create_fa = ReadWrite;
1876 let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1877 let write_buf = write_val.as_bytes();
1878 fd.write(write_buf);
1883 let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1884 let mut read_vec = [0, .. 1028];
1885 let nread = fd.read(read_vec).unwrap();
1886 let read_val = str::from_utf8(read_vec.slice(0, nread as uint));
1887 assert!(read_val == write_val.to_owned());
1889 (*io).fs_unlink(&Path(path));
1894 fn file_test_uvio_full_simple() {
1895 do run_in_mt_newsched_task {
1896 file_test_uvio_full_simple_impl();
1900 fn uvio_naive_print(input: &str) {
1903 use libc::{STDOUT_FILENO};
1904 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1906 let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
1907 let write_buf = input.as_bytes();
1908 fd.write(write_buf);
1914 fn file_test_uvio_write_to_stdout() {
1915 do run_in_mt_newsched_task {
1916 uvio_naive_print("jubilation\n");