--- /dev/null
- fail!(~"Failure in malloc_raw: result ptr is null");
+ // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+
+ use sys::{TypeDesc, size_of};
+ use libc::{c_void, size_t};
+ use c_malloc = libc::malloc;
+ use c_free = libc::free;
+ use managed::raw::{BoxHeaderRepr, BoxRepr};
+ use cast::transmute;
+ use unstable::intrinsics::{atomic_xadd,atomic_xsub};
+ use ptr::null;
+ use intrinsic::TyDesc;
+
+ pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
+ assert!(td.is_not_null());
+
+ let total_size = get_box_size(size, (*td).align);
+ let p = c_malloc(total_size as size_t);
+ assert!(p.is_not_null());
+
+ // FIXME #3475: Converting between our two different tydesc types
+ let td: *TyDesc = transmute(td);
+
+ let box: &mut BoxRepr = transmute(p);
+ box.header.ref_count = -1; // Exchange values not ref counted
+ box.header.type_desc = td;
+ box.header.prev = null();
+ box.header.next = null();
+
+ let exchange_count = &mut *rust_get_exchange_count_ptr();
+ atomic_xadd(exchange_count, 1);
+
+ return transmute(box);
+ }
+ /**
+ Thin wrapper around libc::malloc, none of the box header
+ stuff in exchange_alloc::malloc
+ */
+ pub unsafe fn malloc_raw(size: uint) -> *c_void {
+ let p = c_malloc(size as size_t);
+ if p.is_null() {
++ fail!("Failure in malloc_raw: result ptr is null");
+ }
+ p
+ }
+
+ pub unsafe fn free(ptr: *c_void) {
+ let exchange_count = &mut *rust_get_exchange_count_ptr();
+ atomic_xsub(exchange_count, 1);
+
+ assert!(ptr.is_not_null());
+ c_free(ptr);
+ }
+ ///Thin wrapper around libc::free, as with exchange_alloc::malloc_raw
+ pub unsafe fn free_raw(ptr: *c_void) {
+ c_free(ptr);
+ }
+
+ fn get_box_size(body_size: uint, body_align: uint) -> uint {
+ let header_size = size_of::<BoxHeaderRepr>();
+ // FIXME (#2699): This alignment calculation is suspicious. Is it right?
+ let total_size = align_to(header_size, body_align) + body_size;
+ return total_size;
+ }
+
+ // Rounds |size| to the nearest |alignment|. Invariant: |alignment| is a power
+ // of two.
+ fn align_to(size: uint, align: uint) -> uint {
+ assert!(align != 0);
+ (size + align - 1) & !(align - 1)
+ }
+
+ extern {
+ #[rust_stack]
+ fn rust_get_exchange_count_ptr() -> *mut int;
+ }
#[doc(hidden)];
use libc::c_char;
+use ptr::Ptr;
- #[path = "sched/mod.rs"]
+ /// The global (exchange) heap.
+ pub mod global_heap;
+
+ /// The Scheduler and Task types.
mod sched;
+
+ /// Thread-local access to the current Scheduler.
+ pub mod local_sched;
+
+ /// Synchronous I/O.
+ #[path = "io/mod.rs"]
+ pub mod io;
+
+ /// Thread-local implementations of language-critical runtime features like @.
+ pub mod local_services;
+
+ /// The EventLoop and internal synchronous I/O interface.
mod rtio;
- pub mod uvll;
- mod uvio;
+
+ /// libuv and default rtio implementation.
#[path = "uv/mod.rs"]
- mod uv;
- #[path = "io/mod.rs"]
- mod io;
+ pub mod uv;
+
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
+ /// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
+
+ /// A parallel work-stealing dequeue.
mod work_queue;
+
+ /// Stack segments and caching.
mod stack;
+
+ /// CPU context swapping.
mod context;
+
+ /// Bindings to system threading libraries.
mod thread;
+
+ /// The runtime configuration, read from environment variables
pub mod env;
- pub mod local_services;
+
+ /// The local, managed heap
mod local_heap;
+ /// The Logger trait and implementations
+ pub mod logging;
+
/// Tools for testing the runtime
#[cfg(test)]
pub mod test;
--- /dev/null
- None => fail!(fmt!("all context switches should have a cleanup job"))
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+
+ use option::*;
+ use sys;
+ use cast::transmute;
+
+ use super::work_queue::WorkQueue;
+ use super::stack::{StackPool, StackSegment};
+ use super::rtio::{EventLoop, EventLoopObject};
+ use super::context::Context;
+ use super::local_services::LocalServices;
+ use cell::Cell;
+
+ #[cfg(test)] use rt::uv::uvio::UvEventLoop;
+ #[cfg(test)] use unstable::run_in_bare_thread;
+ #[cfg(test)] use int;
+
+ // A more convenient name for external callers, e.g. `local_sched::take()`
+ pub mod local_sched;
+
+ /// The Scheduler is responsible for coordinating execution of Tasks
+ /// on a single thread. When the scheduler is running it is owned by
+ /// thread local storage and the running task is owned by the
+ /// scheduler.
+ pub struct Scheduler {
+ task_queue: WorkQueue<~Task>,
+ stack_pool: StackPool,
+ /// The event loop used to drive the scheduler and perform I/O
+ event_loop: ~EventLoopObject,
+ /// The scheduler's saved context.
+ /// Always valid when a task is executing, otherwise not
+ priv saved_context: Context,
+ /// The currently executing task
+ current_task: Option<~Task>,
+ /// An action performed after a context switch on behalf of the
+ /// code running before the context switch
+ priv cleanup_job: Option<CleanupJob>
+ }
+
+ // XXX: Some hacks to put a &fn in Scheduler without borrowck
+ // complaining
+ type UnsafeTaskReceiver = sys::Closure;
+ trait ClosureConverter {
+ fn from_fn(&fn(~Task)) -> Self;
+ fn to_fn(self) -> &fn(~Task);
+ }
+ impl ClosureConverter for UnsafeTaskReceiver {
+ fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+ fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
+ }
+
+ enum CleanupJob {
+ DoNothing,
+ GiveTask(~Task, UnsafeTaskReceiver)
+ }
+
+ pub impl Scheduler {
+
+ fn in_task_context(&self) -> bool { self.current_task.is_some() }
+
+ fn new(event_loop: ~EventLoopObject) -> Scheduler {
+
+ // Lazily initialize the global state, currently the scheduler TLS key
+ unsafe { rust_initialize_global_state(); }
+ extern {
+ fn rust_initialize_global_state();
+ }
+
+ Scheduler {
+ event_loop: event_loop,
+ task_queue: WorkQueue::new(),
+ stack_pool: StackPool::new(),
+ saved_context: Context::empty(),
+ current_task: None,
+ cleanup_job: None
+ }
+ }
+
+ // XXX: This may eventually need to be refactored so that
+ // the scheduler itself doesn't have to call event_loop.run.
+ // That will be important for embedding the runtime into external
+ // event loops.
+ fn run(~self) -> ~Scheduler {
+ assert!(!self.in_task_context());
+
+ // Give ownership of the scheduler (self) to the thread
+ local_sched::put(self);
+
+ unsafe {
+ let scheduler = local_sched::unsafe_borrow();
+ fn run_scheduler_once() {
+ let scheduler = local_sched::take();
+ if scheduler.resume_task_from_queue() {
+ // Ok, a task ran. Nice! We'll do it again later
+ do local_sched::borrow |scheduler| {
+ scheduler.event_loop.callback(run_scheduler_once);
+ }
+ }
+ }
+
+ let scheduler = &mut *scheduler;
+ scheduler.event_loop.callback(run_scheduler_once);
+ scheduler.event_loop.run();
+ }
+
+ return local_sched::take();
+ }
+
+ // * Scheduler-context operations
+
+ fn resume_task_from_queue(~self) -> bool {
+ assert!(!self.in_task_context());
+
+ let mut this = self;
+ match this.task_queue.pop_front() {
+ Some(task) => {
+ this.resume_task_immediately(task);
+ return true;
+ }
+ None => {
+ rtdebug!("no tasks in queue");
+ local_sched::put(this);
+ return false;
+ }
+ }
+ }
+
+ // * Task-context operations
+
+ /// Called by a running task to end execution, after which it will
+ /// be recycled by the scheduler for reuse in a new task.
+ fn terminate_current_task(~self) {
+ assert!(self.in_task_context());
+
+ rtdebug!("ending running task");
+
+ do self.deschedule_running_task_and_then |dead_task| {
+ let dead_task = Cell(dead_task);
+ do local_sched::borrow |sched| {
+ dead_task.take().recycle(&mut sched.stack_pool);
+ }
+ }
+
+ abort!("control reached end of task");
+ }
+
+ fn schedule_new_task(~self, task: ~Task) {
+ assert!(self.in_task_context());
+
+ do self.switch_running_tasks_and_then(task) |last_task| {
+ let last_task = Cell(last_task);
+ do local_sched::borrow |sched| {
+ sched.task_queue.push_front(last_task.take());
+ }
+ }
+ }
+
+ // Core scheduling ops
+
+ fn resume_task_immediately(~self, task: ~Task) {
+ let mut this = self;
+ assert!(!this.in_task_context());
+
+ rtdebug!("scheduling a task");
+
+ // Store the task in the scheduler so it can be grabbed later
+ this.current_task = Some(task);
+ this.enqueue_cleanup_job(DoNothing);
+
+ local_sched::put(this);
+
+ // Take pointers to both the task and scheduler's saved registers.
+ unsafe {
+ let sched = local_sched::unsafe_borrow();
+ let (sched_context, _, next_task_context) = (*sched).get_contexts();
+ let next_task_context = next_task_context.unwrap();
+ // Context switch to the task, restoring it's registers
+ // and saving the scheduler's
+ Context::swap(sched_context, next_task_context);
+
+ let sched = local_sched::unsafe_borrow();
+ // The running task should have passed ownership elsewhere
+ assert!((*sched).current_task.is_none());
+
+ // Running tasks may have asked us to do some cleanup
+ (*sched).run_cleanup_job();
+ }
+ }
+
+ /// Block a running task, context switch to the scheduler, then pass the
+ /// blocked task to a closure.
+ ///
+ /// # Safety note
+ ///
+ /// The closure here is a *stack* closure that lives in the
+ /// running task. It gets transmuted to the scheduler's lifetime
+ /// and called while the task is blocked.
+ fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+ let mut this = self;
+ assert!(this.in_task_context());
+
+ rtdebug!("blocking task");
+
+ unsafe {
+ let blocked_task = this.current_task.swap_unwrap();
+ let f_fake_region = transmute::<&fn(~Task), &fn(~Task)>(f);
+ let f_opaque = ClosureConverter::from_fn(f_fake_region);
+ this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+ }
+
+ local_sched::put(this);
+
+ unsafe {
+ let sched = local_sched::unsafe_borrow();
+ let (sched_context, last_task_context, _) = (*sched).get_contexts();
+ let last_task_context = last_task_context.unwrap();
+ Context::swap(last_task_context, sched_context);
+
+ // We could be executing in a different thread now
+ let sched = local_sched::unsafe_borrow();
+ (*sched).run_cleanup_job();
+ }
+ }
+
+ /// Switch directly to another task, without going through the scheduler.
+ /// You would want to think hard about doing this, e.g. if there are
+ /// pending I/O events it would be a bad idea.
+ fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(~Task)) {
+ let mut this = self;
+ assert!(this.in_task_context());
+
+ rtdebug!("switching tasks");
+
+ let old_running_task = this.current_task.swap_unwrap();
+ let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
+ let f_opaque = ClosureConverter::from_fn(f_fake_region);
+ this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
+ this.current_task = Some(next_task);
+
+ local_sched::put(this);
+
+ unsafe {
+ let sched = local_sched::unsafe_borrow();
+ let (_, last_task_context, next_task_context) = (*sched).get_contexts();
+ let last_task_context = last_task_context.unwrap();
+ let next_task_context = next_task_context.unwrap();
+ Context::swap(last_task_context, next_task_context);
+
+ // We could be executing in a different thread now
+ let sched = local_sched::unsafe_borrow();
+ (*sched).run_cleanup_job();
+ }
+ }
+
+
+
+ // * Other stuff
+
+ fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
+ assert!(self.cleanup_job.is_none());
+ self.cleanup_job = Some(job);
+ }
+
+ fn run_cleanup_job(&mut self) {
+ rtdebug!("running cleanup job");
+
+ assert!(self.cleanup_job.is_some());
+
+ let cleanup_job = self.cleanup_job.swap_unwrap();
+ match cleanup_job {
+ DoNothing => { }
+ GiveTask(task, f) => (f.to_fn())(task)
+ }
+ }
+
+ /// Get mutable references to all the contexts that may be involved in a
+ /// context switch.
+ ///
+ /// Returns (the scheduler context, the optional context of the
+ /// task in the cleanup list, the optional context of the task in
+ /// the current task slot). When context switching to a task,
+ /// callers should first arrange for that task to be located in the
+ /// Scheduler's current_task slot and set up the
+ /// post-context-switch cleanup job.
+ fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
+ Option<&'a mut Context>,
+ Option<&'a mut Context>) {
+ let last_task = match self.cleanup_job {
+ Some(GiveTask(~ref task, _)) => {
+ Some(task)
+ }
+ Some(DoNothing) => {
+ None
+ }
++ None => fail!("all context switches should have a cleanup job")
+ };
+ // XXX: Pattern matching mutable pointers above doesn't work
+ // because borrowck thinks the three patterns are conflicting
+ // borrows
+ unsafe {
+ let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
+ let last_task_context = match last_task {
+ Some(t) => Some(&mut t.saved_context), None => None
+ };
+ let next_task_context = match self.current_task {
+ Some(ref mut t) => Some(&mut t.saved_context), None => None
+ };
+ // XXX: These transmutes can be removed after snapshot
+ return (transmute(&mut self.saved_context),
+ last_task_context,
+ transmute(next_task_context));
+ }
+ }
+ }
+
+ static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+
+ pub struct Task {
+ /// The segment of stack on which the task is currently running or,
+ /// if the task is blocked, on which the task will resume execution
+ priv current_stack_segment: StackSegment,
+ /// These are always valid when the task is not running, unless
+ /// the task is dead
+ priv saved_context: Context,
+ /// The heap, GC, unwinding, local storage, logging
+ local_services: LocalServices
+ }
+
+ pub impl Task {
+ fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
+ Task::with_local(stack_pool, LocalServices::new(), start)
+ }
+
+ fn with_local(stack_pool: &mut StackPool,
+ local_services: LocalServices,
+ start: ~fn()) -> Task {
+ let start = Task::build_start_wrapper(start);
+ let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
+ // NB: Context holds a pointer to that ~fn
+ let initial_context = Context::new(start, &mut stack);
+ return Task {
+ current_stack_segment: stack,
+ saved_context: initial_context,
+ local_services: local_services
+ };
+ }
+
+ priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
+ // XXX: The old code didn't have this extra allocation
+ let wrapper: ~fn() = || {
+ // This is the first code to execute after the initial
+ // context switch to the task. The previous context may
+ // have asked us to do some cleanup.
+ unsafe {
+ let sched = local_sched::unsafe_borrow();
+ (*sched).run_cleanup_job();
+
+ let sched = local_sched::unsafe_borrow();
+ let task = (*sched).current_task.get_mut_ref();
+ // FIXME #6141: shouldn't neet to put `start()` in another closure
+ task.local_services.run(||start());
+ }
+
+ let sched = local_sched::take();
+ sched.terminate_current_task();
+ };
+ return wrapper;
+ }
+
+ /// Destroy the task and try to reuse its components
+ fn recycle(~self, stack_pool: &mut StackPool) {
+ match self {
+ ~Task {current_stack_segment, _} => {
+ stack_pool.give_segment(current_stack_segment);
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_simple_scheduling() {
+ do run_in_bare_thread {
+ let mut task_ran = false;
+ let task_ran_ptr: *mut bool = &mut task_ran;
+
+ let mut sched = ~UvEventLoop::new_scheduler();
+ let task = ~do Task::new(&mut sched.stack_pool) {
+ unsafe { *task_ran_ptr = true; }
+ };
+ sched.task_queue.push_back(task);
+ sched.run();
+ assert!(task_ran);
+ }
+ }
+
+ #[test]
+ fn test_several_tasks() {
+ do run_in_bare_thread {
+ let total = 10;
+ let mut task_count = 0;
+ let task_count_ptr: *mut int = &mut task_count;
+
+ let mut sched = ~UvEventLoop::new_scheduler();
+ for int::range(0, total) |_| {
+ let task = ~do Task::new(&mut sched.stack_pool) {
+ unsafe { *task_count_ptr = *task_count_ptr + 1; }
+ };
+ sched.task_queue.push_back(task);
+ }
+ sched.run();
+ assert!(task_count == total);
+ }
+ }
+
+ #[test]
+ fn test_swap_tasks_then() {
+ do run_in_bare_thread {
+ let mut count = 0;
+ let count_ptr: *mut int = &mut count;
+
+ let mut sched = ~UvEventLoop::new_scheduler();
+ let task1 = ~do Task::new(&mut sched.stack_pool) {
+ unsafe { *count_ptr = *count_ptr + 1; }
+ let mut sched = local_sched::take();
+ let task2 = ~do Task::new(&mut sched.stack_pool) {
+ unsafe { *count_ptr = *count_ptr + 1; }
+ };
+ // Context switch directly to the new task
+ do sched.switch_running_tasks_and_then(task2) |task1| {
+ let task1 = Cell(task1);
+ do local_sched::borrow |sched| {
+ sched.task_queue.push_front(task1.take());
+ }
+ }
+ unsafe { *count_ptr = *count_ptr + 1; }
+ };
+ sched.task_queue.push_back(task1);
+ sched.run();
+ assert!(count == 3);
+ }
+ }
+
+ #[bench] #[test] #[ignore(reason = "long test")]
+ fn test_run_a_lot_of_tasks_queued() {
+ do run_in_bare_thread {
+ static MAX: int = 1000000;
+ let mut count = 0;
+ let count_ptr: *mut int = &mut count;
+
+ let mut sched = ~UvEventLoop::new_scheduler();
+
+ let start_task = ~do Task::new(&mut sched.stack_pool) {
+ run_task(count_ptr);
+ };
+ sched.task_queue.push_back(start_task);
+ sched.run();
+
+ assert!(count == MAX);
+
+ fn run_task(count_ptr: *mut int) {
+ do local_sched::borrow |sched| {
+ let task = ~do Task::new(&mut sched.stack_pool) {
+ unsafe {
+ *count_ptr = *count_ptr + 1;
+ if *count_ptr != MAX {
+ run_task(count_ptr);
+ }
+ }
+ };
+ sched.task_queue.push_back(task);
+ }
+ };
+ }
+ }
+
+ #[test]
+ fn test_block_task() {
+ do run_in_bare_thread {
+ let mut sched = ~UvEventLoop::new_scheduler();
+ let task = ~do Task::new(&mut sched.stack_pool) {
+ let sched = local_sched::take();
+ assert!(sched.in_task_context());
+ do sched.deschedule_running_task_and_then() |task| {
+ let task = Cell(task);
+ do local_sched::borrow |sched| {
+ assert!(!sched.in_task_context());
+ sched.task_queue.push_back(task.take());
+ }
+ }
+ };
+ sched.task_queue.push_back(task);
+ sched.run();
+ }
+ }
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use container::Container;
+use ptr::Ptr;
use vec;
+ use ops::Drop;
+ use libc::{c_uint, uintptr_t};
pub struct StackSegment {
- buf: ~[u8]
+ buf: ~[u8],
+ valgrind_id: c_uint
}
pub impl StackSegment {
fn give_segment(&self, _stack: StackSegment) {
}
}
-}
+
+ extern {
+ fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
+ fn rust_valgrind_stack_deregister(id: c_uint);
++}
--- /dev/null
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+
+ //! A very simple unsynchronized channel type for sending buffered data from
+ //! scheduler context to task context.
+ //!
+ //! XXX: This would be safer to use if split into two types like Port/Chan
+
+ use option::*;
+ use clone::Clone;
+ use super::rc::RC;
+ use rt::sched::Task;
+ use rt::{context, TaskContext, SchedulerContext};
+ use rt::local_sched;
++use vec::OwnedVector;
++use container::Container;
+
+ struct TubeState<T> {
+ blocked_task: Option<~Task>,
+ buf: ~[T]
+ }
+
+ pub struct Tube<T> {
+ p: RC<TubeState<T>>
+ }
+
+ impl<T> Tube<T> {
+ pub fn new() -> Tube<T> {
+ Tube {
+ p: RC::new(TubeState {
+ blocked_task: None,
+ buf: ~[]
+ })
+ }
+ }
+
+ pub fn send(&mut self, val: T) {
+ rtdebug!("tube send");
+ assert!(context() == SchedulerContext);
+
+ unsafe {
+ let state = self.p.unsafe_borrow_mut();
+ (*state).buf.push(val);
+
+ if (*state).blocked_task.is_some() {
+ // There's a waiting task. Wake it up
+ rtdebug!("waking blocked tube");
+ let task = (*state).blocked_task.swap_unwrap();
+ let sched = local_sched::take();
+ sched.resume_task_immediately(task);
+ }
+ }
+ }
+
+ pub fn recv(&mut self) -> T {
+ assert!(context() == TaskContext);
+
+ unsafe {
+ let state = self.p.unsafe_borrow_mut();
+ if !(*state).buf.is_empty() {
+ return (*state).buf.shift();
+ } else {
+ // Block and wait for the next message
+ rtdebug!("blocking on tube recv");
+ assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+ assert!((*state).blocked_task.is_none());
+ let sched = local_sched::take();
+ do sched.deschedule_running_task_and_then |task| {
+ (*state).blocked_task = Some(task);
+ }
+ rtdebug!("waking after tube recv");
+ let buf = &mut (*state).buf;
+ assert!(!buf.is_empty());
+ return buf.shift();
+ }
+ }
+ }
+ }
+
+ impl<T> Clone for Tube<T> {
+ fn clone(&self) -> Tube<T> {
+ Tube { p: self.p.clone() }
+ }
+ }
+
+ #[cfg(test)]
+ mod test {
+ use int;
+ use cell::Cell;
+ use rt::local_sched;
+ use rt::test::*;
+ use rt::rtio::EventLoop;
+ use super::*;
+
+ #[test]
+ fn simple_test() {
+ do run_in_newsched_task {
+ let mut tube: Tube<int> = Tube::new();
+ let tube_clone = tube.clone();
+ let tube_clone_cell = Cell(tube_clone);
+ let sched = local_sched::take();
+ do sched.deschedule_running_task_and_then |task| {
+ let mut tube_clone = tube_clone_cell.take();
+ tube_clone.send(1);
+ let sched = local_sched::take();
+ sched.resume_task_immediately(task);
+ }
+
+ assert!(tube.recv() == 1);
+ }
+ }
+
+ #[test]
+ fn blocking_test() {
+ do run_in_newsched_task {
+ let mut tube: Tube<int> = Tube::new();
+ let tube_clone = tube.clone();
+ let tube_clone = Cell(Cell(Cell(tube_clone)));
+ let sched = local_sched::take();
+ do sched.deschedule_running_task_and_then |task| {
+ let tube_clone = tube_clone.take();
+ do local_sched::borrow |sched| {
+ let tube_clone = tube_clone.take();
+ do sched.event_loop.callback {
+ let mut tube_clone = tube_clone.take();
+ // The task should be blocked on this now and
+ // sending will wake it up.
+ tube_clone.send(1);
+ }
+ }
+ let sched = local_sched::take();
+ sched.resume_task_immediately(task);
+ }
+
+ assert!(tube.recv() == 1);
+ }
+ }
+
+ #[test]
+ fn many_blocking_test() {
+ static MAX: int = 100;
+
+ do run_in_newsched_task {
+ let mut tube: Tube<int> = Tube::new();
+ let tube_clone = tube.clone();
+ let tube_clone = Cell(tube_clone);
+ let sched = local_sched::take();
+ do sched.deschedule_running_task_and_then |task| {
+ callback_send(tube_clone.take(), 0);
+
+ fn callback_send(tube: Tube<int>, i: int) {
+ if i == 100 { return; }
+
+ let tube = Cell(Cell(tube));
+ do local_sched::borrow |sched| {
+ let tube = tube.take();
+ do sched.event_loop.callback {
+ let mut tube = tube.take();
+ // The task should be blocked on this now and
+ // sending will wake it up.
+ tube.send(i);
+ callback_send(tube, i + 1);
+ }
+ }
+ }
+
+ let sched = local_sched::take();
+ sched.resume_task_immediately(task);
+ }
+
+ for int::range(0, MAX) |i| {
+ let j = tube.recv();
+ assert!(j == i);
+ }
+ }
+ }
+ }
*/
+use container::Container;
+use option::*;
+use str::raw::from_c_str;
+use to_str::ToStr;
++use ptr::Ptr;
+ use libc;
use vec;
use ptr;
- use ptr::Ptr;
+ use cast;
+ use str;
+ use option::*;
+ use str::raw::from_c_str;
+ use to_str::ToStr;
use libc::{c_void, c_int, size_t, malloc, free};
use cast::transmute;
use ptr::null;
}
- #[test]
- fn connect_close() {
- do run_in_bare_thread() {
- let mut loop_ = Loop::new();
- let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
- // Connect to a port where nobody is listening
- let addr = next_test_ip4();
- do tcp_watcher.connect(addr) |stream_watcher, status| {
- rtdebug!("tcp_watcher.connect!");
- assert!(status.is_some());
- assert!(status.get().name() == ~"ECONNREFUSED");
- stream_watcher.close(||());
+ #[cfg(test)]
+ mod test {
+ use super::*;
+ use util::ignore;
+ use cell::Cell;
+ use vec;
+ use unstable::run_in_bare_thread;
+ use rt::thread::Thread;
+ use rt::test::*;
+ use rt::uv::{Loop, AllocCallback};
+ use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
+
+ #[test]
+ fn connect_close() {
+ do run_in_bare_thread() {
+ let mut loop_ = Loop::new();
+ let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+ // Connect to a port where nobody is listening
+ let addr = next_test_ip4();
+ do tcp_watcher.connect(addr) |stream_watcher, status| {
+ rtdebug!("tcp_watcher.connect!");
+ assert!(status.is_some());
+ assert!(status.get().name() == ~"ECONNREFUSED");
+ stream_watcher.close(||());
+ }
+ loop_.run();
+ loop_.close();
}
- loop_.run();
- loop_.close();
}
- }
- #[test]
- fn listen() {
- do run_in_bare_thread() {
- static MAX: int = 10;
- let mut loop_ = Loop::new();
- let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
- let addr = next_test_ip4();
- server_tcp_watcher.bind(addr);
- let loop_ = loop_;
- rtdebug!("listening");
- do server_tcp_watcher.listen |server_stream_watcher, status| {
- rtdebug!("listened!");
- assert!(status.is_none());
- let mut server_stream_watcher = server_stream_watcher;
- let mut loop_ = loop_;
- let client_tcp_watcher = TcpWatcher::new(&mut loop_);
- let mut client_tcp_watcher = client_tcp_watcher.as_stream();
- server_stream_watcher.accept(client_tcp_watcher);
- let count_cell = Cell(0);
- let server_stream_watcher = server_stream_watcher;
- rtdebug!("starting read");
- let alloc: AllocCallback = |size| {
- vec_to_uv_buf(vec::from_elem(size, 0))
- };
- do client_tcp_watcher.read_start(alloc)
- |stream_watcher, nread, buf, status| {
-
- rtdebug!("i'm reading!");
- let buf = vec_from_uv_buf(buf);
- let mut count = count_cell.take();
- if status.is_none() {
- rtdebug!("got %d bytes", nread);
- let buf = buf.unwrap();
- for buf.slice(0, nread as uint).each |byte| {
- assert!(*byte == count as u8);
- rtdebug!("%u", *byte as uint);
- count += 1;
- }
- } else {
- assert!(count == MAX);
- do stream_watcher.close {
- server_stream_watcher.close(||());
+ #[test]
+ fn listen() {
+ do run_in_bare_thread() {
+ static MAX: int = 10;
+ let mut loop_ = Loop::new();
+ let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+ let addr = next_test_ip4();
+ server_tcp_watcher.bind(addr);
+ let loop_ = loop_;
+ rtdebug!("listening");
+ do server_tcp_watcher.listen |server_stream_watcher, status| {
+ rtdebug!("listened!");
+ assert!(status.is_none());
+ let mut server_stream_watcher = server_stream_watcher;
+ let mut loop_ = loop_;
+ let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+ let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+ server_stream_watcher.accept(client_tcp_watcher);
+ let count_cell = Cell(0);
+ let server_stream_watcher = server_stream_watcher;
+ rtdebug!("starting read");
+ let alloc: AllocCallback = |size| {
+ vec_to_uv_buf(vec::from_elem(size, 0))
+ };
+ do client_tcp_watcher.read_start(alloc)
+ |stream_watcher, nread, buf, status| {
+
+ rtdebug!("i'm reading!");
+ let buf = vec_from_uv_buf(buf);
+ let mut count = count_cell.take();
+ if status.is_none() {
+ rtdebug!("got %d bytes", nread);
+ let buf = buf.unwrap();
+ for buf.slice(0, nread as uint).each |byte| {
+ assert!(*byte == count as u8);
+ rtdebug!("%u", *byte as uint);
+ count += 1;
+ }
+ } else {
+ assert!(count == MAX);
+ do stream_watcher.close {
+ server_stream_watcher.close(||());
+ }
}
+ count_cell.put_back(count);
}
- count_cell.put_back(count);
}
- }
- let _client_thread = do Thread::start {
- rtdebug!("starting client thread");
- let mut loop_ = Loop::new();
- let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
- do tcp_watcher.connect(addr) |stream_watcher, status| {
- rtdebug!("connecting");
- assert!(status.is_none());
- let mut stream_watcher = stream_watcher;
- let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
- do stream_watcher.write(msg) |stream_watcher, status| {
- rtdebug!("writing");
+ let _client_thread = do Thread::start {
+ rtdebug!("starting client thread");
+ let mut loop_ = Loop::new();
+ let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+ do tcp_watcher.connect(addr) |stream_watcher, status| {
+ rtdebug!("connecting");
assert!(status.is_none());
- stream_watcher.close(||());
+ let mut stream_watcher = stream_watcher;
+ let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
+ let buf = slice_to_uv_buf(msg);
+ let msg_cell = Cell(msg);
+ do stream_watcher.write(buf) |stream_watcher, status| {
+ rtdebug!("writing");
+ assert!(status.is_none());
+ let msg_cell = Cell(msg_cell.take());
+ stream_watcher.close(||ignore(msg_cell.take()));
+ }
}
- }
+ loop_.run();
+ loop_.close();
+ };
+
+ let mut loop_ = loop_;
loop_.run();
loop_.close();
- };
-
- let mut loop_ = loop_;
- loop_.run();
- loop_.close();
+ }
}
-}
+}
--- /dev/null
-
+ // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+ // file at the top-level directory of this distribution and at
+ // http://rust-lang.org/COPYRIGHT.
+ //
+ // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+ // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+ // option. This file may not be copied, modified, or distributed
+ // except according to those terms.
+
+ use option::*;
+ use result::*;
+ use ops::Drop;
++use old_iter::CopyableIter;
+ use cell::{Cell, empty_cell};
+ use cast::transmute;
- let mut buf = [1, .. 2048];
++use clone::Clone;
+ use rt::io::IoError;
+ use rt::io::net::ip::IpAddr;
+ use rt::uv::*;
+ use rt::uv::idle::IdleWatcher;
+ use rt::rtio::*;
+ use rt::sched::{Scheduler, local_sched};
+ use rt::io::{standard_error, OtherIoError};
+ use rt::tube::Tube;
+
++#[cfg(test)] use container::Container;
+ #[cfg(test)] use uint;
+ #[cfg(test)] use unstable::run_in_bare_thread;
+ #[cfg(test)] use rt::test::*;
+
+ pub struct UvEventLoop {
+ uvio: UvIoFactory
+ }
+
+ pub impl UvEventLoop {
+ fn new() -> UvEventLoop {
+ UvEventLoop {
+ uvio: UvIoFactory(Loop::new())
+ }
+ }
+
+ /// A convenience constructor
+ fn new_scheduler() -> Scheduler {
+ Scheduler::new(~UvEventLoop::new())
+ }
+ }
+
+ impl Drop for UvEventLoop {
+ fn finalize(&self) {
+ // XXX: Need mutable finalizer
+ let this = unsafe {
+ transmute::<&UvEventLoop, &mut UvEventLoop>(self)
+ };
+ this.uvio.uv_loop().close();
+ }
+ }
+
+ impl EventLoop for UvEventLoop {
+
+ fn run(&mut self) {
+ self.uvio.uv_loop().run();
+ }
+
+ fn callback(&mut self, f: ~fn()) {
+ let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
+ do idle_watcher.start |idle_watcher, status| {
+ assert!(status.is_none());
+ let mut idle_watcher = idle_watcher;
+ idle_watcher.stop();
+ idle_watcher.close(||());
+ f();
+ }
+ }
+
+ fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
+ Some(&mut self.uvio)
+ }
+ }
+
+ #[test]
+ fn test_callback_run_once() {
+ do run_in_bare_thread {
+ let mut event_loop = UvEventLoop::new();
+ let mut count = 0;
+ let count_ptr: *mut int = &mut count;
+ do event_loop.callback {
+ unsafe { *count_ptr += 1 }
+ }
+ event_loop.run();
+ assert!(count == 1);
+ }
+ }
+
+ pub struct UvIoFactory(Loop);
+
+ pub impl UvIoFactory {
+ fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
+ match self { &UvIoFactory(ref mut ptr) => ptr }
+ }
+ }
+
+ impl IoFactory for UvIoFactory {
+ // Connect to an address and return a new stream
+ // NB: This blocks the task waiting on the connection.
+ // It would probably be better to return a future
+ fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
+ // Create a cell in the task to hold the result. We will fill
+ // the cell before resuming the task.
+ let result_cell = empty_cell();
+ let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
+
+ let scheduler = local_sched::take();
+ assert!(scheduler.in_task_context());
+
+ // Block this task and take ownership, switch to scheduler context
+ do scheduler.deschedule_running_task_and_then |task| {
+
+ rtdebug!("connect: entered scheduler context");
+ do local_sched::borrow |scheduler| {
+ assert!(!scheduler.in_task_context());
+ }
+ let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+ let task_cell = Cell(task);
+
+ // Wait for a connection
+ do tcp_watcher.connect(addr) |stream_watcher, status| {
+ rtdebug!("connect: in connect callback");
+ if status.is_none() {
+ rtdebug!("status is none");
+ let res = Ok(~UvTcpStream { watcher: stream_watcher });
+
+ // Store the stream in the task's stack
+ unsafe { (*result_cell_ptr).put_back(res); }
+
+ // Context switch
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ } else {
+ rtdebug!("status is some");
+ let task_cell = Cell(task_cell.take());
+ do stream_watcher.close {
+ let res = Err(uv_error_to_io_error(status.get()));
+ unsafe { (*result_cell_ptr).put_back(res); }
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ };
+ }
+ }
+
+ assert!(!result_cell.is_empty());
+ return result_cell.take();
+ }
+
+ fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
+ let mut watcher = TcpWatcher::new(self.uv_loop());
+ match watcher.bind(addr) {
+ Ok(_) => Ok(~UvTcpListener::new(watcher)),
+ Err(uverr) => {
+ let scheduler = local_sched::take();
+ do scheduler.deschedule_running_task_and_then |task| {
+ let task_cell = Cell(task);
+ do watcher.as_stream().close {
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ }
+ Err(uv_error_to_io_error(uverr))
+ }
+ }
+ }
+ }
+
+ // FIXME #6090: Prefer newtype structs but Drop doesn't work
+ pub struct UvTcpListener {
+ watcher: TcpWatcher,
+ listening: bool,
+ incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+ }
+
+ impl UvTcpListener {
+ fn new(watcher: TcpWatcher) -> UvTcpListener {
+ UvTcpListener {
+ watcher: watcher,
+ listening: false,
+ incoming_streams: Tube::new()
+ }
+ }
+
+ fn watcher(&self) -> TcpWatcher { self.watcher }
+ }
+
+ impl Drop for UvTcpListener {
+ fn finalize(&self) {
+ let watcher = self.watcher();
+ let scheduler = local_sched::take();
+ do scheduler.deschedule_running_task_and_then |task| {
+ let task_cell = Cell(task);
+ do watcher.as_stream().close {
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ }
+ }
+ }
+
+ impl RtioTcpListener for UvTcpListener {
+
+ fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
+ rtdebug!("entering listen");
+
+ if self.listening {
+ return self.incoming_streams.recv();
+ }
+
+ self.listening = true;
+
+ let server_tcp_watcher = self.watcher();
+ let incoming_streams_cell = Cell(self.incoming_streams.clone());
+
+ let incoming_streams_cell = Cell(incoming_streams_cell.take());
+ let mut server_tcp_watcher = server_tcp_watcher;
+ do server_tcp_watcher.listen |server_stream_watcher, status| {
+ let maybe_stream = if status.is_none() {
+ let mut server_stream_watcher = server_stream_watcher;
+ let mut loop_ = server_stream_watcher.event_loop();
+ let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+ let client_tcp_watcher = client_tcp_watcher.as_stream();
+ // XXX: Need's to be surfaced in interface
+ server_stream_watcher.accept(client_tcp_watcher);
+ Ok(~UvTcpStream { watcher: client_tcp_watcher })
+ } else {
+ Err(standard_error(OtherIoError))
+ };
+
+ let mut incoming_streams = incoming_streams_cell.take();
+ incoming_streams.send(maybe_stream);
+ incoming_streams_cell.put_back(incoming_streams);
+ }
+
+ return self.incoming_streams.recv();
+ }
+ }
+
+ // FIXME #6090: Prefer newtype structs but Drop doesn't work
+ pub struct UvTcpStream {
+ watcher: StreamWatcher
+ }
+
+ impl UvTcpStream {
+ fn watcher(&self) -> StreamWatcher { self.watcher }
+ }
+
+ impl Drop for UvTcpStream {
+ fn finalize(&self) {
+ rtdebug!("closing tcp stream");
+ let watcher = self.watcher();
+ let scheduler = local_sched::take();
+ do scheduler.deschedule_running_task_and_then |task| {
+ let task_cell = Cell(task);
+ do watcher.close {
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ }
+ }
+ }
+
+ impl RtioTcpStream for UvTcpStream {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+ let result_cell = empty_cell();
+ let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+
+ let scheduler = local_sched::take();
+ assert!(scheduler.in_task_context());
+ let watcher = self.watcher();
+ let buf_ptr: *&mut [u8] = &buf;
+ do scheduler.deschedule_running_task_and_then |task| {
+ rtdebug!("read: entered scheduler context");
+ do local_sched::borrow |scheduler| {
+ assert!(!scheduler.in_task_context());
+ }
+ let mut watcher = watcher;
+ let task_cell = Cell(task);
+ // XXX: We shouldn't reallocate these callbacks every
+ // call to read
+ let alloc: AllocCallback = |_| unsafe {
+ slice_to_uv_buf(*buf_ptr)
+ };
+ do watcher.read_start(alloc) |watcher, nread, _buf, status| {
+
+ // Stop reading so that no read callbacks are
+ // triggered before the user calls `read` again.
+ // XXX: Is there a performance impact to calling
+ // stop here?
+ let mut watcher = watcher;
+ watcher.read_stop();
+
+ let result = if status.is_none() {
+ assert!(nread >= 0);
+ Ok(nread as uint)
+ } else {
+ Err(standard_error(OtherIoError))
+ };
+
+ unsafe { (*result_cell_ptr).put_back(result); }
+
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ }
+
+ assert!(!result_cell.is_empty());
+ return result_cell.take();
+ }
+
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+ let result_cell = empty_cell();
+ let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+ let scheduler = local_sched::take();
+ assert!(scheduler.in_task_context());
+ let watcher = self.watcher();
+ let buf_ptr: *&[u8] = &buf;
+ do scheduler.deschedule_running_task_and_then |task| {
+ let mut watcher = watcher;
+ let task_cell = Cell(task);
+ let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+ do watcher.write(buf) |_watcher, status| {
+ let result = if status.is_none() {
+ Ok(())
+ } else {
+ Err(standard_error(OtherIoError))
+ };
+
+ unsafe { (*result_cell_ptr).put_back(result); }
+
+ let scheduler = local_sched::take();
+ scheduler.resume_task_immediately(task_cell.take());
+ }
+ }
+
+ assert!(!result_cell.is_empty());
+ return result_cell.take();
+ }
+ }
+
+ #[test]
+ fn test_simple_io_no_connect() {
+ do run_in_newsched_task {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let addr = next_test_ip4();
+ let maybe_chan = (*io).tcp_connect(addr);
+ assert!(maybe_chan.is_err());
+ }
+ }
+ }
+
+ #[test]
+ fn test_simple_tcp_server_and_client() {
+ do run_in_newsched_task {
+ let addr = next_test_ip4();
+
+ // Start the server first so it's listening when we connect
+ do spawntask_immediately {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let mut listener = (*io).tcp_bind(addr).unwrap();
+ let mut stream = listener.accept().unwrap();
+ let mut buf = [0, .. 2048];
+ let nread = stream.read(buf).unwrap();
+ assert!(nread == 8);
+ for uint::range(0, nread) |i| {
+ rtdebug!("%u", buf[i] as uint);
+ assert!(buf[i] == i as u8);
+ }
+ }
+ }
+
+ do spawntask_immediately {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let mut stream = (*io).tcp_connect(addr).unwrap();
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ }
+ }
+ }
+ }
+
+ #[test] #[ignore(reason = "busted")]
+ fn test_read_and_block() {
+ do run_in_newsched_task {
+ let addr = next_test_ip4();
+
+ do spawntask_immediately {
+ let io = unsafe { local_sched::unsafe_borrow_io() };
+ let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
+ let mut stream = listener.accept().unwrap();
+ let mut buf = [0, .. 2048];
+
+ let expected = 32;
+ let mut current = 0;
+ let mut reads = 0;
+
+ while current < expected {
+ let nread = stream.read(buf).unwrap();
+ for uint::range(0, nread) |i| {
+ let val = buf[i] as uint;
+ assert!(val == current % 8);
+ current += 1;
+ }
+ reads += 1;
+
+ let scheduler = local_sched::take();
+ // Yield to the other task in hopes that it
+ // will trigger a read callback while we are
+ // not ready for it
+ do scheduler.deschedule_running_task_and_then |task| {
+ let task = Cell(task);
+ do local_sched::borrow |scheduler| {
+ scheduler.task_queue.push_back(task.take());
+ }
+ }
+ }
+
+ // Make sure we had multiple reads
+ assert!(reads > 1);
+ }
+
+ do spawntask_immediately {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let mut stream = (*io).tcp_connect(addr).unwrap();
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ }
+ }
+
+ }
+ }
+
+ #[test]
+ fn test_read_read_read() {
+ do run_in_newsched_task {
+ let addr = next_test_ip4();
+ static MAX: uint = 500000;
+
+ do spawntask_immediately {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let mut listener = (*io).tcp_bind(addr).unwrap();
+ let mut stream = listener.accept().unwrap();
++ let buf = [1, .. 2048];
+ let mut total_bytes_written = 0;
+ while total_bytes_written < MAX {
+ stream.write(buf);
+ total_bytes_written += buf.len();
+ }
+ }
+ }
+
+ do spawntask_immediately {
+ unsafe {
+ let io = local_sched::unsafe_borrow_io();
+ let mut stream = (*io).tcp_connect(addr).unwrap();
+ let mut buf = [0, .. 2048];
+ let mut total_bytes_read = 0;
+ while total_bytes_read < MAX {
+ let nread = stream.read(buf).unwrap();
+ rtdebug!("read %u bytes", nread as uint);
+ total_bytes_read += nread;
+ for uint::range(0, nread) |i| {
+ assert!(buf[i] == 1);
+ }
+ }
+ rtdebug!("read %u bytes total", total_bytes_read as uint);
+ }
+ }
+ }
+ }
--- /dev/null
- pub mod exchange_alloc;
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#[doc(hidden)];
+
+use libc;
+use comm::{GenericChan, GenericPort};
+use prelude::*;
+use task;
+
+pub mod at_exit;
+pub mod global;
+pub mod finally;
+pub mod weak_task;
+pub mod intrinsics;
+pub mod simd;
+pub mod extfmt;
+#[cfg(not(test))]
+pub mod lang;
+pub mod sync;
+
+/**
+
+Start a new thread outside of the current runtime context and wait
+for it to terminate.
+
+The executing thread has no access to a task pointer and will be using
+a normal large stack.
+*/
+pub fn run_in_bare_thread(f: ~fn()) {
+ let (port, chan) = comm::stream();
+ // FIXME #4525: Unfortunate that this creates an extra scheduler but it's
+ // necessary since rust_raw_thread_join_delete is blocking
+ do task::spawn_sched(task::SingleThreaded) {
+ unsafe {
+ let closure: &fn() = || {
+ f()
+ };
+ let thread = rust_raw_thread_start(&closure);
+ rust_raw_thread_join_delete(thread);
+ chan.send(());
+ }
+ }
+ port.recv();
+}
+
+#[test]
+fn test_run_in_bare_thread() {
+ let i = 100;
+ do run_in_bare_thread {
+ assert!(i == 100);
+ }
+}
+
+#[test]
+fn test_run_in_bare_thread_exchange() {
+ // Does the exchange heap work without the runtime?
+ let i = ~100;
+ do run_in_bare_thread {
+ assert!(i == ~100);
+ }
+}
+
+#[allow(non_camel_case_types)] // runtime type
+pub type raw_thread = libc::c_void;
+
+extern {
+ fn rust_raw_thread_start(f: &(&fn())) -> *raw_thread;
+ fn rust_raw_thread_join_delete(thread: *raw_thread);
+}