From: Alex Crichton Date: Mon, 24 Nov 2014 19:16:40 +0000 (-0800) Subject: Fall out of the std::sync rewrite X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=c3adbd34c4e637d20a184eb03f09b30c69de8b6e;p=rust.git Fall out of the std::sync rewrite --- diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index 9162edcb530..7669df36b04 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -38,9 +38,8 @@ exceptions = [ "rt/isaac/randport.cpp", # public domain "rt/isaac/rand.h", # public domain "rt/isaac/standard.h", # public domain - "libstd/sync/mpsc_queue.rs", # BSD - "libstd/sync/spsc_queue.rs", # BSD - "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libstd/comm/mpsc_queue.rs", # BSD + "libstd/comm/spsc_queue.rs", # BSD "test/bench/shootout-binarytrees.rs", # BSD "test/bench/shootout-chameneos-redux.rs", # BSD "test/bench/shootout-fannkuch-redux.rs", # BSD diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 2b66e91c00d..d291ed72567 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -354,6 +354,8 @@ mod $name { mod shared; mod stream; mod sync; +mod mpsc_queue; +mod spsc_queue; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task @@ -628,24 +630,26 @@ pub fn send_opt(&self, t: T) -> Result<(), T> { #[unstable] impl Clone for Sender { fn clone(&self) -> Sender { - let (packet, sleeper) = match *unsafe { self.inner() } { + let (packet, sleeper, guard) = match *unsafe { self.inner() } { Oneshot(ref p) => { let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { - (*a.get()).postinit_lock(); + let guard = (*a.get()).postinit_lock(); match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), - oneshot::UpWoke(task) => (a, Some(task)) + oneshot::UpSuccess | + oneshot::UpDisconnected => (a, None, guard), + oneshot::UpWoke(task) => (a, Some(task), guard) } } } Stream(ref p) => { let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { - (*a.get()).postinit_lock(); + let guard = (*a.get()).postinit_lock(); match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - stream::UpSuccess | stream::UpDisconnected => (a, None), - stream::UpWoke(task) => (a, Some(task)), + stream::UpSuccess | + stream::UpDisconnected => (a, None, guard), + stream::UpWoke(task) => (a, Some(task), guard), } } } @@ -657,7 +661,7 @@ fn clone(&self) -> Sender { }; unsafe { - (*packet.get()).inherit_blocker(sleeper); + (*packet.get()).inherit_blocker(sleeper, guard); let tmp = Sender::new(Shared(packet.clone())); mem::swap(self.inner_mut(), tmp.inner_mut()); diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 6396edbdbd1..13b5e10fcd3 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -26,12 +26,11 @@ use core::cmp; use core::int; use rustrt::local::Local; -use rustrt::mutex::NativeMutex; use rustrt::task::{Task, BlockedTask}; use rustrt::thread::Thread; -use sync::atomic; -use sync::mpsc_queue as mpsc; +use sync::{atomic, Mutex, MutexGuard}; +use comm::mpsc_queue as mpsc; const DISCONNECTED: int = int::MIN; const FUDGE: int = 1024; @@ -56,7 +55,7 @@ pub struct Packet { // this lock protects various portions of this implementation during // select() - select_lock: NativeMutex, + select_lock: Mutex<()>, } pub enum Failure { @@ -76,7 +75,7 @@ pub fn new() -> Packet { channels: atomic::AtomicInt::new(2), port_dropped: atomic::AtomicBool::new(false), sender_drain: atomic::AtomicInt::new(0), - select_lock: unsafe { NativeMutex::new() }, + select_lock: Mutex::new(()), }; return p; } @@ -86,8 +85,8 @@ pub fn new() -> Packet { // In other case mutex data will be duplicated while cloning // and that could cause problems on platforms where it is // represented by opaque data structure - pub fn postinit_lock(&mut self) { - unsafe { self.select_lock.lock_noguard() } + pub fn postinit_lock(&self) -> MutexGuard<()> { + self.select_lock.lock() } // This function is used at the creation of a shared packet to inherit a @@ -95,7 +94,9 @@ pub fn postinit_lock(&mut self) { // tasks in select(). // // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, task: Option) { + pub fn inherit_blocker(&mut self, + task: Option, + guard: MutexGuard<()>) { match task { Some(task) => { assert_eq!(self.cnt.load(atomic::SeqCst), 0); @@ -135,7 +136,7 @@ pub fn inherit_blocker(&mut self, task: Option) { // interfere with this method. After we unlock this lock, we're // signifying that we're done modifying self.cnt and self.to_wake and // the port is ready for the world to continue using it. - unsafe { self.select_lock.unlock_noguard() } + drop(guard); } pub fn send(&mut self, t: T) -> Result<(), T> { @@ -441,7 +442,7 @@ pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { // done with. Without this bounce, we can race with inherit_blocker // about looking at and dealing with to_wake. Once we have acquired the // lock, we are guaranteed that inherit_blocker is done. - unsafe { + { let _guard = self.select_lock.lock(); } diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 23d042960b1..06ab4f4427a 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -32,7 +32,7 @@ use rustrt::thread::Thread; use sync::atomic; -use sync::spsc_queue as spsc; +use comm::spsc_queue as spsc; use comm::Receiver; const DISCONNECTED: int = int::MIN; diff --git a/src/libstd/dynamic_lib.rs b/src/libstd/dynamic_lib.rs index 3cd0c0eeaf2..160365dac36 100644 --- a/src/libstd/dynamic_lib.rs +++ b/src/libstd/dynamic_lib.rs @@ -225,8 +225,8 @@ pub unsafe fn open_internal() -> *mut u8 { } pub fn check_for_errors_in(f: || -> T) -> Result { - use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + use sync::{StaticMutex, MUTEX_INIT}; + static LOCK: StaticMutex = MUTEX_INIT; unsafe { // dlerror isn't thread safe, so we need to lock around this entire // sequence diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index f6b73f037f2..d4274d7e401 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -106,7 +106,7 @@ #![allow(unknown_features)] #![feature(macro_rules, globs, linkage)] #![feature(default_type_params, phase, lang_items, unsafe_destructor)] -#![feature(import_shadowing, slicing_syntax)] +#![feature(import_shadowing, slicing_syntax, tuple_indexing)] // Don't link to std. We are std. #![no_std] diff --git a/src/libstd/os.rs b/src/libstd/os.rs index 0abd030a163..a8adfec34ed 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -209,14 +209,12 @@ pub fn fill_utf16_buf_and_decode(f: |*mut u16, DWORD| -> DWORD) Serialize access through a global lock. */ fn with_env_lock(f: || -> T) -> T { - use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use sync::{StaticMutex, MUTEX_INIT}; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; + static LOCK: StaticMutex = MUTEX_INIT; - unsafe { - let _guard = LOCK.lock(); - f() - } + let _guard = LOCK.lock(); + f() } /// Returns a vector of (variable, value) pairs, for all the environment diff --git a/src/libstd/rt/backtrace.rs b/src/libstd/rt/backtrace.rs index 0103fe670e7..159fc3080e8 100644 --- a/src/libstd/rt/backtrace.rs +++ b/src/libstd/rt/backtrace.rs @@ -238,7 +238,7 @@ mod imp { use mem; use option::{Some, None, Option}; use result::{Ok, Err}; - use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use sync::{StaticMutex, MUTEX_INIT}; /// As always - iOS on arm uses SjLj exceptions and /// _Unwind_Backtrace is even not available there. Still, @@ -264,8 +264,8 @@ fn backtrace(buf: *mut *mut libc::c_void, // while it doesn't requires lock for work as everything is // local, it still displays much nicer backtraces when a // couple of tasks panic simultaneously - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - let _g = unsafe { LOCK.lock() }; + static LOCK: StaticMutex = MUTEX_INIT; + let _g = LOCK.lock(); try!(writeln!(w, "stack backtrace:")); // 100 lines should be enough @@ -297,8 +297,8 @@ struct Context<'a> { // is semi-reasonable in terms of printing anyway, and we know that all // I/O done here is blocking I/O, not green I/O, so we don't have to // worry about this being a native vs green mutex. - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - let _g = unsafe { LOCK.lock() }; + static LOCK: StaticMutex = MUTEX_INIT; + let _g = LOCK.lock(); try!(writeln!(w, "stack backtrace:")); @@ -667,7 +667,7 @@ mod imp { use option::{Some, None}; use path::Path; use result::{Ok, Err}; - use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use sync::{StaticMutex, MUTEX_INIT}; use slice::SlicePrelude; use str::StrPrelude; use dynamic_lib::DynamicLibrary; @@ -928,8 +928,8 @@ impl Drop for Cleanup { pub fn write(w: &mut Writer) -> IoResult<()> { // According to windows documentation, all dbghelp functions are // single-threaded. - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - let _g = unsafe { LOCK.lock() }; + static LOCK: StaticMutex = MUTEX_INIT; + let _g = LOCK.lock(); // Open up dbghelp.dll, we don't link to it explicitly because it can't // always be found. Additionally, it's nice having fewer dependencies. diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 581b6b4e412..0fdd57b2792 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -143,8 +143,14 @@ pub fn wait(&self, mutex_guard: &T) { /// /// Like `wait`, the lock specified will be re-acquired when this function /// returns, regardless of whether the timeout elapsed or not. - pub fn wait_timeout(&self, mutex_guard: &T, - dur: Duration) -> bool { + // Note that this method is *not* public, and this is quite intentional + // because we're not quite sure about the semantics of relative vs absolute + // durations or how the timing guarantees play into what the system APIs + // provide. There are also additional concerns about the unix-specific + // implementation which may need to be addressed. + #[allow(dead_code)] + fn wait_timeout(&self, mutex_guard: &T, + dur: Duration) -> bool { unsafe { let me: &'static Condvar = &*(self as *const _); me.inner.wait_timeout(mutex_guard, dur) @@ -195,8 +201,9 @@ pub fn wait(&'static self, mutex_guard: &T) { /// specified duration. /// /// See `Condvar::wait_timeout`. - pub fn wait_timeout(&'static self, mutex_guard: &T, - dur: Duration) -> bool { + #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above + fn wait_timeout(&'static self, mutex_guard: &T, + dur: Duration) -> bool { unsafe { let lock = mutex_guard.as_mutex_guard(); let sys = mutex::guard_lock(lock); diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index 3d17f2bc64b..4e07d54c57e 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -45,7 +45,7 @@ /// let data = Arc::new(Mutex::new(0)); /// /// let (tx, rx) = channel(); -/// for _ in range(0, 10) { +/// for _ in range(0u, 10) { /// let (data, tx) = (data.clone(), tx.clone()); /// spawn(proc() { /// // The shared static can only be accessed once the lock is held. diff --git a/src/libstd/sys/common/helper_thread.rs b/src/libstd/sys/common/helper_thread.rs index 9508d8d9232..c0018c5d970 100644 --- a/src/libstd/sys/common/helper_thread.rs +++ b/src/libstd/sys/common/helper_thread.rs @@ -20,13 +20,14 @@ //! can be created in the future and there must be no active timers at that //! time. +use prelude::*; + +use cell::UnsafeCell; use mem; use rustrt::bookkeeping; -use rustrt::mutex::StaticNativeMutex; use rustrt; -use cell::UnsafeCell; +use sync::{StaticMutex, StaticCondvar}; use sys::helper_signal; -use prelude::*; use task; @@ -39,7 +40,8 @@ /// is for static initialization. pub struct Helper { /// Internal lock which protects the remaining fields - pub lock: StaticNativeMutex, + pub lock: StaticMutex, + pub cond: StaticCondvar, // You'll notice that the remaining fields are UnsafeCell, and this is // because all helper thread operations are done through &self, but we need @@ -53,6 +55,9 @@ pub struct Helper { /// Flag if this helper thread has booted and been initialized yet. pub initialized: UnsafeCell, + + /// Flag if this helper thread has shut down + pub shutdown: UnsafeCell, } impl Helper { @@ -80,7 +85,9 @@ pub fn boot(&'static self, task::spawn(proc() { bookkeeping::decrement(); helper(receive, rx, t); - self.lock.lock().signal() + let _g = self.lock.lock(); + *self.shutdown.get() = true; + self.cond.notify_one() }); rustrt::at_exit(proc() { self.shutdown() }); @@ -119,7 +126,9 @@ fn shutdown(&'static self) { helper_signal::signal(*self.signal.get() as helper_signal::signal); // Wait for the child to exit - guard.wait(); + while !*self.shutdown.get() { + self.cond.wait(&guard); + } drop(guard); // Clean up after ourselves diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs index 029fc852742..ddc6dd021c3 100644 --- a/src/libstd/sys/common/net.rs +++ b/src/libstd/sys/common/net.rs @@ -16,13 +16,13 @@ use mem; use num::Int; use ptr::{mod, null, null_mut}; -use rustrt::mutex; use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr}; use io::net::addrinfo; use io::{IoResult, IoError}; use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock, wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval, decode_error_detailed}; +use sync::{Mutex, MutexGuard}; use sys_common::{mod, keep_going, short_write, timeout}; use prelude::*; use cmp; @@ -557,12 +557,12 @@ struct Inner { // Unused on Linux, where this lock is not necessary. #[allow(dead_code)] - lock: mutex::NativeMutex + lock: Mutex<()>, } impl Inner { fn new(fd: sock_t) -> Inner { - Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + Inner { fd: fd, lock: Mutex::new(()) } } } @@ -572,7 +572,7 @@ impl Drop for Inner { pub struct Guard<'a> { pub fd: sock_t, - pub guard: mutex::LockGuard<'a>, + pub guard: MutexGuard<'a, ()>, } #[unsafe_destructor] @@ -666,7 +666,7 @@ fn lock_nonblocking(&self) {} fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { self.inner.lock.lock() }, + guard: self.inner.lock.lock(), }; assert!(set_nonblocking(self.fd(), true).is_ok()); ret @@ -805,7 +805,7 @@ fn lock_nonblocking(&self) {} fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { self.inner.lock.lock() }, + guard: self.inner.lock.lock(), }; assert!(set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index 7b37fb3fb0f..4effedbe3ab 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -25,10 +25,12 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => ( static $name: Helper<$m> = Helper { - lock: ::rustrt::mutex::NATIVE_MUTEX_INIT, + lock: ::sync::MUTEX_INIT, + cond: ::sync::CONDVAR_INIT, chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> }, signal: ::cell::UnsafeCell { value: 0 }, initialized: ::cell::UnsafeCell { value: false }, + shutdown: ::cell::UnsafeCell { value: false }, }; ) ) diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 3f70fb5c1a5..08e6f7059d8 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -12,8 +12,7 @@ use libc; use c_str::CString; use mem; -use rustrt::mutex; -use sync::atomic; +use sync::{atomic, Mutex}; use io::{mod, IoResult, IoError}; use prelude::*; @@ -60,12 +59,12 @@ struct Inner { // Unused on Linux, where this lock is not necessary. #[allow(dead_code)] - lock: mutex::NativeMutex + lock: Mutex<()>, } impl Inner { fn new(fd: fd_t) -> Inner { - Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + Inner { fd: fd, lock: Mutex::new(()) } } } diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index e9243c5040c..9fce308cb94 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -26,10 +26,12 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => ( static $name: Helper<$m> = Helper { - lock: ::rustrt::mutex::NATIVE_MUTEX_INIT, + lock: ::sync::MUTEX_INIT, + cond: ::sync::CONDVAR_INIT, chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> }, signal: ::cell::UnsafeCell { value: 0 }, initialized: ::cell::UnsafeCell { value: false }, + shutdown: ::cell::UnsafeCell { value: false }, }; ) ) diff --git a/src/libstd/sys/windows/mutex.rs b/src/libstd/sys/windows/mutex.rs index 10ebcf4bd09..ddd89070ed5 100644 --- a/src/libstd/sys/windows/mutex.rs +++ b/src/libstd/sys/windows/mutex.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::*; + use sync::atomic; use alloc::{mod, heap}; @@ -21,8 +23,8 @@ pub struct Mutex { inner: atomic::AtomicUint } pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT }; #[inline] -pub unsafe fn raw(m: &super::Mutex) -> ffi::LPCRITICAL_SECTION { - m.0.get() +pub unsafe fn raw(m: &Mutex) -> ffi::LPCRITICAL_SECTION { + m.get() } impl Mutex { diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index ca7985aa35b..bf658d0efd0 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -89,8 +89,7 @@ use c_str::CString; use mem; use ptr; -use sync::atomic; -use rustrt::mutex; +use sync::{atomic, Mutex}; use io::{mod, IoError, IoResult}; use prelude::*; @@ -126,7 +125,7 @@ fn drop(&mut self) { struct Inner { handle: libc::HANDLE, - lock: mutex::NativeMutex, + lock: Mutex<()>, read_closed: atomic::AtomicBool, write_closed: atomic::AtomicBool, } @@ -135,7 +134,7 @@ impl Inner { fn new(handle: libc::HANDLE) -> Inner { Inner { handle: handle, - lock: unsafe { mutex::NativeMutex::new() }, + lock: Mutex::new(()), read_closed: atomic::AtomicBool::new(false), write_closed: atomic::AtomicBool::new(false), } diff --git a/src/test/bench/msgsend-ring-mutex-arcs.rs b/src/test/bench/msgsend-ring-mutex-arcs.rs index d06e6c8cd19..863c3c879a7 100644 --- a/src/test/bench/msgsend-ring-mutex-arcs.rs +++ b/src/test/bench/msgsend-ring-mutex-arcs.rs @@ -19,28 +19,30 @@ // ignore-lexer-test FIXME #15679 use std::os; -use std::sync::{Arc, Future, Mutex}; +use std::sync::{Arc, Future, Mutex, Condvar}; use std::time::Duration; use std::uint; // A poor man's pipe. -type pipe = Arc>>; +type pipe = Arc<(Mutex>, Condvar)>; fn send(p: &pipe, msg: uint) { - let mut arr = p.lock(); + let &(ref lock, ref cond) = &**p; + let mut arr = lock.lock(); arr.push(msg); - arr.cond.signal(); + cond.notify_one(); } fn recv(p: &pipe) -> uint { - let mut arr = p.lock(); + let &(ref lock, ref cond) = &**p; + let mut arr = lock.lock(); while arr.is_empty() { - arr.cond.wait(); + cond.wait(&arr); } arr.pop().unwrap() } fn init() -> (pipe,pipe) { - let m = Arc::new(Mutex::new(Vec::new())); + let m = Arc::new((Mutex::new(Vec::new()), Condvar::new())); ((&m).clone(), m) } diff --git a/src/test/bench/msgsend-ring-rw-arcs.rs b/src/test/bench/msgsend-ring-rw-arcs.rs deleted file mode 100644 index 03066d40512..00000000000 --- a/src/test/bench/msgsend-ring-rw-arcs.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -// This test creates a bunch of tasks that simultaneously send to each -// other in a ring. The messages should all be basically -// independent. -// This is like msgsend-ring-pipes but adapted to use Arcs. - -// This also serves as a pipes test, because Arcs are implemented with pipes. - -// no-pretty-expanded FIXME #15189 -// ignore-lexer-test FIXME #15679 - -use std::os; -use std::sync::{RWLock, Arc, Future}; -use std::time::Duration; -use std::uint; - -// A poor man's pipe. -type pipe = Arc>>; - -fn send(p: &pipe, msg: uint) { - let mut arr = p.write(); - arr.push(msg); - arr.cond.signal(); -} -fn recv(p: &pipe) -> uint { - let mut arr = p.write(); - while arr.is_empty() { - arr.cond.wait(); - } - arr.pop().unwrap() -} - -fn init() -> (pipe,pipe) { - let x = Arc::new(RWLock::new(Vec::new())); - ((&x).clone(), x) -} - - -fn thread_ring(i: uint, count: uint, num_chan: pipe, num_port: pipe) { - let mut num_chan = Some(num_chan); - let mut num_port = Some(num_port); - // Send/Receive lots of messages. - for j in range(0u, count) { - //println!("task %?, iter %?", i, j); - let num_chan2 = num_chan.take().unwrap(); - let num_port2 = num_port.take().unwrap(); - send(&num_chan2, i * j); - num_chan = Some(num_chan2); - let _n = recv(&num_port2); - //log(error, _n); - num_port = Some(num_port2); - }; -} - -fn main() { - let args = os::args(); - let args = if os::getenv("RUST_BENCH").is_some() { - vec!("".to_string(), "100".to_string(), "10000".to_string()) - } else if args.len() <= 1u { - vec!("".to_string(), "10".to_string(), "100".to_string()) - } else { - args.clone().into_iter().collect() - }; - - let num_tasks = from_str::(args[1].as_slice()).unwrap(); - let msg_per_task = from_str::(args[2].as_slice()).unwrap(); - - let (mut num_chan, num_port) = init(); - - let mut p = Some((num_chan, num_port)); - let dur = Duration::span(|| { - let (mut num_chan, num_port) = p.take().unwrap(); - - // create the ring - let mut futures = Vec::new(); - - for i in range(1u, num_tasks) { - //println!("spawning %?", i); - let (new_chan, num_port) = init(); - let num_chan_2 = num_chan.clone(); - let new_future = Future::spawn(proc() { - thread_ring(i, msg_per_task, num_chan_2, num_port) - }); - futures.push(new_future); - num_chan = new_chan; - }; - - // do our iteration - thread_ring(0, msg_per_task, num_chan, num_port); - - // synchronize - for f in futures.iter_mut() { - let _ = f.get(); - } - }); - - // all done, report stats. - let num_msgs = num_tasks * msg_per_task; - let rate = (num_msgs as f64) / (dur.num_milliseconds() as f64); - - println!("Sent {} messages in {} ms", num_msgs, dur.num_milliseconds()); - println!(" {} messages / second", rate / 1000.0); - println!(" {} μs / message", 1000000. / rate / 1000.0); -}