]> git.lizzy.rs Git - rust.git/commitdiff
Merge remote-tracking branch 'brson/io-upstream' into incoming
authorBrian Anderson <banderson@mozilla.com>
Tue, 14 May 2013 22:30:01 +0000 (15:30 -0700)
committerBrian Anderson <banderson@mozilla.com>
Tue, 14 May 2013 22:30:01 +0000 (15:30 -0700)
Conflicts:
src/libcore/logging.rs
src/libcore/rt/local_services.rs
src/libcore/rt/uv/mod.rs
src/libcore/rt/uv/net.rs
src/libcore/rt/uv/uvio.rs
src/libcore/unstable.rs

13 files changed:
1  2 
src/libcore/core.rc
src/libcore/logging.rs
src/libcore/os.rs
src/libcore/rt/global_heap.rs
src/libcore/rt/local_services.rs
src/libcore/rt/mod.rs
src/libcore/rt/sched.rs
src/libcore/rt/stack.rs
src/libcore/rt/tube.rs
src/libcore/rt/uv/mod.rs
src/libcore/rt/uv/net.rs
src/libcore/rt/uv/uvio.rs
src/libcore/unstable/mod.rs

Simple merge
Simple merge
Simple merge
index 0000000000000000000000000000000000000000,57ed579e88ddab785234de6f82dc3785c5b14a3f..3b35c2fb8047fa3b0c476a484c92773d4edcf23c
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,83 +1,83 @@@
 -        fail!(~"Failure in malloc_raw: result ptr is null");
+ // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+ use sys::{TypeDesc, size_of};
+ use libc::{c_void, size_t};
+ use c_malloc = libc::malloc;
+ use c_free = libc::free;
+ use managed::raw::{BoxHeaderRepr, BoxRepr};
+ use cast::transmute;
+ use unstable::intrinsics::{atomic_xadd,atomic_xsub};
+ use ptr::null;
+ use intrinsic::TyDesc;
+ pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
+     assert!(td.is_not_null());
+     let total_size = get_box_size(size, (*td).align);
+     let p = c_malloc(total_size as size_t);
+     assert!(p.is_not_null());
+     // FIXME #3475: Converting between our two different tydesc types
+     let td: *TyDesc = transmute(td);
+     let box: &mut BoxRepr = transmute(p);
+     box.header.ref_count = -1; // Exchange values not ref counted
+     box.header.type_desc = td;
+     box.header.prev = null();
+     box.header.next = null();
+     let exchange_count = &mut *rust_get_exchange_count_ptr();
+     atomic_xadd(exchange_count, 1);
+     return transmute(box);
+ }
+ /**
+ Thin wrapper around libc::malloc, none of the box header
+ stuff in exchange_alloc::malloc
+ */
+ pub unsafe fn malloc_raw(size: uint) -> *c_void {
+     let p = c_malloc(size as size_t);
+     if p.is_null() {
++        fail!("Failure in malloc_raw: result ptr is null");
+     }
+     p
+ }
+ pub unsafe fn free(ptr: *c_void) {
+     let exchange_count = &mut *rust_get_exchange_count_ptr();
+     atomic_xsub(exchange_count, 1);
+     assert!(ptr.is_not_null());
+     c_free(ptr);
+ }
+ ///Thin wrapper around libc::free, as with exchange_alloc::malloc_raw
+ pub unsafe fn free_raw(ptr: *c_void) {
+     c_free(ptr);
+ }
+ fn get_box_size(body_size: uint, body_align: uint) -> uint {
+     let header_size = size_of::<BoxHeaderRepr>();
+     // FIXME (#2699): This alignment calculation is suspicious. Is it right?
+     let total_size = align_to(header_size, body_align) + body_size;
+     return total_size;
+ }
+ // Rounds |size| to the nearest |alignment|. Invariant: |alignment| is a power
+ // of two.
+ fn align_to(size: uint, align: uint) -> uint {
+     assert!(align != 0);
+     (size + align - 1) & !(align - 1)
+ }
+ extern {
+     #[rust_stack]
+     fn rust_get_exchange_count_ptr() -> *mut int;
+ }
Simple merge
index fbbc82743407c28293743fc015dab80519d231d9,5a6a6e4c7d88d205dc482b622d8f4a5ef985bc06..f04c38f79e80004fb348b349ec2def71172f2188
@@@ -13,27 -63,55 +63,56 @@@ Several modules in `core` are clients o
  #[doc(hidden)];
  
  use libc::c_char;
 +use ptr::Ptr;
  
- #[path = "sched/mod.rs"]
+ /// The global (exchange) heap.
+ pub mod global_heap;
+ /// The Scheduler and Task types.
  mod sched;
+ /// Thread-local access to the current Scheduler.
+ pub mod local_sched;
+ /// Synchronous I/O.
+ #[path = "io/mod.rs"]
+ pub mod io;
+ /// Thread-local implementations of language-critical runtime features like @.
+ pub mod local_services;
+ /// The EventLoop and internal synchronous I/O interface.
  mod rtio;
- pub mod uvll;
- mod uvio;
+ /// libuv and default rtio implementation.
  #[path = "uv/mod.rs"]
- mod uv;
- #[path = "io/mod.rs"]
- mod io;
+ pub mod uv;
  // FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
+ /// Bindings to pthread/windows thread-local storage.
  pub mod thread_local_storage;
+ /// A parallel work-stealing dequeue.
  mod work_queue;
+ /// Stack segments and caching.
  mod stack;
+ /// CPU context swapping.
  mod context;
+ /// Bindings to system threading libraries.
  mod thread;
+ /// The runtime configuration, read from environment variables
  pub mod env;
- pub mod local_services;
+ /// The local, managed heap
  mod local_heap;
  
+ /// The Logger trait and implementations
+ pub mod logging;
  /// Tools for testing the runtime
  #[cfg(test)]
  pub mod test;
index 0000000000000000000000000000000000000000,546272474edb671822214314d741bc37f246a1a0..395f9099571a003577381c7ea07949bad4f0eacb
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,502 +1,502 @@@
 -            None => fail!(fmt!("all context switches should have a cleanup job"))
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+ use option::*;
+ use sys;
+ use cast::transmute;
+ use super::work_queue::WorkQueue;
+ use super::stack::{StackPool, StackSegment};
+ use super::rtio::{EventLoop, EventLoopObject};
+ use super::context::Context;
+ use super::local_services::LocalServices;
+ use cell::Cell;
+ #[cfg(test)] use rt::uv::uvio::UvEventLoop;
+ #[cfg(test)] use unstable::run_in_bare_thread;
+ #[cfg(test)] use int;
+ // A more convenient name for external callers, e.g. `local_sched::take()`
+ pub mod local_sched;
+ /// The Scheduler is responsible for coordinating execution of Tasks
+ /// on a single thread. When the scheduler is running it is owned by
+ /// thread local storage and the running task is owned by the
+ /// scheduler.
+ pub struct Scheduler {
+     task_queue: WorkQueue<~Task>,
+     stack_pool: StackPool,
+     /// The event loop used to drive the scheduler and perform I/O
+     event_loop: ~EventLoopObject,
+     /// The scheduler's saved context.
+     /// Always valid when a task is executing, otherwise not
+     priv saved_context: Context,
+     /// The currently executing task
+     current_task: Option<~Task>,
+     /// An action performed after a context switch on behalf of the
+     /// code running before the context switch
+     priv cleanup_job: Option<CleanupJob>
+ }
+ // XXX: Some hacks to put a &fn in Scheduler without borrowck
+ // complaining
+ type UnsafeTaskReceiver = sys::Closure;
+ trait ClosureConverter {
+     fn from_fn(&fn(~Task)) -> Self;
+     fn to_fn(self) -> &fn(~Task);
+ }
+ impl ClosureConverter for UnsafeTaskReceiver {
+     fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+     fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
+ }
+ enum CleanupJob {
+     DoNothing,
+     GiveTask(~Task, UnsafeTaskReceiver)
+ }
+ pub impl Scheduler {
+     fn in_task_context(&self) -> bool { self.current_task.is_some() }
+     fn new(event_loop: ~EventLoopObject) -> Scheduler {
+         // Lazily initialize the global state, currently the scheduler TLS key
+         unsafe { rust_initialize_global_state(); }
+         extern {
+             fn rust_initialize_global_state();
+         }
+         Scheduler {
+             event_loop: event_loop,
+             task_queue: WorkQueue::new(),
+             stack_pool: StackPool::new(),
+             saved_context: Context::empty(),
+             current_task: None,
+             cleanup_job: None
+         }
+     }
+     // XXX: This may eventually need to be refactored so that
+     // the scheduler itself doesn't have to call event_loop.run.
+     // That will be important for embedding the runtime into external
+     // event loops.
+     fn run(~self) -> ~Scheduler {
+         assert!(!self.in_task_context());
+         // Give ownership of the scheduler (self) to the thread
+         local_sched::put(self);
+         unsafe {
+             let scheduler = local_sched::unsafe_borrow();
+             fn run_scheduler_once() {
+                 let scheduler = local_sched::take();
+                 if scheduler.resume_task_from_queue() {
+                     // Ok, a task ran. Nice! We'll do it again later
+                     do local_sched::borrow |scheduler| {
+                         scheduler.event_loop.callback(run_scheduler_once);
+                     }
+                 }
+             }
+             let scheduler = &mut *scheduler;
+             scheduler.event_loop.callback(run_scheduler_once);
+             scheduler.event_loop.run();
+         }
+         return local_sched::take();
+     }
+     // * Scheduler-context operations
+     fn resume_task_from_queue(~self) -> bool {
+         assert!(!self.in_task_context());
+         let mut this = self;
+         match this.task_queue.pop_front() {
+             Some(task) => {
+                 this.resume_task_immediately(task);
+                 return true;
+             }
+             None => {
+                 rtdebug!("no tasks in queue");
+                 local_sched::put(this);
+                 return false;
+             }
+         }
+     }
+     // * Task-context operations
+     /// Called by a running task to end execution, after which it will
+     /// be recycled by the scheduler for reuse in a new task.
+     fn terminate_current_task(~self) {
+         assert!(self.in_task_context());
+         rtdebug!("ending running task");
+         do self.deschedule_running_task_and_then |dead_task| {
+             let dead_task = Cell(dead_task);
+             do local_sched::borrow |sched| {
+                 dead_task.take().recycle(&mut sched.stack_pool);
+             }
+         }
+         abort!("control reached end of task");
+     }
+     fn schedule_new_task(~self, task: ~Task) {
+         assert!(self.in_task_context());
+         do self.switch_running_tasks_and_then(task) |last_task| {
+             let last_task = Cell(last_task);
+             do local_sched::borrow |sched| {
+                 sched.task_queue.push_front(last_task.take());
+             }
+         }
+     }
+     // Core scheduling ops
+     fn resume_task_immediately(~self, task: ~Task) {
+         let mut this = self;
+         assert!(!this.in_task_context());
+         rtdebug!("scheduling a task");
+         // Store the task in the scheduler so it can be grabbed later
+         this.current_task = Some(task);
+         this.enqueue_cleanup_job(DoNothing);
+         local_sched::put(this);
+         // Take pointers to both the task and scheduler's saved registers.
+         unsafe {
+             let sched = local_sched::unsafe_borrow();
+             let (sched_context, _, next_task_context) = (*sched).get_contexts();
+             let next_task_context = next_task_context.unwrap();
+             // Context switch to the task, restoring it's registers
+             // and saving the scheduler's
+             Context::swap(sched_context, next_task_context);
+             let sched = local_sched::unsafe_borrow();
+             // The running task should have passed ownership elsewhere
+             assert!((*sched).current_task.is_none());
+             // Running tasks may have asked us to do some cleanup
+             (*sched).run_cleanup_job();
+         }
+     }
+     /// Block a running task, context switch to the scheduler, then pass the
+     /// blocked task to a closure.
+     ///
+     /// # Safety note
+     ///
+     /// The closure here is a *stack* closure that lives in the
+     /// running task.  It gets transmuted to the scheduler's lifetime
+     /// and called while the task is blocked.
+     fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+         let mut this = self;
+         assert!(this.in_task_context());
+         rtdebug!("blocking task");
+         unsafe {
+             let blocked_task = this.current_task.swap_unwrap();
+             let f_fake_region = transmute::<&fn(~Task), &fn(~Task)>(f);
+             let f_opaque = ClosureConverter::from_fn(f_fake_region);
+             this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+         }
+         local_sched::put(this);
+         unsafe {
+             let sched = local_sched::unsafe_borrow();
+             let (sched_context, last_task_context, _) = (*sched).get_contexts();
+             let last_task_context = last_task_context.unwrap();
+             Context::swap(last_task_context, sched_context);
+             // We could be executing in a different thread now
+             let sched = local_sched::unsafe_borrow();
+             (*sched).run_cleanup_job();
+         }
+     }
+     /// Switch directly to another task, without going through the scheduler.
+     /// You would want to think hard about doing this, e.g. if there are
+     /// pending I/O events it would be a bad idea.
+     fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(~Task)) {
+         let mut this = self;
+         assert!(this.in_task_context());
+         rtdebug!("switching tasks");
+         let old_running_task = this.current_task.swap_unwrap();
+         let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
+         let f_opaque = ClosureConverter::from_fn(f_fake_region);
+         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
+         this.current_task = Some(next_task);
+         local_sched::put(this);
+         unsafe {
+             let sched = local_sched::unsafe_borrow();
+             let (_, last_task_context, next_task_context) = (*sched).get_contexts();
+             let last_task_context = last_task_context.unwrap();
+             let next_task_context = next_task_context.unwrap();
+             Context::swap(last_task_context, next_task_context);
+             // We could be executing in a different thread now
+             let sched = local_sched::unsafe_borrow();
+             (*sched).run_cleanup_job();
+         }
+     }
+     // * Other stuff
+     fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
+         assert!(self.cleanup_job.is_none());
+         self.cleanup_job = Some(job);
+     }
+     fn run_cleanup_job(&mut self) {
+         rtdebug!("running cleanup job");
+         assert!(self.cleanup_job.is_some());
+         let cleanup_job = self.cleanup_job.swap_unwrap();
+         match cleanup_job {
+             DoNothing => { }
+             GiveTask(task, f) => (f.to_fn())(task)
+         }
+     }
+     /// Get mutable references to all the contexts that may be involved in a
+     /// context switch.
+     ///
+     /// Returns (the scheduler context, the optional context of the
+     /// task in the cleanup list, the optional context of the task in
+     /// the current task slot).  When context switching to a task,
+     /// callers should first arrange for that task to be located in the
+     /// Scheduler's current_task slot and set up the
+     /// post-context-switch cleanup job.
+     fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
+                                           Option<&'a mut Context>,
+                                           Option<&'a mut Context>) {
+         let last_task = match self.cleanup_job {
+             Some(GiveTask(~ref task, _)) => {
+                 Some(task)
+             }
+             Some(DoNothing) => {
+                 None
+             }
++            None => fail!("all context switches should have a cleanup job")
+         };
+         // XXX: Pattern matching mutable pointers above doesn't work
+         // because borrowck thinks the three patterns are conflicting
+         // borrows
+         unsafe {
+             let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
+             let last_task_context = match last_task {
+                 Some(t) => Some(&mut t.saved_context), None => None
+             };
+             let next_task_context = match self.current_task {
+                 Some(ref mut t) => Some(&mut t.saved_context), None => None
+             };
+             // XXX: These transmutes can be removed after snapshot
+             return (transmute(&mut self.saved_context),
+                     last_task_context,
+                     transmute(next_task_context));
+         }
+     }
+ }
+ static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+ pub struct Task {
+     /// The segment of stack on which the task is currently running or,
+     /// if the task is blocked, on which the task will resume execution
+     priv current_stack_segment: StackSegment,
+     /// These are always valid when the task is not running, unless
+     /// the task is dead
+     priv saved_context: Context,
+     /// The heap, GC, unwinding, local storage, logging
+     local_services: LocalServices
+ }
+ pub impl Task {
+     fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
+         Task::with_local(stack_pool, LocalServices::new(), start)
+     }
+     fn with_local(stack_pool: &mut StackPool,
+                   local_services: LocalServices,
+                   start: ~fn()) -> Task {
+         let start = Task::build_start_wrapper(start);
+         let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
+         // NB: Context holds a pointer to that ~fn
+         let initial_context = Context::new(start, &mut stack);
+         return Task {
+             current_stack_segment: stack,
+             saved_context: initial_context,
+             local_services: local_services
+         };
+     }
+     priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
+         // XXX: The old code didn't have this extra allocation
+         let wrapper: ~fn() = || {
+             // This is the first code to execute after the initial
+             // context switch to the task. The previous context may
+             // have asked us to do some cleanup.
+             unsafe {
+                 let sched = local_sched::unsafe_borrow();
+                 (*sched).run_cleanup_job();
+                 let sched = local_sched::unsafe_borrow();
+                 let task = (*sched).current_task.get_mut_ref();
+                 // FIXME #6141: shouldn't neet to put `start()` in another closure
+                 task.local_services.run(||start());
+             }
+             let sched = local_sched::take();
+             sched.terminate_current_task();
+         };
+         return wrapper;
+     }
+     /// Destroy the task and try to reuse its components
+     fn recycle(~self, stack_pool: &mut StackPool) {
+         match self {
+             ~Task {current_stack_segment, _} => {
+                 stack_pool.give_segment(current_stack_segment);
+             }
+         }
+     }
+ }
+ #[test]
+ fn test_simple_scheduling() {
+     do run_in_bare_thread {
+         let mut task_ran = false;
+         let task_ran_ptr: *mut bool = &mut task_ran;
+         let mut sched = ~UvEventLoop::new_scheduler();
+         let task = ~do Task::new(&mut sched.stack_pool) {
+             unsafe { *task_ran_ptr = true; }
+         };
+         sched.task_queue.push_back(task);
+         sched.run();
+         assert!(task_ran);
+     }
+ }
+ #[test]
+ fn test_several_tasks() {
+     do run_in_bare_thread {
+         let total = 10;
+         let mut task_count = 0;
+         let task_count_ptr: *mut int = &mut task_count;
+         let mut sched = ~UvEventLoop::new_scheduler();
+         for int::range(0, total) |_| {
+             let task = ~do Task::new(&mut sched.stack_pool) {
+                 unsafe { *task_count_ptr = *task_count_ptr + 1; }
+             };
+             sched.task_queue.push_back(task);
+         }
+         sched.run();
+         assert!(task_count == total);
+     }
+ }
+ #[test]
+ fn test_swap_tasks_then() {
+     do run_in_bare_thread {
+         let mut count = 0;
+         let count_ptr: *mut int = &mut count;
+         let mut sched = ~UvEventLoop::new_scheduler();
+         let task1 = ~do Task::new(&mut sched.stack_pool) {
+             unsafe { *count_ptr = *count_ptr + 1; }
+             let mut sched = local_sched::take();
+             let task2 = ~do Task::new(&mut sched.stack_pool) {
+                 unsafe { *count_ptr = *count_ptr + 1; }
+             };
+             // Context switch directly to the new task
+             do sched.switch_running_tasks_and_then(task2) |task1| {
+                 let task1 = Cell(task1);
+                 do local_sched::borrow |sched| {
+                     sched.task_queue.push_front(task1.take());
+                 }
+             }
+             unsafe { *count_ptr = *count_ptr + 1; }
+         };
+         sched.task_queue.push_back(task1);
+         sched.run();
+         assert!(count == 3);
+     }
+ }
+ #[bench] #[test] #[ignore(reason = "long test")]
+ fn test_run_a_lot_of_tasks_queued() {
+     do run_in_bare_thread {
+         static MAX: int = 1000000;
+         let mut count = 0;
+         let count_ptr: *mut int = &mut count;
+         let mut sched = ~UvEventLoop::new_scheduler();
+         let start_task = ~do Task::new(&mut sched.stack_pool) {
+             run_task(count_ptr);
+         };
+         sched.task_queue.push_back(start_task);
+         sched.run();
+         assert!(count == MAX);
+         fn run_task(count_ptr: *mut int) {
+             do local_sched::borrow |sched| {
+                 let task = ~do Task::new(&mut sched.stack_pool) {
+                     unsafe {
+                         *count_ptr = *count_ptr + 1;
+                         if *count_ptr != MAX {
+                             run_task(count_ptr);
+                         }
+                     }
+                 };
+                 sched.task_queue.push_back(task);
+             }
+         };
+     }
+ }
+ #[test]
+ fn test_block_task() {
+     do run_in_bare_thread {
+         let mut sched = ~UvEventLoop::new_scheduler();
+         let task = ~do Task::new(&mut sched.stack_pool) {
+             let sched = local_sched::take();
+             assert!(sched.in_task_context());
+             do sched.deschedule_running_task_and_then() |task| {
+                 let task = Cell(task);
+                 do local_sched::borrow |sched| {
+                     assert!(!sched.in_task_context());
+                     sched.task_queue.push_back(task.take());
+                 }
+             }
+         };
+         sched.task_queue.push_back(task);
+         sched.run();
+     }
+ }
index 3a4e9307d3b5069ba83b7115fa56a593dafa698d,068bc834ce6a3b7aea7269284706f031e5a60734..019540ce76b3fda2c9ccb35e7735e10cd97395f0
@@@ -8,12 -8,13 +8,15 @@@
  // option. This file may not be copied, modified, or distributed
  // except according to those terms.
  
 +use container::Container;
 +use ptr::Ptr;
  use vec;
+ use ops::Drop;
+ use libc::{c_uint, uintptr_t};
  
  pub struct StackSegment {
-     buf: ~[u8]
+     buf: ~[u8],
+     valgrind_id: c_uint
  }
  
  pub impl StackSegment {
@@@ -49,3 -71,8 +73,8 @@@ impl StackPool 
      fn give_segment(&self, _stack: StackSegment) {
      }
  }
 -}
+ extern {
+     fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
+     fn rust_valgrind_stack_deregister(id: c_uint);
++}
index 0000000000000000000000000000000000000000,ef376199fcbdc6945ae12f3d7c5568117a7e0938..8e7bf72fa63087457b606b9b00ad282c4eea3bf3
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,182 +1,184 @@@
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+ //! A very simple unsynchronized channel type for sending buffered data from
+ //! scheduler context to task context.
+ //!
+ //! XXX: This would be safer to use if split into two types like Port/Chan
+ use option::*;
+ use clone::Clone;
+ use super::rc::RC;
+ use rt::sched::Task;
+ use rt::{context, TaskContext, SchedulerContext};
+ use rt::local_sched;
++use vec::OwnedVector;
++use container::Container;
+ struct TubeState<T> {
+     blocked_task: Option<~Task>,
+     buf: ~[T]
+ }
+ pub struct Tube<T> {
+     p: RC<TubeState<T>>
+ }
+ impl<T> Tube<T> {
+     pub fn new() -> Tube<T> {
+         Tube {
+             p: RC::new(TubeState {
+                 blocked_task: None,
+                 buf: ~[]
+             })
+         }
+     }
+     pub fn send(&mut self, val: T) {
+         rtdebug!("tube send");
+         assert!(context() == SchedulerContext);
+         unsafe {
+             let state = self.p.unsafe_borrow_mut();
+             (*state).buf.push(val);
+             if (*state).blocked_task.is_some() {
+                 // There's a waiting task. Wake it up
+                 rtdebug!("waking blocked tube");
+                 let task = (*state).blocked_task.swap_unwrap();
+                 let sched = local_sched::take();
+                 sched.resume_task_immediately(task);
+             }
+         }
+     }
+     pub fn recv(&mut self) -> T {
+         assert!(context() == TaskContext);
+         unsafe {
+             let state = self.p.unsafe_borrow_mut();
+             if !(*state).buf.is_empty() {
+                 return (*state).buf.shift();
+             } else {
+                 // Block and wait for the next message
+                 rtdebug!("blocking on tube recv");
+                 assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+                 assert!((*state).blocked_task.is_none());
+                 let sched = local_sched::take();
+                 do sched.deschedule_running_task_and_then |task| {
+                     (*state).blocked_task = Some(task);
+                 }
+                 rtdebug!("waking after tube recv");
+                 let buf = &mut (*state).buf;
+                 assert!(!buf.is_empty());
+                 return buf.shift();
+             }
+         }
+     }
+ }
+ impl<T> Clone for Tube<T> {
+     fn clone(&self) -> Tube<T> {
+         Tube { p: self.p.clone() }
+     }
+ }
+ #[cfg(test)]
+ mod test {
+     use int;
+     use cell::Cell;
+     use rt::local_sched;
+     use rt::test::*;
+     use rt::rtio::EventLoop;
+     use super::*;
+     #[test]
+     fn simple_test() {
+         do run_in_newsched_task {
+             let mut tube: Tube<int> = Tube::new();
+             let tube_clone = tube.clone();
+             let tube_clone_cell = Cell(tube_clone);
+             let sched = local_sched::take();
+             do sched.deschedule_running_task_and_then |task| {
+                 let mut tube_clone = tube_clone_cell.take();
+                 tube_clone.send(1);
+                 let sched = local_sched::take();
+                 sched.resume_task_immediately(task);
+             }
+             assert!(tube.recv() == 1);
+         }
+     }
+     #[test]
+     fn blocking_test() {
+         do run_in_newsched_task {
+             let mut tube: Tube<int> = Tube::new();
+             let tube_clone = tube.clone();
+             let tube_clone = Cell(Cell(Cell(tube_clone)));
+             let sched = local_sched::take();
+             do sched.deschedule_running_task_and_then |task| {
+                 let tube_clone = tube_clone.take();
+                 do local_sched::borrow |sched| {
+                     let tube_clone = tube_clone.take();
+                     do sched.event_loop.callback {
+                         let mut tube_clone = tube_clone.take();
+                         // The task should be blocked on this now and
+                         // sending will wake it up.
+                         tube_clone.send(1);
+                     }
+                 }
+                 let sched = local_sched::take();
+                 sched.resume_task_immediately(task);
+             }
+             assert!(tube.recv() == 1);
+         }
+     }
+     #[test]
+     fn many_blocking_test() {
+         static MAX: int = 100;
+         do run_in_newsched_task {
+             let mut tube: Tube<int> = Tube::new();
+             let tube_clone = tube.clone();
+             let tube_clone = Cell(tube_clone);
+             let sched = local_sched::take();
+             do sched.deschedule_running_task_and_then |task| {
+                 callback_send(tube_clone.take(), 0);
+                 fn callback_send(tube: Tube<int>, i: int) {
+                     if i == 100 { return; }
+                     let tube = Cell(Cell(tube));
+                     do local_sched::borrow |sched| {
+                         let tube = tube.take();
+                         do sched.event_loop.callback {
+                             let mut tube = tube.take();
+                             // The task should be blocked on this now and
+                             // sending will wake it up.
+                             tube.send(i);
+                             callback_send(tube, i + 1);
+                         }
+                     }
+                 }
+                 let sched = local_sched::take();
+                 sched.resume_task_immediately(task);
+             }
+             for int::range(0, MAX) |i| {
+                 let j = tube.recv();
+                 assert!(j == i);
+             }
+         }
+     }
+ }
index 6499f0a3efdcf92e5aac820ee21dba196381829f,684099d7fd11fa2ace8f236cb46c357b953e7493..e71944913975812c25e82b685f9ab6165b665c34
@@@ -34,13 -34,14 +34,19 @@@ via `close` and `delete` methods
  
  */
  
 +use container::Container;
 +use option::*;
 +use str::raw::from_c_str;
 +use to_str::ToStr;
++use ptr::Ptr;
+ use libc;
  use vec;
  use ptr;
- use ptr::Ptr;
+ use cast;
+ use str;
+ use option::*;
+ use str::raw::from_c_str;
+ use to_str::ToStr;
  use libc::{c_void, c_int, size_t, malloc, free};
  use cast::transmute;
  use ptr::null;
index 3e6aa657c57dd4273211f007dcd14214c845d42d,fede71ec67931df0238c19b494ef98e5166f5210..fd78b552119b5c82f5dad8ee8184dfafa24d7d87
@@@ -355,93 -328,109 +328,109 @@@ impl NativeHandle<*uvll::uv_write_t> fo
  }
  
  
- #[test]
- fn connect_close() {
-     do run_in_bare_thread() {
-         let mut loop_ = Loop::new();
-         let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-         // Connect to a port where nobody is listening
-         let addr = next_test_ip4();
-         do tcp_watcher.connect(addr) |stream_watcher, status| {
-             rtdebug!("tcp_watcher.connect!");
-             assert!(status.is_some());
-             assert!(status.get().name() == ~"ECONNREFUSED");
-             stream_watcher.close(||());
+ #[cfg(test)]
+ mod test {
+     use super::*;
+     use util::ignore;
+     use cell::Cell;
+     use vec;
+     use unstable::run_in_bare_thread;
+     use rt::thread::Thread;
+     use rt::test::*;
+     use rt::uv::{Loop, AllocCallback};
+     use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
+     #[test]
+     fn connect_close() {
+         do run_in_bare_thread() {
+             let mut loop_ = Loop::new();
+             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+             // Connect to a port where nobody is listening
+             let addr = next_test_ip4();
+             do tcp_watcher.connect(addr) |stream_watcher, status| {
+                 rtdebug!("tcp_watcher.connect!");
+                 assert!(status.is_some());
+                 assert!(status.get().name() == ~"ECONNREFUSED");
+                 stream_watcher.close(||());
+             }
+             loop_.run();
+             loop_.close();
          }
-         loop_.run();
-         loop_.close();
      }
- }
  
- #[test]
- fn listen() {
-     do run_in_bare_thread() {
-         static MAX: int = 10;
-         let mut loop_ = Loop::new();
-         let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
-         let addr = next_test_ip4();
-         server_tcp_watcher.bind(addr);
-         let loop_ = loop_;
-         rtdebug!("listening");
-         do server_tcp_watcher.listen |server_stream_watcher, status| {
-             rtdebug!("listened!");
-             assert!(status.is_none());
-             let mut server_stream_watcher = server_stream_watcher;
-             let mut loop_ = loop_;
-             let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-             let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-             server_stream_watcher.accept(client_tcp_watcher);
-             let count_cell = Cell(0);
-             let server_stream_watcher = server_stream_watcher;
-             rtdebug!("starting read");
-             let alloc: AllocCallback = |size| {
-                 vec_to_uv_buf(vec::from_elem(size, 0))
-             };
-             do client_tcp_watcher.read_start(alloc)
-                 |stream_watcher, nread, buf, status| {
-                 rtdebug!("i'm reading!");
-                 let buf = vec_from_uv_buf(buf);
-                 let mut count = count_cell.take();
-                 if status.is_none() {
-                     rtdebug!("got %d bytes", nread);
-                     let buf = buf.unwrap();
-                     for buf.slice(0, nread as uint).each |byte| {
-                         assert!(*byte == count as u8);
-                         rtdebug!("%u", *byte as uint);
-                         count += 1;
-                     }
-                 } else {
-                     assert!(count == MAX);
-                     do stream_watcher.close {
-                         server_stream_watcher.close(||());
+     #[test]
+     fn listen() {
+         do run_in_bare_thread() {
+             static MAX: int = 10;
+             let mut loop_ = Loop::new();
+             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+             let addr = next_test_ip4();
+             server_tcp_watcher.bind(addr);
+             let loop_ = loop_;
+             rtdebug!("listening");
+             do server_tcp_watcher.listen |server_stream_watcher, status| {
+                 rtdebug!("listened!");
+                 assert!(status.is_none());
+                 let mut server_stream_watcher = server_stream_watcher;
+                 let mut loop_ = loop_;
+                 let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+                 server_stream_watcher.accept(client_tcp_watcher);
+                 let count_cell = Cell(0);
+                 let server_stream_watcher = server_stream_watcher;
+                 rtdebug!("starting read");
+                 let alloc: AllocCallback = |size| {
+                     vec_to_uv_buf(vec::from_elem(size, 0))
+                 };
+                 do client_tcp_watcher.read_start(alloc)
+                     |stream_watcher, nread, buf, status| {
+                     rtdebug!("i'm reading!");
+                     let buf = vec_from_uv_buf(buf);
+                     let mut count = count_cell.take();
+                     if status.is_none() {
+                         rtdebug!("got %d bytes", nread);
+                         let buf = buf.unwrap();
+                         for buf.slice(0, nread as uint).each |byte| {
+                             assert!(*byte == count as u8);
+                             rtdebug!("%u", *byte as uint);
+                             count += 1;
+                         }
+                     } else {
+                         assert!(count == MAX);
+                         do stream_watcher.close {
+                             server_stream_watcher.close(||());
+                         }
                      }
+                     count_cell.put_back(count);
                  }
-                 count_cell.put_back(count);
              }
-         }
  
-         let _client_thread = do Thread::start {
-             rtdebug!("starting client thread");
-             let mut loop_ = Loop::new();
-             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-             do tcp_watcher.connect(addr) |stream_watcher, status| {
-                 rtdebug!("connecting");
-                 assert!(status.is_none());
-                 let mut stream_watcher = stream_watcher;
-                 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
-                 do stream_watcher.write(msg) |stream_watcher, status| {
-                     rtdebug!("writing");
+             let _client_thread = do Thread::start {
+                 rtdebug!("starting client thread");
+                 let mut loop_ = Loop::new();
+                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+                 do tcp_watcher.connect(addr) |stream_watcher, status| {
+                     rtdebug!("connecting");
                      assert!(status.is_none());
-                     stream_watcher.close(||());
+                     let mut stream_watcher = stream_watcher;
+                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
+                     let buf = slice_to_uv_buf(msg);
+                     let msg_cell = Cell(msg);
+                     do stream_watcher.write(buf) |stream_watcher, status| {
+                         rtdebug!("writing");
+                         assert!(status.is_none());
+                         let msg_cell = Cell(msg_cell.take());
+                         stream_watcher.close(||ignore(msg_cell.take()));
+                     }
                  }
-             }
+                 loop_.run();
+                 loop_.close();
+             };
+             let mut loop_ = loop_;
              loop_.run();
              loop_.close();
-         };
-         let mut loop_ = loop_;
-         loop_.run();
-         loop_.close();
+         }
      }
 -}
 +}
index 0000000000000000000000000000000000000000,c031d7a1a6961257b3402c887d08f5c2a879cb34..cc9eb2ada4d17f4d7b23e25e9f87dc04d41443a9
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,481 +1,483 @@@
 -
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+ use option::*;
+ use result::*;
+ use ops::Drop;
++use old_iter::CopyableIter;
+ use cell::{Cell, empty_cell};
+ use cast::transmute;
 -                let mut buf = [1, .. 2048];
++use clone::Clone;
+ use rt::io::IoError;
+ use rt::io::net::ip::IpAddr;
+ use rt::uv::*;
+ use rt::uv::idle::IdleWatcher;
+ use rt::rtio::*;
+ use rt::sched::{Scheduler, local_sched};
+ use rt::io::{standard_error, OtherIoError};
+ use rt::tube::Tube;
++#[cfg(test)] use container::Container;
+ #[cfg(test)] use uint;
+ #[cfg(test)] use unstable::run_in_bare_thread;
+ #[cfg(test)] use rt::test::*;
+ pub struct UvEventLoop {
+     uvio: UvIoFactory
+ }
+ pub impl UvEventLoop {
+     fn new() -> UvEventLoop {
+         UvEventLoop {
+             uvio: UvIoFactory(Loop::new())
+         }
+     }
+     /// A convenience constructor
+     fn new_scheduler() -> Scheduler {
+         Scheduler::new(~UvEventLoop::new())
+     }
+ }
+ impl Drop for UvEventLoop {
+     fn finalize(&self) {
+         // XXX: Need mutable finalizer
+         let this = unsafe {
+             transmute::<&UvEventLoop, &mut UvEventLoop>(self)
+         };
+         this.uvio.uv_loop().close();
+     }
+ }
+ impl EventLoop for UvEventLoop {
+     fn run(&mut self) {
+         self.uvio.uv_loop().run();
+     }
+     fn callback(&mut self, f: ~fn()) {
+         let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
+         do idle_watcher.start |idle_watcher, status| {
+             assert!(status.is_none());
+             let mut idle_watcher = idle_watcher;
+             idle_watcher.stop();
+             idle_watcher.close(||());
+             f();
+         }
+     }
+     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
+         Some(&mut self.uvio)
+     }
+ }
+ #[test]
+ fn test_callback_run_once() {
+     do run_in_bare_thread {
+         let mut event_loop = UvEventLoop::new();
+         let mut count = 0;
+         let count_ptr: *mut int = &mut count;
+         do event_loop.callback {
+             unsafe { *count_ptr += 1 }
+         }
+         event_loop.run();
+         assert!(count == 1);
+     }
+ }
+ pub struct UvIoFactory(Loop);
+ pub impl UvIoFactory {
+     fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
+         match self { &UvIoFactory(ref mut ptr) => ptr }
+     }
+ }
+ impl IoFactory for UvIoFactory {
+     // Connect to an address and return a new stream
+     // NB: This blocks the task waiting on the connection.
+     // It would probably be better to return a future
+     fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
+         // Create a cell in the task to hold the result. We will fill
+         // the cell before resuming the task.
+         let result_cell = empty_cell();
+         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
+         let scheduler = local_sched::take();
+         assert!(scheduler.in_task_context());
+         // Block this task and take ownership, switch to scheduler context
+         do scheduler.deschedule_running_task_and_then |task| {
+             rtdebug!("connect: entered scheduler context");
+             do local_sched::borrow |scheduler| {
+                 assert!(!scheduler.in_task_context());
+             }
+             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+             let task_cell = Cell(task);
+             // Wait for a connection
+             do tcp_watcher.connect(addr) |stream_watcher, status| {
+                 rtdebug!("connect: in connect callback");
+                 if status.is_none() {
+                     rtdebug!("status is none");
+                     let res = Ok(~UvTcpStream { watcher: stream_watcher });
+                     // Store the stream in the task's stack
+                     unsafe { (*result_cell_ptr).put_back(res); }
+                     // Context switch
+                     let scheduler = local_sched::take();
+                     scheduler.resume_task_immediately(task_cell.take());
+                 } else {
+                     rtdebug!("status is some");
+                     let task_cell = Cell(task_cell.take());
+                     do stream_watcher.close {
+                         let res = Err(uv_error_to_io_error(status.get()));
+                         unsafe { (*result_cell_ptr).put_back(res); }
+                         let scheduler = local_sched::take();
+                         scheduler.resume_task_immediately(task_cell.take());
+                     }
+                 };
+             }
+         }
+         assert!(!result_cell.is_empty());
+         return result_cell.take();
+     }
+     fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
+         let mut watcher = TcpWatcher::new(self.uv_loop());
+         match watcher.bind(addr) {
+             Ok(_) => Ok(~UvTcpListener::new(watcher)),
+             Err(uverr) => {
+                 let scheduler = local_sched::take();
+                 do scheduler.deschedule_running_task_and_then |task| {
+                     let task_cell = Cell(task);
+                     do watcher.as_stream().close {
+                         let scheduler = local_sched::take();
+                         scheduler.resume_task_immediately(task_cell.take());
+                     }
+                 }
+                 Err(uv_error_to_io_error(uverr))
+             }
+         }
+     }
+ }
+ // FIXME #6090: Prefer newtype structs but Drop doesn't work
+ pub struct UvTcpListener {
+     watcher: TcpWatcher,
+     listening: bool,
+     incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+ }
+ impl UvTcpListener {
+     fn new(watcher: TcpWatcher) -> UvTcpListener {
+         UvTcpListener {
+             watcher: watcher,
+             listening: false,
+             incoming_streams: Tube::new()
+         }
+     }
+     fn watcher(&self) -> TcpWatcher { self.watcher }
+ }
+ impl Drop for UvTcpListener {
+     fn finalize(&self) {
+         let watcher = self.watcher();
+         let scheduler = local_sched::take();
+         do scheduler.deschedule_running_task_and_then |task| {
+             let task_cell = Cell(task);
+             do watcher.as_stream().close {
+                 let scheduler = local_sched::take();
+                 scheduler.resume_task_immediately(task_cell.take());
+             }
+         }
+     }
+ }
+ impl RtioTcpListener for UvTcpListener {
+     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
+         rtdebug!("entering listen");
+         if self.listening {
+             return self.incoming_streams.recv();
+         }
+         self.listening = true;
+         let server_tcp_watcher = self.watcher();
+         let incoming_streams_cell = Cell(self.incoming_streams.clone());
+         let incoming_streams_cell = Cell(incoming_streams_cell.take());
+         let mut server_tcp_watcher = server_tcp_watcher;
+         do server_tcp_watcher.listen |server_stream_watcher, status| {
+             let maybe_stream = if status.is_none() {
+                 let mut server_stream_watcher = server_stream_watcher;
+                 let mut loop_ = server_stream_watcher.event_loop();
+                 let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                 let client_tcp_watcher = client_tcp_watcher.as_stream();
+                 // XXX: Need's to be surfaced in interface
+                 server_stream_watcher.accept(client_tcp_watcher);
+                 Ok(~UvTcpStream { watcher: client_tcp_watcher })
+             } else {
+                 Err(standard_error(OtherIoError))
+             };
+             let mut incoming_streams = incoming_streams_cell.take();
+             incoming_streams.send(maybe_stream);
+             incoming_streams_cell.put_back(incoming_streams);
+         }
+         return self.incoming_streams.recv();
+     }
+ }
+ // FIXME #6090: Prefer newtype structs but Drop doesn't work
+ pub struct UvTcpStream {
+     watcher: StreamWatcher
+ }
+ impl UvTcpStream {
+     fn watcher(&self) -> StreamWatcher { self.watcher }
+ }
+ impl Drop for UvTcpStream {
+     fn finalize(&self) {
+         rtdebug!("closing tcp stream");
+         let watcher = self.watcher();
+         let scheduler = local_sched::take();
+         do scheduler.deschedule_running_task_and_then |task| {
+             let task_cell = Cell(task);
+             do watcher.close {
+                 let scheduler = local_sched::take();
+                 scheduler.resume_task_immediately(task_cell.take());
+             }
+         }
+     }
+ }
+ impl RtioTcpStream for UvTcpStream {
+     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+         let result_cell = empty_cell();
+         let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+         let scheduler = local_sched::take();
+         assert!(scheduler.in_task_context());
+         let watcher = self.watcher();
+         let buf_ptr: *&mut [u8] = &buf;
+         do scheduler.deschedule_running_task_and_then |task| {
+             rtdebug!("read: entered scheduler context");
+             do local_sched::borrow |scheduler| {
+                 assert!(!scheduler.in_task_context());
+             }
+             let mut watcher = watcher;
+             let task_cell = Cell(task);
+             // XXX: We shouldn't reallocate these callbacks every
+             // call to read
+             let alloc: AllocCallback = |_| unsafe {
+                 slice_to_uv_buf(*buf_ptr)
+             };
+             do watcher.read_start(alloc) |watcher, nread, _buf, status| {
+                 // Stop reading so that no read callbacks are
+                 // triggered before the user calls `read` again.
+                 // XXX: Is there a performance impact to calling
+                 // stop here?
+                 let mut watcher = watcher;
+                 watcher.read_stop();
+                 let result = if status.is_none() {
+                     assert!(nread >= 0);
+                     Ok(nread as uint)
+                 } else {
+                     Err(standard_error(OtherIoError))
+                 };
+                 unsafe { (*result_cell_ptr).put_back(result); }
+                 let scheduler = local_sched::take();
+                 scheduler.resume_task_immediately(task_cell.take());
+             }
+         }
+         assert!(!result_cell.is_empty());
+         return result_cell.take();
+     }
+     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+         let result_cell = empty_cell();
+         let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+         let scheduler = local_sched::take();
+         assert!(scheduler.in_task_context());
+         let watcher = self.watcher();
+         let buf_ptr: *&[u8] = &buf;
+         do scheduler.deschedule_running_task_and_then |task| {
+             let mut watcher = watcher;
+             let task_cell = Cell(task);
+             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+             do watcher.write(buf) |_watcher, status| {
+                 let result = if status.is_none() {
+                     Ok(())
+                 } else {
+                     Err(standard_error(OtherIoError))
+                 };
+                 unsafe { (*result_cell_ptr).put_back(result); }
+                 let scheduler = local_sched::take();
+                 scheduler.resume_task_immediately(task_cell.take());
+             }
+         }
+         assert!(!result_cell.is_empty());
+         return result_cell.take();
+     }
+ }
+ #[test]
+ fn test_simple_io_no_connect() {
+     do run_in_newsched_task {
+         unsafe {
+             let io = local_sched::unsafe_borrow_io();
+             let addr = next_test_ip4();
+             let maybe_chan = (*io).tcp_connect(addr);
+             assert!(maybe_chan.is_err());
+         }
+     }
+ }
+ #[test]
+ fn test_simple_tcp_server_and_client() {
+     do run_in_newsched_task {
+         let addr = next_test_ip4();
+         // Start the server first so it's listening when we connect
+         do spawntask_immediately {
+             unsafe {
+                 let io = local_sched::unsafe_borrow_io();
+                 let mut listener = (*io).tcp_bind(addr).unwrap();
+                 let mut stream = listener.accept().unwrap();
+                 let mut buf = [0, .. 2048];
+                 let nread = stream.read(buf).unwrap();
+                 assert!(nread == 8);
+                 for uint::range(0, nread) |i| {
+                     rtdebug!("%u", buf[i] as uint);
+                     assert!(buf[i] == i as u8);
+                 }
+             }
+         }
+         do spawntask_immediately {
+             unsafe {
+                 let io = local_sched::unsafe_borrow_io();
+                 let mut stream = (*io).tcp_connect(addr).unwrap();
+                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+             }
+         }
+     }
+ }
+ #[test] #[ignore(reason = "busted")]
+ fn test_read_and_block() {
+     do run_in_newsched_task {
+         let addr = next_test_ip4();
+         do spawntask_immediately {
+             let io = unsafe { local_sched::unsafe_borrow_io() };
+             let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
+             let mut stream = listener.accept().unwrap();
+             let mut buf = [0, .. 2048];
+             let expected = 32;
+             let mut current = 0;
+             let mut reads = 0;
+             while current < expected {
+                 let nread = stream.read(buf).unwrap();
+                 for uint::range(0, nread) |i| {
+                     let val = buf[i] as uint;
+                     assert!(val == current % 8);
+                     current += 1;
+                 }
+                 reads += 1;
+                 let scheduler = local_sched::take();
+                 // Yield to the other task in hopes that it
+                 // will trigger a read callback while we are
+                 // not ready for it
+                 do scheduler.deschedule_running_task_and_then |task| {
+                     let task = Cell(task);
+                     do local_sched::borrow |scheduler| {
+                         scheduler.task_queue.push_back(task.take());
+                     }
+                 }
+             }
+             // Make sure we had multiple reads
+             assert!(reads > 1);
+         }
+         do spawntask_immediately {
+             unsafe {
+                 let io = local_sched::unsafe_borrow_io();
+                 let mut stream = (*io).tcp_connect(addr).unwrap();
+                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+             }
+         }
+     }
+ }
+ #[test]
+ fn test_read_read_read() {
+     do run_in_newsched_task {
+         let addr = next_test_ip4();
+         static MAX: uint = 500000;
+         do spawntask_immediately {
+             unsafe {
+                 let io = local_sched::unsafe_borrow_io();
+                 let mut listener = (*io).tcp_bind(addr).unwrap();
+                 let mut stream = listener.accept().unwrap();
++                let buf = [1, .. 2048];
+                 let mut total_bytes_written = 0;
+                 while total_bytes_written < MAX {
+                     stream.write(buf);
+                     total_bytes_written += buf.len();
+                 }
+             }
+         }
+         do spawntask_immediately {
+             unsafe {
+                 let io = local_sched::unsafe_borrow_io();
+                 let mut stream = (*io).tcp_connect(addr).unwrap();
+                 let mut buf = [0, .. 2048];
+                 let mut total_bytes_read = 0;
+                 while total_bytes_read < MAX {
+                     let nread = stream.read(buf).unwrap();
+                     rtdebug!("read %u bytes", nread as uint);
+                     total_bytes_read += nread;
+                     for uint::range(0, nread) |i| {
+                         assert!(buf[i] == 1);
+                     }
+                 }
+                 rtdebug!("read %u bytes total", total_bytes_read as uint);
+             }
+         }
+     }
+ }
index bef7a7f87d3bd7a8622ab9c925a8ba395952d4c7,0000000000000000000000000000000000000000..18a6262f17de99b2b78ccef78a120b8a8e1d57c0
mode 100644,000000..100644
--- /dev/null
@@@ -1,78 -1,0 +1,77 @@@
- pub mod exchange_alloc;
 +// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
 +// file at the top-level directory of this distribution and at
 +// http://rust-lang.org/COPYRIGHT.
 +//
 +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
 +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
 +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
 +// option. This file may not be copied, modified, or distributed
 +// except according to those terms.
 +
 +#[doc(hidden)];
 +
 +use libc;
 +use comm::{GenericChan, GenericPort};
 +use prelude::*;
 +use task;
 +
 +pub mod at_exit;
 +pub mod global;
 +pub mod finally;
 +pub mod weak_task;
 +pub mod intrinsics;
 +pub mod simd;
 +pub mod extfmt;
 +#[cfg(not(test))]
 +pub mod lang;
 +pub mod sync;
 +
 +/**
 +
 +Start a new thread outside of the current runtime context and wait
 +for it to terminate.
 +
 +The executing thread has no access to a task pointer and will be using
 +a normal large stack.
 +*/
 +pub fn run_in_bare_thread(f: ~fn()) {
 +    let (port, chan) = comm::stream();
 +    // FIXME #4525: Unfortunate that this creates an extra scheduler but it's
 +    // necessary since rust_raw_thread_join_delete is blocking
 +    do task::spawn_sched(task::SingleThreaded) {
 +        unsafe {
 +            let closure: &fn() = || {
 +                f()
 +            };
 +            let thread = rust_raw_thread_start(&closure);
 +            rust_raw_thread_join_delete(thread);
 +            chan.send(());
 +        }
 +    }
 +    port.recv();
 +}
 +
 +#[test]
 +fn test_run_in_bare_thread() {
 +    let i = 100;
 +    do run_in_bare_thread {
 +        assert!(i == 100);
 +    }
 +}
 +
 +#[test]
 +fn test_run_in_bare_thread_exchange() {
 +    // Does the exchange heap work without the runtime?
 +    let i = ~100;
 +    do run_in_bare_thread {
 +        assert!(i == ~100);
 +    }
 +}
 +
 +#[allow(non_camel_case_types)] // runtime type
 +pub type raw_thread = libc::c_void;
 +
 +extern {
 +    fn rust_raw_thread_start(f: &(&fn())) -> *raw_thread;
 +    fn rust_raw_thread_join_delete(thread: *raw_thread);
 +}