#[cfg(test)]
mod test {
use comm::{DuplexStream, rendezvous};
- use std::rt::test::run_in_newsched_task;
+ use std::rt::test::run_in_uv_task;
use std::task::spawn_unlinked;
#[test]
fn recv_a_lot() {
// Rendezvous streams should be able to handle any number of messages being sent
- do run_in_newsched_task {
+ do run_in_uv_task {
let (port, chan) = rendezvous();
do spawn {
do 1000000.times { chan.send(()) }
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! This is a basic event loop implementation not meant for any "real purposes"
+//! other than testing the scheduler and proving that it's possible to have a
+//! pluggable event loop.
+
+use prelude::*;
+
+use cast;
+use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
+use unstable::sync::Exclusive;
+use util;
+
+/// This is the only exported function from this module.
+pub fn event_loop() -> ~EventLoop {
+ ~BasicLoop::new() as ~EventLoop
+}
+
+struct BasicLoop {
+ work: ~[~fn()], // pending work
+ idle: Option<*BasicPausible>, // only one is allowed
+ remotes: ~[(uint, ~fn())],
+ next_remote: uint,
+ messages: Exclusive<~[Message]>
+}
+
+enum Message { RunRemote(uint), RemoveRemote(uint) }
+
+struct Time {
+ sec: u64,
+ nsec: u64,
+}
+
+impl Ord for Time {
+ fn lt(&self, other: &Time) -> bool {
+ self.sec < other.sec || self.nsec < other.nsec
+ }
+}
+
+impl BasicLoop {
+ fn new() -> BasicLoop {
+ BasicLoop {
+ work: ~[],
+ idle: None,
+ next_remote: 0,
+ remotes: ~[],
+ messages: Exclusive::new(~[]),
+ }
+ }
+
+ /// Process everything in the work queue (continually)
+ fn work(&mut self) {
+ while self.work.len() > 0 {
+ for work in util::replace(&mut self.work, ~[]).move_iter() {
+ work();
+ }
+ }
+ }
+
+ fn remote_work(&mut self) {
+ let messages = unsafe {
+ do self.messages.with |messages| {
+ if messages.len() > 0 {
+ Some(util::replace(messages, ~[]))
+ } else {
+ None
+ }
+ }
+ };
+ let messages = match messages {
+ Some(m) => m, None => return
+ };
+ for message in messages.iter() {
+ self.message(*message);
+ }
+ }
+
+ fn message(&mut self, message: Message) {
+ match message {
+ RunRemote(i) => {
+ match self.remotes.iter().find(|& &(id, _)| id == i) {
+ Some(&(_, ref f)) => (*f)(),
+ None => unreachable!()
+ }
+ }
+ RemoveRemote(i) => {
+ match self.remotes.iter().position(|&(id, _)| id == i) {
+ Some(i) => { self.remotes.remove(i); }
+ None => unreachable!()
+ }
+ }
+ }
+ }
+
+ /// Run the idle callback if one is registered
+ fn idle(&mut self) {
+ unsafe {
+ match self.idle {
+ Some(idle) => {
+ if (*idle).active {
+ (*(*idle).work.get_ref())();
+ }
+ }
+ None => {}
+ }
+ }
+ }
+
+ fn has_idle(&self) -> bool {
+ unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
+ }
+}
+
+impl EventLoop for BasicLoop {
+ fn run(&mut self) {
+ // Not exactly efficient, but it gets the job done.
+ while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
+
+ self.work();
+ self.remote_work();
+
+ if self.has_idle() {
+ self.idle();
+ continue
+ }
+
+ unsafe {
+ // We block here if we have no messages to process and we may
+ // receive a message at a later date
+ do self.messages.hold_and_wait |messages| {
+ self.remotes.len() > 0 &&
+ messages.len() == 0 &&
+ self.work.len() == 0
+ }
+ }
+ }
+ }
+
+ fn callback(&mut self, f: ~fn()) {
+ self.work.push(f);
+ }
+
+ // XXX: Seems like a really weird requirement to have an event loop provide.
+ fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
+ let callback = ~BasicPausible::new(self);
+ rtassert!(self.idle.is_none());
+ unsafe {
+ let cb_ptr: &*BasicPausible = cast::transmute(&callback);
+ self.idle = Some(*cb_ptr);
+ }
+ return callback as ~PausibleIdleCallback;
+ }
+
+ fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
+ let id = self.next_remote;
+ self.next_remote += 1;
+ self.remotes.push((id, f));
+ ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
+ }
+
+ /// This has no bindings for local I/O
+ fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {}
+}
+
+struct BasicRemote {
+ queue: Exclusive<~[Message]>,
+ id: uint,
+}
+
+impl BasicRemote {
+ fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
+ BasicRemote { queue: queue, id: id }
+ }
+}
+
+impl RemoteCallback for BasicRemote {
+ fn fire(&mut self) {
+ unsafe {
+ do self.queue.hold_and_signal |queue| {
+ queue.push(RunRemote(self.id));
+ }
+ }
+ }
+}
+
+impl Drop for BasicRemote {
+ fn drop(&mut self) {
+ unsafe {
+ do self.queue.hold_and_signal |queue| {
+ queue.push(RemoveRemote(self.id));
+ }
+ }
+ }
+}
+
+struct BasicPausible {
+ eloop: *mut BasicLoop,
+ work: Option<~fn()>,
+ active: bool,
+}
+
+impl BasicPausible {
+ fn new(eloop: &mut BasicLoop) -> BasicPausible {
+ BasicPausible {
+ active: false,
+ work: None,
+ eloop: eloop,
+ }
+ }
+}
+
+impl PausibleIdleCallback for BasicPausible {
+ fn start(&mut self, f: ~fn()) {
+ rtassert!(!self.active && self.work.is_none());
+ self.active = true;
+ self.work = Some(f);
+ }
+ fn pause(&mut self) {
+ self.active = false;
+ }
+ fn resume(&mut self) {
+ self.active = true;
+ }
+ fn close(&mut self) {
+ self.active = false;
+ self.work = None;
+ }
+}
+
+impl Drop for BasicPausible {
+ fn drop(&mut self) {
+ unsafe {
+ (*self.eloop).idle = None;
+ }
+ }
+}
+
+fn time() -> Time {
+ #[fixed_stack_segment]; #[inline(never)];
+ extern {
+ fn get_time(sec: &mut i64, nsec: &mut i32);
+ }
+ let mut sec = 0;
+ let mut nsec = 0;
+ unsafe { get_time(&mut sec, &mut nsec) }
+
+ Time { sec: sec as u64, nsec: nsec as u64 }
+}
detail: None
}
}
+ IoUnavailable => {
+ IoError {
+ kind: IoUnavailable,
+ desc: "I/O is unavailable",
+ detail: None
+ }
+ }
_ => fail!()
}
}
// Internal macros used by the runtime.
mod macros;
+/// Basic implementation of an EventLoop, provides no I/O interfaces
+mod basic;
+
/// The global (exchange) heap.
pub mod global_heap;
/// no longer try to go to sleep, but exit instead.
no_sleep: bool,
stack_pool: StackPool,
- /// The event loop used to drive the scheduler and perform I/O
- event_loop: ~EventLoop,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
priv sched_task: Option<~Task>,
// destroyed before it's actually destroyed.
/// The event loop used to drive the scheduler and perform I/O
- event_loop: ~EventLoopObject,
+ event_loop: ~EventLoop,
}
/// An indication of how hard to work on a given operation, the difference
use cell::Cell;
use rt::thread::Thread;
use rt::task::{Task, Sched};
- use rt::rtio::EventLoop;
+ use rt::basic;
use rt::util;
use option::{Some};
#[test]
fn test_schedule_home_states() {
- use rt::uv::uvio::UvEventLoop;
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
use rt::sched::Shutdown;
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
normal_queue,
queues.clone(),
sleepers.clone());
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
special_queue.clone(),
queues.clone(),
sleepers.clone(),
// in the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and doesn't
// exit before emptying the work queue
- do run_in_newsched_task {
+ do run_in_uv_task {
do spawntask {
timer::sleep(10);
}
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
use rt::stack::StackPool;
- use rt::uv::uvio::UvEventLoop;
use rt::sched::{Shutdown, TaskFromFriend};
use util;
let queues = ~[queue.clone()];
let mut sched = ~Scheduler::new(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
queue,
queues.clone(),
sleepers.clone());
#[test]
fn rng() {
- do run_in_newsched_task() {
+ do run_in_uv_task() {
use rand::{rng, Rng};
let mut r = rng();
let _ = r.next_u32();
#[test]
fn logging() {
- do run_in_newsched_task() {
+ do run_in_uv_task() {
info!("here i am. logging in a newsched task");
}
}
use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use vec::{OwnedVector, MutableVector, ImmutableVector};
use path::GenericPath;
+use rt::basic;
use rt::sched::Scheduler;
use rt::rtio::EventLoop;
use unstable::{run_in_bare_thread};
}
+pub fn new_test_sched() -> Scheduler {
+
+ let queue = WorkQueue::new();
+ let queues = ~[queue.clone()];
+
+ let mut sched = Scheduler::new(basic::event_loop(),
+ queue,
+ queues,
+ SleeperList::new());
+
+ // Don't wait for the Shutdown message
+ sched.no_sleep = true;
+ return sched;
+}
+
+pub fn run_in_uv_task(f: ~fn()) {
+ let f = Cell::new(f);
+ do run_in_bare_thread {
+ run_in_uv_task_core(f.take());
+ }
+}
+
pub fn run_in_newsched_task(f: ~fn()) {
let f = Cell::new(f);
do run_in_bare_thread {
}
}
-pub fn run_in_newsched_task_core(f: ~fn()) {
+pub fn run_in_uv_task_core(f: ~fn()) {
use rt::sched::Shutdown;
sched.bootstrap(task);
}
+pub fn run_in_newsched_task_core(f: ~fn()) {
+
+ use rt::sched::Shutdown;
+
+ let mut sched = ~new_test_sched();
+ let exit_handle = Cell::new(sched.make_handle());
+
+ let on_exit: ~fn(bool) = |exit_status| {
+ exit_handle.take().send(Shutdown);
+ rtassert!(exit_status);
+ };
+ let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
+ task.death.on_exit = Some(on_exit);
+
+ sched.bootstrap(task);
+}
+
#[cfg(target_os="macos")]
#[allow(non_camel_case_types)]
mod darwin_fd_limit {
/// Get a ~Task for testing purposes other than actually scheduling it.
pub fn with_test_task(blk: ~fn(~Task) -> ~Task) {
do run_in_bare_thread {
- let mut sched = ~new_test_uv_sched();
+ let mut sched = ~new_test_sched();
let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{}));
cleanup_task(task);
}
#[test]
fn select_one() {
- do run_in_newsched_task { select_helper(1, [0]) }
+ do run_in_uv_task { select_helper(1, [0]) }
}
#[test]
// NB. I would like to have a test that tests the first one that is
// ready is the one that's returned, but that can't be reliably tested
// with the randomized behaviour of optimistic_check.
- do run_in_newsched_task { select_helper(2, [1]) }
- do run_in_newsched_task { select_helper(2, [0]) }
- do run_in_newsched_task { select_helper(2, [1,0]) }
+ do run_in_uv_task { select_helper(2, [1]) }
+ do run_in_uv_task { select_helper(2, [0]) }
+ do run_in_uv_task { select_helper(2, [1,0]) }
}
#[test]
fn select_a_lot() {
- do run_in_newsched_task { select_helper(12, [7,8,9]) }
+ do run_in_uv_task { select_helper(12, [7,8,9]) }
}
#[test]
// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
- do run_in_newsched_task {
+ do run_in_uv_task {
let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
let (port, chan) = stream();
do 10.times { chan.send(31337); }
#[test]
fn select_unkillable() {
- do run_in_newsched_task {
+ do run_in_uv_task {
do task::unkillable { select_helper(2, [1]) }
}
}
select_blocking_helper(false);
fn select_blocking_helper(killable: bool) {
- do run_in_newsched_task {
+ do run_in_uv_task {
let (p1,_c) = oneshot();
let (p2,c2) = oneshot();
let mut ports = [p1,p2];
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
- do run_in_newsched_task {
+ do run_in_uv_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
do 4.times {
let send_on_chans = send_on_chans.clone();
#[test]
fn select_killed() {
- do run_in_newsched_task {
+ do run_in_uv_task {
let (success_p, success_c) = oneshot::<bool>();
let success_c = Cell::new(success_c);
do task::try {
// CPU, *after* the spawner is already switched-back-to (and passes the
// killed check at the start of its timeslice). As far as I know, it's not
// possible to make this race deterministic, or even more likely to happen.
- do run_in_newsched_task {
+ do run_in_uv_task {
do task::try {
do task::spawn {
fail!();
// Tests that when a kill signal is received, 'rekillable' and
// 'unkillable' unwind correctly in conjunction with each other.
- do run_in_newsched_task {
+ do run_in_uv_task {
do task::try {
do task::unkillable {
do task::rekillable {
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let (po, ch) = stream();
let ch = SharedChan::new(ch);
do spawn_unlinked {
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
do spawn_unlinked { fail!(); }
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
do spawn_supervised { fail!(); }
// Give child a chance to fail-but-not-kill-us.
do 16.times { task::deschedule(); }
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_sup_fail_down() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
do spawn_supervised { block_forever(); }
fail!(); // Shouldn't leave a child hanging around.
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { fail!(); }
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { block_forever(); }
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Make sure the above test is the same as this one.
let mut builder = task();
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_grandchild() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Middle task exits; does grandparent's failure propagate across the gap?
do spawn_supervised {
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_secondborn() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// First-born child exits; does parent's failure propagate to sibling?
do spawn_supervised {
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_nephew_or_niece() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Our sibling exits; does our failure propagate to sibling's child?
do spawn { // linked
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_propagate_sibling() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result: Result<(),()> = do try {
// Middle sibling exits - does eldest's failure propagate to youngest?
do spawn { // linked
#[test]
fn test_unnamed_task() {
- use rt::test::run_in_newsched_task;
+ use rt::test::run_in_uv_task;
- do run_in_newsched_task {
+ do run_in_uv_task {
do spawn {
do with_task_name |name| {
assert!(name.is_none());
#[test]
fn test_owned_named_task() {
- use rt::test::run_in_newsched_task;
+ use rt::test::run_in_uv_task;
- do run_in_newsched_task {
+ do run_in_uv_task {
let mut t = task();
t.name(~"ada lovelace");
do t.spawn {
#[test]
fn test_static_named_task() {
- use rt::test::run_in_newsched_task;
+ use rt::test::run_in_uv_task;
- do run_in_newsched_task {
+ do run_in_uv_task {
let mut t = task();
t.name("ada lovelace");
do t.spawn {
#[test]
fn test_send_named_task() {
- use rt::test::run_in_newsched_task;
+ use rt::test::run_in_uv_task;
- do run_in_newsched_task {
+ do run_in_uv_task {
let mut t = task();
t.name("ada lovelace".into_send_str());
do t.spawn {
#[test]
fn test_simple_newsched_spawn() {
- use rt::test::run_in_newsched_task;
+ use rt::test::run_in_uv_task;
- do run_in_newsched_task {
+ do run_in_uv_task {
spawn(||())
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_watched() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result = do try {
let mut t = task();
t.unlinked();
#[ignore(reason = "linked failure")]
#[test]
fn test_indestructible() {
- use rt::test::run_in_newsched_task;
- do run_in_newsched_task {
+ use rt::test::run_in_uv_task;
+ do run_in_uv_task {
let result = do try {
let mut t = task();
t.watched();
}
}
}
+
+ pub unsafe fn signal(&self) {
+ rust_signal_little_lock(self.l);
+ }
+
+ pub unsafe fn lock_and_wait(&self, f: &fn() -> bool) {
+ do atomically {
+ rust_lock_little_lock(self.l);
+ do (|| {
+ if f() {
+ rust_wait_little_lock(self.l);
+ }
+ }).finally {
+ rust_unlock_little_lock(self.l);
+ }
+ }
+ }
}
struct ExData<T> {
}
}
+ #[inline]
+ pub unsafe fn hold_and_signal(&self, f: &fn(x: &mut T)) {
+ let rec = self.x.get();
+ do (*rec).lock.lock {
+ if (*rec).failed {
+ fail!("Poisoned Exclusive::new - another task failed inside!");
+ }
+ (*rec).failed = true;
+ f(&mut (*rec).data);
+ (*rec).failed = false;
+ (*rec).lock.signal();
+ }
+ }
+
+ #[inline]
+ pub unsafe fn hold_and_wait(&self, f: &fn(x: &T) -> bool) {
+ let rec = self.x.get();
+ do (*rec).lock.lock_and_wait {
+ if (*rec).failed {
+ fail!("Poisoned Exclusive::new - another task failed inside!");
+ }
+ (*rec).failed = true;
+ let result = f(&(*rec).data);
+ (*rec).failed = false;
+ result
+ }
+ }
+
pub fn unwrap(self) -> T {
let Exclusive { x: x } = self;
// Someday we might need to unkillably unwrap an Exclusive, but not today.
externfn!(fn rust_destroy_little_lock(lock: rust_little_lock))
externfn!(fn rust_lock_little_lock(lock: rust_little_lock))
externfn!(fn rust_unlock_little_lock(lock: rust_little_lock))
+externfn!(fn rust_signal_little_lock(lock: rust_little_lock))
+externfn!(fn rust_wait_little_lock(lock: rust_little_lock))
#[cfg(test)]
mod tests {
lock->unlock();
}
+extern "C" void
+rust_wait_little_lock(lock_and_signal *lock) {
+ lock->wait();
+}
+
+extern "C" void
+rust_signal_little_lock(lock_and_signal *lock) {
+ lock->signal();
+}
+
typedef void(startfn)(void*, void*);
class raw_thread: public rust_thread {
rust_destroy_little_lock
rust_lock_little_lock
rust_unlock_little_lock
+rust_signal_little_lock
+rust_wait_little_lock
tdefl_compress_mem_to_heap
tinfl_decompress_mem_to_heap
rust_uv_ip4_port