}
}
- #[allow(non_camel_case_types)] // runtime type
- type rust_port_id = uint;
-
- type GlobalPtr = *libc::uintptr_t;
-
fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
- let old = rusti::atomic_cxchg(address, oldval, newval);
- old == oldval
+ unsafe {
+ let old = rusti::atomic_cxchg(address, oldval, newval);
+ old == oldval
+ }
}
- /**
- * Atomically gets a channel from a pointer to a pointer-sized memory location
- * or, if no channel exists creates and installs a new channel and sets up a
- * new task to receive from it.
- */
- pub unsafe fn chan_from_global_ptr<T: Owned>(
- global: GlobalPtr,
- task_fn: fn() -> task::TaskBuilder,
- f: fn~(oldcomm::Port<T>)
- ) -> oldcomm::Chan<T> {
-
- enum Msg {
- Proceed,
- Abort
- }
-
- log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
- let is_probably_zero = *global == 0u;
- log(debug,~"after is_prob_zero check");
- if is_probably_zero {
- log(debug,~"is probably zero...");
- // There's no global channel. We must make it
-
- let (setup1_po, setup1_ch) = pipes::stream();
- let (setup2_po, setup2_ch) = pipes::stream();
-
- // FIXME #4422: Ugly type inference hint
- let setup2_po: pipes::Port<Msg> = setup2_po;
-
- do task_fn().spawn |move f, move setup1_ch, move setup2_po| {
- let po = oldcomm::Port::<T>();
- let ch = oldcomm::Chan(&po);
- setup1_ch.send(ch);
-
- // Wait to hear if we are the official instance of
- // this global task
- match setup2_po.recv() {
- Proceed => f(move po),
- Abort => ()
- }
- };
-
- log(debug,~"before setup recv..");
- // This is the proposed global channel
- let ch = setup1_po.recv();
- // 0 is our sentinal value. It is not a valid channel
- assert *ch != 0;
-
- // Install the channel
- log(debug,~"BEFORE COMPARE AND SWAP");
- let swapped = compare_and_swap(
- cast::reinterpret_cast(&global),
- 0, cast::reinterpret_cast(&ch));
- log(debug,fmt!("AFTER .. swapped? %?", swapped));
-
- if swapped {
- // Success!
- setup2_ch.send(Proceed);
- ch
- } else {
- // Somebody else got in before we did
- setup2_ch.send(Abort);
- cast::reinterpret_cast(&*global)
- }
- } else {
- log(debug, ~"global != 0");
- cast::reinterpret_cast(&*global)
- }
- }
-
- #[test]
- pub fn test_from_global_chan1() {
-
- // This is unreadable, right?
-
- // The global channel
- let globchan = 0;
- let globchanp = ptr::addr_of(&globchan);
-
- // Create the global channel, attached to a new task
- let ch = unsafe {
- do chan_from_global_ptr(globchanp, task::task) |po| {
- let ch = oldcomm::recv(po);
- oldcomm::send(ch, true);
- let ch = oldcomm::recv(po);
- oldcomm::send(ch, true);
- }
- };
- // Talk to it
- let po = oldcomm::Port();
- oldcomm::send(ch, oldcomm::Chan(&po));
- assert oldcomm::recv(po) == true;
-
- // This one just reuses the previous channel
- let ch = unsafe {
- do chan_from_global_ptr(globchanp, task::task) |po| {
- let ch = oldcomm::recv(po);
- oldcomm::send(ch, false);
- }
- };
-
- // Talk to the original global task
- let po = oldcomm::Port();
- oldcomm::send(ch, oldcomm::Chan(&po));
- assert oldcomm::recv(po) == true;
- }
-
- #[test]
- pub fn test_from_global_chan2() {
-
- for iter::repeat(100) {
- // The global channel
- let globchan = 0;
- let globchanp = ptr::addr_of(&globchan);
-
- let resultpo = oldcomm::Port();
- let resultch = oldcomm::Chan(&resultpo);
-
- // Spawn a bunch of tasks that all want to compete to
- // create the global channel
- for uint::range(0, 10) |i| {
- do task::spawn {
- let ch = unsafe {
- do chan_from_global_ptr(
- globchanp, task::task) |po| {
-
- for uint::range(0, 10) |_j| {
- let ch = oldcomm::recv(po);
- oldcomm::send(ch, {i});
- }
- }
- };
- let po = oldcomm::Port();
- oldcomm::send(ch, oldcomm::Chan(&po));
- // We are The winner if our version of the
- // task was installed
- let winner = oldcomm::recv(po);
- oldcomm::send(resultch, winner == i);
- }
- }
- // There should be only one winner
- let mut winners = 0u;
- for uint::range(0u, 10u) |_i| {
- let res = oldcomm::recv(resultpo);
- if res { winners += 1u };
- }
- assert winners == 1u;
- }
- }
-
- /**
- * Convert the current task to a 'weak' task temporarily
- *
- * As a weak task it will not be counted towards the runtime's set
- * of live tasks. When there are no more outstanding live (non-weak) tasks
- * the runtime will send an exit message on the provided channel.
- *
- * This function is super-unsafe. Do not use.
- *
- * # Safety notes
- *
- * * Weak tasks must either die on their own or exit upon receipt of
- * the exit message. Failure to do so will cause the runtime to never
- * exit
- * * Tasks must not call `weaken_task` multiple times. This will
- * break the kernel's accounting of live tasks.
- * * Weak tasks must not be supervised. A supervised task keeps
- * a reference to its parent, so the parent will not die.
- */
- pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) {
- let po = oldcomm::Port();
- let ch = oldcomm::Chan(&po);
- unsafe {
- rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
- }
- let _unweaken = Unweaken(ch);
- f(po);
-
- struct Unweaken {
- ch: oldcomm::Chan<()>,
- drop {
- unsafe {
- rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
- }
- }
- }
-
- fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken {
- Unweaken {
- ch: ch
- }
- }
- }
-
- #[test]
- pub fn test_weaken_task_then_unweaken() {
- do task::try {
- unsafe {
- do weaken_task |_po| {
- }
- }
- };
- }
-
- #[test]
- pub fn test_weaken_task_wait() {
- do task::spawn_unlinked {
- unsafe {
- do weaken_task |po| {
- oldcomm::recv(po);
- }
- }
- }
- }
-
- #[test]
- pub fn test_weaken_task_stress() {
- // Create a bunch of weak tasks
- for iter::repeat(100u) {
- do task::spawn {
- unsafe {
- do weaken_task |_po| {
- }
- }
- }
- do task::spawn_unlinked {
- unsafe {
- do weaken_task |po| {
- // Wait for it to tell us to die
- oldcomm::recv(po);
- }
- }
- }
- }
- }
-
- #[test]
- #[ignore(cfg(windows))]
- pub fn test_weaken_task_fail() {
- let res = do task::try {
- unsafe {
- do weaken_task |_po| {
- fail;
- }
- }
- };
- assert result::is_err(&res);
- }
-
/****************************************************************************
* Shared state & exclusive ARC
****************************************************************************/
ArcDestruct((*rc).data)
}
- fn clone(&self) -> SharedMutableState<T> unsafe {
- clone_shared_mutable_state(self)
+ impl<T: Owned> SharedMutableState<T>: Clone {
++ fn clone(&self) -> SharedMutableState<T> {
++ unsafe {
++ clone_shared_mutable_state(self)
++ }
+ }
+ }
+
/****************************************************************************/
#[allow(non_camel_case_types)] // runtime type
--- /dev/null
-pub fn at_exit(f: ~fn()) unsafe {
- let runner: &fn(*ExitFunctions) = exit_runner;
- let runner_pair: sys::Closure = cast::transmute(runner);
- let runner_ptr = runner_pair.code;
- let runner_ptr = cast::transmute(runner_ptr);
- rustrt::rust_register_exit_function(runner_ptr, ~f);
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ use sys;
+ use cast;
+ use ptr;
+ use task;
+ use uint;
+ use vec;
+ use rand;
+ use libc::{c_void, size_t};
+
+ /**
+ Register a function to be run during runtime shutdown.
+
+ After all non-weak tasks have exited, registered exit functions will
+ execute, in random order, on the primary scheduler. Each function runs
+ in its own unsupervised task.
+ */
-fn exit_runner(exit_fns: *ExitFunctions) unsafe {
- let exit_fns = &*exit_fns;
++pub fn at_exit(f: ~fn()) {
++ unsafe {
++ let runner: &fn(*ExitFunctions) = exit_runner;
++ let runner_pair: sys::Closure = cast::transmute(runner);
++ let runner_ptr = runner_pair.code;
++ let runner_ptr = cast::transmute(runner_ptr);
++ rustrt::rust_register_exit_function(runner_ptr, ~f);
++ }
+ }
+
+ // NB: The double pointer indirection here is because ~fn() is a fat
+ // pointer and due to FFI problems I am more comfortable making the
+ // interface use a normal pointer
+ extern mod rustrt {
+ fn rust_register_exit_function(runner: *c_void, f: ~~fn());
+ }
+
+ struct ExitFunctions {
+ // The number of exit functions
+ count: size_t,
+ // The buffer of exit functions
+ start: *~~fn()
+ }
+
- let mut exit_fns_vec = vec::from_buf(start, count as uint);
++fn exit_runner(exit_fns: *ExitFunctions) {
++ let exit_fns = unsafe { &*exit_fns };
+ let count = (*exit_fns).count;
+ let start = (*exit_fns).start;
+
+ // NB: from_buf memcpys from the source, which will
+ // give us ownership of the array of functions
- while exit_fns_vec.is_not_empty() {
++ let mut exit_fns_vec = unsafe { vec::from_buf(start, count as uint) };
+ // Let's not make any promises about execution order
+ rand::Rng().shuffle_mut(exit_fns_vec);
+
+ debug!("running %u exit functions", exit_fns_vec.len());
+
++ while !exit_fns_vec.is_empty() {
+ match exit_fns_vec.pop() {
+ ~f => {
+ task::task().supervised().spawn(f);
+ }
+ }
+ }
+ }
+
+ #[abi = "rust-intrinsic"]
+ pub extern mod rusti {
+ fn move_val_init<T>(dst: &mut T, -src: T);
+ fn init<T>() -> T;
+ }
+
+ #[test]
+ fn test_at_exit() {
+ let i = 10;
+ do at_exit {
+ debug!("at_exit1");
+ assert i == 10;
+ }
+ }
+
+ #[test]
+ fn test_at_exit_many() {
+ let i = 10;
+ for uint::range(20, 100) |j| {
+ do at_exit {
+ debug!("at_exit2");
+ assert i == 10;
+ assert j > i;
+ }
+ }
+ }
--- /dev/null
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ The Finally trait provides a method, `finally` on
+ stack closures that emulates Java-style try/finally blocks.
+
+ # Example
+
+ ~~~
+ do || {
+ ...
+ }.finally {
+ alway_run_this();
+ }
+ ~~~
+ */
+
+ use ops::Drop;
+ use task::{spawn, failing};
+
+ pub trait Finally<T> {
+ fn finally(&self, +dtor: &fn()) -> T;
+ }
+
+ impl<T> &fn() -> T: Finally<T> {
+ // XXX: Should not require a mode here
+ fn finally(&self, +dtor: &fn()) -> T {
+ let _d = Finallyalizer {
+ dtor: dtor
+ };
+
+ (*self)()
+ }
+ }
+
+ struct Finallyalizer {
+ dtor: &fn()
+ }
+
+ impl Finallyalizer: Drop {
+ fn finalize(&self) {
+ (self.dtor)();
+ }
+ }
+
+ #[test]
+ fn test_success() {
+ let mut i = 0;
+ do (|| {
+ i = 10;
+ }).finally {
+ assert !failing();
+ assert i == 10;
+ i = 20;
+ }
+ assert i == 20;
+ }
+
+ #[test]
+ #[ignore(cfg(windows))]
+ #[should_fail]
+ fn test_fail() {
+ let mut i = 0;
+ do (|| {
+ i = 10;
+ fail;
+ }).finally {
+ assert failing();
+ assert i == 10;
+ }
+ }
+
+ #[test]
+ fn test_retval() {
+ let i = do (fn&() -> int {
+ 10
+ }).finally { };
+ assert i == 10;
+ }
+
+ #[test]
+ fn test_compact() {
+ // XXX Should be able to use a fn item instead
+ // of a closure for do_some_fallible_work,
+ // but it's a type error.
+ let do_some_fallible_work: &fn() = || { };
+ fn but_always_run_this_function() { }
+ do_some_fallible_work.finally(
+ but_always_run_this_function);
+ }
--- /dev/null
-use send_map::linear::LinearMap;
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ Global data
+
+ An interface for creating and retrieving values with global
+ (per-runtime) scope.
+
+ Global values are stored in a map and protected by a single global
+ mutex. Operations are provided for accessing and cloning the value
+ under the mutex.
+
+ Because all globals go through a single mutex, they should be used
+ sparingly. The interface is intended to be used with clonable,
+ atomically reference counted synchronization types, like ARCs, in
+ which case the value should be cached locally whenever possible to
+ avoid hitting the mutex.
+ */
+
+ use cast::{transmute, reinterpret_cast};
+ use clone::Clone;
+ use kinds::Owned;
+ use libc::{c_void, uintptr_t};
+ use option::{Option, Some, None};
+ use ops::Drop;
+ use pipes;
+ use private::{Exclusive, exclusive};
+ use private::{SharedMutableState, shared_mutable_state};
+ use private::{get_shared_immutable_state};
+ use private::at_exit::at_exit;
- do get_global_state().with |gs| unsafe {
++use hashmap::linear::LinearMap;
+ use sys::Closure;
+ use task::spawn;
+ use uint;
+
+ pub type GlobalDataKey<T: Owned> = &fn(v: T);
+
+ pub unsafe fn global_data_clone_create<T: Owned Clone>(
+ key: GlobalDataKey<T>, create: &fn() -> ~T) -> T {
+ /*!
+ * Clone a global value or, if it has not been created,
+ * first construct the value then return a clone.
+ *
+ * # Safety note
+ *
+ * Both the clone operation and the constructor are
+ * called while the global lock is held. Recursive
+ * use of the global interface in either of these
+ * operations will result in deadlock.
+ */
+ global_data_clone_create_(key_ptr(key), create)
+ }
+
+ unsafe fn global_data_clone_create_<T: Owned Clone>(
+ key: uint, create: &fn() -> ~T) -> T {
+
+ let mut clone_value: Option<T> = None;
+ do global_data_modify_(key) |value: Option<~T>| {
+ match value {
+ None => {
+ let value = create();
+ clone_value = Some(value.clone());
+ Some(value)
+ }
+ Some(value) => {
+ clone_value = Some(value.clone());
+ Some(value)
+ }
+ }
+ }
+ return clone_value.unwrap();
+ }
+
+ unsafe fn global_data_modify<T: Owned>(
+ key: GlobalDataKey<T>, op: &fn(Option<~T>) -> Option<~T>) {
+
+ global_data_modify_(key_ptr(key), op)
+ }
+
+ unsafe fn global_data_modify_<T: Owned>(
+ key: uint, op: &fn(Option<~T>) -> Option<~T>) {
+
+ let mut old_dtor = None;
-fn get_global_state() -> Exclusive<GlobalState> unsafe {
++ do get_global_state().with |gs| {
+ let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) {
+ Some((ptr, dtor)) => {
+ let value: ~T = transmute(ptr);
+ (op(Some(value)), Some(dtor))
+ }
+ None => {
+ (op(None), None)
+ }
+ };
+ match maybe_new_value {
+ Some(value) => {
+ let data: *c_void = transmute(value);
+ let dtor: ~fn() = match maybe_dtor {
+ Some(dtor) => dtor,
+ None => {
+ let dtor: ~fn() = || unsafe {
+ let _destroy_value: ~T = transmute(data);
+ };
+ dtor
+ }
+ };
+ let value = (data, dtor);
+ gs.map.insert(key, value);
+ }
+ None => {
+ match maybe_dtor {
+ Some(dtor) => old_dtor = Some(dtor),
+ None => ()
+ }
+ }
+ }
+ }
+ }
+
+ pub unsafe fn global_data_clone<T: Owned Clone>(
+ key: GlobalDataKey<T>) -> Option<T> {
+ let mut maybe_clone: Option<T> = None;
+ do global_data_modify(key) |current| {
+ match ¤t {
+ &Some(~ref value) => {
+ maybe_clone = Some(value.clone());
+ }
+ &None => ()
+ }
+ current
+ }
+ return maybe_clone;
+ }
+
+ // GlobalState is a map from keys to unique pointers and a
+ // destructor. Keys are pointers derived from the type of the
+ // global value. There is a single GlobalState instance per runtime.
+ struct GlobalState {
+ map: LinearMap<uint, (*c_void, ~fn())>
+ }
+
+ impl GlobalState: Drop {
+ fn finalize(&self) {
+ for self.map.each_value |v| {
+ match v {
+ &(_, ref dtor) => (*dtor)()
+ }
+ }
+ }
+ }
+
- let global_ptr = rust_get_global_data_ptr();
++fn get_global_state() -> Exclusive<GlobalState> {
+
+ const POISON: int = -1;
+
+ // XXX: Doing atomic_cxchg to initialize the global state
+ // lazily, which wouldn't be necessary with a runtime written
+ // in Rust
- if *global_ptr == 0 {
++ let global_ptr = unsafe { rust_get_global_data_ptr() };
+
- map: LinearMap()
++ if unsafe { *global_ptr } == 0 {
+ // Global state doesn't exist yet, probably
+
+ // The global state object
+ let state = GlobalState {
- let state_i: int = transmute(state_ptr);
++ map: LinearMap::new()
+ };
+
+ // It's under a reference-counted mutex
+ let state = ~exclusive(state);
+
+ // Convert it to an integer
+ let state_ptr: &Exclusive<GlobalState> = state;
- let prev_i = atomic_cxchg(&mut *global_ptr, 0, state_i);
++ let state_i: int = unsafe { transmute(state_ptr) };
+
+ // Swap our structure into the global pointer
- do at_exit || unsafe {
++ let prev_i = unsafe { atomic_cxchg(&mut *global_ptr, 0, state_i) };
+
+ // Sanity check that we're not trying to reinitialize after shutdown
+ assert prev_i != POISON;
+
+ if prev_i == 0 {
+ // Successfully installed the global pointer
+
+ // Take a handle to return
+ let clone = state.clone();
+
+ // Install a runtime exit function to destroy the global object
- let prev_i = atomic_cxchg(&mut *global_ptr, state_i, POISON);
++ do at_exit {
+ // Poison the global pointer
- let state: &Exclusive<GlobalState> = transmute(prev_i);
++ let prev_i = unsafe {
++ atomic_cxchg(&mut *global_ptr, state_i, POISON)
++ };
+ assert prev_i == state_i;
+
+ // Capture the global state object in the at_exit closure
+ // so that it is destroyed at the right time
+ let _capture_global_state = &state;
+ };
+ return clone;
+ } else {
+ // Somebody else initialized the globals first
- let state: &Exclusive<GlobalState> = transmute(*global_ptr);
++ let state: &Exclusive<GlobalState> = unsafe { transmute(prev_i) };
+ return state.clone();
+ }
+ } else {
-fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint unsafe {
- let closure: Closure = reinterpret_cast(&key);
- return transmute(closure.code);
++ let state: &Exclusive<GlobalState> = unsafe {
++ transmute(*global_ptr)
++ };
+ return state.clone();
+ }
+ }
+
-fn test_clone_rc() unsafe {
++fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint {
++ unsafe {
++ let closure: Closure = reinterpret_cast(&key);
++ return transmute(closure.code);
++ }
+ }
+
+ extern {
+ fn rust_get_global_data_ptr() -> *mut int;
+ }
+
+ #[abi = "rust-intrinsic"]
+ extern {
+ fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
+ }
+
+ #[test]
- do spawn unsafe {
- let val = do global_data_clone_create(key) {
- ~shared_mutable_state(10)
- };
++fn test_clone_rc() {
+ type MyType = SharedMutableState<int>;
+
+ fn key(_v: SharedMutableState<int>) { }
+
+ for uint::range(0, 100) |_| {
- assert get_shared_immutable_state(&val) == &10;
++ do spawn {
++ unsafe {
++ let val = do global_data_clone_create(key) {
++ ~shared_mutable_state(10)
++ };
+
-fn test_modify() unsafe {
++ assert get_shared_immutable_state(&val) == &10;
++ }
+ }
+ }
+ }
+
+ #[test]
- do global_data_modify(key) |v| unsafe {
- match v {
- None => {
- Some(~shared_mutable_state(10))
++fn test_modify() {
+ type MyType = SharedMutableState<int>;
+
+ fn key(_v: SharedMutableState<int>) { }
+
- _ => fail
++ unsafe {
++ do global_data_modify(key) |v| {
++ match v {
++ None => {
++ unsafe {
++ Some(~shared_mutable_state(10))
++ }
++ }
++ _ => fail
+ }
- }
+ }
- do global_data_modify(key) |v| {
- match v {
- Some(sms) => {
- let v = get_shared_immutable_state(sms);
- assert *v == 10;
- None
- },
- _ => fail
+
- }
++ do global_data_modify(key) |v| {
++ match v {
++ Some(sms) => {
++ let v = get_shared_immutable_state(sms);
++ assert *v == 10;
++ None
++ },
++ _ => fail
++ }
+ }
- do global_data_modify(key) |v| unsafe {
- match v {
- None => {
- Some(~shared_mutable_state(10))
+
- _ => fail
++ do global_data_modify(key) |v| {
++ match v {
++ None => {
++ unsafe {
++ Some(~shared_mutable_state(10))
++ }
++ }
++ _ => fail
+ }
+ }
+ }
+ }
--- /dev/null
-use send_map::linear::LinearMap;
++// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
++// file at the top-level directory of this distribution and at
++// http://rust-lang.org/COPYRIGHT.
++//
++// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
++// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
++// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
++// option. This file may not be copied, modified, or distributed
++// except according to those terms.
++
+ /*!
+ Weak tasks
+
+ Weak tasks are a runtime feature for building global services that
+ do not keep the runtime alive. Normally the runtime exits when all
+ tasks exits, but if a task is weak then the runtime may exit while
+ it is running, sending a notification to the task that the runtime
+ is trying to shut down.
+ */
+
+ use option::{Some, None, swap_unwrap};
+ use private::at_exit::at_exit;
+ use private::global::global_data_clone_create;
+ use private::finally::Finally;
+ use pipes::{Port, Chan, SharedChan, stream};
+ use task::{Task, task, spawn};
+ use task::rt::{task_id, get_task_id};
- let mut shutdown_map = LinearMap();
++use hashmap::linear::LinearMap;
+ use ops::Drop;
+
+ type ShutdownMsg = ();
+
+ // XXX: This could be a PortOne but I've experienced bugginess
+ // with oneshot pipes and try_send
+ pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
+ let service = global_data_clone_create(global_data_key,
+ create_global_service);
+ let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
+ let shutdown_port = ~mut Some(shutdown_port);
+ let task = get_task_id();
+ // Expect the weak task service to be alive
+ assert service.try_send(RegisterWeakTask(task, shutdown_chan));
+ unsafe { rust_inc_weak_task_count(); }
+ do fn&() {
+ let shutdown_port = swap_unwrap(&mut *shutdown_port);
+ f(shutdown_port)
+ }.finally || {
+ unsafe { rust_dec_weak_task_count(); }
+ // Service my have already exited
+ service.send(UnregisterWeakTask(task));
+ }
+ }
+
+ type WeakTaskService = SharedChan<ServiceMsg>;
+ type TaskHandle = task_id;
+
+ fn global_data_key(_v: WeakTaskService) { }
+
+ enum ServiceMsg {
+ RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
+ UnregisterWeakTask(TaskHandle),
+ Shutdown
+ }
+
+ fn create_global_service() -> ~WeakTaskService {
+
+ debug!("creating global weak task service");
+ let (port, chan) = stream::<ServiceMsg>();
+ let port = ~mut Some(port);
+ let chan = SharedChan(chan);
+ let chan_clone = chan.clone();
+
+ do task().unlinked().spawn {
+ debug!("running global weak task service");
+ let port = swap_unwrap(&mut *port);
+ let port = ~mut Some(port);
+ do fn&() {
+ let port = swap_unwrap(&mut *port);
+ // The weak task service is itself a weak task
+ debug!("weakening the weak service task");
+ unsafe { rust_inc_weak_task_count(); }
+ run_weak_task_service(port);
+ }.finally {
+ debug!("unweakening the weak service task");
+ unsafe { rust_dec_weak_task_count(); }
+ }
+ }
+
+ do at_exit {
+ debug!("shutting down weak task service");
+ chan.send(Shutdown);
+ }
+
+ return ~chan_clone;
+ }
+
+ fn run_weak_task_service(port: Port<ServiceMsg>) {
+
-fn test_simple() unsafe {
++ let mut shutdown_map = LinearMap::new();
+
+ loop {
+ match port.recv() {
+ RegisterWeakTask(task, shutdown_chan) => {
+ let previously_unregistered =
+ shutdown_map.insert(task, shutdown_chan);
+ assert previously_unregistered;
+ }
+ UnregisterWeakTask(task) => {
+ match shutdown_map.pop(&task) {
+ Some(shutdown_chan) => {
+ // Oneshot pipes must send, even though
+ // nobody will receive this
+ shutdown_chan.send(());
+ }
+ None => fail
+ }
+ }
+ Shutdown => break
+ }
+ }
+
+ do shutdown_map.consume |_, shutdown_chan| {
+ // Weak task may have already exited
+ shutdown_chan.send(());
+ }
+ }
+
+ extern {
+ unsafe fn rust_inc_weak_task_count();
+ unsafe fn rust_dec_weak_task_count();
+ }
+
+ #[test]
- do spawn unsafe {
- do weaken_task |_signal| {
++fn test_simple() {
+ let (port, chan) = stream();
-fn test_weak_weak() unsafe {
++ do spawn {
++ unsafe {
++ do weaken_task |_signal| {
++ }
+ }
+ chan.send(());
+ }
+ port.recv();
+ }
+
+ #[test]
- do spawn unsafe {
- do weaken_task |_signal| {
- }
- do weaken_task |_signal| {
++fn test_weak_weak() {
+ let (port, chan) = stream();
-fn test_wait_for_signal() unsafe {
- do spawn unsafe {
- do weaken_task |signal| {
- signal.recv();
++ do spawn {
++ unsafe {
++ do weaken_task |_signal| {
++ }
++ do weaken_task |_signal| {
++ }
+ }
+ chan.send(());
+ }
+ port.recv();
+ }
+
+ #[test]
-fn test_wait_for_signal_many() unsafe {
++fn test_wait_for_signal() {
++ do spawn {
++ unsafe {
++ do weaken_task |signal| {
++ signal.recv();
++ }
+ }
+ }
+ }
+
+ #[test]
- do spawn unsafe {
- do weaken_task |signal| {
- signal.recv();
++fn test_wait_for_signal_many() {
+ use uint;
+ for uint::range(0, 100) |_| {
-fn test_select_stream_and_oneshot() unsafe {
++ do spawn {
++ unsafe {
++ do weaken_task |signal| {
++ signal.recv();
++ }
+ }
+ }
+ }
+ }
+
+ #[test]
- do spawn unsafe {
- do weaken_task |signal| {
- match select2i(&port, &signal) {
- Left(*) => (),
- Right(*) => fail
++fn test_select_stream_and_oneshot() {
+ use pipes::select2i;
+ use either::{Left, Right};
+
+ let (port, chan) = stream();
+ let (waitport, waitchan) = stream();
++ do spawn {
++ unsafe {
++ do weaken_task |signal| {
++ match select2i(&port, &signal) {
++ Left(*) => (),
++ Right(*) => fail
++ }
+ }
+ }
+ waitchan.send(());
+ }
+ chan.send(());
+ waitport.recv();
+ }
+
linked: bool,
supervised: bool,
mut notify_chan: Option<Chan<TaskResult>>,
- sched: Option<SchedOpts>,
- sched: SchedOpts,
-};
++ sched: SchedOpts
+}
/**
* The task builder type.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder {
- opts: {
+ opts: TaskOpts {
linked: self.opts.linked,
supervised: self.opts.supervised,
- notify_chan: notify_chan,
- sched: Some(SchedOpts {
- mode: mode,
- foreign_stack_size: None,
- })
- mut notify_chan: move notify_chan,
- sched: { mode: mode, foreign_stack_size: None}
++ notify_chan: move notify_chan,
++ sched: SchedOpts { mode: mode, foreign_stack_size: None}
},
can_not_copy: None,
.. self.consume()
* into the same scheduler, and do not post lifecycle notifications.
*/
- {
+ TaskOpts {
linked: true,
supervised: false,
- mut notify_chan: None,
- sched: {
+ notify_chan: None,
- sched: None
++ sched: SchedOpts {
+ mode: DefaultScheduler,
+ foreign_stack_size: None
+ }
}
}
pub fn get_task() -> Task {
//! Get a handle to the running task
- TaskHandle(rt::get_task_id())
+ unsafe {
+ TaskHandle(rt::get_task_id())
+ }
}
- SchedulerHandle(rt::rust_get_sched_id())
+ pub fn get_scheduler() -> Scheduler {
++ SchedulerHandle(unsafe { rt::rust_get_sched_id() })
+ }
+
/**
* Temporarily make the task unkillable
*
#[test]
fn test_spawn_sched() {
- let po = oldcomm::Port();
- let ch = oldcomm::Chan(&po);
+ let (po, ch) = stream::<()>();
+ let ch = SharedChan(ch);
- fn f(i: int, ch: oldcomm::Chan<()>) {
- unsafe {
- let parent_sched_id = rt::rust_get_sched_id();
+ fn f(i: int, ch: SharedChan<()>) {
- let parent_sched_id = rt::rust_get_sched_id();
++ let parent_sched_id = unsafe { rt::rust_get_sched_id() };
- do spawn_sched(SingleThreaded) {
- unsafe {
- let child_sched_id = rt::rust_get_sched_id();
- assert parent_sched_id != child_sched_id;
-
- if (i == 0) {
- oldcomm::send(ch, ());
- } else {
- f(i - 1, ch);
- }
- }
- };
- }
+ do spawn_sched(SingleThreaded) {
- let child_sched_id = rt::rust_get_sched_id();
++ let child_sched_id = unsafe { rt::rust_get_sched_id() };
+ assert parent_sched_id != child_sched_id;
+
+ if (i == 0) {
+ ch.send(());
+ } else {
+ f(i - 1, ch.clone());
+ }
+ };
}
f(10, ch);
}
#[test]
- fn test_spawn_sched_childs_on_same_sched() {
- let po = oldcomm::Port();
- let ch = oldcomm::Chan(&po);
+ fn test_spawn_sched_childs_on_default_sched() {
+ let (po, ch) = stream();
+
+ // Assuming tests run on the default scheduler
- let default_id = rt::rust_get_sched_id();
++ let default_id = unsafe { rt::rust_get_sched_id() };
do spawn_sched(SingleThreaded) {
- unsafe {
- let parent_sched_id = rt::rust_get_sched_id();
- do spawn {
- unsafe {
- let child_sched_id = rt::rust_get_sched_id();
- // This should be on the same scheduler
- assert parent_sched_id == child_sched_id;
- oldcomm::send(ch, ());
- }
- };
- }
- let parent_sched_id = rt::rust_get_sched_id();
++ let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+ do spawn {
- let child_sched_id = rt::rust_get_sched_id();
++ let child_sched_id = unsafe { rt::rust_get_sched_id() };
+ assert parent_sched_id != child_sched_id;
+ assert child_sched_id == default_id;
+ ch.send(());
+ };
};
- oldcomm::recv(po);
+ po.recv();
}
#[nolink]
let (port, chan) = pipes::stream();
do spawn_sched(ManualThreads(2)) |move chan| {
- let max_threads = rt::rust_sched_threads();
- assert(max_threads as int == 2);
- let running_threads = rt::rust_sched_current_nonlazy_threads();
- assert(running_threads as int == 1);
+ unsafe {
+ let max_threads = rt::rust_sched_threads();
+ assert(max_threads as int == 2);
+ let running_threads = rt::rust_sched_current_nonlazy_threads();
+ assert(running_threads as int == 1);
- let (port2, chan2) = pipes::stream();
+ let (port2, chan2) = pipes::stream();
- do spawn() |move chan2| {
- do spawn_sched(CurrentScheduler) |move chan2| {
- chan2.send(());
- }
++ do spawn_sched(CurrentScheduler) |move chan2| {
+ chan2.send(());
+ }
- let running_threads2 = rt::rust_sched_current_nonlazy_threads();
- assert(running_threads2 as int == 2);
+ let running_threads2 = rt::rust_sched_current_nonlazy_threads();
+ assert(running_threads2 as int == 2);
- port2.recv();
- chan.send(());
+ port2.recv();
+ chan.send(());
+ }
}
port.recv();
}
}
- fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task {
- unsafe {
- if opts.foreign_stack_size != None {
- fail ~"foreign_stack_size scheduler option unimplemented";
- }
+ fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
+ if opts.foreign_stack_size != None {
+ fail ~"foreign_stack_size scheduler option unimplemented";
+ }
- let num_threads = match opts.mode {
- SingleThreaded => 1u,
- ThreadPerCore => rt::rust_num_threads(),
- ThreadPerTask => {
- fail ~"ThreadPerTask scheduling mode unimplemented"
- }
- ManualThreads(threads) => {
- if threads == 0u {
- fail ~"can not create a scheduler with no threads";
- }
- threads
- }
- PlatformThread => 0u /* Won't be used */
- };
+ let num_threads = match opts.mode {
+ DefaultScheduler
+ | CurrentScheduler
+ | ExistingScheduler(*)
+ | PlatformThread => 0u, /* Won't be used */
+ SingleThreaded => 1u,
- ThreadPerCore => rt::rust_num_threads(),
++ ThreadPerCore => unsafe { rt::rust_num_threads() },
+ ThreadPerTask => {
+ fail ~"ThreadPerTask scheduling mode unimplemented"
+ }
+ ManualThreads(threads) => {
+ if threads == 0u {
+ fail ~"can not create a scheduler with no threads";
+ }
+ threads
+ }
+ };
- let sched_id = if opts.mode != PlatformThread {
- rt::rust_new_sched(num_threads)
- } else {
- rt::rust_osmain_sched_id()
- let sched_id = match opts.mode {
- CurrentScheduler => rt::rust_get_sched_id(),
- ExistingScheduler(SchedulerHandle(id)) => id,
- PlatformThread => rt::rust_osmain_sched_id(),
- _ => rt::rust_new_sched(num_threads)
- };
- rt::rust_new_task_in_sched(sched_id)
++ unsafe {
++ let sched_id = match opts.mode {
++ CurrentScheduler => rt::rust_get_sched_id(),
++ ExistingScheduler(SchedulerHandle(id)) => id,
++ PlatformThread => rt::rust_osmain_sched_id(),
++ _ => rt::rust_new_sched(num_threads)
+ };
+ rt::rust_new_task_in_sched(sched_id)
+ }
}
}
* a vector of `ip_addr` results, in the case of success, or an error
* object in the case of failure
*/
- pub fn get_addr(node: &str, iotask: iotask)
+ pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> {
do oldcomm::listen |output_ch| {
- do str::as_buf(node) |node_ptr, len| unsafe {
- log(debug, fmt!("slice len %?", len));
- let handle = create_uv_getaddrinfo_t();
- let handle_ptr = ptr::addr_of(&handle);
- let handle_data: GetAddrData = {
- output_ch: output_ch
- };
- let handle_data_ptr = ptr::addr_of(&handle_data);
- do interact(iotask) |loop_ptr| unsafe {
- let result = uv_getaddrinfo(
- loop_ptr,
- handle_ptr,
- get_addr_cb,
- node_ptr,
- ptr::null(),
- ptr::null());
- match result {
- 0i32 => {
- set_data_for_req(handle_ptr, handle_data_ptr);
- }
- _ => {
- output_ch.send(result::Err(GetAddrUnknownError));
- }
- }
- };
- output_ch.recv()
+ do str::as_buf(node) |node_ptr, len| {
+ unsafe {
+ log(debug, fmt!("slice len %?", len));
+ let handle = create_uv_getaddrinfo_t();
+ let handle_ptr = ptr::addr_of(&handle);
+ let handle_data = GetAddrData {
+ output_ch: output_ch
+ };
+ let handle_data_ptr = ptr::addr_of(&handle_data);
+ do interact(iotask) |loop_ptr| {
+ unsafe {
+ let result = uv_getaddrinfo(
+ loop_ptr,
+ handle_ptr,
+ get_addr_cb,
+ node_ptr,
+ ptr::null(),
+ ptr::null());
+ match result {
+ 0i32 => {
+ set_data_for_req(handle_ptr, handle_data_ptr);
+ }
+ _ => {
+ output_ch.send(result::Err(GetAddrUnknownError));
+ }
+ }
+ }
+ };
+ output_ch.recv()
+ }
}
}
}
* `net::tcp::tcp_connect_err_data` instance will be returned
*/
pub fn connect(input_ip: ip::IpAddr, port: uint,
- iotask: IoTask)
+ iotask: &IoTask)
- -> result::Result<TcpSocket, TcpConnectErrData> unsafe {
- let result_po = oldcomm::Port::<ConnAttempt>();
- let closed_signal_po = oldcomm::Port::<()>();
- let conn_data = {
- result_ch: oldcomm::Chan(&result_po),
- closed_signal_ch: oldcomm::Chan(&closed_signal_po)
- };
- let conn_data_ptr = ptr::addr_of(&conn_data);
- let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>();
- let stream_handle_ptr = malloc_uv_tcp_t();
- *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
- let socket_data = @{
- reader_po: reader_po,
- reader_ch: oldcomm::Chan(&reader_po),
- stream_handle_ptr: stream_handle_ptr,
- connect_req: uv::ll::connect_t(),
- write_req: uv::ll::write_t(),
- ipv6: match input_ip {
- ip::Ipv4(_) => { false }
- ip::Ipv6(_) => { true }
- },
- iotask: iotask.clone()
- };
- let socket_data_ptr = ptr::addr_of(&(*socket_data));
- log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
- // get an unsafe representation of our stream_handle_ptr that
- // we can send into the interact cb to be handled in libuv..
- log(debug, fmt!("stream_handle_ptr outside interact %?",
- stream_handle_ptr));
- do iotask::interact(iotask) |move input_ip, loop_ptr| unsafe {
- log(debug, ~"in interact cb for tcp client connect..");
- log(debug, fmt!("stream_handle_ptr in interact %?",
- stream_handle_ptr));
- match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
- 0i32 => {
- log(debug, ~"tcp_init successful");
- log(debug, ~"dealing w/ ipv4 connection..");
- let connect_req_ptr =
- ptr::addr_of(&((*socket_data_ptr).connect_req));
- let addr_str = ip::format_addr(&input_ip);
- let connect_result = match input_ip {
- ip::Ipv4(ref addr) => {
- // have to "recreate" the sockaddr_in/6
- // since the ip_addr discards the port
- // info.. should probably add an additional
- // rust type that actually is closer to
- // what the libuv API expects (ip str + port num)
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip4_addr(addr_str, port as int);
- uv::ll::tcp_connect(
- connect_req_ptr,
- stream_handle_ptr,
- ptr::addr_of(&in_addr),
- tcp_connect_on_connect_cb)
- }
- ip::Ipv6(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip6_addr(addr_str, port as int);
- uv::ll::tcp_connect6(
- connect_req_ptr,
- stream_handle_ptr,
- ptr::addr_of(&in_addr),
- tcp_connect_on_connect_cb)
- }
- };
- match connect_result {
- 0i32 => {
- log(debug, ~"tcp_connect successful");
- // reusable data that we'll have for the
- // duration..
- uv::ll::set_data_for_uv_handle(stream_handle_ptr,
- socket_data_ptr as
- *libc::c_void);
- // just so the connect_cb can send the
- // outcome..
- uv::ll::set_data_for_req(connect_req_ptr,
- conn_data_ptr);
- log(debug, ~"leaving tcp_connect interact cb...");
- // let tcp_connect_on_connect_cb send on
- // the result_ch, now..
- }
- _ => {
- // immediate connect failure.. probably a garbage
- // ip or somesuch
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send((*conn_data_ptr).result_ch,
- ConnFailure(err_data.to_tcp_err()));
- uv::ll::set_data_for_uv_handle(stream_handle_ptr,
- conn_data_ptr);
- uv::ll::close(stream_handle_ptr, stream_error_close_cb);
- }
+ -> result::Result<TcpSocket, TcpConnectErrData> {
+ unsafe {
+ let result_po = oldcomm::Port::<ConnAttempt>();
+ let closed_signal_po = oldcomm::Port::<()>();
+ let conn_data = {
+ result_ch: oldcomm::Chan(&result_po),
+ closed_signal_ch: oldcomm::Chan(&closed_signal_po)
+ };
+ let conn_data_ptr = ptr::addr_of(&conn_data);
+ let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>();
+ let stream_handle_ptr = malloc_uv_tcp_t();
+ *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
+ let socket_data = @TcpSocketData {
+ reader_po: reader_po,
+ reader_ch: oldcomm::Chan(&reader_po),
+ stream_handle_ptr: stream_handle_ptr,
+ connect_req: uv::ll::connect_t(),
+ write_req: uv::ll::write_t(),
+ ipv6: match input_ip {
+ ip::Ipv4(_) => { false }
+ ip::Ipv6(_) => { true }
+ },
- iotask: iotask
++ iotask: iotask.clone()
+ };
+ let socket_data_ptr = ptr::addr_of(&(*socket_data));
+ log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
+ // get an unsafe representation of our stream_handle_ptr that
+ // we can send into the interact cb to be handled in libuv..
+ log(debug, fmt!("stream_handle_ptr outside interact %?",
- stream_handle_ptr));
++ stream_handle_ptr));
+ do iotask::interact(iotask) |move input_ip, loop_ptr| {
+ unsafe {
+ log(debug, ~"in interact cb for tcp client connect..");
+ log(debug, fmt!("stream_handle_ptr in interact %?",
- stream_handle_ptr));
++ stream_handle_ptr));
+ match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
- 0i32 => {
- log(debug, ~"tcp_init successful");
- log(debug, ~"dealing w/ ipv4 connection..");
- let connect_req_ptr =
- ptr::addr_of(&((*socket_data_ptr).connect_req));
- let addr_str = ip::format_addr(&input_ip);
- let connect_result = match input_ip {
- ip::Ipv4(ref addr) => {
- // have to "recreate" the sockaddr_in/6
- // since the ip_addr discards the port
- // info.. should probably add an additional
- // rust type that actually is closer to
- // what the libuv API expects (ip str + port num)
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip4_addr(addr_str, port as int);
- uv::ll::tcp_connect(
- connect_req_ptr,
- stream_handle_ptr,
- ptr::addr_of(&in_addr),
- tcp_connect_on_connect_cb)
- }
- ip::Ipv6(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip6_addr(addr_str, port as int);
- uv::ll::tcp_connect6(
- connect_req_ptr,
- stream_handle_ptr,
- ptr::addr_of(&in_addr),
- tcp_connect_on_connect_cb)
- }
- };
- match connect_result {
- 0i32 => {
- log(debug, ~"tcp_connect successful");
- // reusable data that we'll have for the
- // duration..
- uv::ll::set_data_for_uv_handle(stream_handle_ptr,
- socket_data_ptr as
- *libc::c_void);
- // just so the connect_cb can send the
- // outcome..
- uv::ll::set_data_for_req(connect_req_ptr,
- conn_data_ptr);
- log(debug, ~"leaving tcp_connect interact cb...");
- // let tcp_connect_on_connect_cb send on
- // the result_ch, now..
- }
- _ => {
- // immediate connect failure.. probably a garbage
- // ip or somesuch
++ 0i32 => {
++ log(debug, ~"tcp_init successful");
++ log(debug, ~"dealing w/ ipv4 connection..");
++ let connect_req_ptr =
++ ptr::addr_of(&((*socket_data_ptr).connect_req));
++ let addr_str = ip::format_addr(&input_ip);
++ let connect_result = match input_ip {
++ ip::Ipv4(ref addr) => {
++ // have to "recreate" the
++ // sockaddr_in/6 since the ip_addr
++ // discards the port info.. should
++ // probably add an additional rust
++ // type that actually is closer to
++ // what the libuv API expects (ip str
++ // + port num)
++ log(debug, fmt!("addr: %?", addr));
++ let in_addr = uv::ll::ip4_addr(addr_str,
++ port as int);
++ uv::ll::tcp_connect(
++ connect_req_ptr,
++ stream_handle_ptr,
++ ptr::addr_of(&in_addr),
++ tcp_connect_on_connect_cb)
++ }
++ ip::Ipv6(ref addr) => {
++ log(debug, fmt!("addr: %?", addr));
++ let in_addr = uv::ll::ip6_addr(addr_str,
++ port as int);
++ uv::ll::tcp_connect6(
++ connect_req_ptr,
++ stream_handle_ptr,
++ ptr::addr_of(&in_addr),
++ tcp_connect_on_connect_cb)
++ }
++ };
++ match connect_result {
++ 0i32 => {
++ log(debug, ~"tcp_connect successful");
++ // reusable data that we'll have for the
++ // duration..
++ uv::ll::set_data_for_uv_handle(
++ stream_handle_ptr,
++ socket_data_ptr as
++ *libc::c_void);
++ // just so the connect_cb can send the
++ // outcome..
++ uv::ll::set_data_for_req(connect_req_ptr,
++ conn_data_ptr);
++ log(debug,
++ ~"leaving tcp_connect interact cb...");
++ // let tcp_connect_on_connect_cb send on
++ // the result_ch, now..
++ }
++ _ => {
++ // immediate connect
++ // failure.. probably a garbage ip or
++ // somesuch
++ let err_data =
++ uv::ll::get_last_err_data(loop_ptr);
++ oldcomm::send((*conn_data_ptr).result_ch,
++ ConnFailure(err_data));
++ uv::ll::set_data_for_uv_handle(
++ stream_handle_ptr,
++ conn_data_ptr);
++ uv::ll::close(stream_handle_ptr,
++ stream_error_close_cb);
++ }
++ }
++ }
++ _ => {
++ // failure to create a tcp handle
+ let err_data = uv::ll::get_last_err_data(loop_ptr);
+ oldcomm::send((*conn_data_ptr).result_ch,
- ConnFailure(err_data));
- uv::ll::set_data_for_uv_handle(stream_handle_ptr,
- conn_data_ptr);
- uv::ll::close(stream_handle_ptr,
- stream_error_close_cb);
- }
++ ConnFailure(err_data));
+ }
- }
- _ => {
- // failure to create a tcp handle
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send((*conn_data_ptr).result_ch,
- ConnFailure(err_data));
- }
+ }
}
- };
+ }
- _ => {
- // failure to create a tcp handle
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send((*conn_data_ptr).result_ch,
- ConnFailure(err_data.to_tcp_err()));
- }
+ match oldcomm::recv(result_po) {
- ConnSuccess => {
- log(debug, ~"tcp::connect - received success on result_po");
- result::Ok(TcpSocket(socket_data))
- }
- ConnFailure(ref err_data) => {
- oldcomm::recv(closed_signal_po);
- log(debug, ~"tcp::connect - received failure on result_po");
- // still have to free the malloc'd stream handle..
- rustrt::rust_uv_current_kernel_free(stream_handle_ptr
- as *libc::c_void);
- let tcp_conn_err = match err_data.err_name {
- ~"ECONNREFUSED" => ConnectionRefused,
- _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
- };
- result::Err(tcp_conn_err)
- }
++ ConnSuccess => {
++ log(debug, ~"tcp::connect - received success on result_po");
++ result::Ok(TcpSocket(socket_data))
++ }
++ ConnFailure(ref err_data) => {
++ oldcomm::recv(closed_signal_po);
++ log(debug, ~"tcp::connect - received failure on result_po");
++ // still have to free the malloc'd stream handle..
++ rustrt::rust_uv_current_kernel_free(stream_handle_ptr
++ as *libc::c_void);
++ let tcp_conn_err = match err_data.err_name {
++ ~"ECONNREFUSED" => ConnectionRefused,
++ _ => GenericConnectErr(err_data.err_name,
++ err_data.err_msg)
++ };
++ result::Err(tcp_conn_err)
++ }
}
- };
- match oldcomm::recv(result_po) {
- ConnSuccess => {
- log(debug, ~"tcp::connect - received success on result_po");
- result::Ok(TcpSocket(socket_data))
- }
- ConnFailure(ref err_data) => {
- oldcomm::recv(closed_signal_po);
- log(debug, ~"tcp::connect - received failure on result_po");
- // still have to free the malloc'd stream handle..
- rustrt::rust_uv_current_kernel_free(stream_handle_ptr
- as *libc::c_void);
- let tcp_conn_err = match err_data.err_name {
- ~"ECONNREFUSED" => ConnectionRefused,
- _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
- };
- result::Err(tcp_conn_err)
- }
}
}
* as the `err` variant of a `result`.
*/
pub fn accept(new_conn: TcpNewConnection)
- -> result::Result<TcpSocket, TcpErrData> unsafe {
-
- match new_conn{
- NewTcpConn(server_handle_ptr) => {
- let server_data_ptr = uv::ll::get_data_for_uv_handle(
- server_handle_ptr) as *TcpListenFcData;
- let reader_po = oldcomm::Port();
- let iotask = &(*server_data_ptr).iotask;
- let stream_handle_ptr = malloc_uv_tcp_t();
- *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
- let client_socket_data: @TcpSocketData = @{
- reader_po: reader_po,
- reader_ch: oldcomm::Chan(&reader_po),
- stream_handle_ptr : stream_handle_ptr,
- connect_req : uv::ll::connect_t(),
- write_req : uv::ll::write_t(),
- ipv6: (*server_data_ptr).ipv6,
- iotask : iotask.clone()
- };
- let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
- let client_stream_handle_ptr =
- (*client_socket_data_ptr).stream_handle_ptr;
-
- let result_po = oldcomm::Port::<Option<TcpErrData>>();
- let result_ch = oldcomm::Chan(&result_po);
-
- // UNSAFE LIBUV INTERACTION BEGIN
- // .. normally this happens within the context of
- // a call to uv::hl::interact.. but we're breaking
- // the rules here because this always has to be
- // called within the context of a listen() new_connect_cb
- // callback (or it will likely fail and drown your cat)
- log(debug, ~"in interact cb for tcp::accept");
- let loop_ptr = uv::ll::get_loop_for_uv_handle(
- server_handle_ptr);
- match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
- 0i32 => {
- log(debug, ~"uv_tcp_init successful for client stream");
- match uv::ll::accept(
- server_handle_ptr as *libc::c_void,
- client_stream_handle_ptr as *libc::c_void) {
- 0i32 => {
- log(debug, ~"successfully accepted client connection");
- uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
- client_socket_data_ptr
- as *libc::c_void);
- oldcomm::send(result_ch, None);
- }
- _ => {
- log(debug, ~"failed to accept client conn");
- oldcomm::send(result_ch, Some(
- uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
- }
+ -> result::Result<TcpSocket, TcpErrData> {
+ unsafe {
- match new_conn {
- NewTcpConn(server_handle_ptr) => {
- let server_data_ptr = uv::ll::get_data_for_uv_handle(
- server_handle_ptr) as *TcpListenFcData;
- let reader_po = oldcomm::Port();
- let iotask = (*server_data_ptr).iotask;
- let stream_handle_ptr = malloc_uv_tcp_t();
- *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
- let client_socket_data = @TcpSocketData {
- reader_po: reader_po,
- reader_ch: oldcomm::Chan(&reader_po),
- stream_handle_ptr : stream_handle_ptr,
- connect_req : uv::ll::connect_t(),
- write_req : uv::ll::write_t(),
- ipv6: (*server_data_ptr).ipv6,
- iotask : iotask
- };
- let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
- let client_stream_handle_ptr =
- (*client_socket_data_ptr).stream_handle_ptr;
-
- let result_po = oldcomm::Port::<Option<TcpErrData>>();
- let result_ch = oldcomm::Chan(&result_po);
-
- // UNSAFE LIBUV INTERACTION BEGIN
- // .. normally this happens within the context of
- // a call to uv::hl::interact.. but we're breaking
- // the rules here because this always has to be
- // called within the context of a listen() new_connect_cb
- // callback (or it will likely fail and drown your cat)
- log(debug, ~"in interact cb for tcp::accept");
- let loop_ptr = uv::ll::get_loop_for_uv_handle(
- server_handle_ptr);
- match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
- 0i32 => {
- log(debug, ~"uv_tcp_init successful for client stream");
- match uv::ll::accept(
- server_handle_ptr as *libc::c_void,
- client_stream_handle_ptr as *libc::c_void) {
- 0i32 => {
- log(debug, ~"successfully accepted client connection");
- uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
- client_socket_data_ptr
- as *libc::c_void);
- oldcomm::send(result_ch, None);
- }
- _ => {
- log(debug, ~"failed to accept client conn");
- oldcomm::send(result_ch, Some(
- uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
- }
++ match new_conn{
++ NewTcpConn(server_handle_ptr) => {
++ let server_data_ptr = uv::ll::get_data_for_uv_handle(
++ server_handle_ptr) as *TcpListenFcData;
++ let reader_po = oldcomm::Port();
++ let iotask = &(*server_data_ptr).iotask;
++ let stream_handle_ptr = malloc_uv_tcp_t();
++ *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
++ uv::ll::tcp_t();
++ let client_socket_data: @TcpSocketData = @TcpSocketData {
++ reader_po: reader_po,
++ reader_ch: oldcomm::Chan(&reader_po),
++ stream_handle_ptr : stream_handle_ptr,
++ connect_req : uv::ll::connect_t(),
++ write_req : uv::ll::write_t(),
++ ipv6: (*server_data_ptr).ipv6,
++ iotask : iotask.clone()
++ };
++ let client_socket_data_ptr = ptr::addr_of(
++ &(*client_socket_data));
++ let client_stream_handle_ptr =
++ (*client_socket_data_ptr).stream_handle_ptr;
++
++ let result_po = oldcomm::Port::<Option<TcpErrData>>();
++ let result_ch = oldcomm::Chan(&result_po);
++
++ // UNSAFE LIBUV INTERACTION BEGIN
++ // .. normally this happens within the context of
++ // a call to uv::hl::interact.. but we're breaking
++ // the rules here because this always has to be
++ // called within the context of a listen() new_connect_cb
++ // callback (or it will likely fail and drown your cat)
++ log(debug, ~"in interact cb for tcp::accept");
++ let loop_ptr = uv::ll::get_loop_for_uv_handle(
++ server_handle_ptr);
++ match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
++ 0i32 => {
++ log(debug, ~"uv_tcp_init successful for \
++ client stream");
++ match uv::ll::accept(
++ server_handle_ptr as *libc::c_void,
++ client_stream_handle_ptr as *libc::c_void) {
++ 0i32 => {
++ log(debug,
++ ~"successfully accepted client \
++ connection");
++ uv::ll::set_data_for_uv_handle(
++ client_stream_handle_ptr,
++ client_socket_data_ptr
++ as *libc::c_void);
++ oldcomm::send(result_ch, None);
++ }
++ _ => {
++ log(debug, ~"failed to accept client conn");
++ oldcomm::send(result_ch, Some(
++ uv::ll::get_last_err_data(
++ loop_ptr).to_tcp_err()));
++ }
++ }
++ }
++ _ => {
++ log(debug, ~"failed to accept client stream");
++ oldcomm::send(result_ch, Some(
++ uv::ll::get_last_err_data(
++ loop_ptr).to_tcp_err()));
++ }
++ }
++ // UNSAFE LIBUV INTERACTION END
++ match oldcomm::recv(result_po) {
++ Some(copy err_data) => result::Err(err_data),
++ None => result::Ok(TcpSocket(client_socket_data))
+ }
- }
- _ => {
- log(debug, ~"failed to init client stream");
- oldcomm::send(result_ch, Some(
- uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
- }
- }
- // UNSAFE LIBUV INTERACTION END
- match oldcomm::recv(result_po) {
- Some(copy err_data) => result::Err(err_data),
- None => result::Ok(TcpSocket(client_socket_data))
}
- }
- _ => {
- log(debug, ~"failed to init client stream");
- oldcomm::send(result_ch, Some(
- uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
-- }
- }
- // UNSAFE LIBUV INTERACTION END
- match oldcomm::recv(result_po) {
- Some(copy err_data) => result::Err(err_data),
- None => result::Ok(TcpSocket(client_socket_data))
}
- }
}
}
* of listen exiting because of an error
*/
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
- iotask: IoTask,
- on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
- new_connect_cb: fn~(TcpNewConnection,
- oldcomm::Chan<Option<TcpErrData>>))
+ iotask: &IoTask,
+ on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+ new_connect_cb: fn~(TcpNewConnection,
+ oldcomm::Chan<Option<TcpErrData>>))
- -> result::Result<(), TcpListenErrData> unsafe {
+ -> result::Result<(), TcpListenErrData> {
- unsafe {
- do listen_common(move host_ip, port, backlog, iotask,
- move on_establish_cb)
- // on_connect_cb
- |move new_connect_cb, handle| {
- unsafe {
- let server_data_ptr =
- uv::ll::get_data_for_uv_handle(handle)
- as *TcpListenFcData;
- let new_conn = NewTcpConn(handle);
- let kill_ch = (*server_data_ptr).kill_ch;
- new_connect_cb(new_conn, kill_ch);
- }
- }
+ do listen_common(move host_ip, port, backlog, iotask,
+ move on_establish_cb)
+ // on_connect_cb
- |move new_connect_cb, handle| unsafe {
++ |move new_connect_cb, handle| {
++ unsafe {
+ let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
+ as *TcpListenFcData;
+ let new_conn = NewTcpConn(handle);
+ let kill_ch = (*server_data_ptr).kill_ch;
+ new_connect_cb(new_conn, kill_ch);
++ }
}
}
fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
- iotask: IoTask,
+ iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
on_connect_cb: fn~(*uv::ll::uv_tcp_t))
- -> result::Result<(), TcpListenErrData> unsafe {
- let stream_closed_po = oldcomm::Port::<()>();
- let kill_po = oldcomm::Port::<Option<TcpErrData>>();
- let kill_ch = oldcomm::Chan(&kill_po);
- let server_stream = uv::ll::tcp_t();
- let server_stream_ptr = ptr::addr_of(&server_stream);
- let server_data: TcpListenFcData = {
- server_stream_ptr: server_stream_ptr,
- stream_closed_ch: oldcomm::Chan(&stream_closed_po),
- kill_ch: kill_ch,
- on_connect_cb: move on_connect_cb,
- iotask: iotask.clone(),
- ipv6: match &host_ip {
- &ip::Ipv4(_) => { false }
- &ip::Ipv6(_) => { true }
- },
- mut active: true
- };
- let server_data_ptr = ptr::addr_of(&server_data);
-
- let setup_result = do oldcomm::listen |setup_ch| {
- // this is to address a compiler warning about
- // an implicit copy.. it seems that double nested
- // will defeat a move sigil, as is done to the host_ip
- // arg above.. this same pattern works w/o complaint in
- // tcp::connect (because the iotask::interact cb isn't
- // nested within a core::comm::listen block)
- let loc_ip = copy(host_ip);
- do iotask::interact(iotask) |move loc_ip, loop_ptr| unsafe {
- match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
- 0i32 => {
- uv::ll::set_data_for_uv_handle(
- server_stream_ptr,
- server_data_ptr);
- let addr_str = ip::format_addr(&loc_ip);
- let bind_result = match loc_ip {
- ip::Ipv4(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip4_addr(addr_str, port as int);
- uv::ll::tcp_bind(server_stream_ptr,
- ptr::addr_of(&in_addr))
- }
- ip::Ipv6(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip6_addr(addr_str, port as int);
- uv::ll::tcp_bind6(server_stream_ptr,
- ptr::addr_of(&in_addr))
- }
- };
- match bind_result {
- 0i32 => {
- match uv::ll::listen(server_stream_ptr,
- backlog as libc::c_int,
- tcp_lfc_on_connection_cb) {
- 0i32 => oldcomm::send(setup_ch, None),
- _ => {
- log(debug, ~"failure to uv_listen()");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(setup_ch, Some(err_data));
- }
+ -> result::Result<(), TcpListenErrData> {
+ unsafe {
+ let stream_closed_po = oldcomm::Port::<()>();
+ let kill_po = oldcomm::Port::<Option<TcpErrData>>();
+ let kill_ch = oldcomm::Chan(&kill_po);
+ let server_stream = uv::ll::tcp_t();
+ let server_stream_ptr = ptr::addr_of(&server_stream);
- let server_data = {
++ let server_data: TcpListenFcData = TcpListenFcData {
+ server_stream_ptr: server_stream_ptr,
+ stream_closed_ch: oldcomm::Chan(&stream_closed_po),
+ kill_ch: kill_ch,
+ on_connect_cb: move on_connect_cb,
- iotask: iotask,
++ iotask: iotask.clone(),
+ ipv6: match &host_ip {
+ &ip::Ipv4(_) => { false }
+ &ip::Ipv6(_) => { true }
+ },
+ mut active: true
+ };
+ let server_data_ptr = ptr::addr_of(&server_data);
+
+ let setup_result = do oldcomm::listen |setup_ch| {
+ // this is to address a compiler warning about
+ // an implicit copy.. it seems that double nested
+ // will defeat a move sigil, as is done to the host_ip
+ // arg above.. this same pattern works w/o complaint in
+ // tcp::connect (because the iotask::interact cb isn't
+ // nested within a core::comm::listen block)
+ let loc_ip = copy(host_ip);
+ do iotask::interact(iotask) |move loc_ip, loop_ptr| {
+ unsafe {
+ match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
- 0i32 => {
- uv::ll::set_data_for_uv_handle(
- server_stream_ptr,
- server_data_ptr);
- let addr_str = ip::format_addr(&loc_ip);
- let bind_result = match loc_ip {
- ip::Ipv4(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip4_addr(addr_str,
- port as int);
- uv::ll::tcp_bind(server_stream_ptr,
- ptr::addr_of(&in_addr))
- }
- ip::Ipv6(ref addr) => {
- log(debug, fmt!("addr: %?", addr));
- let in_addr = uv::ll::ip6_addr(addr_str,
- port as int);
- uv::ll::tcp_bind6(server_stream_ptr,
- ptr::addr_of(&in_addr))
- }
- };
- match bind_result {
- 0i32 => {
- match uv::ll::listen(server_stream_ptr,
- backlog as libc::c_int,
- tcp_lfc_on_connection_cb) {
- 0i32 => oldcomm::send(setup_ch, None),
- _ => {
- log(debug, ~"failure to uv_listen()");
- let err_data = uv::ll::get_last_err_data(
- loop_ptr);
- oldcomm::send(setup_ch, Some(err_data));
- }
++ 0i32 => {
++ uv::ll::set_data_for_uv_handle(
++ server_stream_ptr,
++ server_data_ptr);
++ let addr_str = ip::format_addr(&loc_ip);
++ let bind_result = match loc_ip {
++ ip::Ipv4(ref addr) => {
++ log(debug, fmt!("addr: %?", addr));
++ let in_addr = uv::ll::ip4_addr(
++ addr_str,
++ port as int);
++ uv::ll::tcp_bind(server_stream_ptr,
++ ptr::addr_of(&in_addr))
++ }
++ ip::Ipv6(ref addr) => {
++ log(debug, fmt!("addr: %?", addr));
++ let in_addr = uv::ll::ip6_addr(
++ addr_str,
++ port as int);
++ uv::ll::tcp_bind6(server_stream_ptr,
++ ptr::addr_of(&in_addr))
++ }
++ };
++ match bind_result {
++ 0i32 => {
++ match uv::ll::listen(
++ server_stream_ptr,
++ backlog as libc::c_int,
++ tcp_lfc_on_connection_cb) {
++ 0i32 => oldcomm::send(setup_ch, None),
++ _ => {
++ log(debug,
++ ~"failure to uv_tcp_init");
++ let err_data =
++ uv::ll::get_last_err_data(
++ loop_ptr);
++ oldcomm::send(setup_ch,
++ Some(err_data));
++ }
++ }
++ }
++ _ => {
++ log(debug, ~"failure to uv_tcp_bind");
++ let err_data = uv::ll::get_last_err_data(
++ loop_ptr);
++ oldcomm::send(setup_ch, Some(err_data));
++ }
+ }
- }
- _ => {
++ }
++ _ => {
+ log(debug, ~"failure to uv_tcp_bind");
+ let err_data = uv::ll::get_last_err_data(
+ loop_ptr);
+ oldcomm::send(setup_ch, Some(err_data));
- }
+ }
- }
- _ => {
- log(debug, ~"failure to uv_tcp_init");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(setup_ch, Some(err_data));
- }
}
- };
- }
- _ => {
- log(debug, ~"failure to uv_tcp_bind");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(setup_ch, Some(err_data));
- }
+ }
- }
- _ => {
- log(debug, ~"failure to uv_tcp_init");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(setup_ch, Some(err_data));
- }
}
+ setup_ch.recv()
};
- setup_ch.recv()
- };
- match setup_result {
- Some(ref err_data) => {
- do iotask::interact(iotask) |loop_ptr| unsafe {
- log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
- loop_ptr));
- (*server_data_ptr).active = false;
- uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
- };
- stream_closed_po.recv();
- match err_data.err_name {
- ~"EACCES" => {
- log(debug, ~"Got EACCES error");
- result::Err(AccessDenied)
- }
- ~"EADDRINUSE" => {
- log(debug, ~"Got EADDRINUSE error");
- result::Err(AddressInUse)
- }
- _ => {
- log(debug, fmt!("Got '%s' '%s' libuv error",
- err_data.err_name, err_data.err_msg));
- result::Err(
- GenericListenErr(err_data.err_name, err_data.err_msg))
- }
- }
- }
- None => {
- on_establish_cb(kill_ch);
- let kill_result = oldcomm::recv(kill_po);
- do iotask::interact(iotask) |loop_ptr| unsafe {
- log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
- loop_ptr));
- (*server_data_ptr).active = false;
- uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
- };
- stream_closed_po.recv();
- match kill_result {
- // some failure post bind/listen
- Some(ref err_data) => result::Err(GenericListenErr(
- err_data.err_name,
- err_data.err_msg)),
- // clean exit
- None => result::Ok(())
+ match setup_result {
- Some(ref err_data) => {
- do iotask::interact(iotask) |loop_ptr| {
- unsafe {
- log(debug,
- fmt!("tcp::listen post-kill recv hl interact %?",
- loop_ptr));
- (*server_data_ptr).active = false;
- uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++ Some(ref err_data) => {
++ do iotask::interact(iotask) |loop_ptr| {
++ unsafe {
++ log(debug,
++ fmt!("tcp::listen post-kill recv hl interact %?",
++ loop_ptr));
++ (*server_data_ptr).active = false;
++ uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++ }
++ };
++ stream_closed_po.recv();
++ match err_data.err_name {
++ ~"EACCES" => {
++ log(debug, ~"Got EACCES error");
++ result::Err(AccessDenied)
++ }
++ ~"EADDRINUSE" => {
++ log(debug, ~"Got EADDRINUSE error");
++ result::Err(AddressInUse)
++ }
++ _ => {
++ log(debug, fmt!("Got '%s' '%s' libuv error",
++ err_data.err_name, err_data.err_msg));
++ result::Err(
++ GenericListenErr(err_data.err_name,
++ err_data.err_msg))
++ }
+ }
- };
- stream_closed_po.recv();
- match err_data.err_name {
- ~"EACCES" => {
- log(debug, ~"Got EACCES error");
- result::Err(AccessDenied)
- }
- ~"EADDRINUSE" => {
- log(debug, ~"Got EADDRINUSE error");
- result::Err(AddressInUse)
- }
- _ => {
- log(debug, fmt!("Got '%s' '%s' libuv error",
- err_data.err_name, err_data.err_msg));
- result::Err(
- GenericListenErr(err_data.err_name, err_data.err_msg))
- }
+ }
- }
- None => {
- on_establish_cb(kill_ch);
- let kill_result = oldcomm::recv(kill_po);
- do iotask::interact(iotask) |loop_ptr| {
- unsafe {
- log(debug,
- fmt!("tcp::listen post-kill recv hl interact %?",
- loop_ptr));
- (*server_data_ptr).active = false;
- uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++ None => {
++ on_establish_cb(kill_ch);
++ let kill_result = oldcomm::recv(kill_po);
++ do iotask::interact(iotask) |loop_ptr| {
++ unsafe {
++ log(debug,
++ fmt!("tcp::listen post-kill recv hl interact %?",
++ loop_ptr));
++ (*server_data_ptr).active = false;
++ uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
++ }
++ };
++ stream_closed_po.recv();
++ match kill_result {
++ // some failure post bind/listen
++ Some(ref err_data) => result::Err(GenericListenErr(
++ err_data.err_name,
++ err_data.err_msg)),
++ // clean exit
++ None => result::Ok(())
+ }
- };
- stream_closed_po.recv();
- match kill_result {
- // some failure post bind/listen
- Some(ref err_data) => result::Err(GenericListenErr(
- err_data.err_name,
- err_data.err_msg)),
- // clean exit
- None => result::Ok(())
+ }
- }
}
- }
}
}
++
/**
* Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
*
// INTERNAL API
-fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
- let closed_po = oldcomm::Port::<()>();
- let closed_ch = oldcomm::Chan(&closed_po);
- let close_data = {
- closed_ch: closed_ch
- };
- let close_data_ptr = ptr::addr_of(&close_data);
- let stream_handle_ptr = (*socket_data).stream_handle_ptr;
- do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
- log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
- stream_handle_ptr, loop_ptr));
- uv::ll::set_data_for_uv_handle(stream_handle_ptr,
- close_data_ptr);
- uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
- };
- oldcomm::recv(closed_po);
- //the line below will most likely crash
- //log(debug, fmt!("about to free socket_data at %?", socket_data));
- rustrt::rust_uv_current_kernel_free(stream_handle_ptr
- as *libc::c_void);
- log(debug, ~"exiting dtor for tcp_socket");
+fn tear_down_socket_data(socket_data: @TcpSocketData) {
+ unsafe {
+ let closed_po = oldcomm::Port::<()>();
+ let closed_ch = oldcomm::Chan(&closed_po);
+ let close_data = {
+ closed_ch: closed_ch
+ };
+ let close_data_ptr = ptr::addr_of(&close_data);
+ let stream_handle_ptr = (*socket_data).stream_handle_ptr;
- do iotask::interact((*socket_data).iotask) |loop_ptr| {
++ do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
+ unsafe {
+ log(debug,
+ fmt!("interact dtor for tcp_socket stream %? loop %?",
- stream_handle_ptr, loop_ptr));
++ stream_handle_ptr, loop_ptr));
+ uv::ll::set_data_for_uv_handle(stream_handle_ptr,
+ close_data_ptr);
+ uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
+ }
+ };
+ oldcomm::recv(closed_po);
+ //the line below will most likely crash
+ //log(debug, fmt!("about to free socket_data at %?", socket_data));
+ rustrt::rust_uv_current_kernel_free(stream_handle_ptr
- as *libc::c_void);
++ as *libc::c_void);
+ log(debug, ~"exiting dtor for tcp_socket");
+ }
}
// shared implementation for tcp::read
fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
- -> result::Result<~[u8],TcpErrData> unsafe {
- use timer;
-
- log(debug, ~"starting tcp::read");
- let iotask = &(*socket_data).iotask;
- let rs_result = read_start_common_impl(socket_data);
- if result::is_err(&rs_result) {
- let err_data = result::get_err(&rs_result);
- result::Err(err_data)
- }
- else {
- log(debug, ~"tcp::read before recv_timeout");
- let read_result = if timeout_msecs > 0u {
- timer::recv_timeout(
- iotask, timeout_msecs, result::get(&rs_result))
- } else {
- Some(oldcomm::recv(result::get(&rs_result)))
- };
- log(debug, ~"tcp::read after recv_timeout");
- match move read_result {
- None => {
- log(debug, ~"tcp::read: timed out..");
- let err_data = {
- err_name: ~"TIMEOUT",
- err_msg: ~"req timed out"
- };
- read_stop_common_impl(socket_data);
+ -> result::Result<~[u8],TcpErrData> {
+ unsafe {
+ use timer;
+
+ log(debug, ~"starting tcp::read");
- let iotask = (*socket_data).iotask;
++ let iotask = &(*socket_data).iotask;
+ let rs_result = read_start_common_impl(socket_data);
+ if result::is_err(&rs_result) {
+ let err_data = result::get_err(&rs_result);
result::Err(err_data)
- }
- Some(move data_result) => {
- log(debug, ~"tcp::read got data");
- read_stop_common_impl(socket_data);
- data_result
- }
+ }
+ else {
+ log(debug, ~"tcp::read before recv_timeout");
+ let read_result = if timeout_msecs > 0u {
+ timer::recv_timeout(
- iotask, timeout_msecs, result::get(&rs_result))
++ iotask, timeout_msecs, result::get(&rs_result))
+ } else {
+ Some(oldcomm::recv(result::get(&rs_result)))
+ };
+ log(debug, ~"tcp::read after recv_timeout");
+ match move read_result {
- None => {
- log(debug, ~"tcp::read: timed out..");
- let err_data = TcpErrData {
- err_name: ~"TIMEOUT",
- err_msg: ~"req timed out"
- };
- read_stop_common_impl(socket_data);
- result::Err(err_data)
- }
- Some(move data_result) => {
- log(debug, ~"tcp::read got data");
- read_stop_common_impl(socket_data);
- data_result
- }
++ None => {
++ log(debug, ~"tcp::read: timed out..");
++ let err_data = TcpErrData {
++ err_name: ~"TIMEOUT",
++ err_msg: ~"req timed out"
++ };
++ read_stop_common_impl(socket_data);
++ result::Err(err_data)
++ }
++ Some(move data_result) => {
++ log(debug, ~"tcp::read got data");
++ read_stop_common_impl(socket_data);
++ data_result
++ }
+ }
}
}
}
// shared impl for read_stop
fn read_stop_common_impl(socket_data: *TcpSocketData) ->
- result::Result<(), TcpErrData> unsafe {
- let stream_handle_ptr = (*socket_data).stream_handle_ptr;
- let stop_po = oldcomm::Port::<Option<TcpErrData>>();
- let stop_ch = oldcomm::Chan(&stop_po);
- do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
- log(debug, ~"in interact cb for tcp::read_stop");
- match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
- 0i32 => {
- log(debug, ~"successfully called uv_read_stop");
- oldcomm::send(stop_ch, None);
- }
- _ => {
- log(debug, ~"failure in calling uv_read_stop");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
- }
+ result::Result<(), TcpErrData> {
+ unsafe {
+ let stream_handle_ptr = (*socket_data).stream_handle_ptr;
+ let stop_po = oldcomm::Port::<Option<TcpErrData>>();
+ let stop_ch = oldcomm::Chan(&stop_po);
- do iotask::interact((*socket_data).iotask) |loop_ptr| {
++ do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
+ unsafe {
+ log(debug, ~"in interact cb for tcp::read_stop");
- match uv::ll::read_stop(stream_handle_ptr as
- *uv::ll::uv_stream_t) {
- 0i32 => {
- log(debug, ~"successfully called uv_read_stop");
- oldcomm::send(stop_ch, None);
- }
- _ => {
- log(debug, ~"failure in calling uv_read_stop");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
- }
++ match uv::ll::read_stop(stream_handle_ptr
++ as *uv::ll::uv_stream_t) {
++ 0i32 => {
++ log(debug, ~"successfully called uv_read_stop");
++ oldcomm::send(stop_ch, None);
++ }
++ _ => {
++ log(debug, ~"failure in calling uv_read_stop");
++ let err_data = uv::ll::get_last_err_data(loop_ptr);
++ oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
++ }
+ }
+ }
- };
-
++ }
+ match oldcomm::recv(stop_po) {
- Some(move err_data) => Err(err_data),
- None => Ok(())
++ Some(move err_data) => Err(err_data),
++ None => Ok(())
}
- };
- match oldcomm::recv(stop_po) {
- Some(ref err_data) => result::Err(err_data.to_tcp_err()),
- None => result::Ok(())
}
}
// shared impl for read_start
fn read_start_common_impl(socket_data: *TcpSocketData)
-> result::Result<oldcomm::Port<
- result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
- let stream_handle_ptr = (*socket_data).stream_handle_ptr;
- let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
- let start_ch = oldcomm::Chan(&start_po);
- log(debug, ~"in tcp::read_start before interact loop");
- do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
- log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
- match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
- on_alloc_cb,
- on_tcp_read_cb) {
- 0i32 => {
- log(debug, ~"success doing uv_read_start");
- oldcomm::send(start_ch, None);
- }
- _ => {
- log(debug, ~"error attempting uv_read_start");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(start_ch, Some(err_data));
- }
+ result::Result<~[u8], TcpErrData>>, TcpErrData> {
+ unsafe {
+ let stream_handle_ptr = (*socket_data).stream_handle_ptr;
+ let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
+ let start_ch = oldcomm::Chan(&start_po);
+ log(debug, ~"in tcp::read_start before interact loop");
- do iotask::interact((*socket_data).iotask) |loop_ptr| {
++ do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
+ unsafe {
- log(debug,
- fmt!("in tcp::read_start interact cb %?", loop_ptr));
- match uv::ll::read_start(stream_handle_ptr as
- *uv::ll::uv_stream_t,
++ log(debug, fmt!("in tcp::read_start interact cb %?",
++ loop_ptr));
++ match uv::ll::read_start(stream_handle_ptr
++ as *uv::ll::uv_stream_t,
+ on_alloc_cb,
+ on_tcp_read_cb) {
- 0i32 => {
- log(debug, ~"success doing uv_read_start");
- oldcomm::send(start_ch, None);
- }
- _ => {
- log(debug, ~"error attempting uv_read_start");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send(start_ch, Some(err_data));
- }
++ 0i32 => {
++ log(debug, ~"success doing uv_read_start");
++ oldcomm::send(start_ch, None);
++ }
++ _ => {
++ log(debug, ~"error attempting uv_read_start");
++ let err_data = uv::ll::get_last_err_data(loop_ptr);
++ oldcomm::send(start_ch, Some(err_data));
++ }
+ }
+ }
- };
++ }
+ match oldcomm::recv(start_po) {
- Some(ref err_data) => result::Err(err_data.to_tcp_err()),
- None => result::Ok((*socket_data).reader_po)
++ Some(ref err_data) => result::Err(err_data.to_tcp_err()),
++ None => result::Ok((*socket_data).reader_po)
}
- };
- match oldcomm::recv(start_po) {
- Some(ref err_data) => result::Err(err_data.to_tcp_err()),
- None => result::Ok((*socket_data).reader_po)
}
}
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *TcpSocketData,
raw_write_data: ~[u8])
- -> result::Result<(), TcpErrData> unsafe {
- let write_req_ptr = ptr::addr_of(&((*socket_data_ptr).write_req));
- let stream_handle_ptr =
- (*socket_data_ptr).stream_handle_ptr;
- let write_buf_vec = ~[ uv::ll::buf_init(
- vec::raw::to_ptr(raw_write_data),
- vec::len(raw_write_data)) ];
- let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
- let result_po = oldcomm::Port::<TcpWriteResult>();
- let write_data = {
- result_ch: oldcomm::Chan(&result_po)
- };
- let write_data_ptr = ptr::addr_of(&write_data);
- do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe {
- log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
- match uv::ll::write(write_req_ptr,
- stream_handle_ptr,
- write_buf_vec_ptr,
- tcp_write_complete_cb) {
- 0i32 => {
- log(debug, ~"uv_write() invoked successfully");
- uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
- }
- _ => {
- log(debug, ~"error invoking uv_write()");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send((*write_data_ptr).result_ch,
- TcpWriteError(err_data.to_tcp_err()));
- }
+ -> result::Result<(), TcpErrData> {
+ unsafe {
+ let write_req_ptr = ptr::addr_of(&((*socket_data_ptr).write_req));
+ let stream_handle_ptr =
+ (*socket_data_ptr).stream_handle_ptr;
+ let write_buf_vec = ~[ uv::ll::buf_init(
+ vec::raw::to_ptr(raw_write_data),
+ vec::len(raw_write_data)) ];
+ let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
+ let result_po = oldcomm::Port::<TcpWriteResult>();
+ let write_data = {
+ result_ch: oldcomm::Chan(&result_po)
+ };
+ let write_data_ptr = ptr::addr_of(&write_data);
- do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
++ do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
+ unsafe {
+ log(debug, fmt!("in interact cb for tcp::write %?",
+ loop_ptr));
+ match uv::ll::write(write_req_ptr,
- stream_handle_ptr,
- write_buf_vec_ptr,
- tcp_write_complete_cb) {
- 0i32 => {
- log(debug, ~"uv_write() invoked successfully");
- uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
- }
- _ => {
- log(debug, ~"error invoking uv_write()");
- let err_data = uv::ll::get_last_err_data(loop_ptr);
- oldcomm::send((*write_data_ptr).result_ch,
- TcpWriteError(err_data.to_tcp_err()));
- }
++ stream_handle_ptr,
++ write_buf_vec_ptr,
++ tcp_write_complete_cb) {
++ 0i32 => {
++ log(debug, ~"uv_write() invoked successfully");
++ uv::ll::set_data_for_req(write_req_ptr,
++ write_data_ptr);
++ }
++ _ => {
++ log(debug, ~"error invoking uv_write()");
++ let err_data = uv::ll::get_last_err_data(loop_ptr);
++ oldcomm::send((*write_data_ptr).result_ch,
++ TcpWriteError(err_data.to_tcp_err()));
++ }
+ }
+ }
- };
++ }
+ // FIXME (#2656): Instead of passing unsafe pointers to local data,
+ // and waiting here for the write to complete, we should transfer
+ // ownership of everything to the I/O task and let it deal with the
+ // aftermath, so we don't have to sit here blocking.
+ match oldcomm::recv(result_po) {
+ TcpWriteSuccess => Ok(()),
+ TcpWriteError(move err_data) => Err(err_data)
}
- };
- // FIXME (#2656): Instead of passing unsafe pointers to local data,
- // and waiting here for the write to complete, we should transfer
- // ownership of everything to the I/O task and let it deal with the
- // aftermath, so we don't have to sit here blocking.
- match oldcomm::recv(result_po) {
- TcpWriteSuccess => result::Ok(()),
- TcpWriteError(ref err_data) => result::Err(err_data.to_tcp_err())
}
}
}
#[doc(hidden)]
-fn get_monitor_task_gl() -> IoTask unsafe {
+fn get_monitor_task_gl() -> IoTask {
- unsafe {
- let monitor_loop_chan_ptr =
- rustrt::rust_uv_get_kernel_global_chan_ptr();
-
- debug!("ENTERING global_loop::get() loop chan: %?",
- monitor_loop_chan_ptr);
-
- debug!("before priv::chan_from_global_ptr");
- type MonChan = Chan<IoTask>;
-
- let monitor_ch =
- do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
- || {
- task::task().sched_mode
- (task::SingleThreaded)
- .unlinked()
- }) |msg_po| {
- unsafe {
- debug!("global monitor task starting");
-
- // As a weak task the runtime will notify us when to exit
- do weaken_task() |weak_exit_po| {
- debug!("global monitor task is now weak");
- let hl_loop = spawn_loop();
- loop {
- debug!("in outer_loop...");
- match select2(weak_exit_po, msg_po) {
- Left(weak_exit) => {
- // all normal tasks have ended, tell the
- // libuv loop to tear_down, then exit
- debug!("weak_exit_po recv'd msg: %?", weak_exit);
- iotask::exit(hl_loop);
- break;
- }
- Right(fetch_ch) => {
- debug!("hl_loop req recv'd: %?", fetch_ch);
- fetch_ch.send(hl_loop);
- }
- }
+
+ type MonChan = Chan<IoTask>;
+
+ struct GlobalIoTask(IoTask);
+
+ impl GlobalIoTask: Clone {
+ fn clone(&self) -> GlobalIoTask {
+ GlobalIoTask((**self).clone())
+ }
+ }
+
+ fn key(_: GlobalIoTask) { }
+
- match global_data_clone(key) {
++ match unsafe { global_data_clone(key) } {
+ Some(GlobalIoTask(iotask)) => iotask,
+ None => {
+ let iotask: IoTask = spawn_loop();
+ let mut installed = false;
- let final_iotask = do global_data_clone_create(key) {
- installed = true;
- ~GlobalIoTask(iotask.clone())
++ let final_iotask = unsafe {
++ do global_data_clone_create(key) {
++ installed = true;
++ ~GlobalIoTask(iotask.clone())
++ }
+ };
+ if installed {
- do task().unlinked().spawn() unsafe {
- debug!("global monitor task starting");
- // As a weak task the runtime will notify us when to exit
- do weaken_task |weak_exit_po| {
- debug!("global monitor task is weak");
- weak_exit_po.recv();
- iotask::exit(&iotask);
- debug!("global monitor task is unweak");
- };
- debug!("global monitor task exiting");
++ do task().unlinked().spawn() {
++ unsafe {
++ debug!("global monitor task starting");
++ // As a weak task the runtime will notify us
++ // when to exit
++ do weaken_task |weak_exit_po| {
++ debug!("global monitor task is weak");
++ weak_exit_po.recv();
++ iotask::exit(&iotask);
++ debug!("global monitor task is unweak");
++ };
++ debug!("global monitor task exiting");
+ }
- debug!("global monitor task is leaving weakend state");
- };
- debug!("global monitor task exiting");
+ }
+ } else {
+ iotask::exit(&iotask);
}
- };
- // once we have a chan to the monitor loop, we ask it for
- // the libuv loop's async handle
- do listen |fetch_ch| {
- monitor_ch.send(fetch_ch);
- fetch_ch.recv()
+ match final_iotask {
+ GlobalIoTask(iotask) => iotask
+ }
}
}
}
use core::iter;
use core::libc;
- use core::oldcomm;
use core::ptr;
use core::task;
+ use core::cast::transmute;
+ use core::libc::c_void;
+ use core::pipes::{stream, SharedChan, Chan};
- extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe {
- let exit_ch_ptr = ll::get_data_for_uv_handle(
- timer_ptr as *libc::c_void);
- let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
- exit_ch.send(true);
- log(debug, fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
- exit_ch_ptr));
+ extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
+ unsafe {
+ let exit_ch_ptr = ll::get_data_for_uv_handle(
- timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>;
- let exit_ch = *exit_ch_ptr;
- oldcomm::send(exit_ch, true);
++ timer_ptr as *libc::c_void);
++ let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
++ exit_ch.send(true);
+ log(debug,
+ fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
+ exit_ch_ptr));
+ }
}
extern fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
- _status: libc::c_int) unsafe {
- log(debug, ~"in simple timer cb");
- ll::timer_stop(timer_ptr);
- let hl_loop = &get_gl();
- do iotask::interact(hl_loop) |_loop_ptr| unsafe {
- log(debug, ~"closing timer");
- ll::close(timer_ptr, simple_timer_close_cb);
- log(debug, ~"about to deref exit_ch_ptr");
- log(debug, ~"after msg sent on deref'd exit_ch");
- };
- log(debug, ~"exiting simple timer cb");
+ _status: libc::c_int) {
+ unsafe {
+ log(debug, ~"in simple timer cb");
+ ll::timer_stop(timer_ptr);
- let hl_loop = get_gl();
++ let hl_loop = &get_gl();
+ do iotask::interact(hl_loop) |_loop_ptr| {
++ log(debug, ~"closing timer");
+ unsafe {
- log(debug, ~"closing timer");
+ ll::close(timer_ptr, simple_timer_close_cb);
- log(debug, ~"about to deref exit_ch_ptr");
- log(debug, ~"after msg sent on deref'd exit_ch");
+ }
++ log(debug, ~"about to deref exit_ch_ptr");
++ log(debug, ~"after msg sent on deref'd exit_ch");
+ };
+ log(debug, ~"exiting simple timer cb");
+ }
}
- fn impl_uv_hl_simple_timer(iotask: IoTask) {
- fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe {
- let (exit_po, exit_ch) = stream::<bool>();
- let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
- log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
- exit_ch_ptr));
- let timer_handle = ll::timer_t();
- let timer_ptr = ptr::addr_of(&timer_handle);
- do iotask::interact(iotask) |loop_ptr| unsafe {
- log(debug, ~"user code inside interact loop!!!");
- let init_status = ll::timer_init(loop_ptr, timer_ptr);
- if(init_status == 0i32) {
- ll::set_data_for_uv_handle(
- timer_ptr as *libc::c_void,
- exit_ch_ptr);
- let start_status = ll::timer_start(timer_ptr, simple_timer_cb,
- 1u, 0u);
- if(start_status == 0i32) {
++ fn impl_uv_hl_simple_timer(iotask: &IoTask) {
+ unsafe {
- let exit_po = oldcomm::Port::<bool>();
- let exit_ch = oldcomm::Chan(&exit_po);
- let exit_ch_ptr = ptr::addr_of(&exit_ch);
++ let (exit_po, exit_ch) = stream::<bool>();
++ let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
+ log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
- exit_ch_ptr));
++ exit_ch_ptr));
+ let timer_handle = ll::timer_t();
+ let timer_ptr = ptr::addr_of(&timer_handle);
+ do iotask::interact(iotask) |loop_ptr| {
+ unsafe {
+ log(debug, ~"user code inside interact loop!!!");
+ let init_status = ll::timer_init(loop_ptr, timer_ptr);
+ if(init_status == 0i32) {
+ ll::set_data_for_uv_handle(
+ timer_ptr as *libc::c_void,
- exit_ch_ptr as *libc::c_void);
++ exit_ch_ptr);
+ let start_status = ll::timer_start(timer_ptr,
+ simple_timer_cb,
- 1u,
- 0u);
- if start_status != 0 {
++ 1u, 0u);
++ if(start_status == 0i32) {
++ }
++ else {
+ fail ~"failure on ll::timer_start()";
+ }
- } else {
++ }
++ else {
+ fail ~"failure on ll::timer_init()";
+ }
}
- else {
- fail ~"failure on ll::timer_start()";
- }
- }
- else {
- fail ~"failure on ll::timer_init()";
- }
- };
- exit_po.recv();
- log(debug, ~"global_loop timer test: msg recv on exit_po, done..");
+ };
- oldcomm::recv(exit_po);
++ exit_po.recv();
+ log(debug,
+ ~"global_loop timer test: msg recv on exit_po, done..");
+ }
}
#[test]
- fn test_gl_uv_global_loop_high_level_global_timer() unsafe {
+ fn test_gl_uv_global_loop_high_level_global_timer() {
- unsafe {
- let hl_loop = get_gl();
- let exit_po = oldcomm::Port::<()>();
- let exit_ch = oldcomm::Chan(&exit_po);
- task::spawn_sched(task::ManualThreads(1u), || {
- impl_uv_hl_simple_timer(hl_loop);
- oldcomm::send(exit_ch, ());
- });
+ let hl_loop = &get_gl();
+ let (exit_po, exit_ch) = stream::<()>();
+ task::spawn_sched(task::ManualThreads(1u), || {
+ let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
- oldcomm::recv(exit_po);
- }
+ exit_ch.send(());
+ });
+ impl_uv_hl_simple_timer(hl_loop);
+ exit_po.recv();
}
// keeping this test ignored until some kind of stress-test-harness
// is set up for the build bots
#[test]
#[ignore]
- fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
+ fn test_stress_gl_uv_global_loop_high_level_global_timer() {
- unsafe {
- let hl_loop = get_gl();
- let exit_po = oldcomm::Port::<()>();
- let exit_ch = oldcomm::Chan(&exit_po);
- let cycles = 5000u;
- for iter::repeat(cycles) {
- task::spawn_sched(task::ManualThreads(1u), || {
- impl_uv_hl_simple_timer(hl_loop);
- oldcomm::send(exit_ch, ());
- });
- };
- for iter::repeat(cycles) {
- oldcomm::recv(exit_po);
- };
- log(debug,
- ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
- ~" exiting sucessfully!");
- }
+ let (exit_po, exit_ch) = stream::<()>();
+ let exit_ch = SharedChan(exit_ch);
+ let cycles = 5000u;
+ for iter::repeat(cycles) {
+ let exit_ch_clone = exit_ch.clone();
+ task::spawn_sched(task::ManualThreads(1u), || {
+ let hl_loop = &get_gl();
+ impl_uv_hl_simple_timer(hl_loop);
+ exit_ch_clone.send(());
+ });
+ };
+ for iter::repeat(cycles) {
+ exit_po.recv();
+ };
+ log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
+ ~" exiting sucessfully!");
}
}
* async handle and do a sanity check to make sure that all other handles are
* closed, causing a failure otherwise.
*/
- pub fn exit(iotask: IoTask) {
-pub fn exit(iotask: &IoTask) unsafe {
- send_msg(iotask, TeardownLoop);
++pub fn exit(iotask: &IoTask) {
+ unsafe {
+ send_msg(iotask, TeardownLoop);
+ }
}
}
/// Run the loop and begin handling messages
- fn run_loop(iotask_ch: Chan<IoTask>) {
-fn run_loop(iotask_ch: &Chan<IoTask>) unsafe {
++fn run_loop(iotask_ch: &Chan<IoTask>) {
+
- debug!("creating loop");
- let loop_ptr = ll::loop_new();
+ unsafe {
++ debug!("creating loop");
+ let loop_ptr = ll::loop_new();
- // set up the special async handle we'll use to allow multi-task
- // communication with this loop
- let async = ll::async_t();
- let async_handle = addr_of(&async);
+ // set up the special async handle we'll use to allow multi-task
+ // communication with this loop
+ let async = ll::async_t();
+ let async_handle = addr_of(&async);
- // associate the async handle with the loop
- ll::async_init(loop_ptr, async_handle, wake_up_cb);
+ // associate the async handle with the loop
+ ll::async_init(loop_ptr, async_handle, wake_up_cb);
- let (msg_po, msg_ch) = stream::<IoTaskMsg>();
++ let (msg_po, msg_ch) = stream::<IoTaskMsg>();
+
- // initialize our loop data and store it in the loop
- let data: IoTaskLoopData = {
- async_handle: async_handle,
- msg_po: msg_po
- };
- ll::set_data_for_uv_handle(async_handle, addr_of(&data));
-
- // Send out a handle through which folks can talk to us
- // while we dwell in the I/O loop
- let iotask = IoTask_({
- async_handle: async_handle,
- op_chan: SharedChan(msg_ch)
- });
- iotask_ch.send(iotask);
-
- log(debug, ~"about to run uv loop");
- // enter the loop... this blocks until the loop is done..
- ll::run(loop_ptr);
- log(debug, ~"uv loop ended");
- ll::loop_delete(loop_ptr);
+ // initialize our loop data and store it in the loop
- let data = IoTaskLoopData {
++ let data: IoTaskLoopData = IoTaskLoopData {
+ async_handle: async_handle,
- msg_po: Port()
++ msg_po: msg_po
+ };
+ ll::set_data_for_uv_handle(async_handle, addr_of(&data));
+
+ // Send out a handle through which folks can talk to us
+ // while we dwell in the I/O loop
+ let iotask = IoTask_({
+ async_handle: async_handle,
- op_chan: data.msg_po.chan()
++ op_chan: SharedChan(msg_ch)
+ });
+ iotask_ch.send(iotask);
+
+ log(debug, ~"about to run uv loop");
+ // enter the loop... this blocks until the loop is done..
+ ll::run(loop_ptr);
+ log(debug, ~"uv loop ended");
+ ll::loop_delete(loop_ptr);
+ }
}
// data that lives for the lifetime of the high-evel oo
-type IoTaskLoopData = {
+struct IoTaskLoopData {
async_handle: *ll::uv_async_t,
- msg_po: Port<IoTaskMsg>
-};
+ msg_po: Port<IoTaskMsg>,
+}
- fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
+ fn send_msg(iotask: &IoTask,
- msg: IoTaskMsg) unsafe {
++ msg: IoTaskMsg) {
+ iotask.op_chan.send(move msg);
- ll::async_send(iotask.async_handle);
+ unsafe {
- iotask.op_chan.send(move msg);
+ ll::async_send(iotask.async_handle);
+ }
}
/// Dispatch all pending messages
extern fn wake_up_cb(async_handle: *ll::uv_async_t,
- status: int) unsafe {
+ status: int) {
- unsafe {
- log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
- async_handle, status));
- let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
- let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
- let msg_po = &(*data).msg_po;
-
- while msg_po.peek() {
- match msg_po.recv() {
- Interaction(ref cb) => (*cb)(loop_ptr),
- TeardownLoop => begin_teardown(data)
+ log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
+ async_handle, status));
+
- let data = ll::get_data_for_uv_handle(async_handle)
- as *IoTaskLoopData;
- let msg_po = (*data).msg_po;
++ unsafe {
+ let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
- Interaction(ref cb) => (*cb)(loop_ptr),
- TeardownLoop => begin_teardown(data)
++ let data =
++ ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
++ let msg_po = &(*data).msg_po;
+
+ while msg_po.peek() {
+ match msg_po.recv() {
++ Interaction(ref cb) => (*cb)(loop_ptr),
++ TeardownLoop => begin_teardown(data)
+ }
}
}
}
use core::ptr;
use core::task;
- extern fn async_close_cb(handle: *ll::uv_async_t) unsafe {
- log(debug, fmt!("async_close_cb handle %?", handle));
- let exit_ch = (*(ll::get_data_for_uv_handle(handle)
- as *AhData)).exit_ch;
- oldcomm::send(exit_ch, ());
+ extern fn async_close_cb(handle: *ll::uv_async_t) {
+ unsafe {
+ log(debug, fmt!("async_close_cb handle %?", handle));
+ let exit_ch = (*(ll::get_data_for_uv_handle(handle)
+ as *AhData)).exit_ch;
+ oldcomm::send(exit_ch, ());
+ }
}
- extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int)
+ extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) {
unsafe {
- log(debug, fmt!("async_handle_cb handle %? status %?",handle,status));
- ll::close(handle, async_close_cb);
+ log(debug,
+ fmt!("async_handle_cb handle %? status %?",handle,status));
+ ll::close(handle, async_close_cb);
+ }
}
- type AhData = {
+ struct AhData {
iotask: IoTask,
- exit_ch: oldcomm::Chan<()>,
+ exit_ch: oldcomm::Chan<()>
- };
- fn impl_uv_iotask_async(iotask: &IoTask) unsafe {
- let async_handle = ll::async_t();
- let ah_ptr = ptr::addr_of(&async_handle);
- let exit_po = oldcomm::Port::<()>();
- let exit_ch = oldcomm::Chan(&exit_po);
- let ah_data = {
- iotask: iotask.clone(),
- exit_ch: exit_ch
- };
- let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data);
- debug!("about to interact");
- do interact(iotask) |loop_ptr| unsafe {
- debug!("interacting");
- ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
- ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
- ll::async_send(ah_ptr);
- };
- debug!("waiting for async close");
- oldcomm::recv(exit_po);
+ }
- fn impl_uv_iotask_async(iotask: IoTask) {
++ fn impl_uv_iotask_async(iotask: &IoTask) {
+ unsafe {
+ let async_handle = ll::async_t();
+ let ah_ptr = ptr::addr_of(&async_handle);
+ let exit_po = oldcomm::Port::<()>();
+ let exit_ch = oldcomm::Chan(&exit_po);
- let ah_data = {
- iotask: iotask,
++ let ah_data = AhData {
++ iotask: iotask.clone(),
+ exit_ch: exit_ch
+ };
- let ah_data_ptr = ptr::addr_of(&ah_data);
++ let ah_data_ptr: *AhData = unsafe {
++ ptr::to_unsafe_ptr(&ah_data)
++ };
++ debug!("about to interact");
+ do interact(iotask) |loop_ptr| {
+ unsafe {
++ debug!("interacting");
+ ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
- ll::set_data_for_uv_handle(ah_ptr,
- ah_data_ptr as *libc::c_void);
++ ll::set_data_for_uv_handle(
++ ah_ptr, ah_data_ptr as *libc::c_void);
+ ll::async_send(ah_ptr);
+ }
+ };
++ debug!("waiting for async close");
+ oldcomm::recv(exit_po);
+ }
}
// this fn documents the bear minimum neccesary to roll your own
// high_level_loop
unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
- let iotask_port = oldcomm::Port::<IoTask>();
- let iotask_ch = oldcomm::Chan(&iotask_port);
+ let (iotask_port, iotask_ch) = stream::<IoTask>();
do task::spawn_sched(task::ManualThreads(1u)) {
- run_loop(iotask_ch);
+ debug!("about to run a test loop");
+ run_loop(&iotask_ch);
exit_ch.send(());
};
- return oldcomm::recv(iotask_port);
+ return iotask_port.recv();
}
- extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
- log(debug, fmt!("lifetime_handle_close ptr %?", handle));
+ extern fn lifetime_handle_close(handle: *libc::c_void) {
+ unsafe {
+ log(debug, fmt!("lifetime_handle_close ptr %?", handle));
+ }
}
extern fn lifetime_async_callback(handle: *libc::c_void,
}
#[test]
- fn test_uv_iotask_async() unsafe {
- let exit_po = oldcomm::Port::<()>();
- let exit_ch = oldcomm::Chan(&exit_po);
- let iotask = &spawn_test_loop(exit_ch);
-
- debug!("spawned iotask");
-
- // using this handle to manage the lifetime of the high_level_loop,
- // as it will exit the first time one of the impl_uv_hl_async() is
- // cleaned up with no one ref'd handles on the loop (Which can happen
- // under race-condition type situations.. this ensures that the loop
- // lives until, at least, all of the impl_uv_hl_async() runs have been
- // called, at least.
- let work_exit_po = oldcomm::Port::<()>();
- let work_exit_ch = oldcomm::Chan(&work_exit_po);
- for iter::repeat(7u) {
- let iotask_clone = iotask.clone();
- do task::spawn_sched(task::ManualThreads(1u)) {
- debug!("async");
- impl_uv_iotask_async(&iotask_clone);
- debug!("done async");
- oldcomm::send(work_exit_ch, ());
+ fn test_uv_iotask_async() {
+ unsafe {
+ let exit_po = oldcomm::Port::<()>();
+ let exit_ch = oldcomm::Chan(&exit_po);
- let iotask = spawn_test_loop(exit_ch);
++ let iotask = &spawn_test_loop(exit_ch);
++
++ debug!("spawned iotask");
+
+ // using this handle to manage the lifetime of the
- // high_level_loop, as it will exit the first time one of the
- // impl_uv_hl_async() is cleaned up with no one ref'd handles on
- // the loop (Which can happen under race-condition type
- // situations.. this ensures that the loop lives until, at least,
- // all of the impl_uv_hl_async() runs have been called, at least.
++ // high_level_loop, as it will exit the first time one of
++ // the impl_uv_hl_async() is cleaned up with no one ref'd
++ // handles on the loop (Which can happen under
++ // race-condition type situations.. this ensures that the
++ // loop lives until, at least, all of the
++ // impl_uv_hl_async() runs have been called, at least.
+ let work_exit_po = oldcomm::Port::<()>();
+ let work_exit_ch = oldcomm::Chan(&work_exit_po);
+ for iter::repeat(7u) {
++ let iotask_clone = iotask.clone();
+ do task::spawn_sched(task::ManualThreads(1u)) {
- impl_uv_iotask_async(iotask);
++ debug!("async");
++ impl_uv_iotask_async(&iotask_clone);
++ debug!("done async");
+ oldcomm::send(work_exit_ch, ());
+ };
};
- };
- for iter::repeat(7u) {
- debug!("waiting");
- oldcomm::recv(work_exit_po);
- };
- log(debug, ~"sending teardown_loop msg..");
- exit(iotask);
- oldcomm::recv(exit_po);
- log(debug, ~"after recv on exit_po.. exiting..");
+ for iter::repeat(7u) {
++ debug!("waiting");
+ oldcomm::recv(work_exit_po);
+ };
+ log(debug, ~"sending teardown_loop msg..");
+ exit(iotask);
+ oldcomm::recv(exit_po);
+ log(debug, ~"after recv on exit_po.. exiting..");
+ }
}
}