]> git.lizzy.rs Git - rust.git/commitdiff
Merge remote-tracking branch 'brson/nocommupstream'
authorBrian Anderson <banderson@mozilla.com>
Sat, 26 Jan 2013 01:51:53 +0000 (17:51 -0800)
committerBrian Anderson <banderson@mozilla.com>
Sat, 26 Jan 2013 02:06:30 +0000 (18:06 -0800)
Conflicts:
src/libcore/private.rs
src/libcore/task/mod.rs
src/libcore/task/spawn.rs
src/libstd/net_tcp.rs
src/libstd/uv_global_loop.rs
src/libstd/uv_iotask.rs

17 files changed:
1  2 
src/libcore/os.rs
src/libcore/pipes.rs
src/libcore/private.rs
src/libcore/private/at_exit.rs
src/libcore/private/finally.rs
src/libcore/private/global.rs
src/libcore/private/weak_task.rs
src/libcore/run.rs
src/libcore/task/mod.rs
src/libcore/task/spawn.rs
src/libstd/flatpipes.rs
src/libstd/net_ip.rs
src/libstd/net_tcp.rs
src/libstd/timer.rs
src/libstd/uv_global_loop.rs
src/libstd/uv_iotask.rs
src/rt/rust_kernel.h

Simple merge
Simple merge
index ad27729cc9fa6e629f6855971d85f6c395e2dd03,23268b1b778d875682ecae3d96433edcda94fcfa..332c763f151e85b4bbbadad69c7a4e7b733e3571
@@@ -87,267 -89,11 +93,13 @@@ fn test_run_in_bare_thread() 
      }
  }
  
- #[allow(non_camel_case_types)] // runtime type
- type rust_port_id = uint;
- type GlobalPtr = *libc::uintptr_t;
  fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
 -    let old = rusti::atomic_cxchg(address, oldval, newval);
 -    old == oldval
 +    unsafe {
 +        let old = rusti::atomic_cxchg(address, oldval, newval);
 +        old == oldval
 +    }
  }
  
- /**
-  * Atomically gets a channel from a pointer to a pointer-sized memory location
-  * or, if no channel exists creates and installs a new channel and sets up a
-  * new task to receive from it.
-  */
- pub unsafe fn chan_from_global_ptr<T: Owned>(
-     global: GlobalPtr,
-     task_fn: fn() -> task::TaskBuilder,
-     f: fn~(oldcomm::Port<T>)
- ) -> oldcomm::Chan<T> {
-     enum Msg {
-         Proceed,
-         Abort
-     }
-     log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
-     let is_probably_zero = *global == 0u;
-     log(debug,~"after is_prob_zero check");
-     if is_probably_zero {
-         log(debug,~"is probably zero...");
-         // There's no global channel. We must make it
-         let (setup1_po, setup1_ch) = pipes::stream();
-         let (setup2_po, setup2_ch) = pipes::stream();
-         // FIXME #4422: Ugly type inference hint
-         let setup2_po: pipes::Port<Msg> = setup2_po;
-         do task_fn().spawn |move f, move setup1_ch, move setup2_po| {
-             let po = oldcomm::Port::<T>();
-             let ch = oldcomm::Chan(&po);
-             setup1_ch.send(ch);
-             // Wait to hear if we are the official instance of
-             // this global task
-             match setup2_po.recv() {
-               Proceed => f(move po),
-               Abort => ()
-             }
-         };
-         log(debug,~"before setup recv..");
-         // This is the proposed global channel
-         let ch = setup1_po.recv();
-         // 0 is our sentinal value. It is not a valid channel
-         assert *ch != 0;
-         // Install the channel
-         log(debug,~"BEFORE COMPARE AND SWAP");
-         let swapped = compare_and_swap(
-             cast::reinterpret_cast(&global),
-             0, cast::reinterpret_cast(&ch));
-         log(debug,fmt!("AFTER .. swapped? %?", swapped));
-         if swapped {
-             // Success!
-             setup2_ch.send(Proceed);
-             ch
-         } else {
-             // Somebody else got in before we did
-             setup2_ch.send(Abort);
-             cast::reinterpret_cast(&*global)
-         }
-     } else {
-         log(debug, ~"global != 0");
-         cast::reinterpret_cast(&*global)
-     }
- }
- #[test]
- pub fn test_from_global_chan1() {
-     // This is unreadable, right?
-     // The global channel
-     let globchan = 0;
-     let globchanp = ptr::addr_of(&globchan);
-     // Create the global channel, attached to a new task
-     let ch = unsafe {
-         do chan_from_global_ptr(globchanp, task::task) |po| {
-             let ch = oldcomm::recv(po);
-             oldcomm::send(ch, true);
-             let ch = oldcomm::recv(po);
-             oldcomm::send(ch, true);
-         }
-     };
-     // Talk to it
-     let po = oldcomm::Port();
-     oldcomm::send(ch, oldcomm::Chan(&po));
-     assert oldcomm::recv(po) == true;
-     // This one just reuses the previous channel
-     let ch = unsafe {
-         do chan_from_global_ptr(globchanp, task::task) |po| {
-             let ch = oldcomm::recv(po);
-             oldcomm::send(ch, false);
-         }
-     };
-     // Talk to the original global task
-     let po = oldcomm::Port();
-     oldcomm::send(ch, oldcomm::Chan(&po));
-     assert oldcomm::recv(po) == true;
- }
- #[test]
- pub fn test_from_global_chan2() {
-     for iter::repeat(100) {
-         // The global channel
-         let globchan = 0;
-         let globchanp = ptr::addr_of(&globchan);
-         let resultpo = oldcomm::Port();
-         let resultch = oldcomm::Chan(&resultpo);
-         // Spawn a bunch of tasks that all want to compete to
-         // create the global channel
-         for uint::range(0, 10) |i| {
-             do task::spawn {
-                 let ch = unsafe {
-                     do chan_from_global_ptr(
-                         globchanp, task::task) |po| {
-                         for uint::range(0, 10) |_j| {
-                             let ch = oldcomm::recv(po);
-                             oldcomm::send(ch, {i});
-                         }
-                     }
-                 };
-                 let po = oldcomm::Port();
-                 oldcomm::send(ch, oldcomm::Chan(&po));
-                 // We are The winner if our version of the
-                 // task was installed
-                 let winner = oldcomm::recv(po);
-                 oldcomm::send(resultch, winner == i);
-             }
-         }
-         // There should be only one winner
-         let mut winners = 0u;
-         for uint::range(0u, 10u) |_i| {
-             let res = oldcomm::recv(resultpo);
-             if res { winners += 1u };
-         }
-         assert winners == 1u;
-     }
- }
- /**
-  * Convert the current task to a 'weak' task temporarily
-  *
-  * As a weak task it will not be counted towards the runtime's set
-  * of live tasks. When there are no more outstanding live (non-weak) tasks
-  * the runtime will send an exit message on the provided channel.
-  *
-  * This function is super-unsafe. Do not use.
-  *
-  * # Safety notes
-  *
-  * * Weak tasks must either die on their own or exit upon receipt of
-  *   the exit message. Failure to do so will cause the runtime to never
-  *   exit
-  * * Tasks must not call `weaken_task` multiple times. This will
-  *   break the kernel's accounting of live tasks.
-  * * Weak tasks must not be supervised. A supervised task keeps
-  *   a reference to its parent, so the parent will not die.
-  */
- pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) {
-     let po = oldcomm::Port();
-     let ch = oldcomm::Chan(&po);
-     unsafe {
-         rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
-     }
-     let _unweaken = Unweaken(ch);
-     f(po);
-     struct Unweaken {
-       ch: oldcomm::Chan<()>,
-       drop {
-         unsafe {
-             rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
-         }
-       }
-     }
-     fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken {
-         Unweaken {
-             ch: ch
-         }
-     }
- }
- #[test]
- pub fn test_weaken_task_then_unweaken() {
-     do task::try {
-         unsafe {
-             do weaken_task |_po| {
-             }
-         }
-     };
- }
- #[test]
- pub fn test_weaken_task_wait() {
-     do task::spawn_unlinked {
-         unsafe {
-             do weaken_task |po| {
-                 oldcomm::recv(po);
-             }
-         }
-     }
- }
- #[test]
- pub fn test_weaken_task_stress() {
-     // Create a bunch of weak tasks
-     for iter::repeat(100u) {
-         do task::spawn {
-             unsafe {
-                 do weaken_task |_po| {
-                 }
-             }
-         }
-         do task::spawn_unlinked {
-             unsafe {
-                 do weaken_task |po| {
-                     // Wait for it to tell us to die
-                     oldcomm::recv(po);
-                 }
-             }
-         }
-     }
- }
- #[test]
- #[ignore(cfg(windows))]
- pub fn test_weaken_task_fail() {
-     let res = do task::try {
-         unsafe {
-             do weaken_task |_po| {
-                 fail;
-             }
-         }
-     };
-     assert result::is_err(&res);
- }
  /****************************************************************************
   * Shared state & exclusive ARC
   ****************************************************************************/
@@@ -533,6 -273,12 +285,14 @@@ pub unsafe fn clone_shared_mutable_stat
      ArcDestruct((*rc).data)
  }
  
 -    fn clone(&self) -> SharedMutableState<T> unsafe {
 -        clone_shared_mutable_state(self)
+ impl<T: Owned> SharedMutableState<T>: Clone {
++    fn clone(&self) -> SharedMutableState<T> {
++        unsafe {
++            clone_shared_mutable_state(self)
++        }
+     }
+ }
  /****************************************************************************/
  
  #[allow(non_camel_case_types)] // runtime type
index 0000000000000000000000000000000000000000,7ac252ea1021179848ebc3d159c1ed23dae82627..a87301dbe07bb8235fc6e5dfd36afb0758de7c14
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,86 +1,98 @@@
 -pub fn at_exit(f: ~fn()) unsafe {
 -    let runner: &fn(*ExitFunctions) = exit_runner;
 -    let runner_pair: sys::Closure = cast::transmute(runner);
 -    let runner_ptr = runner_pair.code;
 -    let runner_ptr = cast::transmute(runner_ptr);
 -    rustrt::rust_register_exit_function(runner_ptr, ~f);
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ use sys;
+ use cast;
+ use ptr;
+ use task;
+ use uint;
+ use vec;
+ use rand;
+ use libc::{c_void, size_t};
+ /**
+ Register a function to be run during runtime shutdown.
+ After all non-weak tasks have exited, registered exit functions will
+ execute, in random order, on the primary scheduler. Each function runs
+ in its own unsupervised task.
+ */
 -fn exit_runner(exit_fns: *ExitFunctions) unsafe {
 -    let exit_fns = &*exit_fns;
++pub fn at_exit(f: ~fn()) {
++    unsafe {
++        let runner: &fn(*ExitFunctions) = exit_runner;
++        let runner_pair: sys::Closure = cast::transmute(runner);
++        let runner_ptr = runner_pair.code;
++        let runner_ptr = cast::transmute(runner_ptr);
++        rustrt::rust_register_exit_function(runner_ptr, ~f);
++    }
+ }
+ // NB: The double pointer indirection here is because ~fn() is a fat
+ // pointer and due to FFI problems I am more comfortable making the
+ // interface use a normal pointer
+ extern mod rustrt {
+     fn rust_register_exit_function(runner: *c_void, f: ~~fn());
+ }
+ struct ExitFunctions {
+     // The number of exit functions
+     count: size_t,
+     // The buffer of exit functions
+     start: *~~fn()
+ }
 -    let mut exit_fns_vec = vec::from_buf(start, count as uint);
++fn exit_runner(exit_fns: *ExitFunctions) {
++    let exit_fns = unsafe { &*exit_fns };
+     let count = (*exit_fns).count;
+     let start = (*exit_fns).start;
+     // NB: from_buf memcpys from the source, which will
+     // give us ownership of the array of functions
 -    while exit_fns_vec.is_not_empty() {
++    let mut exit_fns_vec = unsafe { vec::from_buf(start, count as uint) };
+     // Let's not make any promises about execution order
+     rand::Rng().shuffle_mut(exit_fns_vec);
+     debug!("running %u exit functions", exit_fns_vec.len());
++    while !exit_fns_vec.is_empty() {
+         match exit_fns_vec.pop() {
+             ~f => {
+                 task::task().supervised().spawn(f);
+             }
+         }
+     }
+ }
+ #[abi = "rust-intrinsic"]
+ pub extern mod rusti {
+     fn move_val_init<T>(dst: &mut T, -src: T);
+     fn init<T>() -> T;
+ }
+ #[test]
+ fn test_at_exit() {
+     let i = 10;
+     do at_exit {
+         debug!("at_exit1");
+         assert i == 10;
+     }
+ }
+ #[test]
+ fn test_at_exit_many() {
+     let i = 10;
+     for uint::range(20, 100) |j| {
+         do at_exit {
+             debug!("at_exit2");
+             assert i == 10;
+             assert j > i;
+         }
+     }
+ }
index 0000000000000000000000000000000000000000,f4d76dfd54db08bc4560cffd865b8f29e9094d3e..66e23ff4336021a12db5c3341c1fcb7448abaa15
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,88 +1,98 @@@
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ The Finally trait provides a method, `finally` on
+ stack closures that emulates Java-style try/finally blocks.
+ # Example
+ ~~~
+ do || {
+     ...
+ }.finally {
+     alway_run_this();
+ }
+ ~~~
+ */
+ use ops::Drop;
+ use task::{spawn, failing};
+ pub trait Finally<T> {
+     fn finally(&self, +dtor: &fn()) -> T;
+ }
+ impl<T> &fn() -> T: Finally<T> {
+     // XXX: Should not require a mode here
+     fn finally(&self, +dtor: &fn()) -> T {
+         let _d = Finallyalizer {
+             dtor: dtor
+         };
+         (*self)()
+     }
+ }
+ struct Finallyalizer {
+     dtor: &fn()
+ }
+ impl Finallyalizer: Drop {
+     fn finalize(&self) {
+         (self.dtor)();
+     }
+ }
+ #[test]
+ fn test_success() {
+     let mut i = 0;
+     do (|| {
+         i = 10;
+     }).finally {
+         assert !failing();
+         assert i == 10;
+         i = 20;
+     }
+     assert i == 20;
+ }
+ #[test]
+ #[ignore(cfg(windows))]
+ #[should_fail]
+ fn test_fail() {
+     let mut i = 0;
+     do (|| {
+         i = 10;
+         fail;
+     }).finally {
+         assert failing();
+         assert i == 10;
+     }
+ }
+ #[test]
+ fn test_retval() {
+     let i = do (fn&() -> int {
+         10
+     }).finally { };
+     assert i == 10;
+ }
+ #[test]
+ fn test_compact() {
+     // XXX Should be able to use a fn item instead
+     // of a closure for do_some_fallible_work,
+     // but it's a type error.
+     let do_some_fallible_work: &fn() = || { };
+     fn but_always_run_this_function() { }
+     do_some_fallible_work.finally(
+         but_always_run_this_function);
+ }
index 0000000000000000000000000000000000000000,d9230e08dc76d98954283300447230edc9ad49e7..69319abc0093071e48eefaabc3c28ce74c58aa3b
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,272 +1,296 @@@
 -use send_map::linear::LinearMap;
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ Global data
+ An interface for creating and retrieving values with global
+ (per-runtime) scope.
+ Global values are stored in a map and protected by a single global
+ mutex. Operations are provided for accessing and cloning the value
+ under the mutex.
+ Because all globals go through a single mutex, they should be used
+ sparingly.  The interface is intended to be used with clonable,
+ atomically reference counted synchronization types, like ARCs, in
+ which case the value should be cached locally whenever possible to
+ avoid hitting the mutex.
+ */
+ use cast::{transmute, reinterpret_cast};
+ use clone::Clone;
+ use kinds::Owned;
+ use libc::{c_void, uintptr_t};
+ use option::{Option, Some, None};
+ use ops::Drop;
+ use pipes;
+ use private::{Exclusive, exclusive};
+ use private::{SharedMutableState, shared_mutable_state};
+ use private::{get_shared_immutable_state};
+ use private::at_exit::at_exit;
 -    do get_global_state().with |gs| unsafe {
++use hashmap::linear::LinearMap;
+ use sys::Closure;
+ use task::spawn;
+ use uint;
+ pub type GlobalDataKey<T: Owned> = &fn(v: T);
+ pub unsafe fn global_data_clone_create<T: Owned Clone>(
+     key: GlobalDataKey<T>, create: &fn() -> ~T) -> T {
+     /*!
+      * Clone a global value or, if it has not been created,
+      * first construct the value then return a clone.
+      *
+      * # Safety note
+      *
+      * Both the clone operation and the constructor are
+      * called while the global lock is held. Recursive
+      * use of the global interface in either of these
+      * operations will result in deadlock.
+      */
+     global_data_clone_create_(key_ptr(key), create)
+ }
+ unsafe fn global_data_clone_create_<T: Owned Clone>(
+     key: uint, create: &fn() -> ~T) -> T {
+     let mut clone_value: Option<T> = None;
+     do global_data_modify_(key) |value: Option<~T>| {
+         match value {
+             None => {
+                 let value = create();
+                 clone_value = Some(value.clone());
+                 Some(value)
+             }
+             Some(value) => {
+                 clone_value = Some(value.clone());
+                 Some(value)
+             }
+         }
+     }
+     return clone_value.unwrap();
+ }
+ unsafe fn global_data_modify<T: Owned>(
+     key: GlobalDataKey<T>, op: &fn(Option<~T>) -> Option<~T>) {
+     global_data_modify_(key_ptr(key), op)
+ }
+ unsafe fn global_data_modify_<T: Owned>(
+     key: uint, op: &fn(Option<~T>) -> Option<~T>) {
+     let mut old_dtor = None;
 -fn get_global_state() -> Exclusive<GlobalState> unsafe {
++    do get_global_state().with |gs| {
+         let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) {
+             Some((ptr, dtor)) => {
+                 let value: ~T = transmute(ptr);
+                 (op(Some(value)), Some(dtor))
+             }
+             None => {
+                 (op(None), None)
+             }
+         };
+         match maybe_new_value {
+             Some(value) => {
+                 let data: *c_void = transmute(value);
+                 let dtor: ~fn() = match maybe_dtor {
+                     Some(dtor) => dtor,
+                     None => {
+                         let dtor: ~fn() = || unsafe {
+                             let _destroy_value: ~T = transmute(data);
+                         };
+                         dtor
+                     }
+                 };
+                 let value = (data, dtor);
+                 gs.map.insert(key, value);
+             }
+             None => {
+                 match maybe_dtor {
+                     Some(dtor) => old_dtor = Some(dtor),
+                     None => ()
+                 }
+             }
+         }
+     }
+ }
+ pub unsafe fn global_data_clone<T: Owned Clone>(
+     key: GlobalDataKey<T>) -> Option<T> {
+     let mut maybe_clone: Option<T> = None;
+     do global_data_modify(key) |current| {
+         match &current {
+             &Some(~ref value) => {
+                 maybe_clone = Some(value.clone());
+             }
+             &None => ()
+         }
+         current
+     }
+     return maybe_clone;
+ }
+ // GlobalState is a map from keys to unique pointers and a
+ // destructor. Keys are pointers derived from the type of the
+ // global value.  There is a single GlobalState instance per runtime.
+ struct GlobalState {
+     map: LinearMap<uint, (*c_void, ~fn())>
+ }
+ impl GlobalState: Drop {
+     fn finalize(&self) {
+         for self.map.each_value |v| {
+             match v {
+                 &(_, ref dtor) => (*dtor)()
+             }
+         }
+     }
+ }
 -    let global_ptr = rust_get_global_data_ptr();
++fn get_global_state() -> Exclusive<GlobalState> {
+     const POISON: int = -1;
+     // XXX: Doing atomic_cxchg to initialize the global state
+     // lazily, which wouldn't be necessary with a runtime written
+     // in Rust
 -    if *global_ptr == 0 {
++    let global_ptr = unsafe { rust_get_global_data_ptr() };
 -            map: LinearMap()
++    if unsafe { *global_ptr } == 0 {
+         // Global state doesn't exist yet, probably
+         // The global state object
+         let state = GlobalState {
 -        let state_i: int = transmute(state_ptr);
++            map: LinearMap::new()
+         };
+         // It's under a reference-counted mutex
+         let state = ~exclusive(state);
+         // Convert it to an integer
+         let state_ptr: &Exclusive<GlobalState> = state;
 -        let prev_i = atomic_cxchg(&mut *global_ptr, 0, state_i);
++        let state_i: int = unsafe { transmute(state_ptr) };
+         // Swap our structure into the global pointer
 -            do at_exit || unsafe {
++        let prev_i = unsafe { atomic_cxchg(&mut *global_ptr, 0, state_i) };
+         // Sanity check that we're not trying to reinitialize after shutdown
+         assert prev_i != POISON;
+         if prev_i == 0 {
+             // Successfully installed the global pointer
+             // Take a handle to return
+             let clone = state.clone();
+             // Install a runtime exit function to destroy the global object
 -                let prev_i = atomic_cxchg(&mut *global_ptr, state_i, POISON);
++            do at_exit {
+                 // Poison the global pointer
 -            let state: &Exclusive<GlobalState> = transmute(prev_i);
++                let prev_i = unsafe {
++                    atomic_cxchg(&mut *global_ptr, state_i, POISON)
++                };
+                 assert prev_i == state_i;
+                 // Capture the global state object in the at_exit closure
+                 // so that it is destroyed at the right time
+                 let _capture_global_state = &state;
+             };
+             return clone;
+         } else {
+             // Somebody else initialized the globals first
 -        let state: &Exclusive<GlobalState> = transmute(*global_ptr);
++            let state: &Exclusive<GlobalState> = unsafe { transmute(prev_i) };
+             return state.clone();
+         }
+     } else {
 -fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint unsafe {
 -    let closure: Closure = reinterpret_cast(&key);
 -    return transmute(closure.code);
++        let state: &Exclusive<GlobalState> = unsafe {
++            transmute(*global_ptr)
++        };
+         return state.clone();
+     }
+ }
 -fn test_clone_rc() unsafe {
++fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint {
++    unsafe {
++        let closure: Closure = reinterpret_cast(&key);
++        return transmute(closure.code);
++    }
+ }
+ extern {
+     fn rust_get_global_data_ptr() -> *mut int;
+ }
+ #[abi = "rust-intrinsic"]
+ extern {
+     fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
+ }
+ #[test]
 -        do spawn unsafe {
 -            let val = do global_data_clone_create(key) {
 -                ~shared_mutable_state(10)
 -            };
++fn test_clone_rc() {
+     type MyType = SharedMutableState<int>;
+     fn key(_v: SharedMutableState<int>) { }
+     for uint::range(0, 100) |_| {
 -            assert get_shared_immutable_state(&val) == &10;
++        do spawn {
++            unsafe {
++                let val = do global_data_clone_create(key) {
++                    ~shared_mutable_state(10)
++                };
 -fn test_modify() unsafe {
++                assert get_shared_immutable_state(&val) == &10;
++            }
+         }
+     }
+ }
+ #[test]
 -    do global_data_modify(key) |v| unsafe {
 -        match v {
 -            None => {
 -                Some(~shared_mutable_state(10))
++fn test_modify() {
+     type MyType = SharedMutableState<int>;
+     fn key(_v: SharedMutableState<int>) { }
 -            _ => fail
++    unsafe {
++        do global_data_modify(key) |v| {
++            match v {
++                None => {
++                    unsafe {
++                        Some(~shared_mutable_state(10))
++                    }
++                }
++                _ => fail
+             }
 -    }
+         }
 -    do global_data_modify(key) |v| {
 -        match v {
 -            Some(sms) => {
 -                let v = get_shared_immutable_state(sms);
 -                assert *v == 10;
 -                None
 -            },
 -            _ => fail
 -    }
++        do global_data_modify(key) |v| {
++            match v {
++                Some(sms) => {
++                    let v = get_shared_immutable_state(sms);
++                    assert *v == 10;
++                    None
++                },
++                _ => fail
++            }
+         }
 -    do global_data_modify(key) |v| unsafe {
 -        match v {
 -            None => {
 -                Some(~shared_mutable_state(10))
 -            _ => fail
++        do global_data_modify(key) |v| {
++            match v {
++                None => {
++                    unsafe {
++                        Some(~shared_mutable_state(10))
++                    }
++                }
++                _ => fail
+             }
+         }
+     }
+ }
index 0000000000000000000000000000000000000000,868361b0e6078abafda7460a874684c1a41fdfc6..25a03ff960f46209bbe2729e05d3c1dcd9fda255
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,187 +1,207 @@@
 -use send_map::linear::LinearMap;
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ Weak tasks
+ Weak tasks are a runtime feature for building global services that
+ do not keep the runtime alive. Normally the runtime exits when all
+ tasks exits, but if a task is weak then the runtime may exit while
+ it is running, sending a notification to the task that the runtime
+ is trying to shut down.
+ */
+ use option::{Some, None, swap_unwrap};
+ use private::at_exit::at_exit;
+ use private::global::global_data_clone_create;
+ use private::finally::Finally;
+ use pipes::{Port, Chan, SharedChan, stream};
+ use task::{Task, task, spawn};
+ use task::rt::{task_id, get_task_id};
 -    let mut shutdown_map = LinearMap();
++use hashmap::linear::LinearMap;
+ use ops::Drop;
+ type ShutdownMsg = ();
+ // XXX: This could be a PortOne but I've experienced bugginess
+ // with oneshot pipes and try_send
+ pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
+     let service = global_data_clone_create(global_data_key,
+                                            create_global_service);
+     let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
+     let shutdown_port = ~mut Some(shutdown_port);
+     let task = get_task_id();
+     // Expect the weak task service to be alive
+     assert service.try_send(RegisterWeakTask(task, shutdown_chan));
+     unsafe { rust_inc_weak_task_count(); }
+     do fn&() {
+         let shutdown_port = swap_unwrap(&mut *shutdown_port);
+         f(shutdown_port)
+     }.finally || {
+         unsafe { rust_dec_weak_task_count(); }
+         // Service my have already exited
+         service.send(UnregisterWeakTask(task));
+     }
+ }
+ type WeakTaskService = SharedChan<ServiceMsg>;
+ type TaskHandle = task_id;
+ fn global_data_key(_v: WeakTaskService) { }
+ enum ServiceMsg {
+     RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
+     UnregisterWeakTask(TaskHandle),
+     Shutdown
+ }
+ fn create_global_service() -> ~WeakTaskService {
+     debug!("creating global weak task service");
+     let (port, chan) = stream::<ServiceMsg>();
+     let port = ~mut Some(port);
+     let chan = SharedChan(chan);
+     let chan_clone = chan.clone();
+     do task().unlinked().spawn {
+         debug!("running global weak task service");
+         let port = swap_unwrap(&mut *port);
+         let port = ~mut Some(port);
+         do fn&() {
+             let port = swap_unwrap(&mut *port);
+             // The weak task service is itself a weak task
+             debug!("weakening the weak service task");
+             unsafe { rust_inc_weak_task_count(); }
+             run_weak_task_service(port);
+         }.finally {
+             debug!("unweakening the weak service task");
+             unsafe { rust_dec_weak_task_count(); }
+         }
+     }
+     do at_exit {
+         debug!("shutting down weak task service");
+         chan.send(Shutdown);
+     }
+     return ~chan_clone;
+ }
+ fn run_weak_task_service(port: Port<ServiceMsg>) {
 -fn test_simple() unsafe {
++    let mut shutdown_map = LinearMap::new();
+     loop {
+         match port.recv() {
+             RegisterWeakTask(task, shutdown_chan) => {
+                 let previously_unregistered =
+                     shutdown_map.insert(task, shutdown_chan);
+                 assert previously_unregistered;
+             }
+             UnregisterWeakTask(task) => {
+                 match shutdown_map.pop(&task) {
+                     Some(shutdown_chan) => {
+                         // Oneshot pipes must send, even though
+                         // nobody will receive this
+                         shutdown_chan.send(());
+                     }
+                     None => fail
+                 }
+             }
+             Shutdown => break
+         }
+     }
+     do shutdown_map.consume |_, shutdown_chan| {
+         // Weak task may have already exited
+         shutdown_chan.send(());
+     }
+ }
+ extern {
+     unsafe fn rust_inc_weak_task_count();
+     unsafe fn rust_dec_weak_task_count();
+ }
+ #[test]
 -    do spawn unsafe {
 -        do weaken_task |_signal| {
++fn test_simple() {
+     let (port, chan) = stream();
 -fn test_weak_weak() unsafe {
++    do spawn {
++        unsafe {
++            do weaken_task |_signal| {
++            }
+         }
+         chan.send(());
+     }
+     port.recv();
+ }
+ #[test]
 -    do spawn unsafe {
 -        do weaken_task |_signal| {
 -        }
 -        do weaken_task |_signal| {
++fn test_weak_weak() {
+     let (port, chan) = stream();
 -fn test_wait_for_signal() unsafe {
 -    do spawn unsafe {
 -        do weaken_task |signal| {
 -            signal.recv();
++    do spawn {
++        unsafe {
++            do weaken_task |_signal| {
++            }
++            do weaken_task |_signal| {
++            }
+         }
+         chan.send(());
+     }
+     port.recv();
+ }
+ #[test]
 -fn test_wait_for_signal_many() unsafe {
++fn test_wait_for_signal() {
++    do spawn {
++        unsafe {
++            do weaken_task |signal| {
++                signal.recv();
++            }
+         }
+     }
+ }
+ #[test]
 -        do spawn unsafe {
 -            do weaken_task |signal| {
 -                signal.recv();
++fn test_wait_for_signal_many() {
+     use uint;
+     for uint::range(0, 100) |_| {
 -fn test_select_stream_and_oneshot() unsafe {
++        do spawn {
++            unsafe {
++                do weaken_task |signal| {
++                    signal.recv();
++                }
+             }
+         }
+     }
+ }
+ #[test]
 -    do spawn unsafe {
 -        do weaken_task |signal| {
 -            match select2i(&port, &signal) {
 -                Left(*) => (),
 -                Right(*) => fail
++fn test_select_stream_and_oneshot() {
+     use pipes::select2i;
+     use either::{Left, Right};
+     let (port, chan) = stream();
+     let (waitport, waitchan) = stream();
++    do spawn {
++        unsafe {
++            do weaken_task |signal| {
++                match select2i(&port, &signal) {
++                    Left(*) => (),
++                    Right(*) => fail
++                }
+             }
+         }
+         waitchan.send(());
+     }
+     chan.send(());
+     waitport.recv();
+ }
Simple merge
index a4d99bf5db4a6cb78ea9efcd977a6783d2cc9441,315a2843af6e143608920be0024b712316c7367f..aa82309c78aecf2fd854d8a2be7af5c4ec2cf297
@@@ -204,8 -176,8 +176,8 @@@ pub struct TaskOpts 
      linked: bool,
      supervised: bool,
      mut notify_chan: Option<Chan<TaskResult>>,
-     sched: Option<SchedOpts>,
 -    sched: SchedOpts,
 -};
++    sched: SchedOpts
 +}
  
  /**
   * The task builder type.
@@@ -366,14 -338,11 +338,11 @@@ impl TaskBuilder 
      fn sched_mode(mode: SchedMode) -> TaskBuilder {
          let notify_chan = replace(&mut self.opts.notify_chan, None);
          TaskBuilder {
 -            opts: {
 +            opts: TaskOpts {
                  linked: self.opts.linked,
                  supervised: self.opts.supervised,
-                 notify_chan: notify_chan,
-                 sched: Some(SchedOpts {
-                     mode: mode,
-                     foreign_stack_size: None,
-                 })
 -                mut notify_chan: move notify_chan,
 -                sched: { mode: mode, foreign_stack_size: None}
++                notify_chan: move notify_chan,
++                sched: SchedOpts { mode: mode, foreign_stack_size: None}
              },
              can_not_copy: None,
              .. self.consume()
@@@ -485,11 -453,14 +453,14 @@@ pub fn default_task_opts() -> TaskOpts 
       * into the same scheduler, and do not post lifecycle notifications.
       */
  
 -    {
 +    TaskOpts {
          linked: true,
          supervised: false,
 -        mut notify_chan: None,
 -        sched: {
 +        notify_chan: None,
-         sched: None
++        sched: SchedOpts {
+             mode: DefaultScheduler,
+             foreign_stack_size: None
+         }
      }
  }
  
@@@ -594,11 -560,13 +564,15 @@@ pub fn failing() -> bool 
  pub fn get_task() -> Task {
      //! Get a handle to the running task
  
 -    TaskHandle(rt::get_task_id())
 +    unsafe {
 +        TaskHandle(rt::get_task_id())
 +    }
  }
  
 -    SchedulerHandle(rt::rust_get_sched_id())
+ pub fn get_scheduler() -> Scheduler {
++    SchedulerHandle(unsafe { rt::rust_get_sched_id() })
+ }
  /**
   * Temporarily make the task unkillable
   *
@@@ -929,26 -878,22 +900,22 @@@ fn test_spawn_sched_no_threads() 
  
  #[test]
  fn test_spawn_sched() {
-     let po = oldcomm::Port();
-     let ch = oldcomm::Chan(&po);
+     let (po, ch) = stream::<()>();
+     let ch = SharedChan(ch);
  
-     fn f(i: int, ch: oldcomm::Chan<()>) {
-         unsafe {
-             let parent_sched_id = rt::rust_get_sched_id();
+     fn f(i: int, ch: SharedChan<()>) {
 -        let parent_sched_id = rt::rust_get_sched_id();
++        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
  
-             do spawn_sched(SingleThreaded) {
-                 unsafe {
-                     let child_sched_id = rt::rust_get_sched_id();
-                     assert parent_sched_id != child_sched_id;
-                     if (i == 0) {
-                         oldcomm::send(ch, ());
-                     } else {
-                         f(i - 1, ch);
-                     }
-                 }
-             };
-         }
+         do spawn_sched(SingleThreaded) {
 -            let child_sched_id = rt::rust_get_sched_id();
++            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+             assert parent_sched_id != child_sched_id;
+             if (i == 0) {
+                 ch.send(());
+             } else {
+                 f(i - 1, ch.clone());
+             }
+         };
  
      }
      f(10, ch);
  }
  
  #[test]
- fn test_spawn_sched_childs_on_same_sched() {
-     let po = oldcomm::Port();
-     let ch = oldcomm::Chan(&po);
+ fn test_spawn_sched_childs_on_default_sched() {
+     let (po, ch) = stream();
+     // Assuming tests run on the default scheduler
 -    let default_id = rt::rust_get_sched_id();
++    let default_id = unsafe { rt::rust_get_sched_id() };
  
      do spawn_sched(SingleThreaded) {
-         unsafe {
-             let parent_sched_id = rt::rust_get_sched_id();
-             do spawn {
-                 unsafe {
-                     let child_sched_id = rt::rust_get_sched_id();
-                     // This should be on the same scheduler
-                     assert parent_sched_id == child_sched_id;
-                     oldcomm::send(ch, ());
-                 }
-             };
-         }
 -        let parent_sched_id = rt::rust_get_sched_id();
++        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+         do spawn {
 -            let child_sched_id = rt::rust_get_sched_id();
++            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+             assert parent_sched_id != child_sched_id;
+             assert child_sched_id == default_id;
+             ch.send(());
+         };
      };
  
-     oldcomm::recv(po);
+     po.recv();
  }
  
  #[nolink]
@@@ -1234,24 -1168,22 +1192,24 @@@ fn test_spawn_thread_on_demand() 
      let (port, chan) = pipes::stream();
  
      do spawn_sched(ManualThreads(2)) |move chan| {
 -        let max_threads = rt::rust_sched_threads();
 -        assert(max_threads as int == 2);
 -        let running_threads = rt::rust_sched_current_nonlazy_threads();
 -        assert(running_threads as int == 1);
 +        unsafe {
 +            let max_threads = rt::rust_sched_threads();
 +            assert(max_threads as int == 2);
 +            let running_threads = rt::rust_sched_current_nonlazy_threads();
 +            assert(running_threads as int == 1);
  
 -        let (port2, chan2) = pipes::stream();
 +            let (port2, chan2) = pipes::stream();
  
-             do spawn() |move chan2| {
 -        do spawn_sched(CurrentScheduler) |move chan2| {
 -            chan2.send(());
 -        }
++            do spawn_sched(CurrentScheduler) |move chan2| {
 +                chan2.send(());
 +            }
  
 -        let running_threads2 = rt::rust_sched_current_nonlazy_threads();
 -        assert(running_threads2 as int == 2);
 +            let running_threads2 = rt::rust_sched_current_nonlazy_threads();
 +            assert(running_threads2 as int == 2);
  
 -        port2.recv();
 -        chan.send(());
 +            port2.recv();
 +            chan.send(());
 +        }
      }
  
      port.recv();
index edeacb31e1d0993059b3b422b41b0cc550b16ec8,a844542c214ecfc51efa6b2e6e1b532cfba0de26..a5ab4af40bef71b50f65f4616d5581377b6f463e
@@@ -642,34 -631,36 +642,38 @@@ pub fn spawn_raw(opts: TaskOpts, f: fn~
          }
      }
  
-     fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task {
-         unsafe {
-             if opts.foreign_stack_size != None {
-                 fail ~"foreign_stack_size scheduler option unimplemented";
-             }
+     fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
+         if opts.foreign_stack_size != None {
+             fail ~"foreign_stack_size scheduler option unimplemented";
+         }
  
-             let num_threads = match opts.mode {
-               SingleThreaded => 1u,
-               ThreadPerCore => rt::rust_num_threads(),
-               ThreadPerTask => {
-                 fail ~"ThreadPerTask scheduling mode unimplemented"
-               }
-               ManualThreads(threads) => {
-                 if threads == 0u {
-                     fail ~"can not create a scheduler with no threads";
-                 }
-                 threads
-               }
-               PlatformThread => 0u /* Won't be used */
-             };
+         let num_threads = match opts.mode {
+           DefaultScheduler
+           | CurrentScheduler
+           | ExistingScheduler(*)
+           | PlatformThread => 0u, /* Won't be used */
+           SingleThreaded => 1u,
 -          ThreadPerCore => rt::rust_num_threads(),
++          ThreadPerCore => unsafe { rt::rust_num_threads() },
+           ThreadPerTask => {
+             fail ~"ThreadPerTask scheduling mode unimplemented"
+           }
+           ManualThreads(threads) => {
+             if threads == 0u {
+                 fail ~"can not create a scheduler with no threads";
+             }
+             threads
+           }
+         };
  
-             let sched_id = if opts.mode != PlatformThread {
-                 rt::rust_new_sched(num_threads)
-             } else {
-                 rt::rust_osmain_sched_id()
 -        let sched_id = match opts.mode {
 -            CurrentScheduler => rt::rust_get_sched_id(),
 -            ExistingScheduler(SchedulerHandle(id)) => id,
 -            PlatformThread => rt::rust_osmain_sched_id(),
 -            _ => rt::rust_new_sched(num_threads)
 -        };
 -        rt::rust_new_task_in_sched(sched_id)
++        unsafe {
++            let sched_id = match opts.mode {
++                CurrentScheduler => rt::rust_get_sched_id(),
++                ExistingScheduler(SchedulerHandle(id)) => id,
++                PlatformThread => rt::rust_osmain_sched_id(),
++                _ => rt::rust_new_sched(num_threads)
 +            };
 +            rt::rust_new_task_in_sched(sched_id)
 +        }
      }
  }
  
Simple merge
index 84c3b75564984a82572875270f45bda7b1f20da1,080c5514ac8bc3320fa40f8ca24c0b3ccab2e715..72e58cbd5d3687211019a78ff40419e9d721b220
@@@ -114,39 -114,35 +114,39 @@@ enum IpGetAddrErr 
   * a vector of `ip_addr` results, in the case of success, or an error
   * object in the case of failure
   */
- pub fn get_addr(node: &str, iotask: iotask)
+ pub fn get_addr(node: &str, iotask: &iotask)
          -> result::Result<~[IpAddr], IpGetAddrErr> {
      do oldcomm::listen |output_ch| {
 -        do str::as_buf(node) |node_ptr, len| unsafe {
 -            log(debug, fmt!("slice len %?", len));
 -            let handle = create_uv_getaddrinfo_t();
 -            let handle_ptr = ptr::addr_of(&handle);
 -            let handle_data: GetAddrData = {
 -                output_ch: output_ch
 -            };
 -            let handle_data_ptr = ptr::addr_of(&handle_data);
 -            do interact(iotask) |loop_ptr| unsafe {
 -                let result = uv_getaddrinfo(
 -                    loop_ptr,
 -                    handle_ptr,
 -                    get_addr_cb,
 -                    node_ptr,
 -                    ptr::null(),
 -                    ptr::null());
 -                match result {
 -                  0i32 => {
 -                    set_data_for_req(handle_ptr, handle_data_ptr);
 -                  }
 -                  _ => {
 -                    output_ch.send(result::Err(GetAddrUnknownError));
 -                  }
 -                }
 -            };
 -            output_ch.recv()
 +        do str::as_buf(node) |node_ptr, len| {
 +            unsafe {
 +                log(debug, fmt!("slice len %?", len));
 +                let handle = create_uv_getaddrinfo_t();
 +                let handle_ptr = ptr::addr_of(&handle);
 +                let handle_data = GetAddrData {
 +                    output_ch: output_ch
 +                };
 +                let handle_data_ptr = ptr::addr_of(&handle_data);
 +                do interact(iotask) |loop_ptr| {
 +                    unsafe {
 +                        let result = uv_getaddrinfo(
 +                            loop_ptr,
 +                            handle_ptr,
 +                            get_addr_cb,
 +                            node_ptr,
 +                            ptr::null(),
 +                            ptr::null());
 +                        match result {
 +                          0i32 => {
 +                            set_data_for_req(handle_ptr, handle_data_ptr);
 +                          }
 +                          _ => {
 +                            output_ch.send(result::Err(GetAddrUnknownError));
 +                          }
 +                        }
 +                    }
 +                };
 +                output_ch.recv()
 +            }
          }
      }
  }
index aa5eec2b43ce2f701ecbf53472e1514daba48ee5,75c7a7cbfb9f27074f06bd6247225c933f0a3011..8d6de36947934402a0e4bde7e1af643aba5876f7
@@@ -143,130 -142,125 +143,140 @@@ pub enum TcpConnectErrData 
   * `net::tcp::tcp_connect_err_data` instance will be returned
   */
  pub fn connect(input_ip: ip::IpAddr, port: uint,
-            iotask: IoTask)
+                iotask: &IoTask)
 -    -> result::Result<TcpSocket, TcpConnectErrData> unsafe {
 -    let result_po = oldcomm::Port::<ConnAttempt>();
 -    let closed_signal_po = oldcomm::Port::<()>();
 -    let conn_data = {
 -        result_ch: oldcomm::Chan(&result_po),
 -        closed_signal_ch: oldcomm::Chan(&closed_signal_po)
 -    };
 -    let conn_data_ptr = ptr::addr_of(&conn_data);
 -    let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>();
 -    let stream_handle_ptr = malloc_uv_tcp_t();
 -    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
 -    let socket_data = @{
 -        reader_po: reader_po,
 -        reader_ch: oldcomm::Chan(&reader_po),
 -        stream_handle_ptr: stream_handle_ptr,
 -        connect_req: uv::ll::connect_t(),
 -        write_req: uv::ll::write_t(),
 -        ipv6: match input_ip {
 -            ip::Ipv4(_) => { false }
 -            ip::Ipv6(_) => { true }
 -        },
 -        iotask: iotask.clone()
 -    };
 -    let socket_data_ptr = ptr::addr_of(&(*socket_data));
 -    log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
 -    // get an unsafe representation of our stream_handle_ptr that
 -    // we can send into the interact cb to be handled in libuv..
 -    log(debug, fmt!("stream_handle_ptr outside interact %?",
 -        stream_handle_ptr));
 -    do iotask::interact(iotask) |move input_ip, loop_ptr| unsafe {
 -        log(debug, ~"in interact cb for tcp client connect..");
 -        log(debug, fmt!("stream_handle_ptr in interact %?",
 -            stream_handle_ptr));
 -        match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
 -          0i32 => {
 -            log(debug, ~"tcp_init successful");
 -            log(debug, ~"dealing w/ ipv4 connection..");
 -            let connect_req_ptr =
 -                ptr::addr_of(&((*socket_data_ptr).connect_req));
 -            let addr_str = ip::format_addr(&input_ip);
 -            let connect_result = match input_ip {
 -              ip::Ipv4(ref addr) => {
 -                // have to "recreate" the sockaddr_in/6
 -                // since the ip_addr discards the port
 -                // info.. should probably add an additional
 -                // rust type that actually is closer to
 -                // what the libuv API expects (ip str + port num)
 -                log(debug, fmt!("addr: %?", addr));
 -                let in_addr = uv::ll::ip4_addr(addr_str, port as int);
 -                uv::ll::tcp_connect(
 -                    connect_req_ptr,
 -                    stream_handle_ptr,
 -                    ptr::addr_of(&in_addr),
 -                    tcp_connect_on_connect_cb)
 -              }
 -              ip::Ipv6(ref addr) => {
 -                log(debug, fmt!("addr: %?", addr));
 -                let in_addr = uv::ll::ip6_addr(addr_str, port as int);
 -                uv::ll::tcp_connect6(
 -                    connect_req_ptr,
 -                    stream_handle_ptr,
 -                    ptr::addr_of(&in_addr),
 -                    tcp_connect_on_connect_cb)
 -              }
 -            };
 -            match connect_result {
 -              0i32 => {
 -                log(debug, ~"tcp_connect successful");
 -                // reusable data that we'll have for the
 -                // duration..
 -                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
 -                                           socket_data_ptr as
 -                                              *libc::c_void);
 -                // just so the connect_cb can send the
 -                // outcome..
 -                uv::ll::set_data_for_req(connect_req_ptr,
 -                                         conn_data_ptr);
 -                log(debug, ~"leaving tcp_connect interact cb...");
 -                // let tcp_connect_on_connect_cb send on
 -                // the result_ch, now..
 -              }
 -              _ => {
 -                // immediate connect failure.. probably a garbage
 -                // ip or somesuch
 -                let err_data = uv::ll::get_last_err_data(loop_ptr);
 -                oldcomm::send((*conn_data_ptr).result_ch,
 -                           ConnFailure(err_data.to_tcp_err()));
 -                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
 -                                               conn_data_ptr);
 -                uv::ll::close(stream_handle_ptr, stream_error_close_cb);
 -              }
 +    -> result::Result<TcpSocket, TcpConnectErrData> {
 +    unsafe {
 +        let result_po = oldcomm::Port::<ConnAttempt>();
 +        let closed_signal_po = oldcomm::Port::<()>();
 +        let conn_data = {
 +            result_ch: oldcomm::Chan(&result_po),
 +            closed_signal_ch: oldcomm::Chan(&closed_signal_po)
 +        };
 +        let conn_data_ptr = ptr::addr_of(&conn_data);
 +        let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>();
 +        let stream_handle_ptr = malloc_uv_tcp_t();
 +        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
 +        let socket_data = @TcpSocketData {
 +            reader_po: reader_po,
 +            reader_ch: oldcomm::Chan(&reader_po),
 +            stream_handle_ptr: stream_handle_ptr,
 +            connect_req: uv::ll::connect_t(),
 +            write_req: uv::ll::write_t(),
 +            ipv6: match input_ip {
 +                ip::Ipv4(_) => { false }
 +                ip::Ipv6(_) => { true }
 +            },
-             iotask: iotask
++            iotask: iotask.clone()
 +        };
 +        let socket_data_ptr = ptr::addr_of(&(*socket_data));
 +        log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
 +        // get an unsafe representation of our stream_handle_ptr that
 +        // we can send into the interact cb to be handled in libuv..
 +        log(debug, fmt!("stream_handle_ptr outside interact %?",
-             stream_handle_ptr));
++                        stream_handle_ptr));
 +        do iotask::interact(iotask) |move input_ip, loop_ptr| {
 +            unsafe {
 +                log(debug, ~"in interact cb for tcp client connect..");
 +                log(debug, fmt!("stream_handle_ptr in interact %?",
-                     stream_handle_ptr));
++                                stream_handle_ptr));
 +                match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
-                   0i32 => {
-                     log(debug, ~"tcp_init successful");
-                     log(debug, ~"dealing w/ ipv4 connection..");
-                     let connect_req_ptr =
-                         ptr::addr_of(&((*socket_data_ptr).connect_req));
-                     let addr_str = ip::format_addr(&input_ip);
-                     let connect_result = match input_ip {
-                       ip::Ipv4(ref addr) => {
-                         // have to "recreate" the sockaddr_in/6
-                         // since the ip_addr discards the port
-                         // info.. should probably add an additional
-                         // rust type that actually is closer to
-                         // what the libuv API expects (ip str + port num)
-                         log(debug, fmt!("addr: %?", addr));
-                         let in_addr = uv::ll::ip4_addr(addr_str, port as int);
-                         uv::ll::tcp_connect(
-                             connect_req_ptr,
-                             stream_handle_ptr,
-                             ptr::addr_of(&in_addr),
-                             tcp_connect_on_connect_cb)
-                       }
-                       ip::Ipv6(ref addr) => {
-                         log(debug, fmt!("addr: %?", addr));
-                         let in_addr = uv::ll::ip6_addr(addr_str, port as int);
-                         uv::ll::tcp_connect6(
-                             connect_req_ptr,
-                             stream_handle_ptr,
-                             ptr::addr_of(&in_addr),
-                             tcp_connect_on_connect_cb)
-                       }
-                     };
-                     match connect_result {
-                       0i32 => {
-                         log(debug, ~"tcp_connect successful");
-                         // reusable data that we'll have for the
-                         // duration..
-                         uv::ll::set_data_for_uv_handle(stream_handle_ptr,
-                                                    socket_data_ptr as
-                                                       *libc::c_void);
-                         // just so the connect_cb can send the
-                         // outcome..
-                         uv::ll::set_data_for_req(connect_req_ptr,
-                                                  conn_data_ptr);
-                         log(debug, ~"leaving tcp_connect interact cb...");
-                         // let tcp_connect_on_connect_cb send on
-                         // the result_ch, now..
-                       }
-                       _ => {
-                         // immediate connect failure.. probably a garbage
-                         // ip or somesuch
++                    0i32 => {
++                        log(debug, ~"tcp_init successful");
++                        log(debug, ~"dealing w/ ipv4 connection..");
++                        let connect_req_ptr =
++                            ptr::addr_of(&((*socket_data_ptr).connect_req));
++                        let addr_str = ip::format_addr(&input_ip);
++                        let connect_result = match input_ip {
++                            ip::Ipv4(ref addr) => {
++                                // have to "recreate" the
++                                // sockaddr_in/6 since the ip_addr
++                                // discards the port info.. should
++                                // probably add an additional rust
++                                // type that actually is closer to
++                                // what the libuv API expects (ip str
++                                // + port num)
++                                log(debug, fmt!("addr: %?", addr));
++                                let in_addr = uv::ll::ip4_addr(addr_str,
++                                                               port as int);
++                                uv::ll::tcp_connect(
++                                    connect_req_ptr,
++                                    stream_handle_ptr,
++                                    ptr::addr_of(&in_addr),
++                                    tcp_connect_on_connect_cb)
++                            }
++                            ip::Ipv6(ref addr) => {
++                                log(debug, fmt!("addr: %?", addr));
++                                let in_addr = uv::ll::ip6_addr(addr_str,
++                                                               port as int);
++                                uv::ll::tcp_connect6(
++                                    connect_req_ptr,
++                                    stream_handle_ptr,
++                                    ptr::addr_of(&in_addr),
++                                    tcp_connect_on_connect_cb)
++                            }
++                        };
++                        match connect_result {
++                            0i32 => {
++                                log(debug, ~"tcp_connect successful");
++                                // reusable data that we'll have for the
++                                // duration..
++                                uv::ll::set_data_for_uv_handle(
++                                    stream_handle_ptr,
++                                    socket_data_ptr as
++                                    *libc::c_void);
++                                // just so the connect_cb can send the
++                                // outcome..
++                                uv::ll::set_data_for_req(connect_req_ptr,
++                                                         conn_data_ptr);
++                                log(debug,
++                                    ~"leaving tcp_connect interact cb...");
++                                // let tcp_connect_on_connect_cb send on
++                                // the result_ch, now..
++                            }
++                            _ => {
++                                // immediate connect
++                                // failure.. probably a garbage ip or
++                                // somesuch
++                                let err_data =
++                                    uv::ll::get_last_err_data(loop_ptr);
++                                oldcomm::send((*conn_data_ptr).result_ch,
++                                              ConnFailure(err_data));
++                                uv::ll::set_data_for_uv_handle(
++                                    stream_handle_ptr,
++                                    conn_data_ptr);
++                                uv::ll::close(stream_handle_ptr,
++                                              stream_error_close_cb);
++                            }
++                        }
++                    }
++                    _ => {
++                        // failure to create a tcp handle
 +                        let err_data = uv::ll::get_last_err_data(loop_ptr);
 +                        oldcomm::send((*conn_data_ptr).result_ch,
-                                    ConnFailure(err_data));
-                         uv::ll::set_data_for_uv_handle(stream_handle_ptr,
-                                                        conn_data_ptr);
-                         uv::ll::close(stream_handle_ptr,
-                                       stream_error_close_cb);
-                       }
++                                      ConnFailure(err_data));
 +                    }
-                   }
-                   _ => {
-                     // failure to create a tcp handle
-                     let err_data = uv::ll::get_last_err_data(loop_ptr);
-                     oldcomm::send((*conn_data_ptr).result_ch,
-                                ConnFailure(err_data));
-                   }
 +                }
              }
-         };
+         }
 -          _ => {
 -            // failure to create a tcp handle
 -            let err_data = uv::ll::get_last_err_data(loop_ptr);
 -            oldcomm::send((*conn_data_ptr).result_ch,
 -                       ConnFailure(err_data.to_tcp_err()));
 -          }
 +        match oldcomm::recv(result_po) {
-           ConnSuccess => {
-             log(debug, ~"tcp::connect - received success on result_po");
-             result::Ok(TcpSocket(socket_data))
-           }
-           ConnFailure(ref err_data) => {
-             oldcomm::recv(closed_signal_po);
-             log(debug, ~"tcp::connect - received failure on result_po");
-             // still have to free the malloc'd stream handle..
-             rustrt::rust_uv_current_kernel_free(stream_handle_ptr
-                                                as *libc::c_void);
-             let tcp_conn_err = match err_data.err_name {
-               ~"ECONNREFUSED" => ConnectionRefused,
-               _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
-             };
-             result::Err(tcp_conn_err)
-           }
++            ConnSuccess => {
++                log(debug, ~"tcp::connect - received success on result_po");
++                result::Ok(TcpSocket(socket_data))
++            }
++            ConnFailure(ref err_data) => {
++                oldcomm::recv(closed_signal_po);
++                log(debug, ~"tcp::connect - received failure on result_po");
++                // still have to free the malloc'd stream handle..
++                rustrt::rust_uv_current_kernel_free(stream_handle_ptr
++                                                    as *libc::c_void);
++                let tcp_conn_err = match err_data.err_name {
++                    ~"ECONNREFUSED" => ConnectionRefused,
++                    _ => GenericConnectErr(err_data.err_name,
++                                           err_data.err_msg)
++                };
++                result::Err(tcp_conn_err)
++            }
          }
 -    };
 -    match oldcomm::recv(result_po) {
 -      ConnSuccess => {
 -        log(debug, ~"tcp::connect - received success on result_po");
 -        result::Ok(TcpSocket(socket_data))
 -      }
 -      ConnFailure(ref err_data) => {
 -        oldcomm::recv(closed_signal_po);
 -        log(debug, ~"tcp::connect - received failure on result_po");
 -        // still have to free the malloc'd stream handle..
 -        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
 -                                           as *libc::c_void);
 -        let tcp_conn_err = match err_data.err_name {
 -          ~"ECONNREFUSED" => ConnectionRefused,
 -          _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
 -        };
 -        result::Err(tcp_conn_err)
 -      }
      }
  }
  
@@@ -504,74 -489,73 +514,82 @@@ fn read_future(sock: &TcpSocket, timeou
   * as the `err` variant of a `result`.
   */
  pub fn accept(new_conn: TcpNewConnection)
 -    -> result::Result<TcpSocket, TcpErrData> unsafe {
 -
 -    match new_conn{
 -      NewTcpConn(server_handle_ptr) => {
 -        let server_data_ptr = uv::ll::get_data_for_uv_handle(
 -            server_handle_ptr) as *TcpListenFcData;
 -        let reader_po = oldcomm::Port();
 -        let iotask = &(*server_data_ptr).iotask;
 -        let stream_handle_ptr = malloc_uv_tcp_t();
 -        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
 -        let client_socket_data: @TcpSocketData = @{
 -            reader_po: reader_po,
 -            reader_ch: oldcomm::Chan(&reader_po),
 -            stream_handle_ptr : stream_handle_ptr,
 -            connect_req : uv::ll::connect_t(),
 -            write_req : uv::ll::write_t(),
 -            ipv6: (*server_data_ptr).ipv6,
 -            iotask : iotask.clone()
 -        };
 -        let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
 -        let client_stream_handle_ptr =
 -            (*client_socket_data_ptr).stream_handle_ptr;
 -
 -        let result_po = oldcomm::Port::<Option<TcpErrData>>();
 -        let result_ch = oldcomm::Chan(&result_po);
 -
 -        // UNSAFE LIBUV INTERACTION BEGIN
 -        // .. normally this happens within the context of
 -        // a call to uv::hl::interact.. but we're breaking
 -        // the rules here because this always has to be
 -        // called within the context of a listen() new_connect_cb
 -        // callback (or it will likely fail and drown your cat)
 -        log(debug, ~"in interact cb for tcp::accept");
 -        let loop_ptr = uv::ll::get_loop_for_uv_handle(
 -            server_handle_ptr);
 -        match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
 -          0i32 => {
 -            log(debug, ~"uv_tcp_init successful for client stream");
 -            match uv::ll::accept(
 -                server_handle_ptr as *libc::c_void,
 -                client_stream_handle_ptr as *libc::c_void) {
 -              0i32 => {
 -                log(debug, ~"successfully accepted client connection");
 -                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
 -                                               client_socket_data_ptr
 -                                                   as *libc::c_void);
 -                oldcomm::send(result_ch, None);
 -              }
 -              _ => {
 -                log(debug, ~"failed to accept client conn");
 -                oldcomm::send(result_ch, Some(
 -                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
 -              }
 +    -> result::Result<TcpSocket, TcpErrData> {
 +    unsafe {
-         match new_conn {
-           NewTcpConn(server_handle_ptr) => {
-             let server_data_ptr = uv::ll::get_data_for_uv_handle(
-                 server_handle_ptr) as *TcpListenFcData;
-             let reader_po = oldcomm::Port();
-             let iotask = (*server_data_ptr).iotask;
-             let stream_handle_ptr = malloc_uv_tcp_t();
-             *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
-             let client_socket_data = @TcpSocketData {
-                 reader_po: reader_po,
-                 reader_ch: oldcomm::Chan(&reader_po),
-                 stream_handle_ptr : stream_handle_ptr,
-                 connect_req : uv::ll::connect_t(),
-                 write_req : uv::ll::write_t(),
-                 ipv6: (*server_data_ptr).ipv6,
-                 iotask : iotask
-             };
-             let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
-             let client_stream_handle_ptr =
-                 (*client_socket_data_ptr).stream_handle_ptr;
-             let result_po = oldcomm::Port::<Option<TcpErrData>>();
-             let result_ch = oldcomm::Chan(&result_po);
-             // UNSAFE LIBUV INTERACTION BEGIN
-             // .. normally this happens within the context of
-             // a call to uv::hl::interact.. but we're breaking
-             // the rules here because this always has to be
-             // called within the context of a listen() new_connect_cb
-             // callback (or it will likely fail and drown your cat)
-             log(debug, ~"in interact cb for tcp::accept");
-             let loop_ptr = uv::ll::get_loop_for_uv_handle(
-                 server_handle_ptr);
-             match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
-               0i32 => {
-                 log(debug, ~"uv_tcp_init successful for client stream");
-                 match uv::ll::accept(
-                     server_handle_ptr as *libc::c_void,
-                     client_stream_handle_ptr as *libc::c_void) {
-                   0i32 => {
-                     log(debug, ~"successfully accepted client connection");
-                     uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
-                                                    client_socket_data_ptr
-                                                        as *libc::c_void);
-                     oldcomm::send(result_ch, None);
-                   }
-                   _ => {
-                     log(debug, ~"failed to accept client conn");
-                     oldcomm::send(result_ch, Some(
-                         uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
-                   }
++        match new_conn{
++            NewTcpConn(server_handle_ptr) => {
++                let server_data_ptr = uv::ll::get_data_for_uv_handle(
++                    server_handle_ptr) as *TcpListenFcData;
++                let reader_po = oldcomm::Port();
++                let iotask = &(*server_data_ptr).iotask;
++                let stream_handle_ptr = malloc_uv_tcp_t();
++                *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
++                    uv::ll::tcp_t();
++                let client_socket_data: @TcpSocketData = @TcpSocketData {
++                    reader_po: reader_po,
++                    reader_ch: oldcomm::Chan(&reader_po),
++                    stream_handle_ptr : stream_handle_ptr,
++                    connect_req : uv::ll::connect_t(),
++                    write_req : uv::ll::write_t(),
++                    ipv6: (*server_data_ptr).ipv6,
++                    iotask : iotask.clone()
++                };
++                let client_socket_data_ptr = ptr::addr_of(
++                    &(*client_socket_data));
++                let client_stream_handle_ptr =
++                    (*client_socket_data_ptr).stream_handle_ptr;
++
++                let result_po = oldcomm::Port::<Option<TcpErrData>>();
++                let result_ch = oldcomm::Chan(&result_po);
++
++                // UNSAFE LIBUV INTERACTION BEGIN
++                // .. normally this happens within the context of
++                // a call to uv::hl::interact.. but we're breaking
++                // the rules here because this always has to be
++                // called within the context of a listen() new_connect_cb
++                // callback (or it will likely fail and drown your cat)
++                log(debug, ~"in interact cb for tcp::accept");
++                let loop_ptr = uv::ll::get_loop_for_uv_handle(
++                    server_handle_ptr);
++                match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
++                    0i32 => {
++                        log(debug, ~"uv_tcp_init successful for \
++                                     client stream");
++                        match uv::ll::accept(
++                            server_handle_ptr as *libc::c_void,
++                            client_stream_handle_ptr as *libc::c_void) {
++                            0i32 => {
++                                log(debug,
++                                    ~"successfully accepted client \
++                                      connection");
++                                uv::ll::set_data_for_uv_handle(
++                                    client_stream_handle_ptr,
++                                    client_socket_data_ptr
++                                    as *libc::c_void);
++                                oldcomm::send(result_ch, None);
++                            }
++                            _ => {
++                                log(debug, ~"failed to accept client conn");
++                                oldcomm::send(result_ch, Some(
++                                    uv::ll::get_last_err_data(
++                                        loop_ptr).to_tcp_err()));
++                            }
++                        }
++                    }
++                    _ => {
++                        log(debug, ~"failed to accept client stream");
++                        oldcomm::send(result_ch, Some(
++                            uv::ll::get_last_err_data(
++                                loop_ptr).to_tcp_err()));
++                    }
++                }
++                // UNSAFE LIBUV INTERACTION END
++                match oldcomm::recv(result_po) {
++                    Some(copy err_data) => result::Err(err_data),
++                    None => result::Ok(TcpSocket(client_socket_data))
 +                }
-               }
-               _ => {
-                 log(debug, ~"failed to init client stream");
-                 oldcomm::send(result_ch, Some(
-                     uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
-               }
-             }
-             // UNSAFE LIBUV INTERACTION END
-             match oldcomm::recv(result_po) {
-               Some(copy err_data) => result::Err(err_data),
-               None => result::Ok(TcpSocket(client_socket_data))
              }
 -          }
 -          _ => {
 -            log(debug, ~"failed to init client stream");
 -            oldcomm::send(result_ch, Some(
 -                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
--          }
 -        }
 -        // UNSAFE LIBUV INTERACTION END
 -        match oldcomm::recv(result_po) {
 -          Some(copy err_data) => result::Err(err_data),
 -          None => result::Ok(TcpSocket(client_socket_data))
          }
 -      }
      }
  }
  
   * of listen exiting because of an error
   */
  pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
-           iotask: IoTask,
-           on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
-           new_connect_cb: fn~(TcpNewConnection,
-                                oldcomm::Chan<Option<TcpErrData>>))
+               iotask: &IoTask,
+               on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+               new_connect_cb: fn~(TcpNewConnection,
+                                   oldcomm::Chan<Option<TcpErrData>>))
 -    -> result::Result<(), TcpListenErrData> unsafe {
 +    -> result::Result<(), TcpListenErrData> {
-     unsafe {
-         do listen_common(move host_ip, port, backlog, iotask,
-                          move on_establish_cb)
-             // on_connect_cb
-             |move new_connect_cb, handle| {
-                 unsafe {
-                     let server_data_ptr =
-                         uv::ll::get_data_for_uv_handle(handle)
-                         as *TcpListenFcData;
-                     let new_conn = NewTcpConn(handle);
-                     let kill_ch = (*server_data_ptr).kill_ch;
-                     new_connect_cb(new_conn, kill_ch);
-                 }
-             }
+     do listen_common(move host_ip, port, backlog, iotask,
+                      move on_establish_cb)
+         // on_connect_cb
 -        |move new_connect_cb, handle| unsafe {
++        |move new_connect_cb, handle| {
++        unsafe {
+             let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
+                 as *TcpListenFcData;
+             let new_conn = NewTcpConn(handle);
+             let kill_ch = (*server_data_ptr).kill_ch;
+             new_connect_cb(new_conn, kill_ch);
++        }
      }
  }
  
  fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
-           iotask: IoTask,
+           iotask: &IoTask,
            on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
            on_connect_cb: fn~(*uv::ll::uv_tcp_t))
 -    -> result::Result<(), TcpListenErrData> unsafe {
 -    let stream_closed_po = oldcomm::Port::<()>();
 -    let kill_po = oldcomm::Port::<Option<TcpErrData>>();
 -    let kill_ch = oldcomm::Chan(&kill_po);
 -    let server_stream = uv::ll::tcp_t();
 -    let server_stream_ptr = ptr::addr_of(&server_stream);
 -    let server_data: TcpListenFcData = {
 -        server_stream_ptr: server_stream_ptr,
 -        stream_closed_ch: oldcomm::Chan(&stream_closed_po),
 -        kill_ch: kill_ch,
 -        on_connect_cb: move on_connect_cb,
 -        iotask: iotask.clone(),
 -        ipv6: match &host_ip {
 -            &ip::Ipv4(_) => { false }
 -            &ip::Ipv6(_) => { true }
 -        },
 -        mut active: true
 -    };
 -    let server_data_ptr = ptr::addr_of(&server_data);
 -
 -    let setup_result = do oldcomm::listen |setup_ch| {
 -        // this is to address a compiler warning about
 -        // an implicit copy.. it seems that double nested
 -        // will defeat a move sigil, as is done to the host_ip
 -        // arg above.. this same pattern works w/o complaint in
 -        // tcp::connect (because the iotask::interact cb isn't
 -        // nested within a core::comm::listen block)
 -        let loc_ip = copy(host_ip);
 -        do iotask::interact(iotask) |move loc_ip, loop_ptr| unsafe {
 -            match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
 -              0i32 => {
 -                uv::ll::set_data_for_uv_handle(
 -                    server_stream_ptr,
 -                    server_data_ptr);
 -                let addr_str = ip::format_addr(&loc_ip);
 -                let bind_result = match loc_ip {
 -                  ip::Ipv4(ref addr) => {
 -                    log(debug, fmt!("addr: %?", addr));
 -                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
 -                    uv::ll::tcp_bind(server_stream_ptr,
 -                                     ptr::addr_of(&in_addr))
 -                  }
 -                  ip::Ipv6(ref addr) => {
 -                    log(debug, fmt!("addr: %?", addr));
 -                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
 -                    uv::ll::tcp_bind6(server_stream_ptr,
 -                                     ptr::addr_of(&in_addr))
 -                  }
 -                };
 -                match bind_result {
 -                  0i32 => {
 -                    match uv::ll::listen(server_stream_ptr,
 -                                       backlog as libc::c_int,
 -                                       tcp_lfc_on_connection_cb) {
 -                      0i32 => oldcomm::send(setup_ch, None),
 -                      _ => {
 -                        log(debug, ~"failure to uv_listen()");
 -                        let err_data = uv::ll::get_last_err_data(loop_ptr);
 -                        oldcomm::send(setup_ch, Some(err_data));
 -                      }
 +    -> result::Result<(), TcpListenErrData> {
 +    unsafe {
 +        let stream_closed_po = oldcomm::Port::<()>();
 +        let kill_po = oldcomm::Port::<Option<TcpErrData>>();
 +        let kill_ch = oldcomm::Chan(&kill_po);
 +        let server_stream = uv::ll::tcp_t();
 +        let server_stream_ptr = ptr::addr_of(&server_stream);
-         let server_data = {
++        let server_data: TcpListenFcData = TcpListenFcData {
 +            server_stream_ptr: server_stream_ptr,
 +            stream_closed_ch: oldcomm::Chan(&stream_closed_po),
 +            kill_ch: kill_ch,
 +            on_connect_cb: move on_connect_cb,
-             iotask: iotask,
++            iotask: iotask.clone(),
 +            ipv6: match &host_ip {
 +                &ip::Ipv4(_) => { false }
 +                &ip::Ipv6(_) => { true }
 +            },
 +            mut active: true
 +        };
 +        let server_data_ptr = ptr::addr_of(&server_data);
 +
 +        let setup_result = do oldcomm::listen |setup_ch| {
 +            // this is to address a compiler warning about
 +            // an implicit copy.. it seems that double nested
 +            // will defeat a move sigil, as is done to the host_ip
 +            // arg above.. this same pattern works w/o complaint in
 +            // tcp::connect (because the iotask::interact cb isn't
 +            // nested within a core::comm::listen block)
 +            let loc_ip = copy(host_ip);
 +            do iotask::interact(iotask) |move loc_ip, loop_ptr| {
 +                unsafe {
 +                    match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
-                       0i32 => {
-                         uv::ll::set_data_for_uv_handle(
-                             server_stream_ptr,
-                             server_data_ptr);
-                         let addr_str = ip::format_addr(&loc_ip);
-                         let bind_result = match loc_ip {
-                           ip::Ipv4(ref addr) => {
-                             log(debug, fmt!("addr: %?", addr));
-                             let in_addr = uv::ll::ip4_addr(addr_str,
-                                                            port as int);
-                             uv::ll::tcp_bind(server_stream_ptr,
-                                              ptr::addr_of(&in_addr))
-                           }
-                           ip::Ipv6(ref addr) => {
-                             log(debug, fmt!("addr: %?", addr));
-                             let in_addr = uv::ll::ip6_addr(addr_str,
-                                                            port as int);
-                             uv::ll::tcp_bind6(server_stream_ptr,
-                                              ptr::addr_of(&in_addr))
-                           }
-                         };
-                         match bind_result {
-                           0i32 => {
-                             match uv::ll::listen(server_stream_ptr,
-                                                backlog as libc::c_int,
-                                                tcp_lfc_on_connection_cb) {
-                               0i32 => oldcomm::send(setup_ch, None),
-                               _ => {
-                                 log(debug, ~"failure to uv_listen()");
-                                 let err_data = uv::ll::get_last_err_data(
-                                     loop_ptr);
-                                 oldcomm::send(setup_ch, Some(err_data));
-                               }
++                        0i32 => {
++                            uv::ll::set_data_for_uv_handle(
++                                server_stream_ptr,
++                                server_data_ptr);
++                            let addr_str = ip::format_addr(&loc_ip);
++                            let bind_result = match loc_ip {
++                                ip::Ipv4(ref addr) => {
++                                    log(debug, fmt!("addr: %?", addr));
++                                    let in_addr = uv::ll::ip4_addr(
++                                        addr_str,
++                                        port as int);
++                                    uv::ll::tcp_bind(server_stream_ptr,
++                                                     ptr::addr_of(&in_addr))
++                                }
++                                ip::Ipv6(ref addr) => {
++                                    log(debug, fmt!("addr: %?", addr));
++                                    let in_addr = uv::ll::ip6_addr(
++                                        addr_str,
++                                        port as int);
++                                    uv::ll::tcp_bind6(server_stream_ptr,
++                                                      ptr::addr_of(&in_addr))
++                                }
++                            };
++                            match bind_result {
++                                0i32 => {
++                                    match uv::ll::listen(
++                                        server_stream_ptr,
++                                        backlog as libc::c_int,
++                                        tcp_lfc_on_connection_cb) {
++                                        0i32 => oldcomm::send(setup_ch, None),
++                                        _ => {
++                                            log(debug,
++                                                ~"failure to uv_tcp_init");
++                                            let err_data =
++                                                uv::ll::get_last_err_data(
++                                                    loop_ptr);
++                                            oldcomm::send(setup_ch,
++                                                          Some(err_data));
++                                        }
++                                    }
++                                }
++                                _ => {
++                                    log(debug, ~"failure to uv_tcp_bind");
++                                    let err_data = uv::ll::get_last_err_data(
++                                        loop_ptr);
++                                    oldcomm::send(setup_ch, Some(err_data));
++                                }
 +                            }
-                           }
-                           _ => {
++                        }
++                        _ => {
 +                            log(debug, ~"failure to uv_tcp_bind");
 +                            let err_data = uv::ll::get_last_err_data(
 +                                loop_ptr);
 +                            oldcomm::send(setup_ch, Some(err_data));
-                           }
 +                        }
-                       }
-                       _ => {
-                         log(debug, ~"failure to uv_tcp_init");
-                         let err_data = uv::ll::get_last_err_data(loop_ptr);
-                         oldcomm::send(setup_ch, Some(err_data));
-                       }
                      }
-                 };
 -                  }
 -                  _ => {
 -                    log(debug, ~"failure to uv_tcp_bind");
 -                    let err_data = uv::ll::get_last_err_data(loop_ptr);
 -                    oldcomm::send(setup_ch, Some(err_data));
 -                  }
+                 }
 -              }
 -              _ => {
 -                log(debug, ~"failure to uv_tcp_init");
 -                let err_data = uv::ll::get_last_err_data(loop_ptr);
 -                oldcomm::send(setup_ch, Some(err_data));
 -              }
              }
 +            setup_ch.recv()
          };
 -        setup_ch.recv()
 -    };
 -    match setup_result {
 -      Some(ref err_data) => {
 -        do iotask::interact(iotask) |loop_ptr| unsafe {
 -            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
 -                            loop_ptr));
 -            (*server_data_ptr).active = false;
 -            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
 -        };
 -        stream_closed_po.recv();
 -        match err_data.err_name {
 -          ~"EACCES" => {
 -            log(debug, ~"Got EACCES error");
 -            result::Err(AccessDenied)
 -          }
 -          ~"EADDRINUSE" => {
 -            log(debug, ~"Got EADDRINUSE error");
 -            result::Err(AddressInUse)
 -          }
 -          _ => {
 -            log(debug, fmt!("Got '%s' '%s' libuv error",
 -                            err_data.err_name, err_data.err_msg));
 -            result::Err(
 -                GenericListenErr(err_data.err_name, err_data.err_msg))
 -          }
 -        }
 -      }
 -      None => {
 -        on_establish_cb(kill_ch);
 -        let kill_result = oldcomm::recv(kill_po);
 -        do iotask::interact(iotask) |loop_ptr| unsafe {
 -            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
 -                            loop_ptr));
 -            (*server_data_ptr).active = false;
 -            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
 -        };
 -        stream_closed_po.recv();
 -        match kill_result {
 -          // some failure post bind/listen
 -          Some(ref err_data) => result::Err(GenericListenErr(
 -              err_data.err_name,
 -              err_data.err_msg)),
 -          // clean exit
 -          None => result::Ok(())
 +        match setup_result {
-           Some(ref err_data) => {
-             do iotask::interact(iotask) |loop_ptr| {
-                 unsafe {
-                     log(debug,
-                         fmt!("tcp::listen post-kill recv hl interact %?",
-                              loop_ptr));
-                     (*server_data_ptr).active = false;
-                     uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++            Some(ref err_data) => {
++                do iotask::interact(iotask) |loop_ptr| {
++                    unsafe {
++                        log(debug,
++                            fmt!("tcp::listen post-kill recv hl interact %?",
++                                 loop_ptr));
++                        (*server_data_ptr).active = false;
++                        uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++                    }
++                };
++                stream_closed_po.recv();
++                match err_data.err_name {
++                    ~"EACCES" => {
++                        log(debug, ~"Got EACCES error");
++                        result::Err(AccessDenied)
++                    }
++                    ~"EADDRINUSE" => {
++                        log(debug, ~"Got EADDRINUSE error");
++                        result::Err(AddressInUse)
++                    }
++                    _ => {
++                        log(debug, fmt!("Got '%s' '%s' libuv error",
++                                        err_data.err_name, err_data.err_msg));
++                        result::Err(
++                            GenericListenErr(err_data.err_name,
++                                             err_data.err_msg))
++                    }
 +                }
-             };
-             stream_closed_po.recv();
-             match err_data.err_name {
-               ~"EACCES" => {
-                 log(debug, ~"Got EACCES error");
-                 result::Err(AccessDenied)
-               }
-               ~"EADDRINUSE" => {
-                 log(debug, ~"Got EADDRINUSE error");
-                 result::Err(AddressInUse)
-               }
-               _ => {
-                 log(debug, fmt!("Got '%s' '%s' libuv error",
-                                 err_data.err_name, err_data.err_msg));
-                 result::Err(
-                     GenericListenErr(err_data.err_name, err_data.err_msg))
-               }
 +            }
-           }
-           None => {
-             on_establish_cb(kill_ch);
-             let kill_result = oldcomm::recv(kill_po);
-             do iotask::interact(iotask) |loop_ptr| {
-                 unsafe {
-                     log(debug,
-                         fmt!("tcp::listen post-kill recv hl interact %?",
-                              loop_ptr));
-                     (*server_data_ptr).active = false;
-                     uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++            None => {
++                on_establish_cb(kill_ch);
++                let kill_result = oldcomm::recv(kill_po);
++                do iotask::interact(iotask) |loop_ptr| {
++                    unsafe {
++                        log(debug,
++                            fmt!("tcp::listen post-kill recv hl interact %?",
++                                 loop_ptr));
++                        (*server_data_ptr).active = false;
++                        uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++                    }
++                };
++                stream_closed_po.recv();
++                match kill_result {
++                    // some failure post bind/listen
++                    Some(ref err_data) => result::Err(GenericListenErr(
++                        err_data.err_name,
++                        err_data.err_msg)),
++                    // clean exit
++                    None => result::Ok(())
 +                }
-             };
-             stream_closed_po.recv();
-             match kill_result {
-               // some failure post bind/listen
-               Some(ref err_data) => result::Err(GenericListenErr(
-                   err_data.err_name,
-                   err_data.err_msg)),
-               // clean exit
-               None => result::Ok(())
 +            }
-           }
          }
 -      }
      }
  }
  
++
  /**
   * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
   *
@@@ -927,141 -887,122 +951,140 @@@ impl TcpSocketBuf: io::Writer 
  
  // INTERNAL API
  
 -fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
 -    let closed_po = oldcomm::Port::<()>();
 -    let closed_ch = oldcomm::Chan(&closed_po);
 -    let close_data = {
 -        closed_ch: closed_ch
 -    };
 -    let close_data_ptr = ptr::addr_of(&close_data);
 -    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
 -    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
 -        log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
 -            stream_handle_ptr, loop_ptr));
 -        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
 -                                       close_data_ptr);
 -        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
 -    };
 -    oldcomm::recv(closed_po);
 -    //the line below will most likely crash
 -    //log(debug, fmt!("about to free socket_data at %?", socket_data));
 -    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
 -                                       as *libc::c_void);
 -    log(debug, ~"exiting dtor for tcp_socket");
 +fn tear_down_socket_data(socket_data: @TcpSocketData) {
 +    unsafe {
 +        let closed_po = oldcomm::Port::<()>();
 +        let closed_ch = oldcomm::Chan(&closed_po);
 +        let close_data = {
 +            closed_ch: closed_ch
 +        };
 +        let close_data_ptr = ptr::addr_of(&close_data);
 +        let stream_handle_ptr = (*socket_data).stream_handle_ptr;
-         do iotask::interact((*socket_data).iotask) |loop_ptr| {
++        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
 +            unsafe {
 +                log(debug,
 +                    fmt!("interact dtor for tcp_socket stream %? loop %?",
-                     stream_handle_ptr, loop_ptr));
++                         stream_handle_ptr, loop_ptr));
 +                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
 +                                               close_data_ptr);
 +                uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
 +            }
 +        };
 +        oldcomm::recv(closed_po);
 +        //the line below will most likely crash
 +        //log(debug, fmt!("about to free socket_data at %?", socket_data));
 +        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
-                                            as *libc::c_void);
++                                            as *libc::c_void);
 +        log(debug, ~"exiting dtor for tcp_socket");
 +    }
  }
  
  // shared implementation for tcp::read
  fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
 -    -> result::Result<~[u8],TcpErrData> unsafe {
 -    use timer;
 -
 -    log(debug, ~"starting tcp::read");
 -    let iotask = &(*socket_data).iotask;
 -    let rs_result = read_start_common_impl(socket_data);
 -    if result::is_err(&rs_result) {
 -        let err_data = result::get_err(&rs_result);
 -        result::Err(err_data)
 -    }
 -    else {
 -        log(debug, ~"tcp::read before recv_timeout");
 -        let read_result = if timeout_msecs > 0u {
 -            timer::recv_timeout(
 -               iotask, timeout_msecs, result::get(&rs_result))
 -        } else {
 -            Some(oldcomm::recv(result::get(&rs_result)))
 -        };
 -        log(debug, ~"tcp::read after recv_timeout");
 -        match move read_result {
 -          None => {
 -            log(debug, ~"tcp::read: timed out..");
 -            let err_data = {
 -                err_name: ~"TIMEOUT",
 -                err_msg: ~"req timed out"
 -            };
 -            read_stop_common_impl(socket_data);
 +    -> result::Result<~[u8],TcpErrData> {
 +    unsafe {
 +        use timer;
 +
 +        log(debug, ~"starting tcp::read");
-         let iotask = (*socket_data).iotask;
++        let iotask = &(*socket_data).iotask;
 +        let rs_result = read_start_common_impl(socket_data);
 +        if result::is_err(&rs_result) {
 +            let err_data = result::get_err(&rs_result);
              result::Err(err_data)
 -          }
 -          Some(move data_result) => {
 -            log(debug, ~"tcp::read got data");
 -            read_stop_common_impl(socket_data);
 -            data_result
 -          }
 +        }
 +        else {
 +            log(debug, ~"tcp::read before recv_timeout");
 +            let read_result = if timeout_msecs > 0u {
 +                timer::recv_timeout(
-                    iotask, timeout_msecs, result::get(&rs_result))
++                    iotask, timeout_msecs, result::get(&rs_result))
 +            } else {
 +                Some(oldcomm::recv(result::get(&rs_result)))
 +            };
 +            log(debug, ~"tcp::read after recv_timeout");
 +            match move read_result {
-               None => {
-                 log(debug, ~"tcp::read: timed out..");
-                 let err_data = TcpErrData {
-                     err_name: ~"TIMEOUT",
-                     err_msg: ~"req timed out"
-                 };
-                 read_stop_common_impl(socket_data);
-                 result::Err(err_data)
-               }
-               Some(move data_result) => {
-                 log(debug, ~"tcp::read got data");
-                 read_stop_common_impl(socket_data);
-                 data_result
-               }
++                None => {
++                    log(debug, ~"tcp::read: timed out..");
++                    let err_data = TcpErrData {
++                        err_name: ~"TIMEOUT",
++                        err_msg: ~"req timed out"
++                    };
++                    read_stop_common_impl(socket_data);
++                    result::Err(err_data)
++                }
++                Some(move data_result) => {
++                    log(debug, ~"tcp::read got data");
++                    read_stop_common_impl(socket_data);
++                    data_result
++                }
 +            }
          }
      }
  }
  
  // shared impl for read_stop
  fn read_stop_common_impl(socket_data: *TcpSocketData) ->
 -    result::Result<(), TcpErrData> unsafe {
 -    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
 -    let stop_po = oldcomm::Port::<Option<TcpErrData>>();
 -    let stop_ch = oldcomm::Chan(&stop_po);
 -    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
 -        log(debug, ~"in interact cb for tcp::read_stop");
 -        match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
 -          0i32 => {
 -            log(debug, ~"successfully called uv_read_stop");
 -            oldcomm::send(stop_ch, None);
 -          }
 -          _ => {
 -            log(debug, ~"failure in calling uv_read_stop");
 -            let err_data = uv::ll::get_last_err_data(loop_ptr);
 -            oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
 -          }
 +    result::Result<(), TcpErrData> {
 +    unsafe {
 +        let stream_handle_ptr = (*socket_data).stream_handle_ptr;
 +        let stop_po = oldcomm::Port::<Option<TcpErrData>>();
 +        let stop_ch = oldcomm::Chan(&stop_po);
-         do iotask::interact((*socket_data).iotask) |loop_ptr| {
++        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
 +            unsafe {
 +                log(debug, ~"in interact cb for tcp::read_stop");
-                 match uv::ll::read_stop(stream_handle_ptr as
-                                         *uv::ll::uv_stream_t) {
-                   0i32 => {
-                     log(debug, ~"successfully called uv_read_stop");
-                     oldcomm::send(stop_ch, None);
-                   }
-                   _ => {
-                     log(debug, ~"failure in calling uv_read_stop");
-                     let err_data = uv::ll::get_last_err_data(loop_ptr);
-                     oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
-                   }
++                match uv::ll::read_stop(stream_handle_ptr
++                                        as *uv::ll::uv_stream_t) {
++                    0i32 => {
++                        log(debug, ~"successfully called uv_read_stop");
++                        oldcomm::send(stop_ch, None);
++                    }
++                    _ => {
++                        log(debug, ~"failure in calling uv_read_stop");
++                        let err_data = uv::ll::get_last_err_data(loop_ptr);
++                        oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
++                    }
 +                }
 +            }
-         };
++        }
 +        match oldcomm::recv(stop_po) {
-           Some(move err_data) => Err(err_data),
-           None => Ok(())
++            Some(move err_data) => Err(err_data),
++            None => Ok(())
          }
 -    };
 -    match oldcomm::recv(stop_po) {
 -      Some(ref err_data) => result::Err(err_data.to_tcp_err()),
 -      None => result::Ok(())
      }
  }
  
  // shared impl for read_start
  fn read_start_common_impl(socket_data: *TcpSocketData)
      -> result::Result<oldcomm::Port<
 -        result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
 -    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
 -    let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
 -    let start_ch = oldcomm::Chan(&start_po);
 -    log(debug, ~"in tcp::read_start before interact loop");
 -    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
 -        log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
 -        match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
 -                               on_alloc_cb,
 -                               on_tcp_read_cb) {
 -          0i32 => {
 -            log(debug, ~"success doing uv_read_start");
 -            oldcomm::send(start_ch, None);
 -          }
 -          _ => {
 -            log(debug, ~"error attempting uv_read_start");
 -            let err_data = uv::ll::get_last_err_data(loop_ptr);
 -            oldcomm::send(start_ch, Some(err_data));
 -          }
 +        result::Result<~[u8], TcpErrData>>, TcpErrData> {
 +    unsafe {
 +        let stream_handle_ptr = (*socket_data).stream_handle_ptr;
 +        let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
 +        let start_ch = oldcomm::Chan(&start_po);
 +        log(debug, ~"in tcp::read_start before interact loop");
-         do iotask::interact((*socket_data).iotask) |loop_ptr| {
++        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
 +            unsafe {
-                 log(debug,
-                     fmt!("in tcp::read_start interact cb %?", loop_ptr));
-                 match uv::ll::read_start(stream_handle_ptr as
-                                          *uv::ll::uv_stream_t,
++                log(debug, fmt!("in tcp::read_start interact cb %?",
++                                loop_ptr));
++                match uv::ll::read_start(stream_handle_ptr
++                                         as *uv::ll::uv_stream_t,
 +                                         on_alloc_cb,
 +                                         on_tcp_read_cb) {
-                   0i32 => {
-                     log(debug, ~"success doing uv_read_start");
-                     oldcomm::send(start_ch, None);
-                   }
-                   _ => {
-                     log(debug, ~"error attempting uv_read_start");
-                     let err_data = uv::ll::get_last_err_data(loop_ptr);
-                     oldcomm::send(start_ch, Some(err_data));
-                   }
++                    0i32 => {
++                        log(debug, ~"success doing uv_read_start");
++                        oldcomm::send(start_ch, None);
++                    }
++                    _ => {
++                        log(debug, ~"error attempting uv_read_start");
++                        let err_data = uv::ll::get_last_err_data(loop_ptr);
++                        oldcomm::send(start_ch, Some(err_data));
++                    }
 +                }
 +            }
-         };
++        }
 +        match oldcomm::recv(start_po) {
-           Some(ref err_data) => result::Err(err_data.to_tcp_err()),
-           None => result::Ok((*socket_data).reader_po)
++            Some(ref err_data) => result::Err(err_data.to_tcp_err()),
++            None => result::Ok((*socket_data).reader_po)
          }
 -    };
 -    match oldcomm::recv(start_po) {
 -      Some(ref err_data) => result::Err(err_data.to_tcp_err()),
 -      None => result::Ok((*socket_data).reader_po)
      }
  }
  
  // shared implementation used by write and write_future
  fn write_common_impl(socket_data_ptr: *TcpSocketData,
                       raw_write_data: ~[u8])
 -    -> result::Result<(), TcpErrData> unsafe {
 -    let write_req_ptr = ptr::addr_of(&((*socket_data_ptr).write_req));
 -    let stream_handle_ptr =
 -        (*socket_data_ptr).stream_handle_ptr;
 -    let write_buf_vec =  ~[ uv::ll::buf_init(
 -        vec::raw::to_ptr(raw_write_data),
 -        vec::len(raw_write_data)) ];
 -    let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
 -    let result_po = oldcomm::Port::<TcpWriteResult>();
 -    let write_data = {
 -        result_ch: oldcomm::Chan(&result_po)
 -    };
 -    let write_data_ptr = ptr::addr_of(&write_data);
 -    do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe {
 -        log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
 -        match uv::ll::write(write_req_ptr,
 -                          stream_handle_ptr,
 -                          write_buf_vec_ptr,
 -                          tcp_write_complete_cb) {
 -          0i32 => {
 -            log(debug, ~"uv_write() invoked successfully");
 -            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
 -          }
 -          _ => {
 -            log(debug, ~"error invoking uv_write()");
 -            let err_data = uv::ll::get_last_err_data(loop_ptr);
 -            oldcomm::send((*write_data_ptr).result_ch,
 -                       TcpWriteError(err_data.to_tcp_err()));
 -          }
 +    -> result::Result<(), TcpErrData> {
 +    unsafe {
 +        let write_req_ptr = ptr::addr_of(&((*socket_data_ptr).write_req));
 +        let stream_handle_ptr =
 +            (*socket_data_ptr).stream_handle_ptr;
 +        let write_buf_vec =  ~[ uv::ll::buf_init(
 +            vec::raw::to_ptr(raw_write_data),
 +            vec::len(raw_write_data)) ];
 +        let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
 +        let result_po = oldcomm::Port::<TcpWriteResult>();
 +        let write_data = {
 +            result_ch: oldcomm::Chan(&result_po)
 +        };
 +        let write_data_ptr = ptr::addr_of(&write_data);
-         do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
++        do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
 +            unsafe {
 +                log(debug, fmt!("in interact cb for tcp::write %?",
 +                                loop_ptr));
 +                match uv::ll::write(write_req_ptr,
-                                   stream_handle_ptr,
-                                   write_buf_vec_ptr,
-                                   tcp_write_complete_cb) {
-                   0i32 => {
-                     log(debug, ~"uv_write() invoked successfully");
-                     uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
-                   }
-                   _ => {
-                     log(debug, ~"error invoking uv_write()");
-                     let err_data = uv::ll::get_last_err_data(loop_ptr);
-                     oldcomm::send((*write_data_ptr).result_ch,
-                                TcpWriteError(err_data.to_tcp_err()));
-                   }
++                                    stream_handle_ptr,
++                                    write_buf_vec_ptr,
++                                    tcp_write_complete_cb) {
++                    0i32 => {
++                        log(debug, ~"uv_write() invoked successfully");
++                        uv::ll::set_data_for_req(write_req_ptr,
++                                                 write_data_ptr);
++                    }
++                    _ => {
++                        log(debug, ~"error invoking uv_write()");
++                        let err_data = uv::ll::get_last_err_data(loop_ptr);
++                        oldcomm::send((*write_data_ptr).result_ch,
++                                      TcpWriteError(err_data.to_tcp_err()));
++                    }
 +                }
 +            }
-         };
++        }
 +        // FIXME (#2656): Instead of passing unsafe pointers to local data,
 +        // and waiting here for the write to complete, we should transfer
 +        // ownership of everything to the I/O task and let it deal with the
 +        // aftermath, so we don't have to sit here blocking.
 +        match oldcomm::recv(result_po) {
 +            TcpWriteSuccess => Ok(()),
 +            TcpWriteError(move err_data) => Err(err_data)
          }
 -    };
 -    // FIXME (#2656): Instead of passing unsafe pointers to local data,
 -    // and waiting here for the write to complete, we should transfer
 -    // ownership of everything to the I/O task and let it deal with the
 -    // aftermath, so we don't have to sit here blocking.
 -    match oldcomm::recv(result_po) {
 -      TcpWriteSuccess => result::Ok(()),
 -      TcpWriteError(ref err_data) => result::Err(err_data.to_tcp_err())
      }
  }
  
Simple merge
index 3a2c3b7c135e0a2f70316407e5825f73069b82c4,8637470dd5407b0b9c79387ae503780f4acc4a75..8ae3e24abee40bfea28cd461fe5c6127060e10d9
@@@ -47,58 -47,48 +47,53 @@@ pub fn get() -> IoTask 
  }
  
  #[doc(hidden)]
 -fn get_monitor_task_gl() -> IoTask unsafe {
 +fn get_monitor_task_gl() -> IoTask {
-     unsafe {
-         let monitor_loop_chan_ptr =
-             rustrt::rust_uv_get_kernel_global_chan_ptr();
-         debug!("ENTERING global_loop::get() loop chan: %?",
-                monitor_loop_chan_ptr);
-         debug!("before priv::chan_from_global_ptr");
-         type MonChan = Chan<IoTask>;
-         let monitor_ch =
-             do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
-                                                || {
-                                                     task::task().sched_mode
-                                                     (task::SingleThreaded)
-                                                     .unlinked()
-                                                }) |msg_po| {
-             unsafe {
-                 debug!("global monitor task starting");
-                 // As a weak task the runtime will notify us when to exit
-                 do weaken_task() |weak_exit_po| {
-                     debug!("global monitor task is now weak");
-                     let hl_loop = spawn_loop();
-                     loop {
-                         debug!("in outer_loop...");
-                         match select2(weak_exit_po, msg_po) {
-                           Left(weak_exit) => {
-                             // all normal tasks have ended, tell the
-                             // libuv loop to tear_down, then exit
-                             debug!("weak_exit_po recv'd msg: %?", weak_exit);
-                             iotask::exit(hl_loop);
-                             break;
-                           }
-                           Right(fetch_ch) => {
-                             debug!("hl_loop req recv'd: %?", fetch_ch);
-                             fetch_ch.send(hl_loop);
-                           }
-                         }
+     type MonChan = Chan<IoTask>;
+     struct GlobalIoTask(IoTask);
+     impl GlobalIoTask: Clone {
+         fn clone(&self) -> GlobalIoTask {
+             GlobalIoTask((**self).clone())
+         }
+     }
+     fn key(_: GlobalIoTask) { }
 -    match global_data_clone(key) {
++    match unsafe { global_data_clone(key) } {
+         Some(GlobalIoTask(iotask)) => iotask,
+         None => {
+             let iotask: IoTask = spawn_loop();
+             let mut installed = false;
 -            let final_iotask = do global_data_clone_create(key) {
 -                installed = true;
 -                ~GlobalIoTask(iotask.clone())
++            let final_iotask = unsafe {
++                do global_data_clone_create(key) {
++                    installed = true;
++                    ~GlobalIoTask(iotask.clone())
++                }
+             };
+             if installed {
 -                do task().unlinked().spawn() unsafe {
 -                    debug!("global monitor task starting");
 -                    // As a weak task the runtime will notify us when to exit
 -                    do weaken_task |weak_exit_po| {
 -                        debug!("global monitor task is weak");
 -                        weak_exit_po.recv();
 -                        iotask::exit(&iotask);
 -                        debug!("global monitor task is unweak");
 -                    };
 -                    debug!("global monitor task exiting");
++                do task().unlinked().spawn() {
++                    unsafe {
++                        debug!("global monitor task starting");
++                        // As a weak task the runtime will notify us
++                        // when to exit
++                        do weaken_task |weak_exit_po| {
++                            debug!("global monitor task is weak");
++                            weak_exit_po.recv();
++                            iotask::exit(&iotask);
++                            debug!("global monitor task is unweak");
++                        };
++                        debug!("global monitor task exiting");
 +                    }
-                     debug!("global monitor task is leaving weakend state");
-                 };
-                 debug!("global monitor task exiting");
+                 }
+             } else {
+                 iotask::exit(&iotask);
              }
-         };
  
-         // once we have a chan to the monitor loop, we ask it for
-         // the libuv loop's async handle
-         do listen |fetch_ch| {
-             monitor_ch.send(fetch_ch);
-             fetch_ch.recv()
+             match final_iotask {
+                 GlobalIoTask(iotask) => iotask
+             }
          }
      }
  }
@@@ -135,111 -126,97 +131,110 @@@ mod test 
  
      use core::iter;
      use core::libc;
-     use core::oldcomm;
      use core::ptr;
      use core::task;
+     use core::cast::transmute;
+     use core::libc::c_void;
+     use core::pipes::{stream, SharedChan, Chan};
  
 -    extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe {
 -        let exit_ch_ptr = ll::get_data_for_uv_handle(
 -            timer_ptr as *libc::c_void);
 -        let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
 -        exit_ch.send(true);
 -        log(debug, fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
 -                       exit_ch_ptr));
 +    extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
 +        unsafe {
 +            let exit_ch_ptr = ll::get_data_for_uv_handle(
-                 timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>;
-             let exit_ch = *exit_ch_ptr;
-             oldcomm::send(exit_ch, true);
++                timer_ptr as *libc::c_void);
++            let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
++            exit_ch.send(true);
 +            log(debug,
 +                fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
 +                     exit_ch_ptr));
 +        }
      }
      extern fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
 -                             _status: libc::c_int) unsafe {
 -        log(debug, ~"in simple timer cb");
 -        ll::timer_stop(timer_ptr);
 -        let hl_loop = &get_gl();
 -        do iotask::interact(hl_loop) |_loop_ptr| unsafe {
 -            log(debug, ~"closing timer");
 -            ll::close(timer_ptr, simple_timer_close_cb);
 -            log(debug, ~"about to deref exit_ch_ptr");
 -            log(debug, ~"after msg sent on deref'd exit_ch");
 -        };
 -        log(debug, ~"exiting simple timer cb");
 +                             _status: libc::c_int) {
 +        unsafe {
 +            log(debug, ~"in simple timer cb");
 +            ll::timer_stop(timer_ptr);
-             let hl_loop = get_gl();
++            let hl_loop = &get_gl();
 +            do iotask::interact(hl_loop) |_loop_ptr| {
++                log(debug, ~"closing timer");
 +                unsafe {
-                     log(debug, ~"closing timer");
 +                    ll::close(timer_ptr, simple_timer_close_cb);
-                     log(debug, ~"about to deref exit_ch_ptr");
-                     log(debug, ~"after msg sent on deref'd exit_ch");
 +                }
++                log(debug, ~"about to deref exit_ch_ptr");
++                log(debug, ~"after msg sent on deref'd exit_ch");
 +            };
 +            log(debug, ~"exiting simple timer cb");
 +        }
      }
  
-     fn impl_uv_hl_simple_timer(iotask: IoTask) {
 -    fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe {
 -        let (exit_po, exit_ch) = stream::<bool>();
 -        let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
 -        log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
 -                       exit_ch_ptr));
 -        let timer_handle = ll::timer_t();
 -        let timer_ptr = ptr::addr_of(&timer_handle);
 -        do iotask::interact(iotask) |loop_ptr| unsafe {
 -            log(debug, ~"user code inside interact loop!!!");
 -            let init_status = ll::timer_init(loop_ptr, timer_ptr);
 -            if(init_status == 0i32) {
 -                ll::set_data_for_uv_handle(
 -                    timer_ptr as *libc::c_void,
 -                    exit_ch_ptr);
 -                let start_status = ll::timer_start(timer_ptr, simple_timer_cb,
 -                                                   1u, 0u);
 -                if(start_status == 0i32) {
++    fn impl_uv_hl_simple_timer(iotask: &IoTask) {
 +        unsafe {
-             let exit_po = oldcomm::Port::<bool>();
-             let exit_ch = oldcomm::Chan(&exit_po);
-             let exit_ch_ptr = ptr::addr_of(&exit_ch);
++            let (exit_po, exit_ch) = stream::<bool>();
++            let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
 +            log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
-                            exit_ch_ptr));
++                            exit_ch_ptr));
 +            let timer_handle = ll::timer_t();
 +            let timer_ptr = ptr::addr_of(&timer_handle);
 +            do iotask::interact(iotask) |loop_ptr| {
 +                unsafe {
 +                    log(debug, ~"user code inside interact loop!!!");
 +                    let init_status = ll::timer_init(loop_ptr, timer_ptr);
 +                    if(init_status == 0i32) {
 +                        ll::set_data_for_uv_handle(
 +                            timer_ptr as *libc::c_void,
-                             exit_ch_ptr as *libc::c_void);
++                            exit_ch_ptr);
 +                        let start_status = ll::timer_start(timer_ptr,
 +                                                           simple_timer_cb,
-                                                            1u,
-                                                            0u);
-                         if start_status != 0 {
++                                                           1u, 0u);
++                        if(start_status == 0i32) {
++                        }
++                        else {
 +                            fail ~"failure on ll::timer_start()";
 +                        }
-                     } else {
++                    }
++                    else {
 +                        fail ~"failure on ll::timer_init()";
 +                    }
                  }
 -                else {
 -                    fail ~"failure on ll::timer_start()";
 -                }
 -            }
 -            else {
 -                fail ~"failure on ll::timer_init()";
 -            }
 -        };
 -        exit_po.recv();
 -        log(debug, ~"global_loop timer test: msg recv on exit_po, done..");
 +            };
-             oldcomm::recv(exit_po);
++            exit_po.recv();
 +            log(debug,
 +                ~"global_loop timer test: msg recv on exit_po, done..");
 +        }
      }
  
      #[test]
 -    fn test_gl_uv_global_loop_high_level_global_timer() unsafe {
 +    fn test_gl_uv_global_loop_high_level_global_timer() {
-         unsafe {
-             let hl_loop = get_gl();
-             let exit_po = oldcomm::Port::<()>();
-             let exit_ch = oldcomm::Chan(&exit_po);
-             task::spawn_sched(task::ManualThreads(1u), || {
-                 impl_uv_hl_simple_timer(hl_loop);
-                 oldcomm::send(exit_ch, ());
-             });
+         let hl_loop = &get_gl();
+         let (exit_po, exit_ch) = stream::<()>();
+         task::spawn_sched(task::ManualThreads(1u), || {
+             let hl_loop = &get_gl();
              impl_uv_hl_simple_timer(hl_loop);
-             oldcomm::recv(exit_po);
-         }
+             exit_ch.send(());
+         });
+         impl_uv_hl_simple_timer(hl_loop);
+         exit_po.recv();
      }
  
      // keeping this test ignored until some kind of stress-test-harness
      // is set up for the build bots
      #[test]
      #[ignore]
 -    fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
 +    fn test_stress_gl_uv_global_loop_high_level_global_timer() {
-         unsafe {
-             let hl_loop = get_gl();
-             let exit_po = oldcomm::Port::<()>();
-             let exit_ch = oldcomm::Chan(&exit_po);
-             let cycles = 5000u;
-             for iter::repeat(cycles) {
-                 task::spawn_sched(task::ManualThreads(1u), || {
-                     impl_uv_hl_simple_timer(hl_loop);
-                     oldcomm::send(exit_ch, ());
-                 });
-             };
-             for iter::repeat(cycles) {
-                 oldcomm::recv(exit_po);
-             };
-             log(debug,
-                 ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
-                 ~" exiting sucessfully!");
-         }
+         let (exit_po, exit_ch) = stream::<()>();
+         let exit_ch = SharedChan(exit_ch);
+         let cycles = 5000u;
+         for iter::repeat(cycles) {
+             let exit_ch_clone = exit_ch.clone();
+             task::spawn_sched(task::ManualThreads(1u), || {
+                 let hl_loop = &get_gl();
+                 impl_uv_hl_simple_timer(hl_loop);
+                 exit_ch_clone.send(());
+             });
+         };
+         for iter::repeat(cycles) {
+             exit_po.recv();
+         };
+         log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
+             ~" exiting sucessfully!");
      }
  }
index 0a3d64a02a4eab4862a81a1b67b2e171a11ae886,c50a19cc5c17c7ed588b1b6cd55ce392ab019944..dc0092aadfaf2aafd54f5ce969f0f0f4e5fe34b5
@@@ -83,10 -91,8 +91,10 @@@ pub unsafe fn interact(iotask: &IoTask
   * async handle and do a sanity check to make sure that all other handles are
   * closed, causing a failure otherwise.
   */
- pub fn exit(iotask: IoTask) {
 -pub fn exit(iotask: &IoTask) unsafe {
 -    send_msg(iotask, TeardownLoop);
++pub fn exit(iotask: &IoTask) {
 +    unsafe {
 +        send_msg(iotask, TeardownLoop);
 +    }
  }
  
  
@@@ -98,71 -104,70 +106,77 @@@ enum IoTaskMsg 
  }
  
  /// Run the loop and begin handling messages
- fn run_loop(iotask_ch: Chan<IoTask>) {
 -fn run_loop(iotask_ch: &Chan<IoTask>) unsafe {
++fn run_loop(iotask_ch: &Chan<IoTask>) {
 -    debug!("creating loop");
 -    let loop_ptr = ll::loop_new();
 +    unsafe {
++        debug!("creating loop");
 +        let loop_ptr = ll::loop_new();
  
 -    // set up the special async handle we'll use to allow multi-task
 -    // communication with this loop
 -    let async = ll::async_t();
 -    let async_handle = addr_of(&async);
 +        // set up the special async handle we'll use to allow multi-task
 +        // communication with this loop
 +        let async = ll::async_t();
 +        let async_handle = addr_of(&async);
  
 -    // associate the async handle with the loop
 -    ll::async_init(loop_ptr, async_handle, wake_up_cb);
 +        // associate the async handle with the loop
 +        ll::async_init(loop_ptr, async_handle, wake_up_cb);
  
 -    let (msg_po, msg_ch) = stream::<IoTaskMsg>();
++        let (msg_po, msg_ch) = stream::<IoTaskMsg>();
 -    // initialize our loop data and store it in the loop
 -    let data: IoTaskLoopData = {
 -        async_handle: async_handle,
 -        msg_po: msg_po
 -    };
 -    ll::set_data_for_uv_handle(async_handle, addr_of(&data));
 -
 -    // Send out a handle through which folks can talk to us
 -    // while we dwell in the I/O loop
 -    let iotask = IoTask_({
 -        async_handle: async_handle,
 -        op_chan: SharedChan(msg_ch)
 -    });
 -    iotask_ch.send(iotask);
 -
 -    log(debug, ~"about to run uv loop");
 -    // enter the loop... this blocks until the loop is done..
 -    ll::run(loop_ptr);
 -    log(debug, ~"uv loop ended");
 -    ll::loop_delete(loop_ptr);
 +        // initialize our loop data and store it in the loop
-         let data = IoTaskLoopData {
++        let data: IoTaskLoopData = IoTaskLoopData {
 +            async_handle: async_handle,
-             msg_po: Port()
++            msg_po: msg_po
 +        };
 +        ll::set_data_for_uv_handle(async_handle, addr_of(&data));
 +
 +        // Send out a handle through which folks can talk to us
 +        // while we dwell in the I/O loop
 +        let iotask = IoTask_({
 +            async_handle: async_handle,
-             op_chan: data.msg_po.chan()
++            op_chan: SharedChan(msg_ch)
 +        });
 +        iotask_ch.send(iotask);
 +
 +        log(debug, ~"about to run uv loop");
 +        // enter the loop... this blocks until the loop is done..
 +        ll::run(loop_ptr);
 +        log(debug, ~"uv loop ended");
 +        ll::loop_delete(loop_ptr);
 +    }
  }
  
  // data that lives for the lifetime of the high-evel oo
 -type IoTaskLoopData = {
 +struct IoTaskLoopData {
      async_handle: *ll::uv_async_t,
 -    msg_po: Port<IoTaskMsg>
 -};
 +    msg_po: Port<IoTaskMsg>,
 +}
  
- fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
+ fn send_msg(iotask: &IoTask,
 -            msg: IoTaskMsg) unsafe {
++            msg: IoTaskMsg) {
+     iotask.op_chan.send(move msg);
 -    ll::async_send(iotask.async_handle);
 +    unsafe {
-         iotask.op_chan.send(move msg);
 +        ll::async_send(iotask.async_handle);
 +    }
  }
  
  /// Dispatch all pending messages
  extern fn wake_up_cb(async_handle: *ll::uv_async_t,
 -                    status: int) unsafe {
 +                    status: int) {
-     unsafe {
-         log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
-                          async_handle, status));
  
 -    let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
 -    let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
 -    let msg_po = &(*data).msg_po;
 -
 -    while msg_po.peek() {
 -        match msg_po.recv() {
 -          Interaction(ref cb) => (*cb)(loop_ptr),
 -          TeardownLoop => begin_teardown(data)
+     log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
+                      async_handle, status));
-         let data = ll::get_data_for_uv_handle(async_handle)
-             as *IoTaskLoopData;
-         let msg_po = (*data).msg_po;
++    unsafe {
 +        let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
-               Interaction(ref cb) => (*cb)(loop_ptr),
-               TeardownLoop => begin_teardown(data)
++        let data =
++            ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
++        let msg_po = &(*data).msg_po;
 +
 +        while msg_po.peek() {
 +            match msg_po.recv() {
++                Interaction(ref cb) => (*cb)(loop_ptr),
++                TeardownLoop => begin_teardown(data)
 +            }
          }
      }
  }
@@@ -199,64 -199,56 +213,69 @@@ mod test 
      use core::ptr;
      use core::task;
  
 -    extern fn async_close_cb(handle: *ll::uv_async_t) unsafe {
 -        log(debug, fmt!("async_close_cb handle %?", handle));
 -        let exit_ch = (*(ll::get_data_for_uv_handle(handle)
 -                        as *AhData)).exit_ch;
 -        oldcomm::send(exit_ch, ());
 +    extern fn async_close_cb(handle: *ll::uv_async_t) {
 +        unsafe {
 +            log(debug, fmt!("async_close_cb handle %?", handle));
 +            let exit_ch = (*(ll::get_data_for_uv_handle(handle)
 +                            as *AhData)).exit_ch;
 +            oldcomm::send(exit_ch, ());
 +        }
      }
 -    extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int)
 +    extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) {
          unsafe {
 -        log(debug, fmt!("async_handle_cb handle %? status %?",handle,status));
 -        ll::close(handle, async_close_cb);
 +            log(debug,
 +                fmt!("async_handle_cb handle %? status %?",handle,status));
 +            ll::close(handle, async_close_cb);
 +        }
      }
 -    type AhData = {
 +    struct AhData {
          iotask: IoTask,
-         exit_ch: oldcomm::Chan<()>,
+         exit_ch: oldcomm::Chan<()>
 -    };
 -    fn impl_uv_iotask_async(iotask: &IoTask) unsafe {
 -        let async_handle = ll::async_t();
 -        let ah_ptr = ptr::addr_of(&async_handle);
 -        let exit_po = oldcomm::Port::<()>();
 -        let exit_ch = oldcomm::Chan(&exit_po);
 -        let ah_data = {
 -            iotask: iotask.clone(),
 -            exit_ch: exit_ch
 -        };
 -        let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data);
 -        debug!("about to interact");
 -        do interact(iotask) |loop_ptr| unsafe {
 -            debug!("interacting");
 -            ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
 -            ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
 -            ll::async_send(ah_ptr);
 -        };
 -        debug!("waiting for async close");
 -        oldcomm::recv(exit_po);
 +    }
-     fn impl_uv_iotask_async(iotask: IoTask) {
++    fn impl_uv_iotask_async(iotask: &IoTask) {
 +        unsafe {
 +            let async_handle = ll::async_t();
 +            let ah_ptr = ptr::addr_of(&async_handle);
 +            let exit_po = oldcomm::Port::<()>();
 +            let exit_ch = oldcomm::Chan(&exit_po);
-             let ah_data = {
-                 iotask: iotask,
++            let ah_data = AhData {
++                iotask: iotask.clone(),
 +                exit_ch: exit_ch
 +            };
-             let ah_data_ptr = ptr::addr_of(&ah_data);
++            let ah_data_ptr: *AhData = unsafe {
++                ptr::to_unsafe_ptr(&ah_data)
++            };
++            debug!("about to interact");
 +            do interact(iotask) |loop_ptr| {
 +                unsafe {
++                    debug!("interacting");
 +                    ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
-                     ll::set_data_for_uv_handle(ah_ptr,
-                                                ah_data_ptr as *libc::c_void);
++                    ll::set_data_for_uv_handle(
++                        ah_ptr, ah_data_ptr as *libc::c_void);
 +                    ll::async_send(ah_ptr);
 +                }
 +            };
++            debug!("waiting for async close");
 +            oldcomm::recv(exit_po);
 +        }
      }
  
      // this fn documents the bear minimum neccesary to roll your own
      // high_level_loop
      unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
-         let iotask_port = oldcomm::Port::<IoTask>();
-         let iotask_ch = oldcomm::Chan(&iotask_port);
+         let (iotask_port, iotask_ch) = stream::<IoTask>();
          do task::spawn_sched(task::ManualThreads(1u)) {
-             run_loop(iotask_ch);
+             debug!("about to run a test loop");
+             run_loop(&iotask_ch);
              exit_ch.send(());
          };
-         return oldcomm::recv(iotask_port);
+         return iotask_port.recv();
      }
  
 -    extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
 -        log(debug, fmt!("lifetime_handle_close ptr %?", handle));
 +    extern fn lifetime_handle_close(handle: *libc::c_void) {
 +        unsafe {
 +            log(debug, fmt!("lifetime_handle_close ptr %?", handle));
 +        }
      }
  
      extern fn lifetime_async_callback(handle: *libc::c_void,
      }
  
      #[test]
 -    fn test_uv_iotask_async() unsafe {
 -        let exit_po = oldcomm::Port::<()>();
 -        let exit_ch = oldcomm::Chan(&exit_po);
 -        let iotask = &spawn_test_loop(exit_ch);
 -
 -        debug!("spawned iotask");
 -
 -        // using this handle to manage the lifetime of the high_level_loop,
 -        // as it will exit the first time one of the impl_uv_hl_async() is
 -        // cleaned up with no one ref'd handles on the loop (Which can happen
 -        // under race-condition type situations.. this ensures that the loop
 -        // lives until, at least, all of the impl_uv_hl_async() runs have been
 -        // called, at least.
 -        let work_exit_po = oldcomm::Port::<()>();
 -        let work_exit_ch = oldcomm::Chan(&work_exit_po);
 -        for iter::repeat(7u) {
 -            let iotask_clone = iotask.clone();
 -            do task::spawn_sched(task::ManualThreads(1u)) {
 -                debug!("async");
 -                impl_uv_iotask_async(&iotask_clone);
 -                debug!("done async");
 -                oldcomm::send(work_exit_ch, ());
 +    fn test_uv_iotask_async() {
 +        unsafe {
 +            let exit_po = oldcomm::Port::<()>();
 +            let exit_ch = oldcomm::Chan(&exit_po);
-             let iotask = spawn_test_loop(exit_ch);
++            let iotask = &spawn_test_loop(exit_ch);
++
++            debug!("spawned iotask");
 +
 +            // using this handle to manage the lifetime of the
-             // high_level_loop, as it will exit the first time one of the
-             // impl_uv_hl_async() is cleaned up with no one ref'd handles on
-             // the loop (Which can happen under race-condition type
-             // situations.. this ensures that the loop lives until, at least,
-             // all of the impl_uv_hl_async() runs have been called, at least.
++            // high_level_loop, as it will exit the first time one of
++            // the impl_uv_hl_async() is cleaned up with no one ref'd
++            // handles on the loop (Which can happen under
++            // race-condition type situations.. this ensures that the
++            // loop lives until, at least, all of the
++            // impl_uv_hl_async() runs have been called, at least.
 +            let work_exit_po = oldcomm::Port::<()>();
 +            let work_exit_ch = oldcomm::Chan(&work_exit_po);
 +            for iter::repeat(7u) {
++                let iotask_clone = iotask.clone();
 +                do task::spawn_sched(task::ManualThreads(1u)) {
-                     impl_uv_iotask_async(iotask);
++                    debug!("async");
++                    impl_uv_iotask_async(&iotask_clone);
++                    debug!("done async");
 +                    oldcomm::send(work_exit_ch, ());
 +                };
              };
 -        };
 -        for iter::repeat(7u) {
 -            debug!("waiting");
 -            oldcomm::recv(work_exit_po);
 -        };
 -        log(debug, ~"sending teardown_loop msg..");
 -        exit(iotask);
 -        oldcomm::recv(exit_po);
 -        log(debug, ~"after recv on exit_po.. exiting..");
 +            for iter::repeat(7u) {
++                debug!("waiting");
 +                oldcomm::recv(work_exit_po);
 +            };
 +            log(debug, ~"sending teardown_loop msg..");
 +            exit(iotask);
 +            oldcomm::recv(exit_po);
 +            log(debug, ~"after recv on exit_po.. exiting..");
 +        }
      }
  }
Simple merge