1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use libc::{c_int, c_uint, c_void, pid_t};
23 use rt::io::net::ip::{SocketAddr, IpAddr};
24 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
25 use rt::kill::BlockedTask;
28 use rt::sched::{Scheduler, SchedHandle};
31 use rt::uv::idle::IdleWatcher;
32 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
33 use unstable::sync::Exclusive;
34 use super::super::io::support::PathLike;
35 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
37 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
38 CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
41 #[cfg(test)] use container::Container;
42 #[cfg(test)] use unstable::run_in_bare_thread;
43 #[cfg(test)] use rt::test::{spawntask,
45 run_in_newsched_task};
46 #[cfg(test)] use iterator::{Iterator, range};
48 // XXX we should not be calling uvll functions in here.
51 fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
52 /* XXX This will move pinned tasks to do IO on the proper scheduler
53 * and then move them back to their home.
55 fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
56 use rt::sched::{PinnedTask, TaskFromFriend};
58 let old_home = Cell::new_empty();
59 let old_home_ptr = &old_home;
60 do task::unkillable { // FIXME(#8674)
61 let scheduler: ~Scheduler = Local::take();
62 do scheduler.deschedule_running_task_and_then |_, task| {
63 // get the old home first
64 do task.wake().map_move |mut task| {
65 old_home_ptr.put_back(task.take_unwrap_home());
66 self.home().send(PinnedTask(task));
75 do task::unkillable { // FIXME(#8674)
76 let scheduler: ~Scheduler = Local::take();
77 do scheduler.deschedule_running_task_and_then |scheduler, task| {
78 do task.wake().map_move |mut task| {
79 task.give_home(old_home.take());
80 scheduler.make_handle().send(TaskFromFriend(task));
85 // return the result of the IO
89 fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
90 use rt::sched::{PinnedTask, TaskFromFriend};
92 do task::unkillable { // FIXME(#8674)
94 let old_home = Cell::new_empty();
95 let old_home_ptr = &old_home;
96 let scheduler: ~Scheduler = Local::take();
97 do scheduler.deschedule_running_task_and_then |_, task| {
98 // get the old home first
99 do task.wake().map_move |mut task| {
100 old_home_ptr.put_back(task.take_unwrap_home());
101 self.home().send(PinnedTask(task));
106 let scheduler: ~Scheduler = Local::take();
107 let a = io_sched(self, scheduler);
110 let scheduler: ~Scheduler = Local::take();
111 do scheduler.deschedule_running_task_and_then |scheduler, task| {
112 do task.wake().map_move |mut task| {
113 task.give_home(old_home.take());
114 scheduler.make_handle().send(TaskFromFriend(task));
118 // return the result of the IO
124 // get a handle for the current scheduler
125 macro_rules! get_handle_to_current_scheduler(
126 () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
129 enum SocketNameKind {
135 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
136 handle: U) -> Result<SocketAddr, IoError> {
137 let getsockname = match sk {
138 TcpPeer => uvll::tcp_getpeername,
139 Tcp => uvll::tcp_getsockname,
140 Udp => uvll::udp_getsockname,
143 // Allocate a sockaddr_storage
144 // since we don't know if it's ipv4 or ipv6
145 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
148 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
152 let status = status_to_maybe_uv_error(r);
153 return Err(uv_error_to_io_error(status.unwrap()));
157 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
158 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
160 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
164 unsafe { uvll::free_sockaddr_storage(r_addr); }
170 // Obviously an Event Loop is always home.
171 pub struct UvEventLoop {
176 pub fn new() -> UvEventLoop {
178 uvio: UvIoFactory(Loop::new())
183 impl Drop for UvEventLoop {
185 // XXX: Need mutable finalizer
187 transmute::<&UvEventLoop, &mut UvEventLoop>(self)
189 this.uvio.uv_loop().close();
193 impl EventLoop for UvEventLoop {
195 self.uvio.uv_loop().run();
198 fn callback(&mut self, f: ~fn()) {
199 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
200 do idle_watcher.start |mut idle_watcher, status| {
201 assert!(status.is_none());
203 idle_watcher.close(||());
208 fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
209 let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
210 return ~UvPausibleIdleCallback {
211 watcher: idle_watcher,
217 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
218 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
219 do timer.start(ms, 0) |timer, status| {
220 assert!(status.is_none());
226 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
227 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
230 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
235 pub struct UvPausibleIdleCallback {
236 watcher: IdleWatcher,
241 impl UvPausibleIdleCallback {
243 pub fn start(&mut self, f: ~fn()) {
244 do self.watcher.start |_idle_watcher, _status| {
247 self.idle_flag = true;
250 pub fn pause(&mut self) {
251 if self.idle_flag == true {
253 self.idle_flag = false;
257 pub fn resume(&mut self) {
258 if self.idle_flag == false {
259 self.watcher.restart();
260 self.idle_flag = true;
264 pub fn close(&mut self) {
268 self.watcher.close(||{});
274 fn test_callback_run_once() {
275 do run_in_bare_thread {
276 let mut event_loop = UvEventLoop::new();
278 let count_ptr: *mut int = &mut count;
279 do event_loop.callback {
280 unsafe { *count_ptr += 1 }
283 assert_eq!(count, 1);
287 // The entire point of async is to call into a loop from other threads so it does not need to home.
288 pub struct UvRemoteCallback {
289 // The uv async handle for triggering the callback
291 // A flag to tell the callback to exit, set from the dtor. This is
292 // almost never contested - only in rare races with the dtor.
293 exit_flag: Exclusive<bool>
296 impl UvRemoteCallback {
297 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
298 let exit_flag = Exclusive::new(false);
299 let exit_flag_clone = exit_flag.clone();
300 let async = do AsyncWatcher::new(loop_) |watcher, status| {
301 assert!(status.is_none());
303 // The synchronization logic here is subtle. To review,
304 // the uv async handle type promises that, after it is
305 // triggered the remote callback is definitely called at
306 // least once. UvRemoteCallback needs to maintain those
307 // semantics while also shutting down cleanly from the
308 // dtor. In our case that means that, when the
309 // UvRemoteCallback dtor calls `async.send()`, here `f` is
310 // always called later.
312 // In the dtor both the exit flag is set and the async
313 // callback fired under a lock. Here, before calling `f`,
314 // we take the lock and check the flag. Because we are
315 // checking the flag before calling `f`, and the flag is
316 // set under the same lock as the send, then if the flag
317 // is set then we're guaranteed to call `f` after the
320 // If the check was done after `f()` then there would be a
321 // period between that call and the check where the dtor
322 // could be called in the other thread, missing the final
323 // callback while still destroying the handle.
325 let should_exit = unsafe {
326 exit_flag_clone.with_imm(|&should_exit| should_exit)
343 impl RemoteCallback for UvRemoteCallback {
344 fn fire(&mut self) { self.async.send() }
347 impl Drop for UvRemoteCallback {
350 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
351 do this.exit_flag.with |should_exit| {
352 // NB: These two things need to happen atomically. Otherwise
353 // the event handler could wake up due to a *previous*
354 // signal and see the exit flag, destroying the handle
355 // before the final send.
367 use rt::thread::Thread;
369 use rt::rtio::EventLoop;
370 use rt::local::Local;
371 use rt::sched::Scheduler;
374 fn test_uv_remote() {
375 do run_in_newsched_task {
376 let mut tube = Tube::new();
377 let tube_clone = tube.clone();
378 let remote_cell = Cell::new_empty();
379 do Local::borrow |sched: &mut Scheduler| {
380 let tube_clone = tube_clone.clone();
381 let tube_clone_cell = Cell::new(tube_clone);
382 let remote = do sched.event_loop.remote_callback {
383 // This could be called multiple times
384 if !tube_clone_cell.is_empty() {
385 tube_clone_cell.take().send(1);
388 remote_cell.put_back(remote);
390 let thread = do Thread::start {
391 remote_cell.take().fire();
394 assert!(tube.recv() == 1);
400 pub struct UvIoFactory(Loop);
403 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
404 match self { &UvIoFactory(ref mut ptr) => ptr }
408 impl IoFactory for UvIoFactory {
409 // Connect to an address and return a new stream
410 // NB: This blocks the task waiting on the connection.
411 // It would probably be better to return a future
412 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
413 // Create a cell in the task to hold the result. We will fill
414 // the cell before resuming the task.
415 let result_cell = Cell::new_empty();
416 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
418 // Block this task and take ownership, switch to scheduler context
419 do task::unkillable { // FIXME(#8674)
420 let scheduler: ~Scheduler = Local::take();
421 do scheduler.deschedule_running_task_and_then |_, task| {
423 let mut tcp = TcpWatcher::new(self.uv_loop());
424 let task_cell = Cell::new(task);
426 // Wait for a connection
427 do tcp.connect(addr) |stream, status| {
430 let tcp = NativeHandle::from_native_handle(stream.native_handle());
431 let home = get_handle_to_current_scheduler!();
432 let res = Ok(~UvTcpStream { watcher: tcp, home: home });
434 // Store the stream in the task's stack
435 unsafe { (*result_cell_ptr).put_back(res); }
438 let scheduler: ~Scheduler = Local::take();
439 scheduler.resume_blocked_task_immediately(task_cell.take());
442 let task_cell = Cell::new(task_cell.take());
444 let res = Err(uv_error_to_io_error(status.unwrap()));
445 unsafe { (*result_cell_ptr).put_back(res); }
446 let scheduler: ~Scheduler = Local::take();
447 scheduler.resume_blocked_task_immediately(task_cell.take());
455 assert!(!result_cell.is_empty());
456 return result_cell.take();
459 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
460 let mut watcher = TcpWatcher::new(self.uv_loop());
461 match watcher.bind(addr) {
463 let home = get_handle_to_current_scheduler!();
464 Ok(~UvTcpListener::new(watcher, home))
467 do task::unkillable { // FIXME(#8674)
468 let scheduler: ~Scheduler = Local::take();
469 do scheduler.deschedule_running_task_and_then |_, task| {
470 let task_cell = Cell::new(task);
471 do watcher.as_stream().close {
472 let scheduler: ~Scheduler = Local::take();
473 scheduler.resume_blocked_task_immediately(task_cell.take());
476 Err(uv_error_to_io_error(uverr))
482 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
483 let mut watcher = UdpWatcher::new(self.uv_loop());
484 match watcher.bind(addr) {
486 let home = get_handle_to_current_scheduler!();
487 Ok(~UvUdpSocket { watcher: watcher, home: home })
490 do task::unkillable { // FIXME(#8674)
491 let scheduler: ~Scheduler = Local::take();
492 do scheduler.deschedule_running_task_and_then |_, task| {
493 let task_cell = Cell::new(task);
495 let scheduler: ~Scheduler = Local::take();
496 scheduler.resume_blocked_task_immediately(task_cell.take());
499 Err(uv_error_to_io_error(uverr))
505 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
506 let watcher = TimerWatcher::new(self.uv_loop());
507 let home = get_handle_to_current_scheduler!();
508 Ok(~UvTimer::new(watcher, home))
511 fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
512 let loop_ = Loop {handle: self.uv_loop().native_handle()};
513 let fd = file::FileDescriptor(fd);
514 let home = get_handle_to_current_scheduler!();
515 ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
518 fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
519 -> Result<~RtioFileStream, IoError> {
520 let mut flags = match fm {
523 OpenOrCreate => O_CREAT,
526 CreateOrTruncate => O_TRUNC | O_CREAT
529 Read => flags | O_RDONLY,
530 Write => flags | O_WRONLY,
531 ReadWrite => flags | O_RDWR
533 let create_mode = match fm {
534 Create|OpenOrCreate|CreateOrTruncate =>
538 let result_cell = Cell::new_empty();
539 let result_cell_ptr: *Cell<Result<~RtioFileStream,
540 IoError>> = &result_cell;
541 let path_cell = Cell::new(path);
542 do task::unkillable { // FIXME(#8674)
543 let scheduler: ~Scheduler = Local::take();
544 do scheduler.deschedule_running_task_and_then |_, task| {
545 let task_cell = Cell::new(task);
546 let path = path_cell.take();
547 do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
550 let loop_ = Loop {handle: req.get_loop().native_handle()};
551 let home = get_handle_to_current_scheduler!();
552 let fd = file::FileDescriptor(req.get_result());
553 let fs = ~UvFileStream::new(
554 loop_, fd, true, home) as ~RtioFileStream;
556 unsafe { (*result_cell_ptr).put_back(res); }
557 let scheduler: ~Scheduler = Local::take();
558 scheduler.resume_blocked_task_immediately(task_cell.take());
560 let res = Err(uv_error_to_io_error(err.unwrap()));
561 unsafe { (*result_cell_ptr).put_back(res); }
562 let scheduler: ~Scheduler = Local::take();
563 scheduler.resume_blocked_task_immediately(task_cell.take());
568 assert!(!result_cell.is_empty());
569 return result_cell.take();
572 fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
573 let result_cell = Cell::new_empty();
574 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
575 let path_cell = Cell::new(path);
576 do task::unkillable { // FIXME(#8674)
577 let scheduler: ~Scheduler = Local::take();
578 do scheduler.deschedule_running_task_and_then |_, task| {
579 let task_cell = Cell::new(task);
580 let path = path_cell.take();
581 do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
582 let res = match err {
584 Some(err) => Err(uv_error_to_io_error(err))
586 unsafe { (*result_cell_ptr).put_back(res); }
587 let scheduler: ~Scheduler = Local::take();
588 scheduler.resume_blocked_task_immediately(task_cell.take());
592 assert!(!result_cell.is_empty());
593 return result_cell.take();
596 fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> {
597 let home = get_handle_to_current_scheduler!();
598 Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
602 config: &process::Config) -> Result<~RtioProcessObject, IoError> {
603 // Sadly, we must create the UvProcess before we actually call uv_spawn
604 // so that the exit_cb can close over it and notify it when the process
606 let mut ret = ~UvProcess {
607 process: Process::new(),
614 let ret_ptr = unsafe {
615 *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
618 // The purpose of this exit callback is to record the data about the
619 // exit and then wake up the task which may be waiting for the process
620 // to exit. This is all performed in the current io-loop, and the
621 // implementation of UvProcess ensures that reading these fields always
622 // occurs on the current io-loop.
623 let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
625 assert!((*ret_ptr).exit_status.is_none());
626 (*ret_ptr).exit_status = Some(exit_status);
627 (*ret_ptr).term_signal = Some(term_signal);
628 (*ret_ptr).exit_error = error;
629 match (*ret_ptr).descheduled.take() {
631 let scheduler: ~Scheduler = Local::take();
632 scheduler.resume_blocked_task_immediately(task);
639 match ret.process.spawn(self.uv_loop(), config, exit_cb) {
641 // Only now do we actually get a handle to this scheduler.
642 ret.home = Some(get_handle_to_current_scheduler!());
646 // We still need to close the process handle we created, but
647 // that's taken care for us in the destructor of UvProcess
648 Err(uv_error_to_io_error(uverr))
654 pub struct UvTcpListener {
657 incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
661 impl HomingIO for UvTcpListener {
662 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
666 fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
670 incoming_streams: Tube::new(),
675 fn watcher(&self) -> TcpWatcher { self.watcher }
678 impl Drop for UvTcpListener {
680 // XXX need mutable finalizer
681 let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
682 do self_.home_for_io_with_sched |self_, scheduler| {
683 do scheduler.deschedule_running_task_and_then |_, task| {
684 let task_cell = Cell::new(task);
685 do self_.watcher().as_stream().close {
686 let scheduler: ~Scheduler = Local::take();
687 scheduler.resume_blocked_task_immediately(task_cell.take());
694 impl RtioSocket for UvTcpListener {
695 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
696 do self.home_for_io |self_| {
697 socket_name(Tcp, self_.watcher)
702 impl RtioTcpListener for UvTcpListener {
704 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
705 do self.home_for_io |self_| {
707 if !self_.listening {
708 self_.listening = true;
710 let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
712 do self_.watcher().listen |mut server, status| {
713 let stream = match status {
714 Some(_) => Err(standard_error(OtherIoError)),
716 let client = TcpWatcher::new(&server.event_loop());
717 // XXX: needs to be surfaced in interface
718 server.accept(client.as_stream());
719 let home = get_handle_to_current_scheduler!();
720 Ok(~UvTcpStream { watcher: client, home: home })
724 let mut incoming_streams = incoming_streams_cell.take();
725 incoming_streams.send(stream);
726 incoming_streams_cell.put_back(incoming_streams);
730 self_.incoming_streams.recv()
734 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
735 do self.home_for_io |self_| {
737 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
740 match status_to_maybe_uv_error(r) {
741 Some(err) => Err(uv_error_to_io_error(err)),
747 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
748 do self.home_for_io |self_| {
750 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
753 match status_to_maybe_uv_error(r) {
754 Some(err) => Err(uv_error_to_io_error(err)),
761 trait UvStream: HomingIO {
762 fn as_stream(&mut self) -> StreamWatcher;
765 // FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but
766 // that has conflicts with other traits that also have methods
767 // called `read` and `write`
768 macro_rules! rtiostream(($t:ident) => {
769 impl RtioStream for $t {
770 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
771 do self.home_for_io_with_sched |self_, scheduler| {
772 let result_cell = Cell::new_empty();
773 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
775 let buf_ptr: *&mut [u8] = &buf;
776 do scheduler.deschedule_running_task_and_then |_sched, task| {
777 let task_cell = Cell::new(task);
778 // XXX: We shouldn't reallocate these callbacks every
780 let alloc: AllocCallback = |_| unsafe {
781 slice_to_uv_buf(*buf_ptr)
783 let mut watcher = self_.as_stream();
784 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
786 // Stop reading so that no read callbacks are
787 // triggered before the user calls `read` again.
788 // XXX: Is there a performance impact to calling
792 let result = if status.is_none() {
796 Err(uv_error_to_io_error(status.unwrap()))
799 unsafe { (*result_cell_ptr).put_back(result); }
801 let scheduler: ~Scheduler = Local::take();
802 scheduler.resume_blocked_task_immediately(task_cell.take());
806 assert!(!result_cell.is_empty());
811 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
812 do self.home_for_io_with_sched |self_, scheduler| {
813 let result_cell = Cell::new_empty();
814 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
815 let buf_ptr: *&[u8] = &buf;
816 do scheduler.deschedule_running_task_and_then |_, task| {
817 let task_cell = Cell::new(task);
818 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
819 let mut watcher = self_.as_stream();
820 do watcher.write(buf) |_watcher, status| {
821 let result = if status.is_none() {
824 Err(uv_error_to_io_error(status.unwrap()))
827 unsafe { (*result_cell_ptr).put_back(result); }
829 let scheduler: ~Scheduler = Local::take();
830 scheduler.resume_blocked_task_immediately(task_cell.take());
834 assert!(!result_cell.is_empty());
841 rtiostream!(UvPipeStream)
842 rtiostream!(UvTcpStream)
844 pub struct UvPipeStream {
849 impl UvStream for UvPipeStream {
850 fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() }
853 impl HomingIO for UvPipeStream {
854 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
857 impl Drop for UvPipeStream {
859 // FIXME(#4330): should not need a transmute
860 let this = unsafe { cast::transmute_mut(self) };
861 do this.home_for_io |self_| {
862 let scheduler: ~Scheduler = Local::take();
863 do scheduler.deschedule_running_task_and_then |_, task| {
864 let task_cell = Cell::new(task);
865 do self_.pipe.close {
866 let scheduler: ~Scheduler = Local::take();
867 scheduler.resume_blocked_task_immediately(task_cell.take());
875 pub fn uv_pipe(&self) -> Pipe { self.pipe }
878 pub struct UvTcpStream {
883 impl HomingIO for UvTcpStream {
884 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
887 impl Drop for UvTcpStream {
889 // FIXME(#4330): should not need a transmute
890 let this = unsafe { cast::transmute_mut(self) };
891 do this.home_for_io |self_| {
892 let scheduler: ~Scheduler = Local::take();
893 do scheduler.deschedule_running_task_and_then |_, task| {
894 let task_cell = Cell::new(task);
895 do self_.watcher.as_stream().close {
896 let scheduler: ~Scheduler = Local::take();
897 scheduler.resume_blocked_task_immediately(task_cell.take());
904 impl UvStream for UvTcpStream {
905 fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() }
908 impl RtioSocket for UvTcpStream {
909 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
910 do self.home_for_io |self_| {
911 socket_name(Tcp, self_.watcher)
916 impl RtioTcpStream for UvTcpStream {
917 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
918 do self.home_for_io |self_| {
919 socket_name(TcpPeer, self_.watcher)
923 fn control_congestion(&mut self) -> Result<(), IoError> {
924 do self.home_for_io |self_| {
925 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
927 match status_to_maybe_uv_error(r) {
928 Some(err) => Err(uv_error_to_io_error(err)),
934 fn nodelay(&mut self) -> Result<(), IoError> {
935 do self.home_for_io |self_| {
936 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
938 match status_to_maybe_uv_error(r) {
939 Some(err) => Err(uv_error_to_io_error(err)),
945 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
946 do self.home_for_io |self_| {
948 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
949 delay_in_seconds as c_uint)
952 match status_to_maybe_uv_error(r) {
953 Some(err) => Err(uv_error_to_io_error(err)),
959 fn letdie(&mut self) -> Result<(), IoError> {
960 do self.home_for_io |self_| {
962 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
965 match status_to_maybe_uv_error(r) {
966 Some(err) => Err(uv_error_to_io_error(err)),
973 pub struct UvUdpSocket {
978 impl HomingIO for UvUdpSocket {
979 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
982 impl Drop for UvUdpSocket {
984 // XXX need mutable finalizer
985 let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
986 do this.home_for_io_with_sched |self_, scheduler| {
987 do scheduler.deschedule_running_task_and_then |_, task| {
988 let task_cell = Cell::new(task);
989 do self_.watcher.close {
990 let scheduler: ~Scheduler = Local::take();
991 scheduler.resume_blocked_task_immediately(task_cell.take());
998 impl RtioSocket for UvUdpSocket {
999 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
1000 do self.home_for_io |self_| {
1001 socket_name(Udp, self_.watcher)
1006 impl RtioUdpSocket for UvUdpSocket {
1007 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
1008 do self.home_for_io_with_sched |self_, scheduler| {
1009 let result_cell = Cell::new_empty();
1010 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
1012 let buf_ptr: *&mut [u8] = &buf;
1013 do scheduler.deschedule_running_task_and_then |_, task| {
1014 let task_cell = Cell::new(task);
1015 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
1016 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
1017 let _ = flags; // /XXX add handling for partials?
1019 watcher.recv_stop();
1021 let result = match status {
1023 assert!(nread >= 0);
1024 Ok((nread as uint, addr))
1026 Some(err) => Err(uv_error_to_io_error(err)),
1029 unsafe { (*result_cell_ptr).put_back(result); }
1031 let scheduler: ~Scheduler = Local::take();
1032 scheduler.resume_blocked_task_immediately(task_cell.take());
1036 assert!(!result_cell.is_empty());
1041 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
1042 do self.home_for_io_with_sched |self_, scheduler| {
1043 let result_cell = Cell::new_empty();
1044 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1045 let buf_ptr: *&[u8] = &buf;
1046 do scheduler.deschedule_running_task_and_then |_, task| {
1047 let task_cell = Cell::new(task);
1048 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1049 do self_.watcher.send(buf, dst) |_watcher, status| {
1051 let result = match status {
1053 Some(err) => Err(uv_error_to_io_error(err)),
1056 unsafe { (*result_cell_ptr).put_back(result); }
1058 let scheduler: ~Scheduler = Local::take();
1059 scheduler.resume_blocked_task_immediately(task_cell.take());
1063 assert!(!result_cell.is_empty());
1068 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1069 do self.home_for_io |self_| {
1071 do multi.to_str().with_c_str |m_addr| {
1072 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1073 ptr::null(), uvll::UV_JOIN_GROUP)
1077 match status_to_maybe_uv_error(r) {
1078 Some(err) => Err(uv_error_to_io_error(err)),
1084 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1085 do self.home_for_io |self_| {
1087 do multi.to_str().with_c_str |m_addr| {
1088 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1089 ptr::null(), uvll::UV_LEAVE_GROUP)
1093 match status_to_maybe_uv_error(r) {
1094 Some(err) => Err(uv_error_to_io_error(err)),
1100 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
1101 do self.home_for_io |self_| {
1104 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
1107 match status_to_maybe_uv_error(r) {
1108 Some(err) => Err(uv_error_to_io_error(err)),
1114 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1115 do self.home_for_io |self_| {
1118 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1121 match status_to_maybe_uv_error(r) {
1122 Some(err) => Err(uv_error_to_io_error(err)),
1128 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1129 do self.home_for_io |self_| {
1132 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1135 match status_to_maybe_uv_error(r) {
1136 Some(err) => Err(uv_error_to_io_error(err)),
1142 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1143 do self.home_for_io |self_| {
1146 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1149 match status_to_maybe_uv_error(r) {
1150 Some(err) => Err(uv_error_to_io_error(err)),
1156 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1157 do self.home_for_io |self_| {
1160 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1163 match status_to_maybe_uv_error(r) {
1164 Some(err) => Err(uv_error_to_io_error(err)),
1170 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1171 do self.home_for_io |self_| {
1174 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1177 match status_to_maybe_uv_error(r) {
1178 Some(err) => Err(uv_error_to_io_error(err)),
1185 pub struct UvTimer {
1186 watcher: timer::TimerWatcher,
1190 impl HomingIO for UvTimer {
1191 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1195 fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1196 UvTimer { watcher: w, home: home }
1200 impl Drop for UvTimer {
1202 let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
1203 do self_.home_for_io_with_sched |self_, scheduler| {
1204 rtdebug!("closing UvTimer");
1205 do scheduler.deschedule_running_task_and_then |_, task| {
1206 let task_cell = Cell::new(task);
1207 do self_.watcher.close {
1208 let scheduler: ~Scheduler = Local::take();
1209 scheduler.resume_blocked_task_immediately(task_cell.take());
1216 impl RtioTimer for UvTimer {
1217 fn sleep(&mut self, msecs: u64) {
1218 do self.home_for_io_with_sched |self_, scheduler| {
1219 do scheduler.deschedule_running_task_and_then |_sched, task| {
1220 rtdebug!("sleep: entered scheduler context");
1221 let task_cell = Cell::new(task);
1222 do self_.watcher.start(msecs, 0) |_, status| {
1223 assert!(status.is_none());
1224 let scheduler: ~Scheduler = Local::take();
1225 scheduler.resume_blocked_task_immediately(task_cell.take());
1228 self_.watcher.stop();
1233 pub struct UvFileStream {
1235 fd: file::FileDescriptor,
1236 close_on_drop: bool,
1240 impl HomingIO for UvFileStream {
1241 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1245 fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
1246 home: SchedHandle) -> UvFileStream {
1250 close_on_drop: close_on_drop,
1254 fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1255 let result_cell = Cell::new_empty();
1256 let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1257 let buf_ptr: *&mut [u8] = &buf;
1258 do self.home_for_io_with_sched |self_, scheduler| {
1259 do scheduler.deschedule_running_task_and_then |_, task| {
1260 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1261 let task_cell = Cell::new(task);
1262 do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
1263 let res = match uverr {
1264 None => Ok(req.get_result() as int),
1265 Some(err) => Err(uv_error_to_io_error(err))
1267 unsafe { (*result_cell_ptr).put_back(res); }
1268 let scheduler: ~Scheduler = Local::take();
1269 scheduler.resume_blocked_task_immediately(task_cell.take());
1275 fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1276 let result_cell = Cell::new_empty();
1277 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1278 let buf_ptr: *&[u8] = &buf;
1279 do self.home_for_io_with_sched |self_, scheduler| {
1280 do scheduler.deschedule_running_task_and_then |_, task| {
1281 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1282 let task_cell = Cell::new(task);
1283 do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
1284 let res = match uverr {
1286 Some(err) => Err(uv_error_to_io_error(err))
1288 unsafe { (*result_cell_ptr).put_back(res); }
1289 let scheduler: ~Scheduler = Local::take();
1290 scheduler.resume_blocked_task_immediately(task_cell.take());
1296 fn seek_common(&mut self, pos: i64, whence: c_int) ->
1297 Result<u64, IoError>{
1298 #[fixed_stack_segment]; #[inline(never)];
1300 match lseek((*self.fd), pos as off_t, whence) {
1304 desc: "Failed to lseek.",
1314 impl Drop for UvFileStream {
1316 let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
1317 if self.close_on_drop {
1318 do self_.home_for_io_with_sched |self_, scheduler| {
1319 do scheduler.deschedule_running_task_and_then |_, task| {
1320 let task_cell = Cell::new(task);
1321 do self_.fd.close(&self.loop_) |_,_| {
1322 let scheduler: ~Scheduler = Local::take();
1323 scheduler.resume_blocked_task_immediately(task_cell.take());
1331 impl RtioFileStream for UvFileStream {
1332 fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1333 self.base_read(buf, -1)
1335 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1336 self.base_write(buf, -1)
1338 fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1339 self.base_read(buf, offset as i64)
1341 fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1342 self.base_write(buf, offset as i64)
1344 fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1345 use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1346 let whence = match whence {
1347 SeekSet => SEEK_SET,
1348 SeekCur => SEEK_CUR,
1351 self.seek_common(pos, whence)
1353 fn tell(&self) -> Result<u64, IoError> {
1355 // this is temporary
1356 let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1357 self_.seek_common(0, SEEK_CUR)
1359 fn flush(&mut self) -> Result<(), IoError> {
1364 pub struct UvProcess {
1365 process: process::Process,
1367 // Sadly, this structure must be created before we return it, so in that
1368 // brief interim the `home` is None.
1369 home: Option<SchedHandle>,
1371 // All None until the process exits (exit_error may stay None)
1372 priv exit_status: Option<int>,
1373 priv term_signal: Option<int>,
1374 priv exit_error: Option<UvError>,
1376 // Used to store which task to wake up from the exit_cb
1377 priv descheduled: Option<BlockedTask>,
1380 impl HomingIO for UvProcess {
1381 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
1384 impl Drop for UvProcess {
1386 // FIXME(#4330): should not need a transmute
1387 let this = unsafe { cast::transmute_mut(self) };
1389 let close = |self_: &mut UvProcess| {
1390 let scheduler: ~Scheduler = Local::take();
1391 do scheduler.deschedule_running_task_and_then |_, task| {
1392 let task = Cell::new(task);
1393 do self_.process.close {
1394 let scheduler: ~Scheduler = Local::take();
1395 scheduler.resume_blocked_task_immediately(task.take());
1400 // If home is none, then this process never actually successfully
1401 // spawned, so there's no need to switch event loops
1402 if this.home.is_none() {
1405 this.home_for_io(close)
1410 impl RtioProcess for UvProcess {
1411 fn id(&self) -> pid_t {
1415 fn kill(&mut self, signal: int) -> Result<(), IoError> {
1416 do self.home_for_io |self_| {
1417 match self_.process.kill(signal) {
1419 Err(uverr) => Err(uv_error_to_io_error(uverr))
1424 fn wait(&mut self) -> int {
1425 // Make sure (on the home scheduler) that we have an exit status listed
1426 do self.home_for_io |self_| {
1427 match self_.exit_status {
1430 // If there's no exit code previously listed, then the
1431 // process's exit callback has yet to be invoked. We just
1432 // need to deschedule ourselves and wait to be reawoken.
1433 let scheduler: ~Scheduler = Local::take();
1434 do scheduler.deschedule_running_task_and_then |_, task| {
1435 assert!(self_.descheduled.is_none());
1436 self_.descheduled = Some(task);
1438 assert!(self_.exit_status.is_some());
1443 self.exit_status.unwrap()
1448 fn test_simple_io_no_connect() {
1449 do run_in_newsched_task {
1451 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1452 let addr = next_test_ip4();
1453 let maybe_chan = (*io).tcp_connect(addr);
1454 assert!(maybe_chan.is_err());
1460 fn test_simple_udp_io_bind_only() {
1461 do run_in_newsched_task {
1463 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1464 let addr = next_test_ip4();
1465 let maybe_socket = (*io).udp_bind(addr);
1466 assert!(maybe_socket.is_ok());
1472 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1473 use rt::sleeper_list::SleeperList;
1474 use rt::work_queue::WorkQueue;
1475 use rt::thread::Thread;
1477 use rt::sched::{Shutdown, TaskFromFriend};
1478 do run_in_bare_thread {
1479 let sleepers = SleeperList::new();
1480 let work_queue1 = WorkQueue::new();
1481 let work_queue2 = WorkQueue::new();
1482 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1484 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1486 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1489 let handle1 = Cell::new(sched1.make_handle());
1490 let handle2 = Cell::new(sched2.make_handle());
1491 let tasksFriendHandle = Cell::new(sched2.make_handle());
1493 let on_exit: ~fn(bool) = |exit_status| {
1494 handle1.take().send(Shutdown);
1495 handle2.take().send(Shutdown);
1496 rtassert!(exit_status);
1499 let test_function: ~fn() = || {
1500 let io: *mut IoFactoryObject = unsafe {
1501 Local::unsafe_borrow()
1503 let addr = next_test_ip4();
1504 let maybe_socket = unsafe { (*io).udp_bind(addr) };
1505 // this socket is bound to this event loop
1506 assert!(maybe_socket.is_ok());
1508 // block self on sched1
1509 do task::unkillable { // FIXME(#8674)
1510 let scheduler: ~Scheduler = Local::take();
1511 do scheduler.deschedule_running_task_and_then |_, task| {
1513 do task.wake().map_move |task| {
1514 // send self to sched2
1515 tasksFriendHandle.take().send(TaskFromFriend(task));
1517 // sched1 should now sleep since it has nothing else to do
1520 // sched2 will wake up and get the task
1521 // as we do nothing else, the function ends and the socket goes out of scope
1522 // sched2 will start to run the destructor
1523 // the destructor will first block the task, set it's home as sched1, then enqueue it
1524 // sched2 will dequeue the task, see that it has a home, and send it to sched1
1525 // sched1 will wake up, exec the close function on the correct loop, and then we're done
1528 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1529 main_task.death.on_exit = Some(on_exit);
1530 let main_task = Cell::new(main_task);
1532 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1534 let sched1 = Cell::new(sched1);
1535 let sched2 = Cell::new(sched2);
1537 let thread1 = do Thread::start {
1538 sched1.take().bootstrap(main_task.take());
1540 let thread2 = do Thread::start {
1541 sched2.take().bootstrap(null_task.take());
1550 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1551 use rt::sleeper_list::SleeperList;
1552 use rt::work_queue::WorkQueue;
1553 use rt::thread::Thread;
1555 use rt::comm::oneshot;
1556 use rt::sched::Shutdown;
1557 do run_in_bare_thread {
1558 let sleepers = SleeperList::new();
1559 let work_queue1 = WorkQueue::new();
1560 let work_queue2 = WorkQueue::new();
1561 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1563 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1565 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1568 let handle1 = Cell::new(sched1.make_handle());
1569 let handle2 = Cell::new(sched2.make_handle());
1571 let (port, chan) = oneshot();
1572 let port = Cell::new(port);
1573 let chan = Cell::new(chan);
1575 let body1: ~fn() = || {
1576 let io: *mut IoFactoryObject = unsafe {
1577 Local::unsafe_borrow()
1579 let addr = next_test_ip4();
1580 let socket = unsafe { (*io).udp_bind(addr) };
1581 assert!(socket.is_ok());
1582 chan.take().send(socket);
1585 let body2: ~fn() = || {
1586 let socket = port.take().recv();
1587 assert!(socket.is_ok());
1588 /* The socket goes out of scope and the destructor is called.
1590 * - sends itself back to sched1
1591 * - frees the socket
1592 * - resets the home of the task to whatever it was previously
1596 let on_exit: ~fn(bool) = |exit| {
1597 handle1.take().send(Shutdown);
1598 handle2.take().send(Shutdown);
1602 let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1604 let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1605 task2.death.on_exit = Some(on_exit);
1606 let task2 = Cell::new(task2);
1608 let sched1 = Cell::new(sched1);
1609 let sched2 = Cell::new(sched2);
1611 let thread1 = do Thread::start {
1612 sched1.take().bootstrap(task1.take());
1614 let thread2 = do Thread::start {
1615 sched2.take().bootstrap(task2.take());
1624 fn test_simple_tcp_server_and_client() {
1625 do run_in_newsched_task {
1626 let addr = next_test_ip4();
1628 // Start the server first so it's listening when we connect
1631 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1632 let mut listener = (*io).tcp_bind(addr).unwrap();
1633 let mut stream = listener.accept().unwrap();
1634 let mut buf = [0, .. 2048];
1635 let nread = stream.read(buf).unwrap();
1636 assert_eq!(nread, 8);
1637 for i in range(0u, nread) {
1638 rtdebug!("%u", buf[i] as uint);
1639 assert_eq!(buf[i], i as u8);
1646 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1647 let mut stream = (*io).tcp_connect(addr).unwrap();
1648 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1655 fn test_simple_tcp_server_and_client_on_diff_threads() {
1656 use rt::sleeper_list::SleeperList;
1657 use rt::work_queue::WorkQueue;
1658 use rt::thread::Thread;
1660 use rt::sched::{Shutdown};
1661 do run_in_bare_thread {
1662 let sleepers = SleeperList::new();
1664 let server_addr = next_test_ip4();
1665 let client_addr = server_addr.clone();
1667 let server_work_queue = WorkQueue::new();
1668 let client_work_queue = WorkQueue::new();
1669 let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1671 let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1672 queues.clone(), sleepers.clone());
1673 let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1674 queues.clone(), sleepers.clone());
1676 let server_handle = Cell::new(server_sched.make_handle());
1677 let client_handle = Cell::new(client_sched.make_handle());
1679 let server_on_exit: ~fn(bool) = |exit_status| {
1680 server_handle.take().send(Shutdown);
1681 rtassert!(exit_status);
1684 let client_on_exit: ~fn(bool) = |exit_status| {
1685 client_handle.take().send(Shutdown);
1686 rtassert!(exit_status);
1689 let server_fn: ~fn() = || {
1690 let io: *mut IoFactoryObject = unsafe {
1691 Local::unsafe_borrow()
1693 let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1694 let mut stream = listener.accept().unwrap();
1695 let mut buf = [0, .. 2048];
1696 let nread = stream.read(buf).unwrap();
1697 assert_eq!(nread, 8);
1698 for i in range(0u, nread) {
1699 assert_eq!(buf[i], i as u8);
1703 let client_fn: ~fn() = || {
1704 let io: *mut IoFactoryObject = unsafe {
1705 Local::unsafe_borrow()
1707 let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1708 while stream.is_err() {
1709 stream = unsafe { (*io).tcp_connect(client_addr) };
1711 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1714 let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1715 server_task.death.on_exit = Some(server_on_exit);
1716 let server_task = Cell::new(server_task);
1718 let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1719 client_task.death.on_exit = Some(client_on_exit);
1720 let client_task = Cell::new(client_task);
1722 let server_sched = Cell::new(server_sched);
1723 let client_sched = Cell::new(client_sched);
1725 let server_thread = do Thread::start {
1726 server_sched.take().bootstrap(server_task.take());
1728 let client_thread = do Thread::start {
1729 client_sched.take().bootstrap(client_task.take());
1732 server_thread.join();
1733 client_thread.join();
1738 fn test_simple_udp_server_and_client() {
1739 do run_in_newsched_task {
1740 let server_addr = next_test_ip4();
1741 let client_addr = next_test_ip4();
1745 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1746 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1747 let mut buf = [0, .. 2048];
1748 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1749 assert_eq!(nread, 8);
1750 for i in range(0u, nread) {
1751 rtdebug!("%u", buf[i] as uint);
1752 assert_eq!(buf[i], i as u8);
1754 assert_eq!(src, client_addr);
1760 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1761 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1762 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1768 #[test] #[ignore(reason = "busted")]
1769 fn test_read_and_block() {
1770 do run_in_newsched_task {
1771 let addr = next_test_ip4();
1774 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1775 let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1776 let mut stream = listener.accept().unwrap();
1777 let mut buf = [0, .. 2048];
1780 let mut current = 0;
1783 while current < expected {
1784 let nread = stream.read(buf).unwrap();
1785 for i in range(0u, nread) {
1786 let val = buf[i] as uint;
1787 assert_eq!(val, current % 8);
1792 do task::unkillable { // FIXME(#8674)
1793 let scheduler: ~Scheduler = Local::take();
1794 // Yield to the other task in hopes that it
1795 // will trigger a read callback while we are
1797 do scheduler.deschedule_running_task_and_then |sched, task| {
1798 let task = Cell::new(task);
1799 sched.enqueue_blocked_task(task.take());
1804 // Make sure we had multiple reads
1810 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1811 let mut stream = (*io).tcp_connect(addr).unwrap();
1812 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1813 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1814 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1815 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1823 fn test_read_read_read() {
1824 do run_in_newsched_task {
1825 let addr = next_test_ip4();
1826 static MAX: uint = 500000;
1830 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1831 let mut listener = (*io).tcp_bind(addr).unwrap();
1832 let mut stream = listener.accept().unwrap();
1833 let buf = [1, .. 2048];
1834 let mut total_bytes_written = 0;
1835 while total_bytes_written < MAX {
1837 total_bytes_written += buf.len();
1844 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1845 let mut stream = (*io).tcp_connect(addr).unwrap();
1846 let mut buf = [0, .. 2048];
1847 let mut total_bytes_read = 0;
1848 while total_bytes_read < MAX {
1849 let nread = stream.read(buf).unwrap();
1850 rtdebug!("read %u bytes", nread as uint);
1851 total_bytes_read += nread;
1852 for i in range(0u, nread) {
1853 assert_eq!(buf[i], 1);
1856 rtdebug!("read %u bytes total", total_bytes_read as uint);
1863 fn test_udp_twice() {
1864 do run_in_newsched_task {
1865 let server_addr = next_test_ip4();
1866 let client_addr = next_test_ip4();
1870 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1871 let mut client = (*io).udp_bind(client_addr).unwrap();
1872 assert!(client.sendto([1], server_addr).is_ok());
1873 assert!(client.sendto([2], server_addr).is_ok());
1879 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1880 let mut server = (*io).udp_bind(server_addr).unwrap();
1883 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1884 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1885 assert_eq!(nread1, 1);
1886 assert_eq!(nread2, 1);
1887 assert_eq!(src1, client_addr);
1888 assert_eq!(src2, client_addr);
1889 assert_eq!(buf1[0], 1);
1890 assert_eq!(buf2[0], 2);
1897 fn test_udp_many_read() {
1898 do run_in_newsched_task {
1899 let server_out_addr = next_test_ip4();
1900 let server_in_addr = next_test_ip4();
1901 let client_out_addr = next_test_ip4();
1902 let client_in_addr = next_test_ip4();
1903 static MAX: uint = 500_000;
1907 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1908 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
1909 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
1910 let msg = [1, .. 2048];
1911 let mut total_bytes_sent = 0;
1915 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1916 total_bytes_sent += msg.len();
1917 // check if the client has received enough
1918 let res = server_in.recvfrom(buf);
1919 assert!(res.is_ok());
1920 let (nread, src) = res.unwrap();
1921 assert_eq!(nread, 1);
1922 assert_eq!(src, client_out_addr);
1924 assert!(total_bytes_sent >= MAX);
1930 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1931 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
1932 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
1933 let mut total_bytes_recv = 0;
1934 let mut buf = [0, .. 2048];
1935 while total_bytes_recv < MAX {
1937 assert!(client_out.sendto([1], server_in_addr).is_ok());
1939 let res = client_in.recvfrom(buf);
1940 assert!(res.is_ok());
1941 let (nread, src) = res.unwrap();
1942 assert_eq!(src, server_out_addr);
1943 total_bytes_recv += nread;
1944 for i in range(0u, nread) {
1945 assert_eq!(buf[i], 1);
1948 // tell the server we're done
1949 assert!(client_out.sendto([0], server_in_addr).is_ok());
1956 fn test_timer_sleep_simple() {
1957 do run_in_newsched_task {
1959 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1960 let timer = (*io).timer_init();
1961 do timer.map_move |mut t| { t.sleep(1) };
1966 fn file_test_uvio_full_simple_impl() {
1967 use str::StrSlice; // why does this have to be explicitly imported to work?
1968 // compiler was complaining about no trait for str that
1969 // does .as_bytes() ..
1971 use rt::io::{Open, Create, ReadWrite, Read};
1973 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1974 let write_val = "hello uvio!";
1975 let path = "./tmp/file_test_uvio_full.txt";
1977 let create_fm = Create;
1978 let create_fa = ReadWrite;
1979 let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
1980 let write_buf = write_val.as_bytes();
1981 fd.write(write_buf);
1986 let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
1987 let mut read_vec = [0, .. 1028];
1988 let nread = fd.read(read_vec).unwrap();
1989 let read_val = str::from_bytes(read_vec.slice(0, nread as uint));
1990 assert!(read_val == write_val.to_owned());
1992 (*io).fs_unlink(&Path(path));
1997 fn file_test_uvio_full_simple() {
1998 do run_in_newsched_task {
1999 file_test_uvio_full_simple_impl();
2003 fn uvio_naive_print(input: &str) {
2006 use libc::{STDOUT_FILENO};
2007 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2009 let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
2010 let write_buf = input.as_bytes();
2011 fd.write(write_buf);
2017 fn file_test_uvio_write_to_stdout() {
2018 do run_in_newsched_task {
2019 uvio_naive_print("jubilation\n");