use getopts;
use sort;
+ use stats::Stats;
use term;
-
- use core::comm::{stream, SharedChan};
- use core::either;
- use core::io;
- use core::option;
- use core::result;
- use core::task;
- use core::to_str::ToStr;
- use core::uint;
- use core::vec;
+ use time::precise_time_ns;
+
+ use std::comm::{stream, SharedChan};
+ use std::either;
+ use std::io;
+ use std::num;
+ use std::option;
+ use std::rand::RngUtil;
+ use std::rand;
+ use std::result;
+ use std::task;
+ use std::to_str::ToStr;
+ use std::u64;
+ use std::uint;
+ use std::vec;
-pub mod rustrt {
- use std::libc::size_t;
-
- #[abi = "cdecl"]
- pub extern {
- pub unsafe fn rust_sched_threads() -> size_t;
- }
-}
// The name of a test. By convention this follows the rules for rust
// paths; i.e. it should be a series of identifiers separated by double
static sched_overcommit : uint = 4u;
fn get_concurrency() -> uint {
- use core::rt;
- unsafe {
- let threads = rustrt::rust_sched_threads() as uint;
- if threads == 1 { 1 }
- else { threads * sched_overcommit }
- }
++ use std::rt;
+ let threads = rt::util::default_sched_threads();
+ if threads == 1 { 1 }
+ else { threads * sched_overcommit }
}
#[allow(non_implicitly_copyable_typarams)]
pub mod traits {}
pub mod raw {
- use at_vec::{capacity, rustrt};
+ use at_vec::capacity;
++ use cast;
use cast::{transmute, transmute_copy};
use libc;
use ptr;
use sys;
use uint;
-- use unstable::intrinsics::{move_val_init};
++ use unstable::intrinsics;
++ use unstable::intrinsics::{move_val_init, TyDesc};
use vec;
- #[cfg(stage0)]
- use intrinsic::{get_tydesc};
- #[cfg(not(stage0))]
- use unstable::intrinsics::{get_tydesc};
+ use vec::UnboxedVecRepr;
- use sys::TypeDesc;
pub type VecRepr = vec::raw::VecRepr;
pub type SliceRepr = vec::raw::SliceRepr;
pub unsafe fn reserve<T>(v: &mut @[T], n: uint) {
// Only make the (slow) call into the runtime if we have to
if capacity(*v) < n {
- let ptr: **VecRepr = transmute(v);
- rustrt::vec_reserve_shared_actual(get_tydesc::<T>(),
- ptr, n as libc::size_t);
+ let ptr: *mut *mut VecRepr = transmute(v);
- let ty = sys::get_type_desc::<T>();
++ let ty = intrinsics::get_tydesc::<T>();
++ // XXX transmute shouldn't be necessary
++ let ty = cast::transmute(ty);
+ return reserve_raw(ty, ptr, n);
+ }
+ }
+
+ // Implementation detail. Shouldn't be public
+ #[allow(missing_doc)]
- pub fn reserve_raw(ty: *TypeDesc, ptr: *mut *mut VecRepr, n: uint) {
++ pub fn reserve_raw(ty: *TyDesc, ptr: *mut *mut VecRepr, n: uint) {
+
+ unsafe {
+ let size_in_bytes = n * (*ty).size;
+ if size_in_bytes > (**ptr).unboxed.alloc {
+ let total_size = size_in_bytes + sys::size_of::<UnboxedVecRepr>();
+ // XXX: UnboxedVecRepr has an extra u8 at the end
+ let total_size = total_size - sys::size_of::<u8>();
+ (*ptr) = local_realloc(*ptr as *(), total_size) as *mut VecRepr;
+ (**ptr).unboxed.alloc = size_in_bytes;
+ }
+ }
+
+ fn local_realloc(ptr: *(), size: uint) -> *() {
+ use rt;
+ use rt::OldTaskContext;
+ use rt::local::Local;
+ use rt::task::Task;
+
+ if rt::context() == OldTaskContext {
+ unsafe {
+ return rust_local_realloc(ptr, size as libc::size_t);
+ }
+
+ extern {
+ #[fast_ffi]
+ fn rust_local_realloc(ptr: *(), size: libc::size_t) -> *();
+ }
+ } else {
+ do Local::borrow::<Task, *()> |task| {
+ task.heap.realloc(ptr as *libc::c_void, size) as *()
+ }
+ }
}
}
#[doc(hidden)];
- use libc::{c_char, c_void, intptr_t, uintptr_t};
- use ptr::mut_null;
-use libc::{c_char, intptr_t, uintptr_t};
++use libc::c_void;
+ use ptr::{mut_null};
use repr::BoxRepr;
- use rt;
- use rt::OldTaskContext;
- use sys::TypeDesc;
use cast::transmute;
-#[cfg(not(test))] use unstable::lang::clear_task_borrow_list;
+ use unstable::intrinsics::TyDesc;
- #[cfg(not(test))] use ptr::to_unsafe_ptr;
-/**
- * Runtime structures
- *
- * NB: These must match the representation in the C++ runtime.
- */
--
- type DropGlue<'self> = &'self fn(**TypeDesc, *c_void);
-type TaskID = uintptr_t;
-
-struct StackSegment { priv opaque: () }
-struct Scheduler { priv opaque: () }
-struct SchedulerLoop { priv opaque: () }
-struct Kernel { priv opaque: () }
-struct Env { priv opaque: () }
-struct AllocHeader { priv opaque: () }
-struct MemoryRegion { priv opaque: () }
-
-#[cfg(target_arch="x86")]
-struct Registers {
- data: [u32, ..16]
-}
-
-#[cfg(target_arch="arm")]
-#[cfg(target_arch="mips")]
-struct Registers {
- data: [u32, ..32]
-}
-
-#[cfg(target_arch="x86")]
-#[cfg(target_arch="arm")]
-#[cfg(target_arch="mips")]
-struct Context {
- regs: Registers,
- next: *Context,
- pad: [u32, ..3]
-}
-
-#[cfg(target_arch="x86_64")]
-struct Registers {
- data: [u64, ..22]
-}
-
-#[cfg(target_arch="x86_64")]
-struct Context {
- regs: Registers,
- next: *Context,
- pad: uintptr_t
-}
-
-struct BoxedRegion {
- env: *Env,
- backing_region: *MemoryRegion,
- live_allocs: *BoxRepr
-}
-
-#[cfg(target_arch="x86")]
-#[cfg(target_arch="arm")]
-#[cfg(target_arch="mips")]
-struct Task {
- // Public fields
- refcount: intptr_t, // 0
- id: TaskID, // 4
- pad: [u32, ..2], // 8
- ctx: Context, // 16
- stack_segment: *StackSegment, // 96
- runtime_sp: uintptr_t, // 100
- scheduler: *Scheduler, // 104
- scheduler_loop: *SchedulerLoop, // 108
-
- // Fields known only to the runtime
- kernel: *Kernel, // 112
- name: *c_char, // 116
- list_index: i32, // 120
- boxed_region: BoxedRegion // 128
-}
-
-#[cfg(target_arch="x86_64")]
-struct Task {
- // Public fields
- refcount: intptr_t,
- id: TaskID,
- ctx: Context,
- stack_segment: *StackSegment,
- runtime_sp: uintptr_t,
- scheduler: *Scheduler,
- scheduler_loop: *SchedulerLoop,
-
- // Fields known only to the runtime
- kernel: *Kernel,
- name: *c_char,
- list_index: i32,
- boxed_region: BoxedRegion
-}
++type DropGlue<'self> = &'self fn(**TyDesc, *c_void);
/*
* Box annihilation
#[cfg(unix)]
fn debug_mem() -> bool {
- ::rt::env::get().debug_mem
++ use rt;
++ use rt::OldTaskContext;
+ // XXX: Need to port the environment struct to newsched
+ match rt::context() {
+ OldTaskContext => ::rt::env::get().debug_mem,
+ _ => false
+ }
}
#[cfg(windows)]
false
}
+ #[inline]
+ #[cfg(not(stage0))]
+ unsafe fn call_drop_glue(tydesc: *TyDesc, data: *i8) {
+ // This function should be inlined when stage0 is gone
+ ((*tydesc).drop_glue)(data);
+ }
+
+ #[inline]
+ #[cfg(stage0)]
+ unsafe fn call_drop_glue(tydesc: *TyDesc, data: *i8) {
+ ((*tydesc).drop_glue)(0 as **TyDesc, data);
+ }
+
/// Destroys all managed memory (i.e. @ boxes) held by the current task.
-#[cfg(not(test))]
-#[lang="annihilate"]
pub unsafe fn annihilate() {
- use unstable::lang::local_free;
+ use rt::local_heap::local_free;
use io::WriterUtil;
use io;
use libc;
// callback, as the original value may have been freed.
for each_live_alloc(false) |box, uniq| {
if !uniq {
- let tydesc: *TypeDesc = transmute(copy (*box).header.type_desc);
- let drop_glue: DropGlue = transmute(((*tydesc).drop_glue, 0));
- drop_glue(&tydesc, transmute(&(*box).data));
- let tydesc = (*box).header.type_desc;
++ let tydesc: *TyDesc = transmute(copy (*box).header.type_desc);
+ let data = transmute(&(*box).data);
+ call_drop_glue(tydesc, data);
}
}
/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
- ch: Exclusive<pipesy::Chan<T>>
+ inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
}
- impl<T: Owned> SharedChan<T> {
+ impl<T: Send> SharedChan<T> {
/// Converts a `chan` into a `shared_chan`.
pub fn new(c: Chan<T>) -> SharedChan<T> {
let Chan { inner } = c;
}
}
- impl<T: Owned> GenericChan<T> for SharedChan<T> {
+ impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, x: T) {
- unsafe {
- let mut xx = Some(x);
- do self.ch.with_imm |chan| {
- let x = replace(&mut xx, None);
- chan.send(x.unwrap())
+ match self.inner {
+ Left(ref chan) => {
+ unsafe {
+ let mut xx = Some(x);
+ do chan.with_imm |chan| {
+ let x = replace(&mut xx, None);
+ chan.send(x.unwrap())
+ }
+ }
}
+ Right(ref chan) => chan.send(x)
}
}
}
- impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
+ impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, x: T) -> bool {
- unsafe {
- let mut xx = Some(x);
- do self.ch.with_imm |chan| {
- let x = replace(&mut xx, None);
- chan.try_send(x.unwrap())
+ match self.inner {
+ Left(ref chan) => {
+ unsafe {
+ let mut xx = Some(x);
+ do chan.with_imm |chan| {
+ let x = replace(&mut xx, None);
+ chan.try_send(x.unwrap())
+ }
+ }
}
+ Right(ref chan) => chan.try_send(x)
}
}
}
- impl<T: Owned> ::clone::Clone for SharedChan<T> {
+ impl<T: Send> ::clone::Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
- SharedChan { ch: self.ch.clone() }
+ SharedChan { inner: self.inner.clone() }
}
}
use os::win32::{
as_utf16_p
};
- use rt::global_heap::{malloc_raw, free_raw};
+ use rt::global_heap::malloc_raw;
++
#[nolink]
extern {
unsafe fn rust_list_dir_wfd_size() -> libc::size_t;
#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
pub fn real_args() -> ~[~str] {
- unsafe {
- let argc = rustrt::rust_get_argc();
- let argv = rustrt::rust_get_argv();
- load_argc_and_argv(argc, argv)
++ use rt;
++ use rt::TaskContext;
++
+ if rt::context() == TaskContext {
+ match rt::args::clone() {
+ Some(args) => args,
+ None => fail!("process arguments not initialized")
+ }
+ } else {
+ unsafe {
+ let argc = rustrt::rust_get_argc();
+ let argv = rustrt::rust_get_argv();
+ load_argc_and_argv(argc, argv)
+ }
}
}
use cast;
use util;
use ops::Drop;
- use kinds::Owned;
- use rt::sched::{Scheduler};
+use rt::task::Task;
+ use kinds::Send;
-use rt::sched::{Scheduler, Coroutine};
++use rt::sched::Scheduler;
use rt::local::Local;
-use unstable::intrinsics::{atomic_xchg, atomic_load};
+use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
+use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
suppress_finalize: bool
}
- pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+ pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
let packet: ~Packet<T> = ~Packet {
- state: STATE_BOTH,
+ state: AtomicUint::new(STATE_BOTH),
payload: None
};
/// An port with unbounded size.
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
- next: Cell<PortOne<StreamPayload<T>>>
+ next: Cell<StreamPortOne<T>>
}
- pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
+ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
let (pone, cone) = oneshot();
let port = Port { next: Cell::new(pone) };
let chan = Chan { next: Cell::new(cone) };
}
}
- impl<T: Owned> GenericChan<T> for SharedChan<T> {
+pub struct SharedChan<T> {
+ // Just like Chan, but a shared AtomicOption instead of Cell
+ priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
+}
+
+impl<T> SharedChan<T> {
+ pub fn new(chan: Chan<T>) -> SharedChan<T> {
+ let next = chan.next.take();
+ let next = AtomicOption::new(~next);
+ SharedChan { next: UnsafeAtomicRcBox::new(next) }
+ }
+}
+
- impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
++impl<T: Send> GenericChan<T> for SharedChan<T> {
+ fn send(&self, val: T) {
+ self.try_send(val);
+ }
+}
+
- impl<T: Owned> GenericPort<T> for SharedPort<T> {
++impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
+ fn try_send(&self, val: T) -> bool {
+ unsafe {
+ let (next_pone, next_cone) = oneshot();
+ let cone = (*self.next.get()).swap(~next_cone, SeqCst);
+ cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
+ }
+ }
+}
+
+impl<T> Clone for SharedChan<T> {
+ fn clone(&self) -> SharedChan<T> {
+ SharedChan {
+ next: self.next.clone()
+ }
+ }
+}
+
+pub struct SharedPort<T> {
+ // The next port on which we will receive the next port on which we will receive T
+ priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
+}
+
+impl<T> SharedPort<T> {
+ pub fn new(port: Port<T>) -> SharedPort<T> {
+ // Put the data port into a new link pipe
+ let next_data_port = port.next.take();
+ let (next_link_port, next_link_chan) = oneshot();
+ next_link_chan.send(next_data_port);
+ let next_link = AtomicOption::new(~next_link_port);
+ SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
+ }
+}
+
- pub fn megapipe<T: Owned>() -> MegaPipe<T> {
++impl<T: Send> GenericPort<T> for SharedPort<T> {
+ fn recv(&self) -> T {
+ match self.try_recv() {
+ Some(val) => val,
+ None => {
+ fail!("receiving on a closed channel");
+ }
+ }
+ }
+
+ fn try_recv(&self) -> Option<T> {
+ unsafe {
+ let (next_link_port, next_link_chan) = oneshot();
+ let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
+ let link_port = link_port.unwrap();
+ let data_port = link_port.recv();
+ let (next_data_port, res) = match data_port.try_recv() {
+ Some(StreamPayload { val, next }) => {
+ (next, Some(val))
+ }
+ None => {
+ let (next_data_port, _) = oneshot();
+ (next_data_port, None)
+ }
+ };
+ next_link_chan.send(next_data_port);
+ return res;
+ }
+ }
+}
+
+impl<T> Clone for SharedPort<T> {
+ fn clone(&self) -> SharedPort<T> {
+ SharedPort {
+ next_link: self.next_link.clone()
+ }
+ }
+}
+
+// XXX: Need better name
+type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
+
- impl<T: Owned> GenericChan<T> for MegaPipe<T> {
++pub fn megapipe<T: Send>() -> MegaPipe<T> {
+ let (port, chan) = stream();
+ (SharedPort::new(port), SharedChan::new(chan))
+}
+
- impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> {
++impl<T: Send> GenericChan<T> for MegaPipe<T> {
+ fn send(&self, val: T) {
+ match *self {
+ (_, ref c) => c.send(val)
+ }
+ }
+}
+
- impl<T: Owned> GenericPort<T> for MegaPipe<T> {
++impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
+ fn try_send(&self, val: T) -> bool {
+ match *self {
+ (_, ref c) => c.try_send(val)
+ }
+ }
+}
+
++impl<T: Send> GenericPort<T> for MegaPipe<T> {
+ fn recv(&self) -> T {
+ match *self {
+ (ref p, _) => p.recv()
+ }
+ }
+
+ fn try_recv(&self) -> Option<T> {
+ match *self {
+ (ref p, _) => p.try_recv()
+ }
+ }
+}
+
#[cfg(test)]
mod test {
use super::*;
// option. This file may not be copied, modified, or distributed
// except according to those terms.
- use sys::{TypeDesc, size_of};
- use libc::{c_void, size_t, uintptr_t};
- use c_malloc = libc::malloc;
- use c_free = libc::free;
-use libc::{c_char, c_void, size_t, uintptr_t, free, malloc, realloc};
++use libc::{c_void, c_char, size_t, uintptr_t, free, malloc, realloc};
use managed::raw::{BoxHeaderRepr, BoxRepr};
- use cast::transmute;
- use unstable::intrinsics::{atomic_xadd,atomic_xsub, atomic_load};
- use ptr::null;
- use intrinsic::TyDesc;
+ use unstable::intrinsics::TyDesc;
+ use sys::size_of;
- pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
- assert!(td.is_not_null());
-
- let total_size = get_box_size(size, (*td).align);
- let p = c_malloc(total_size as size_t);
- assert!(p.is_not_null());
-
- // FIXME #3475: Converting between our two different tydesc types
- let td: *TyDesc = transmute(td);
-
- let box: &mut BoxRepr = transmute(p);
- box.header.ref_count = -1; // Exchange values not ref counted
- box.header.type_desc = td;
- box.header.prev = null();
- box.header.next = null();
+ extern {
+ #[rust_stack]
+ fn abort();
+ }
- inc_count();
+ #[inline]
+ fn get_box_size(body_size: uint, body_align: uint) -> uint {
+ let header_size = size_of::<BoxHeaderRepr>();
+ // FIXME (#2699): This alignment calculation is suspicious. Is it right?
+ let total_size = align_to(header_size, body_align) + body_size;
+ total_size
+ }
- return transmute(box);
+ // Rounds |size| to the nearest |alignment|. Invariant: |alignment| is a power
+ // of two.
+ #[inline]
+ fn align_to(size: uint, align: uint) -> uint {
+ assert!(align != 0);
+ (size + align - 1) & !(align - 1)
}
- /**
- Thin wrapper around libc::malloc, none of the box header
- stuff in exchange_alloc::malloc
- */
+
+ /// A wrapper around libc::malloc, aborting on out-of-memory
+ #[inline]
pub unsafe fn malloc_raw(size: uint) -> *c_void {
- let p = c_malloc(size as size_t);
+ let p = malloc(size as size_t);
if p.is_null() {
- fail!("Failure in malloc_raw: result ptr is null");
+ // we need a non-allocating way to print an error here
+ abort();
}
- inc_count();
p
}
- pub unsafe fn free(ptr: *c_void) {
- assert!(ptr.is_not_null());
- dec_count();
- c_free(ptr);
+ /// A wrapper around libc::realloc, aborting on out-of-memory
+ #[inline]
+ pub unsafe fn realloc_raw(ptr: *mut c_void, size: uint) -> *mut c_void {
+ let p = realloc(ptr, size as size_t);
+ if p.is_null() {
+ // we need a non-allocating way to print an error here
+ abort();
+ }
+ p
}
- ///Thin wrapper around libc::free, as with exchange_alloc::malloc_raw
- pub unsafe fn free_raw(ptr: *c_void) {
- assert!(ptr.is_not_null());
- dec_count();
- c_free(ptr);
+
+ // FIXME #4942: Make these signatures agree with exchange_alloc's signatures
+ #[cfg(stage0, not(test))]
+ #[lang="exchange_malloc"]
+ #[inline]
++pub unsafe fn exchange_malloc_(td: *c_char, size: uintptr_t) -> *c_char {
++ exchange_malloc(td, size)
+}
+
- fn inc_count() {
- unsafe {
- let exchange_count = &mut *exchange_count_ptr();
- atomic_xadd(exchange_count, 1);
- }
++#[cfg(stage0)]
++#[inline]
+ pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
+ let td = td as *TyDesc;
+ let size = size as uint;
+
+ assert!(td.is_not_null());
+
+ let total_size = get_box_size(size, (*td).align);
+ let p = malloc_raw(total_size as uint);
+
+ let box: *mut BoxRepr = p as *mut BoxRepr;
+ (*box).header.ref_count = -1;
+ (*box).header.type_desc = td;
+
+ box as *c_char
}
- fn dec_count() {
- unsafe {
- let exchange_count = &mut *exchange_count_ptr();
- atomic_xsub(exchange_count, 1);
- }
+ // FIXME #4942: Make these signatures agree with exchange_alloc's signatures
+ #[cfg(not(stage0), not(test))]
+ #[lang="exchange_malloc"]
+ #[inline]
++pub unsafe fn exchange_malloc_(align: u32, size: uintptr_t) -> *c_char {
++ exchange_malloc(align, size)
+}
+
- pub fn cleanup() {
- unsafe {
- let count_ptr = exchange_count_ptr();
- let allocations = atomic_load(&*count_ptr);
- if allocations != 0 {
- rtabort!("exchange heap not empty on exit - %i dangling allocations", allocations);
- }
- }
++#[cfg(not(stage0))]
++#[inline]
+ pub unsafe fn exchange_malloc(align: u32, size: uintptr_t) -> *c_char {
+ let total_size = get_box_size(size as uint, align as uint);
+ malloc_raw(total_size as uint) as *c_char
}
- fn get_box_size(body_size: uint, body_align: uint) -> uint {
- let header_size = size_of::<BoxHeaderRepr>();
- // FIXME (#2699): This alignment calculation is suspicious. Is it right?
- let total_size = align_to(header_size, body_align) + body_size;
- return total_size;
+ // FIXME: #7496
+ #[cfg(not(test))]
+ #[lang="closure_exchange_malloc"]
++#[inline]
++pub unsafe fn closure_exchange_malloc_(td: *c_char, size: uintptr_t) -> *c_char {
++ closure_exchange_malloc(td, size)
+}
+
- // Rounds |size| to the nearest |alignment|. Invariant: |alignment| is a power
- // of two.
- fn align_to(size: uint, align: uint) -> uint {
- assert!(align != 0);
- (size + align - 1) & !(align - 1)
+ #[inline]
+ pub unsafe fn closure_exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
+ let td = td as *TyDesc;
+ let size = size as uint;
+
+ assert!(td.is_not_null());
+
+ let total_size = get_box_size(size, (*td).align);
+ let p = malloc_raw(total_size as uint);
+
+ let box: *mut BoxRepr = p as *mut BoxRepr;
+ (*box).header.type_desc = td;
+
+ box as *c_char
}
- fn exchange_count_ptr() -> *mut int {
- // XXX: Need mutable globals
- unsafe { transmute(&rust_exchange_count) }
+ // NB: Calls to free CANNOT be allowed to fail, as throwing an exception from
+ // inside a landing pad may corrupt the state of the exception handler.
+ #[cfg(not(test))]
+ #[lang="exchange_free"]
++#[inline]
++pub unsafe fn exchange_free_(ptr: *c_char) {
++ exchange_free(ptr)
+}
+
- extern {
- static rust_exchange_count: uintptr_t;
+ #[inline]
+ pub unsafe fn exchange_free(ptr: *c_char) {
+ free(ptr as *c_void);
}
--- /dev/null
- fn finalize(&self) {
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! The JoinLatch is a concurrent type that establishes the task
+//! tree and propagates failure.
+//!
+//! Each task gets a JoinLatch that is derived from the JoinLatch
+//! of its parent task. Every latch must be released by either calling
+//! the non-blocking `release` method or the task-blocking `wait` method.
+//! Releasing a latch does not complete until all of its child latches
+//! complete.
+//!
+//! Latches carry a `success` flag that is set to `false` during task
+//! failure and is propagated both from children to parents and parents
+//! to children. The status af this flag may be queried for the purposes
+//! of linked failure.
+//!
+//! In addition to failure propagation the task tree serves to keep the
+//! default task schedulers alive. The runtime only sends the shutdown
+//! message to schedulers once the root task exits.
+//!
+//! Under this scheme tasks that terminate before their children become
+//! 'zombies' since they may not exit until their children do. Zombie
+//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks
+//! themselves allowed to terminate.
+//!
+//! XXX: Propagate flag from parents to children.
+//! XXX: Tombstoning actually doesn't work.
+//! XXX: This could probably be done in a way that doesn't leak tombstones
+//! longer than the life of the child tasks.
+
+use comm::{GenericPort, Peekable, GenericSmartChan};
+use clone::Clone;
+use container::Container;
+use option::{Option, Some, None};
+use ops::Drop;
+use rt::comm::{SharedChan, Port, stream};
+use rt::local::Local;
+use rt::sched::Scheduler;
+use unstable::atomics::{AtomicUint, SeqCst};
+use util;
+use vec::OwnedVector;
+
+// FIXME #7026: Would prefer this to be an enum
+pub struct JoinLatch {
+ priv parent: Option<ParentLink>,
+ priv child: Option<ChildLink>,
+ closed: bool,
+}
+
+// Shared between parents and all their children.
+struct SharedState {
+ /// Reference count, held by a parent and all children.
+ count: AtomicUint,
+ success: bool
+}
+
+struct ParentLink {
+ shared: *mut SharedState,
+ // For communicating with the parent.
+ chan: SharedChan<Message>
+}
+
+struct ChildLink {
+ shared: ~SharedState,
+ // For receiving from children.
+ port: Port<Message>,
+ chan: SharedChan<Message>,
+ // Prevents dropping the child SharedState reference counts multiple times.
+ dropped_child: bool
+}
+
+// Messages from child latches to parent.
+enum Message {
+ Tombstone(~JoinLatch),
+ ChildrenTerminated
+}
+
+impl JoinLatch {
+ pub fn new_root() -> ~JoinLatch {
+ let this = ~JoinLatch {
+ parent: None,
+ child: None,
+ closed: false
+ };
+ rtdebug!("new root latch %x", this.id());
+ return this;
+ }
+
+ fn id(&self) -> uint {
+ unsafe { ::cast::transmute(&*self) }
+ }
+
+ pub fn new_child(&mut self) -> ~JoinLatch {
+ rtassert!(!self.closed);
+
+ if self.child.is_none() {
+ // This is the first time spawning a child
+ let shared = ~SharedState {
+ count: AtomicUint::new(1),
+ success: true
+ };
+ let (port, chan) = stream();
+ let chan = SharedChan::new(chan);
+ let child = ChildLink {
+ shared: shared,
+ port: port,
+ chan: chan,
+ dropped_child: false
+ };
+ self.child = Some(child);
+ }
+
+ let child_link: &mut ChildLink = self.child.get_mut_ref();
+ let shared_state: *mut SharedState = &mut *child_link.shared;
+
+ child_link.shared.count.fetch_add(1, SeqCst);
+
+ let child = ~JoinLatch {
+ parent: Some(ParentLink {
+ shared: shared_state,
+ chan: child_link.chan.clone()
+ }),
+ child: None,
+ closed: false
+ };
+ rtdebug!("NEW child latch %x", child.id());
+ return child;
+ }
+
+ pub fn release(~self, local_success: bool) {
+ // XXX: This should not block, but there's a bug in the below
+ // code that I can't figure out.
+ self.wait(local_success);
+ }
+
+ // XXX: Should not require ~self
+ fn release_broken(~self, local_success: bool) {
+ rtassert!(!self.closed);
+
+ rtdebug!("releasing %x", self.id());
+
+ let id = self.id();
+ let _ = id; // XXX: `id` is only used in debug statements so appears unused
+ let mut this = self;
+ let mut child_success = true;
+ let mut children_done = false;
+
+ if this.child.is_some() {
+ rtdebug!("releasing children");
+ let child_link: &mut ChildLink = this.child.get_mut_ref();
+ let shared: &mut SharedState = &mut *child_link.shared;
+
+ if !child_link.dropped_child {
+ let last_count = shared.count.fetch_sub(1, SeqCst);
+ rtdebug!("child count before sub %u %x", last_count, id);
+ if last_count == 1 {
+ assert!(child_link.chan.try_send(ChildrenTerminated));
+ }
+ child_link.dropped_child = true;
+ }
+
+ // Wait for messages from children
+ let mut tombstones = ~[];
+ loop {
+ if child_link.port.peek() {
+ match child_link.port.recv() {
+ Tombstone(t) => {
+ tombstones.push(t);
+ },
+ ChildrenTerminated => {
+ children_done = true;
+ break;
+ }
+ }
+ } else {
+ break
+ }
+ }
+
+ rtdebug!("releasing %u tombstones %x", tombstones.len(), id);
+
+ // Try to release the tombstones. Those that still have
+ // outstanding will be re-enqueued. When this task's
+ // parents release their latch we'll end up back here
+ // trying them again.
+ while !tombstones.is_empty() {
+ tombstones.pop().release(true);
+ }
+
+ if children_done {
+ let count = shared.count.load(SeqCst);
+ assert!(count == 0);
+ // self_count is the acquire-read barrier
+ child_success = shared.success;
+ }
+ } else {
+ children_done = true;
+ }
+
+ let total_success = local_success && child_success;
+
+ rtassert!(this.parent.is_some());
+
+ unsafe {
+ {
+ let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+ let shared: *mut SharedState = parent_link.shared;
+
+ if !total_success {
+ // parent_count is the write-wait barrier
+ (*shared).success = false;
+ }
+ }
+
+ if children_done {
+ rtdebug!("children done");
+ do Local::borrow::<Scheduler, ()> |sched| {
+ sched.metrics.release_tombstone += 1;
+ }
+ {
+ rtdebug!("RELEASING parent %x", id);
+ let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+ let shared: *mut SharedState = parent_link.shared;
+ let last_count = (*shared).count.fetch_sub(1, SeqCst);
+ rtdebug!("count before parent sub %u %x", last_count, id);
+ if last_count == 1 {
+ assert!(parent_link.chan.try_send(ChildrenTerminated));
+ }
+ }
+ this.closed = true;
+ util::ignore(this);
+ } else {
+ rtdebug!("children not done");
+ rtdebug!("TOMBSTONING %x", id);
+ do Local::borrow::<Scheduler, ()> |sched| {
+ sched.metrics.release_no_tombstone += 1;
+ }
+ let chan = {
+ let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+ parent_link.chan.clone()
+ };
+ assert!(chan.try_send(Tombstone(this)));
+ }
+ }
+ }
+
+ // XXX: Should not require ~self
+ pub fn wait(~self, local_success: bool) -> bool {
+ rtassert!(!self.closed);
+
+ rtdebug!("WAITING %x", self.id());
+
+ let mut this = self;
+ let mut child_success = true;
+
+ if this.child.is_some() {
+ rtdebug!("waiting for children");
+ let child_link: &mut ChildLink = this.child.get_mut_ref();
+ let shared: &mut SharedState = &mut *child_link.shared;
+
+ if !child_link.dropped_child {
+ let last_count = shared.count.fetch_sub(1, SeqCst);
+ rtdebug!("child count before sub %u", last_count);
+ if last_count == 1 {
+ assert!(child_link.chan.try_send(ChildrenTerminated));
+ }
+ child_link.dropped_child = true;
+ }
+
+ // Wait for messages from children
+ loop {
+ match child_link.port.recv() {
+ Tombstone(t) => {
+ t.wait(true);
+ }
+ ChildrenTerminated => break
+ }
+ }
+
+ let count = shared.count.load(SeqCst);
+ if count != 0 { ::io::println(fmt!("%u", count)); }
+ assert!(count == 0);
+ // self_count is the acquire-read barrier
+ child_success = shared.success;
+ }
+
+ let total_success = local_success && child_success;
+
+ if this.parent.is_some() {
+ rtdebug!("releasing parent");
+ unsafe {
+ let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+ let shared: *mut SharedState = parent_link.shared;
+
+ if !total_success {
+ // parent_count is the write-wait barrier
+ (*shared).success = false;
+ }
+
+ let last_count = (*shared).count.fetch_sub(1, SeqCst);
+ rtdebug!("count before parent sub %u", last_count);
+ if last_count == 1 {
+ assert!(parent_link.chan.try_send(ChildrenTerminated));
+ }
+ }
+ }
+
+ this.closed = true;
+ util::ignore(this);
+
+ return total_success;
+ }
+}
+
+impl Drop for JoinLatch {
- use old_iter::BaseIter;
++ fn drop(&self) {
+ rtdebug!("DESTROYING %x", self.id());
+ rtassert!(self.closed);
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use cell::Cell;
+ use container::Container;
+ use iter::Times;
- for my_orders.each |&order| {
+ use rt::test::*;
+ use rand;
+ use rand::RngUtil;
+ use vec::{CopyableVector, ImmutableVector};
+
+ #[test]
+ fn success_immediately() {
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_immediately {
+ let child_latch = child_latch.take();
+ assert!(child_latch.wait(true));
+ }
+
+ assert!(latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn success_later() {
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_later {
+ let child_latch = child_latch.take();
+ assert!(child_latch.wait(true));
+ }
+
+ assert!(latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn mt_success() {
+ do run_in_mt_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ for 10.times {
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_random {
+ let child_latch = child_latch.take();
+ assert!(child_latch.wait(true));
+ }
+ }
+
+ assert!(latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn mt_failure() {
+ do run_in_mt_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ let spawn = |status| {
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_random {
+ let child_latch = child_latch.take();
+ child_latch.wait(status);
+ }
+ };
+
+ for 10.times { spawn(true) }
+ spawn(false);
+ for 10.times { spawn(true) }
+
+ assert!(!latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn mt_multi_level_success() {
+ do run_in_mt_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ fn child(latch: &mut JoinLatch, i: int) {
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_random {
+ let mut child_latch = child_latch.take();
+ if i != 0 {
+ child(&mut *child_latch, i - 1);
+ child_latch.wait(true);
+ } else {
+ child_latch.wait(true);
+ }
+ }
+ }
+
+ child(&mut *latch, 10);
+
+ assert!(latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn mt_multi_level_failure() {
+ do run_in_mt_newsched_task {
+ let mut latch = JoinLatch::new_root();
+
+ fn child(latch: &mut JoinLatch, i: int) {
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_random {
+ let mut child_latch = child_latch.take();
+ if i != 0 {
+ child(&mut *child_latch, i - 1);
+ child_latch.wait(false);
+ } else {
+ child_latch.wait(true);
+ }
+ }
+ }
+
+ child(&mut *latch, 10);
+
+ assert!(!latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn release_child() {
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+
+ do spawntask_immediately {
+ let latch = child_latch.take();
+ latch.release(false);
+ }
+
+ assert!(!latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn release_child_tombstone() {
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+
+ do spawntask_immediately {
+ let mut latch = child_latch.take();
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_later {
+ let latch = child_latch.take();
+ latch.release(false);
+ }
+ latch.release(true);
+ }
+
+ assert!(!latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn release_child_no_tombstone() {
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+
+ do spawntask_later {
+ let mut latch = child_latch.take();
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ do spawntask_immediately {
+ let latch = child_latch.take();
+ latch.release(false);
+ }
+ latch.release(true);
+ }
+
+ assert!(!latch.wait(true));
+ }
+ }
+
+ #[test]
+ fn release_child_tombstone_stress() {
+ fn rand_orders() -> ~[bool] {
+ let mut v = ~[false,.. 5];
+ v[0] = true;
+ let mut rng = rand::rng();
+ return rng.shuffle(v);
+ }
+
+ fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) {
+ if orders.is_empty() {
+ return (~[], ~[]);
+ } else if orders.len() <= 2 {
+ return (orders.to_owned(), ~[]);
+ }
+ let mut rng = rand::rng();
+ let n = rng.gen_uint_range(1, orders.len());
+ let first = orders.slice(0, n).to_owned();
+ let last = orders.slice(n, orders.len()).to_owned();
+ assert!(first.len() + last.len() == orders.len());
+ return (first, last);
+ }
+
+ for stress_factor().times {
+ do run_in_newsched_task {
+ fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) {
+ let (my_orders, remaining_orders) = split_orders(orders);
+ rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders));
+ rtdebug!("depth: %u", depth);
+ let mut remaining_orders = remaining_orders;
+ let mut num = 0;
- for orders.each |order| {
++ for my_orders.iter().advance |&order| {
+ let child_latch = latch.new_child();
+ let child_latch = Cell::new(child_latch);
+ let (child_orders, remaining) = split_orders(remaining_orders);
+ rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining));
+ remaining_orders = remaining;
+ let child_orders = Cell::new(child_orders);
+ let child_num = num;
+ let _ = child_num; // XXX unused except in rtdebug!
+ do spawntask_random {
+ rtdebug!("depth %u num %u", depth, child_num);
+ let mut child_latch = child_latch.take();
+ let child_orders = child_orders.take();
+ doit(&mut *child_latch, child_orders, depth + 1);
+ child_latch.release(order);
+ }
+
+ num += 1;
+ }
+ }
+
+ let mut latch = JoinLatch::new_root();
+ let orders = rand_orders();
+ rtdebug!("orders: %?", orders);
+
+ doit(&mut *latch, orders, 0);
+
+ assert!(!latch.wait(true));
+ }
+ }
+ }
+
+ #[test]
+ fn whateverman() {
+ struct Order {
+ immediate: bool,
+ succeed: bool,
+ orders: ~[Order]
+ }
+ fn next(latch: &mut JoinLatch, orders: ~[Order]) {
++ for orders.iter().advance |order| {
+ let suborders = copy order.orders;
+ let child_latch = Cell::new(latch.new_child());
+ let succeed = order.succeed;
+ if order.immediate {
+ do spawntask_immediately {
+ let mut child_latch = child_latch.take();
+ next(&mut *child_latch, copy suborders);
+ rtdebug!("immediate releasing");
+ child_latch.release(succeed);
+ }
+ } else {
+ do spawntask_later {
+ let mut child_latch = child_latch.take();
+ next(&mut *child_latch, copy suborders);
+ rtdebug!("later releasing");
+ child_latch.release(succeed);
+ }
+ }
+ }
+ }
+
+ do run_in_newsched_task {
+ let mut latch = JoinLatch::new_root();
+ let orders = ~[ Order { // 0 0
+ immediate: true,
+ succeed: true,
+ orders: ~[ Order { // 1 0
+ immediate: true,
+ succeed: false,
+ orders: ~[ Order { // 2 0
+ immediate: false,
+ succeed: false,
+ orders: ~[ Order { // 3 0
+ immediate: true,
+ succeed: false,
+ orders: ~[]
+ }, Order { // 3 1
+ immediate: false,
+ succeed: false,
+ orders: ~[]
+ }]
+ }]
+ }]
+ }];
+
+ next(&mut *latch, orders);
+ assert!(!latch.wait(true));
+ }
+ }
+}
+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+
+//! A concurrent queue that supports multiple producers and a
+//! single consumer.
use container::Container;
- use kinds::Owned;
+ use kinds::Send;
use vec::OwnedVector;
use cell::Cell;
use option::*;
/// # Return value
///
/// The return value is used as the process return code. 0 on success, 101 on error.
-pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
+pub fn start(argc: int, argv: **u8, crate_map: *u8, main: ~fn()) -> int {
- use self::sched::{Scheduler, Coroutine};
- use self::uv::uvio::UvEventLoop;
+ init(argc, argv, crate_map);
+ let exit_code = run(main);
+ cleanup();
- init(crate_map);
+ return exit_code;
+}
- let loop_ = ~UvEventLoop::new();
- let mut sched = ~Scheduler::new(loop_);
- let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
+/// One-time runtime initialization.
+///
+/// Initializes global state, including frobbing
+/// the crate's logging flags, registering GC
+/// metadata, and storing the process arguments.
+pub fn init(argc: int, argv: **u8, crate_map: *u8) {
+ // XXX: Derefing these pointers is not safe.
+ // Need to propagate the unsafety to `start`.
+ unsafe {
+ args::init(argc, argv);
+ logging::init(crate_map);
+ rust_update_gc_metadata(crate_map);
+ }
- sched.enqueue_task(main_task);
- sched.run();
+ extern {
+ fn rust_update_gc_metadata(crate_map: *u8);
+ }
+}
- return 0;
+/// One-time runtime cleanup.
+pub fn cleanup() {
+ args::cleanup();
- global_heap::cleanup();
}
-/// One-time runtime initialization. Currently all this does is set up logging
-/// based on the RUST_LOG environment variable.
-pub fn init(crate_map: *u8) {
- logging::init(crate_map);
+/// Execute the main function in a scheduler.
+///
+/// Configures the runtime according to the environment, by default
+/// using a task scheduler with the same number of threads as cores.
+/// Returns a process exit code.
+pub fn run(main: ~fn()) -> int {
+
+ static DEFAULT_ERROR_CODE: int = 101;
+
+ let nthreads = util::default_sched_threads();
+
+ // The shared list of sleeping schedulers. Schedulers wake each other
+ // occassionally to do new work.
+ let sleepers = SleeperList::new();
+ // The shared work queue. Temporary until work stealing is implemented.
+ let work_queue = WorkQueue::new();
+
+ // The schedulers.
+ let mut scheds = ~[];
+ // Handles to the schedulers. When the main task ends these will be
+ // sent the Shutdown message to terminate the schedulers.
+ let mut handles = ~[];
+
+ for nthreads.times {
+ // Every scheduler is driven by an I/O event loop.
+ let loop_ = ~UvEventLoop::new();
+ let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+ let handle = sched.make_handle();
+
+ scheds.push(sched);
+ handles.push(handle);
+ }
+
+ // Create a shared cell for transmitting the process exit
+ // code from the main task to this function.
+ let exit_code = UnsafeAtomicRcBox::new(AtomicInt::new(0));
+ let exit_code_clone = exit_code.clone();
+
+ // When the main task exits, after all the tasks in the main
+ // task tree, shut down the schedulers and set the exit code.
+ let handles = Cell::new(handles);
+ let on_exit: ~fn(bool) = |exit_success| {
+
+ let mut handles = handles.take();
+ for handles.mut_iter().advance |handle| {
+ handle.send(Shutdown);
+ }
+
+ unsafe {
+ let exit_code = if exit_success { 0 } else { DEFAULT_ERROR_CODE };
+ (*exit_code_clone.get()).store(exit_code, SeqCst);
+ }
+ };
+
+ // Create and enqueue the main task.
+ let main_cell = Cell::new(main);
+ let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
+ main_cell.take());
+ main_task.on_exit = Some(on_exit);
+ scheds[0].enqueue_task(main_task);
+
+ // Run each scheduler in a thread.
+ let mut threads = ~[];
+ while !scheds.is_empty() {
+ let sched = scheds.pop();
+ let sched_cell = Cell::new(sched);
+ let thread = do Thread::start {
+ let sched = sched_cell.take();
+ sched.run();
+ };
+
+ threads.push(thread);
+ }
+
+ // Wait for schedulers
+ { let _threads = threads; }
+
+ // Return the exit code
+ unsafe {
+ (*exit_code.get()).load(SeqCst)
+ }
}
/// Possible contexts in which Rust code may be executing.
}
}
}
- let mut (p, ch1) = stream();
+
+ #[test]
+ fn handle() {
+ use rt::comm::*;
+
+ do run_in_bare_thread {
+ let (port, chan) = oneshot::<()>();
+ let port_cell = Cell::new(port);
+ let chan_cell = Cell::new(chan);
+ let mut sched1 = ~new_test_uv_sched();
+ let handle1 = sched1.make_handle();
+ let handle1_cell = Cell::new(handle1);
+ let task1 = ~do Task::new_root(&mut sched1.stack_pool) {
+ chan_cell.take().send(());
+ };
+ sched1.enqueue_task(task1);
+
+ let mut sched2 = ~new_test_uv_sched();
+ let task2 = ~do Task::new_root(&mut sched2.stack_pool) {
+ port_cell.take().recv();
+ // Release the other scheduler's handle so it can exit
+ handle1_cell.take();
+ };
+ sched2.enqueue_task(task2);
+
+ let sched1_cell = Cell::new(sched1);
+ let _thread1 = do Thread::start {
+ let sched1 = sched1_cell.take();
+ sched1.run();
+ };
+
+ let sched2_cell = Cell::new(sched2);
+ let _thread2 = do Thread::start {
+ let sched2 = sched2_cell.take();
+ sched2.run();
+ };
+ }
+ }
+
+ #[test]
+ fn multithreading() {
+ use rt::comm::*;
+ use iter::Times;
+ use vec::OwnedVector;
+ use container::Container;
+
+ do run_in_mt_newsched_task {
+ let mut ports = ~[];
+ for 10.times {
+ let (port, chan) = oneshot();
+ let chan_cell = Cell::new(chan);
+ do spawntask_later {
+ chan_cell.take().send(());
+ }
+ ports.push(port);
+ }
+
+ while !ports.is_empty() {
+ ports.pop().recv();
+ }
+ }
+ }
+
+ #[test]
+ fn thread_ring() {
+ use rt::comm::*;
+ use comm::{GenericPort, GenericChan};
+
+ do run_in_mt_newsched_task {
+ let (end_port, end_chan) = oneshot();
+
+ let n_tasks = 10;
+ let token = 2000;
+
- fn finalize(&self) {
++ let (p, ch1) = stream();
++ let mut p = p;
+ ch1.send((token, end_chan));
+ let mut i = 2;
+ while i <= n_tasks {
+ let (next_p, ch) = stream();
+ let imm_i = i;
+ let imm_p = p;
+ do spawntask_random {
+ roundtrip(imm_i, n_tasks, &imm_p, &ch);
+ };
+ p = next_p;
+ i += 1;
+ }
+ let imm_p = p;
+ let imm_ch = ch1;
+ do spawntask_random {
+ roundtrip(1, n_tasks, &imm_p, &imm_ch);
+ }
+
+ end_port.recv();
+ }
+
+ fn roundtrip(id: int, n_tasks: int,
+ p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
+ while (true) {
+ match p.recv() {
+ (1, end_chan) => {
+ debug!("%d\n", id);
+ end_chan.send(());
+ return;
+ }
+ (token, end_chan) => {
+ debug!("thread: %d got token: %d", id, token);
+ ch.send((token - 1, end_chan));
+ if token <= n_tasks {
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn start_closure_dtor() {
+ use ops::Drop;
+
+ // Regression test that the `start` task entrypoint can
+ // contain dtors that use task resources
+ do run_in_newsched_task {
+ struct S { field: () }
+
+ impl Drop for S {
++ fn drop(&self) {
+ let _foo = @0;
+ }
+ }
+
+ let s = S { field: () };
+
+ do spawntask {
+ let _ss = &s;
+ }
+ }
+ }
}
}
impl Drop for Task {
- fn finalize(&self) { assert!(self.destroyed) }
+ fn drop(&self) { assert!(self.destroyed) }
}
+// Coroutines represent nothing more than a context and a stack
+// segment.
+
+impl Coroutine {
+
+ pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
+ static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack
+
+ let start = Coroutine::build_start_wrapper(start);
+ let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
+ let initial_context = Context::new(start, &mut stack);
+ Coroutine {
+ current_stack_segment: stack,
+ saved_context: initial_context
+ }
+ }
+
+ fn build_start_wrapper(start: ~fn()) -> ~fn() {
+ let start_cell = Cell::new(start);
+ let wrapper: ~fn() = || {
+ // First code after swap to this new context. Run our
+ // cleanup job.
+ unsafe {
+ let sched = Local::unsafe_borrow::<Scheduler>();
+ (*sched).run_cleanup_job();
+
+ let sched = Local::unsafe_borrow::<Scheduler>();
+ let task = (*sched).current_task.get_mut_ref();
+
+ do task.run {
+ // N.B. Removing `start` from the start wrapper
+ // closure by emptying a cell is critical for
+ // correctness. The ~Task pointer, and in turn the
+ // closure used to initialize the first call
+ // frame, is destroyed in the scheduler context,
+ // not task context. So any captured closures must
+ // not contain user-definable dtors that expect to
+ // be in task context. By moving `start` out of
+ // the closure, all the user code goes our of
+ // scope while the task is still running.
+ let start = start_cell.take();
+ start();
+ };
+ }
+
+ let sched = Local::take::<Scheduler>();
+ sched.terminate_current_task();
+ };
+ return wrapper;
+ }
+
+ /// Destroy coroutine and try to reuse stack segment.
+ pub fn recycle(~self, stack_pool: &mut StackPool) {
+ match self {
+ ~Coroutine { current_stack_segment, _ } => {
+ stack_pool.give_segment(current_stack_segment);
+ }
+ }
+ }
+
+}
+
+
// Just a sanity check to make sure we are catching a Rust-thrown exception
static UNWIND_TOKEN: uintptr_t = 839147;
}
}
- fn finalize(&self) {
+pub struct UvRemoteCallback {
+ // The uv async handle for triggering the callback
+ async: AsyncWatcher,
+ // A flag to tell the callback to exit, set from the dtor. This is
+ // almost never contested - only in rare races with the dtor.
+ exit_flag: Exclusive<bool>
+}
+
+impl UvRemoteCallback {
+ pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
+ let exit_flag = exclusive(false);
+ let exit_flag_clone = exit_flag.clone();
+ let async = do AsyncWatcher::new(loop_) |watcher, status| {
+ assert!(status.is_none());
+ f();
+ unsafe {
+ do exit_flag_clone.with_imm |&should_exit| {
+ if should_exit {
+ watcher.close(||());
+ }
+ }
+ }
+ };
+ UvRemoteCallback {
+ async: async,
+ exit_flag: exit_flag
+ }
+ }
+}
+
+impl RemoteCallback for UvRemoteCallback {
+ fn fire(&mut self) { self.async.send() }
+}
+
+impl Drop for UvRemoteCallback {
++ fn drop(&self) {
+ unsafe {
+ let this: &mut UvRemoteCallback = cast::transmute_mut(self);
+ do this.exit_flag.with |should_exit| {
+ // NB: These two things need to happen atomically. Otherwise
+ // the event handler could wake up due to a *previous*
+ // signal and see the exit flag, destroying the handle
+ // before the final send.
+ *should_exit = true;
+ this.async.send();
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod test_remote {
+ use cell::Cell;
+ use rt::test::*;
+ use rt::thread::Thread;
+ use rt::tube::Tube;
+ use rt::rtio::EventLoop;
+ use rt::local::Local;
+ use rt::sched::Scheduler;
+
+ #[test]
+ fn test_uv_remote() {
+ do run_in_newsched_task {
+ let mut tube = Tube::new();
+ let tube_clone = tube.clone();
+ let remote_cell = Cell::new_empty();
+ do Local::borrow::<Scheduler, ()>() |sched| {
+ let tube_clone = tube_clone.clone();
+ let tube_clone_cell = Cell::new(tube_clone);
+ let remote = do sched.event_loop.remote_callback {
+ tube_clone_cell.take().send(1);
+ };
+ remote_cell.put_back(remote);
+ }
+ let _thread = do Thread::start {
+ remote_cell.take().fire();
+ };
+
+ assert!(tube.recv() == 1);
+ }
+ }
+}
+
pub struct UvIoFactory(Loop);
impl UvIoFactory {
}
impl Drop for UvTcpListener {
- fn finalize(&self) {
+ fn drop(&self) {
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
- do scheduler.deschedule_running_task_and_then |task| {
+ do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
#[allow(missing_doc)];
--use option::{Some, None};
use cast;
use gc;
use io;
// FIXME #4427: Temporary until rt::rt_fail_ goes away
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
- use option::Option;
+ use cell::Cell;
- use option::Option;
+ use either::Left;
use rt::{context, OldTaskContext, TaskContext};
-- use rt::task::{Task, Unwinder};
++ use rt::task::Task;
use rt::local::Local;
+ use rt::logging::Logger;
let context = context();
match context {
use util;
use unstable::sync::{Exclusive, exclusive};
use rt::local::Local;
- use iterator::{IteratorUtil};
+use rt::task::Task;
+ use iterator::IteratorUtil;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
}
}
-fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
+fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
- let mut sched = Local::take::<Scheduler>();
- let task = ~Coroutine::new(&mut sched.stack_pool, f);
- sched.schedule_new_task(task);
+ let f = Cell::new(f);
+
+ let mut task = unsafe {
+ let sched = Local::unsafe_borrow::<Scheduler>();
+ rtdebug!("unsafe borrowed sched");
+
+ if opts.linked {
+ do Local::borrow::<Task, ~Task>() |running_task| {
+ ~running_task.new_child(&mut (*sched).stack_pool, f.take())
+ }
+ } else {
+ // An unlinked task is a new root in the task tree
+ ~Task::new_root(&mut (*sched).stack_pool, f.take())
+ }
+ };
+
+ if opts.notify_chan.is_some() {
+ let notify_chan = opts.notify_chan.swap_unwrap();
+ let notify_chan = Cell::new(notify_chan);
+ let on_exit: ~fn(bool) = |success| {
+ notify_chan.take().send(
+ if success { Success } else { Failure }
+ )
+ };
+ task.on_exit = Some(on_exit);
+ }
+
+ rtdebug!("spawn about to take scheduler");
+
- let mut sched = Local::take::<Scheduler>();
++ let sched = Local::take::<Scheduler>();
+ rtdebug!("took sched in spawn");
- // let task = ~Coroutine::with_task(&mut sched.stack_pool,
- // task, f);
- // let task = ~Task::new_root(&mut sched.stack_pool, f);
+ sched.schedule_task(task);
}
fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {
//! Runtime calls emitted by the compiler.
--use iterator::IteratorUtil;
--use uint;
use cast::transmute;
--use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO};
-use managed::raw::BoxRepr;
++use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int};
use str;
use sys;
use rt::{context, OldTaskContext};
use rt::task::Task;
use rt::local::Local;
--use option::{Option, Some, None};
--use io;
- use rt::global_heap;
+use rt::borrowck;
- use borrow::to_uint;
#[allow(non_camel_case_types)]
pub type rust_task = c_void;
-pub static FROZEN_BIT: uint = 1 << (uint::bits - 1);
-pub static MUT_BIT: uint = 1 << (uint::bits - 2);
-static ALL_BITS: uint = FROZEN_BIT | MUT_BIT;
-
pub mod rustrt {
use unstable::lang::rust_task;
-- use libc::{c_void, c_char, uintptr_t};
++ use libc::{c_char, uintptr_t};
pub extern {
#[rust_stack]
}
}
- // FIXME #4942: Make these signatures agree with exchange_alloc's signatures
- #[lang="exchange_malloc"]
- #[inline]
- pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
- transmute(global_heap::malloc(transmute(td), transmute(size)))
-#[deriving(Eq)]
-struct BorrowRecord {
- box: *mut BoxRepr,
- file: *c_char,
- line: size_t
--}
--
- // NB: Calls to free CANNOT be allowed to fail, as throwing an exception from
- // inside a landing pad may corrupt the state of the exception handler. If a
- // problem occurs, call exit instead.
- #[lang="exchange_free"]
-fn try_take_task_borrow_list() -> Option<~[BorrowRecord]> {
- unsafe {
- let cur_task: *rust_task = rustrt::rust_try_get_task();
- if cur_task.is_not_null() {
- let ptr = rustrt::rust_take_task_borrow_list(cur_task);
- if ptr.is_null() {
- None
- } else {
- let v: ~[BorrowRecord] = transmute(ptr);
- Some(v)
- }
- } else {
- None
- }
- }
-}
-
-fn swap_task_borrow_list(f: &fn(~[BorrowRecord]) -> ~[BorrowRecord]) {
- unsafe {
- let cur_task: *rust_task = rustrt::rust_try_get_task();
- if cur_task.is_not_null() {
- let mut borrow_list: ~[BorrowRecord] = {
- let ptr = rustrt::rust_take_task_borrow_list(cur_task);
- if ptr.is_null() { ~[] } else { transmute(ptr) }
- };
- borrow_list = f(borrow_list);
- rustrt::rust_set_task_borrow_list(cur_task, transmute(borrow_list));
- }
- }
-}
-
-pub unsafe fn clear_task_borrow_list() {
- // pub because it is used by the box annihilator.
- let _ = try_take_task_borrow_list();
-}
-
-unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
- debug_borrow("fail_borrowed: ", box, 0, 0, file, line);
-
- match try_take_task_borrow_list() {
- None => { // not recording borrows
- let msg = "borrowed";
- do str::as_buf(msg) |msg_p, _| {
- fail_(msg_p as *c_char, file, line);
- }
- }
- Some(borrow_list) => { // recording borrows
- let mut msg = ~"borrowed";
- let mut sep = " at ";
- for borrow_list.rev_iter().advance |entry| {
- if entry.box == box {
- msg.push_str(sep);
- let filename = str::raw::from_c_str(entry.file);
- msg.push_str(filename);
- msg.push_str(fmt!(":%u", entry.line as uint));
- sep = " and at ";
- }
- }
- do str::as_buf(msg) |msg_p, _| {
- fail_(msg_p as *c_char, file, line)
- }
- }
- }
-}
-
-/// Because this code is so perf. sensitive, use a static constant so that
-/// debug printouts are compiled out most of the time.
-static ENABLE_DEBUG: bool = false;
-
--#[inline]
- pub unsafe fn exchange_free(ptr: *c_char) {
- global_heap::free(transmute(ptr))
-unsafe fn debug_borrow<T>(tag: &'static str,
- p: *const T,
- old_bits: uint,
- new_bits: uint,
- filename: *c_char,
- line: size_t) {
- //! A useful debugging function that prints a pointer + tag + newline
- //! without allocating memory.
-
- if ENABLE_DEBUG && ::rt::env::get().debug_borrow {
- debug_borrow_slow(tag, p, old_bits, new_bits, filename, line);
- }
-
- unsafe fn debug_borrow_slow<T>(tag: &'static str,
- p: *const T,
- old_bits: uint,
- new_bits: uint,
- filename: *c_char,
- line: size_t) {
- let dbg = STDERR_FILENO as io::fd_t;
- dbg.write_str(tag);
- dbg.write_hex(p as uint);
- dbg.write_str(" ");
- dbg.write_hex(old_bits);
- dbg.write_str(" ");
- dbg.write_hex(new_bits);
- dbg.write_str(" ");
- dbg.write_cstr(filename);
- dbg.write_str(":");
- dbg.write_hex(line as uint);
- dbg.write_str("\n");
- }
-}
-
-trait DebugPrints {
- fn write_hex(&self, val: uint);
- unsafe fn write_cstr(&self, str: *c_char);
-}
-
-impl DebugPrints for io::fd_t {
- fn write_hex(&self, mut i: uint) {
- let letters = ['0', '1', '2', '3', '4', '5', '6', '7', '8',
- '9', 'a', 'b', 'c', 'd', 'e', 'f'];
- static uint_nibbles: uint = ::uint::bytes << 1;
- let mut buffer = [0_u8, ..uint_nibbles+1];
- let mut c = uint_nibbles;
- while c > 0 {
- c -= 1;
- buffer[c] = letters[i & 0xF] as u8;
- i >>= 4;
- }
- self.write(buffer.slice(0, uint_nibbles));
- }
-
- unsafe fn write_cstr(&self, p: *c_char) {
- use libc::strlen;
- use vec;
-
- let len = strlen(p);
- let p: *u8 = transmute(p);
- do vec::raw::buf_as_slice(p, len as uint) |s| {
- self.write(s);
- }
- }
--}
--
#[lang="malloc"]
pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
match context() {
use cast::transmute;
use cast;
use container::{Container, Mutable};
+ use cmp;
use cmp::{Eq, Ord, TotalEq, TotalOrd, Ordering, Less, Equal, Greater};
use clone::Clone;
- use old_iter::BaseIter;
- use old_iter;
- use iterator::{Iterator};
+ use iterator::{FromIterator, Iterator, IteratorUtil};
use iter::FromIter;
use kinds::Copy;
--use libc;
+ use libc::c_void;
use num::Zero;
- use old_iter::CopyableIter;
+ use ops::Add;
use option::{None, Option, Some};
use ptr::to_unsafe_ptr;
use ptr;
#[cfg(not(test))] use cmp::Equiv;
-#[doc(hidden)]
--pub mod rustrt {
-- use libc;
- use sys;
-- use vec::raw;
- #[cfg(stage0)]
- use intrinsic::{TyDesc};
- #[cfg(not(stage0))]
- use unstable::intrinsics::{TyDesc};
--
-- #[abi = "cdecl"]
-- pub extern {
- // These names are terrible. reserve_shared applies
- // to ~[] and reserve_shared_actual applies to @[].
-- #[fast_ffi]
- unsafe fn vec_reserve_shared(t: *sys::TypeDesc,
- v: *mut *mut raw::VecRepr,
- n: libc::size_t);
- unsafe fn vec_reserve_shared_actual(t: *TyDesc,
- v: **raw::VecRepr,
- n: libc::size_t);
-- }
--}
--
/// Returns true if two vectors have the same length
- pub fn same_length<T, U>(xs: &const [T], ys: &const [U]) -> bool {
+ pub fn same_length<T, U>(xs: &[T], ys: &[U]) -> bool {
xs.len() == ys.len()
}
}
impl<T> OwnedVector<T> for ~[T] {
+ /**
+ * Reserves capacity for exactly `n` elements in the given vector.
+ *
+ * If the capacity for `self` is already equal to or greater than the requested
+ * capacity, then no action is taken.
+ *
+ * # Arguments
+ *
+ * * n - The number of elements to reserve space for
+ */
#[inline]
- fn push(&mut self, t: T) {
- push(self, t);
+ #[cfg(stage0)]
+ fn reserve(&mut self, n: uint) {
+ // Only make the (slow) call into the runtime if we have to
+ use managed;
+ if self.capacity() < n {
+ unsafe {
+ let ptr: *mut *mut raw::VecRepr = cast::transmute(self);
+ let td = get_tydesc::<T>();
+ if ((**ptr).box_header.ref_count ==
+ managed::raw::RC_MANAGED_UNIQUE) {
- rustrt::vec_reserve_shared_actual(td, ptr as **raw::VecRepr, n as libc::size_t);
++ // XXX transmute shouldn't be necessary
++ let td = cast::transmute(td);
++ ::at_vec::raw::reserve_raw(td, ptr, n);
+ } else {
+ let alloc = n * sys::nonzero_size_of::<T>();
+ *ptr = realloc_raw(*ptr as *mut c_void, alloc + size_of::<raw::VecRepr>())
+ as *mut raw::VecRepr;
+ (**ptr).unboxed.alloc = alloc;
+ }
+ }
+ }
}
+ /**
+ * Reserves capacity for exactly `n` elements in the given vector.
+ *
+ * If the capacity for `self` is already equal to or greater than the requested
+ * capacity, then no action is taken.
+ *
+ * # Arguments
+ *
+ * * n - The number of elements to reserve space for
+ */
#[inline]
- fn push_all_move(&mut self, rhs: ~[T]) {
- push_all_move(self, rhs);
+ #[cfg(not(stage0))]
+ fn reserve(&mut self, n: uint) {
+ // Only make the (slow) call into the runtime if we have to
+ if self.capacity() < n {
+ unsafe {
+ let ptr: *mut *mut raw::VecRepr = cast::transmute(self);
+ let td = get_tydesc::<T>();
+ if contains_managed::<T>() {
- rustrt::vec_reserve_shared_actual(td, ptr as **raw::VecRepr, n as libc::size_t);
++ ::at_vec::raw::reserve_raw(td, ptr, n);
+ } else {
+ let alloc = n * sys::nonzero_size_of::<T>();
+ *ptr = realloc_raw(*ptr as *mut c_void, alloc + size_of::<raw::VecRepr>())
+ as *mut raw::VecRepr;
+ (**ptr).unboxed.alloc = alloc;
+ }
+ }
+ }
}
+ /**
+ * Reserves capacity for at least `n` elements in the given vector.
+ *
+ * This function will over-allocate in order to amortize the allocation costs
+ * in scenarios where the caller may need to repeatedly reserve additional
+ * space.
+ *
+ * If the capacity for `self` is already equal to or greater than the requested
+ * capacity, then no action is taken.
+ *
+ * # Arguments
+ *
+ * * n - The number of elements to reserve space for
+ */
+ fn reserve_at_least(&mut self, n: uint) {
+ self.reserve(uint::next_power_of_two(n));
+ }
+
+ /// Returns the number of elements the vector can hold without reallocating.
#[inline]
- fn pop(&mut self) -> T {
- pop(self)
+ fn capacity(&self) -> uint {
+ unsafe {
+ let repr: **raw::VecRepr = transmute(self);
+ (**repr).unboxed.alloc / sys::nonzero_size_of::<T>()
+ }
}
+ /// Append an element to a vector
#[inline]
+ fn push(&mut self, t: T) {
+ unsafe {
+ let repr: **raw::VecRepr = transmute(&mut *self);
+ let fill = (**repr).unboxed.fill;
+ if (**repr).unboxed.alloc <= fill {
+ // need more space
+ reserve_no_inline(self);
+ }
+
+ self.push_fast(t);
+ }
+
+ // this peculiar function is because reserve_at_least is very
+ // large (because of reserve), and will be inlined, which
+ // makes push too large.
+ #[inline(never)]
+ fn reserve_no_inline<T>(v: &mut ~[T]) {
+ let new_len = v.len() + 1;
+ v.reserve_at_least(new_len);
+ }
+ }
+
+ // This doesn't bother to make sure we have space.
+ #[inline] // really pretty please
+ unsafe fn push_fast(&mut self, t: T) {
+ let repr: **mut raw::VecRepr = transmute(self);
+ let fill = (**repr).unboxed.fill;
+ (**repr).unboxed.fill += sys::nonzero_size_of::<T>();
+ let p = to_unsafe_ptr(&((**repr).unboxed.data));
+ let p = ptr::offset(p, fill) as *mut T;
+ intrinsics::move_val_init(&mut(*p), t);
+ }
+
+ /// Takes ownership of the vector `rhs`, moving all elements into
+ /// the current vector. This does not copy any elements, and it is
+ /// illegal to use the `rhs` vector after calling this method
+ /// (because it is moved here).
+ ///
+ /// # Example
+ ///
+ /// ~~~ {.rust}
+ /// let mut a = ~[~1];
+ /// a.push_all_move(~[~2, ~3, ~4]);
+ /// assert!(a == ~[~1, ~2, ~3, ~4]);
+ /// ~~~
+ #[inline]
+ fn push_all_move(&mut self, mut rhs: ~[T]) {
+ let new_len = self.len() + rhs.len();
+ self.reserve(new_len);
+ unsafe {
+ do as_mut_buf(rhs) |p, len| {
+ for uint::range(0, len) |i| {
+ let x = ptr::replace_ptr(ptr::mut_offset(p, i),
+ intrinsics::uninit());
+ self.push(x);
+ }
+ }
+ raw::set_len(&mut rhs, 0);
+ }
+ }
+
+ /// Remove the last element from a vector and return it
+ fn pop(&mut self) -> T {
+ let ln = self.len();
+ if ln == 0 {
+ fail!("sorry, cannot pop an empty vector")
+ }
+ let valptr = ptr::to_mut_unsafe_ptr(&mut self[ln - 1u]);
+ unsafe {
+ let val = ptr::replace_ptr(valptr, intrinsics::init());
+ raw::set_len(self, ln - 1u);
+ val
+ }
+ }
+
+ /// Removes the first element from a vector and return it
fn shift(&mut self) -> T {
- shift(self)
+ unsafe {
+ assert!(!self.is_empty());
+
+ if self.len() == 1 { return self.pop() }
+
+ if self.len() == 2 {
+ let last = self.pop();
+ let first = self.pop();
+ self.push(last);
+ return first;
+ }
+
+ let ln = self.len();
+ let next_ln = self.len() - 1;
+
+ // Save the last element. We're going to overwrite its position
+ let work_elt = self.pop();
+ // We still should have room to work where what last element was
+ assert!(self.capacity() >= ln);
+ // Pretend like we have the original length so we can use
+ // the vector copy_memory to overwrite the hole we just made
+ raw::set_len(self, ln);
+
+ // Memcopy the head element (the one we want) to the location we just
+ // popped. For the moment it unsafely exists at both the head and last
+ // positions
+ {
+ let first_slice = self.slice(0, 1);
+ let last_slice = self.slice(next_ln, ln);
+ raw::copy_memory(transmute(last_slice), first_slice, 1);
+ }
+
+ // Memcopy everything to the left one element
+ {
+ let init_slice = self.slice(0, next_ln);
+ let tail_slice = self.slice(1, ln);
+ raw::copy_memory(transmute(init_slice),
+ tail_slice,
+ next_ln);
+ }
+
+ // Set the new length. Now the vector is back to normal
+ raw::set_len(self, next_ln);
+
+ // Swap out the element we want from the end
+ let vp = raw::to_mut_ptr(*self);
+ let vp = ptr::mut_offset(vp, next_ln - 1);
+
+ ptr::replace_ptr(vp, work_elt)
+ }
}
- #[inline]
+ /// Prepend an element to the vector
fn unshift(&mut self, x: T) {
- unshift(self, x)
+ let v = util::replace(self, ~[x]);
+ self.push_all_move(v);
}
- #[inline]
+ /// Insert an element at position i within v, shifting all
+ /// elements after position i one position to the right.
fn insert(&mut self, i: uint, x:T) {
- insert(self, i, x)
+ let len = self.len();
+ assert!(i <= len);
+
+ self.push(x);
+ let mut j = len;
+ while j > i {
+ self.swap(j, j - 1);
+ j -= 1;
+ }
}
- #[inline]
+ /// Remove and return the element at position i within v, shifting
+ /// all elements after position i one position to the left.
fn remove(&mut self, i: uint) -> T {
- remove(self, i)
+ let len = self.len();
+ assert!(i < len);
+
+ let mut j = i;
+ while j < len - 1 {
+ self.swap(j, j + 1);
+ j += 1;
+ }
+ self.pop()
}
- #[inline]
+ /**
+ * Remove an element from anywhere in the vector and return it, replacing it
+ * with the last element. This does not preserve ordering, but is O(1).
+ *
+ * Fails if index >= length.
+ */
fn swap_remove(&mut self, index: uint) -> T {
- swap_remove(self, index)
+ let ln = self.len();
+ if index >= ln {
+ fail!("vec::swap_remove - index %u >= length %u", index, ln);
+ }
+ if index < ln - 1 {
+ self.swap(index, ln - 1);
+ }
+ self.pop()
}
- #[inline]
+ /// Shorten a vector, dropping excess elements.
fn truncate(&mut self, newlen: uint) {
- truncate(self, newlen);
+ do as_mut_buf(*self) |p, oldlen| {
+ assert!(newlen <= oldlen);
+ unsafe {
+ // This loop is optimized out for non-drop types.
+ for uint::range(newlen, oldlen) |i| {
+ ptr::replace_ptr(ptr::mut_offset(p, i), intrinsics::uninit());
+ }
+ }
+ }
+ unsafe { raw::set_len(self, newlen); }
}
- #[inline]
+
+ /**
+ * Like `filter()`, but in place. Preserves order of `v`. Linear time.
+ */
fn retain(&mut self, f: &fn(t: &T) -> bool) {
- retain(self, f);
+ let len = self.len();
+ let mut deleted: uint = 0;
+
+ for uint::range(0, len) |i| {
+ if !f(&self[i]) {
+ deleted += 1;
+ } else if deleted > 0 {
+ self.swap(i - deleted, i);
+ }
+ }
+
+ if deleted > 0 {
+ self.truncate(len - deleted);
+ }
}
#[inline]
}
#endif
-extern "C" CDECL void
-vec_reserve_shared_actual(type_desc* ty, rust_vec_box** vp,
- size_t n_elts) {
+extern "C" CDECL void *
+rust_local_realloc(rust_opaque_box *ptr, size_t size) {
rust_task *task = rust_get_current_task();
- reserve_vec_exact_shared(task, vp, n_elts * ty->size);
+ return task->boxed.realloc(ptr, size);
}
- // This is completely misnamed.
- extern "C" CDECL void
- vec_reserve_shared(type_desc* ty, rust_vec_box** vp,
- size_t n_elts) {
- reserve_vec_exact(vp, n_elts * ty->size);
- }
-
extern "C" CDECL size_t
rand_seed_size() {
return rng_seed_size();
rust_get_c_stack
rust_log_str
start_task
-vec_reserve_shared_actual
+rust_local_realloc
- vec_reserve_shared
task_clear_event_reject
task_wait_event
task_signal_event
rust_drop_env_lock
rust_update_log_settings
rust_running_on_valgrind
- rust_drop_global_args_lock
+rust_get_num_cpus
+rust_get_global_args_ptr
+rust_current_boxed_region
+rust_take_global_args_lock
++rust_drop_global_args_lock