"rt/isaac/randport.cpp", # public domain
"rt/isaac/rand.h", # public domain
"rt/isaac/standard.h", # public domain
- "libstd/sync/mpsc_queue.rs", # BSD
- "libstd/sync/spsc_queue.rs", # BSD
- "libstd/sync/mpmc_bounded_queue.rs", # BSD
+ "libstd/comm/mpsc_queue.rs", # BSD
+ "libstd/comm/spsc_queue.rs", # BSD
"test/bench/shootout-binarytrees.rs", # BSD
"test/bench/shootout-chameneos-redux.rs", # BSD
"test/bench/shootout-fannkuch-redux.rs", # BSD
mod shared;
mod stream;
mod sync;
+mod mpsc_queue;
+mod spsc_queue;
/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
#[unstable]
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
- let (packet, sleeper) = match *unsafe { self.inner() } {
+ let (packet, sleeper, guard) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
- (*a.get()).postinit_lock();
+ let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
- oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
- oneshot::UpWoke(task) => (a, Some(task))
+ oneshot::UpSuccess |
+ oneshot::UpDisconnected => (a, None, guard),
+ oneshot::UpWoke(task) => (a, Some(task), guard)
}
}
}
Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
- (*a.get()).postinit_lock();
+ let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
- stream::UpSuccess | stream::UpDisconnected => (a, None),
- stream::UpWoke(task) => (a, Some(task)),
+ stream::UpSuccess |
+ stream::UpDisconnected => (a, None, guard),
+ stream::UpWoke(task) => (a, Some(task), guard),
}
}
}
};
unsafe {
- (*packet.get()).inherit_blocker(sleeper);
+ (*packet.get()).inherit_blocker(sleeper, guard);
let tmp = Sender::new(Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
use core::cmp;
use core::int;
use rustrt::local::Local;
-use rustrt::mutex::NativeMutex;
use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;
-use sync::atomic;
-use sync::mpsc_queue as mpsc;
+use sync::{atomic, Mutex, MutexGuard};
+use comm::mpsc_queue as mpsc;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
// this lock protects various portions of this implementation during
// select()
- select_lock: NativeMutex,
+ select_lock: Mutex<()>,
}
pub enum Failure {
channels: atomic::AtomicInt::new(2),
port_dropped: atomic::AtomicBool::new(false),
sender_drain: atomic::AtomicInt::new(0),
- select_lock: unsafe { NativeMutex::new() },
+ select_lock: Mutex::new(()),
};
return p;
}
// In other case mutex data will be duplicated while cloning
// and that could cause problems on platforms where it is
// represented by opaque data structure
- pub fn postinit_lock(&mut self) {
- unsafe { self.select_lock.lock_noguard() }
+ pub fn postinit_lock(&self) -> MutexGuard<()> {
+ self.select_lock.lock()
}
// This function is used at the creation of a shared packet to inherit a
// tasks in select().
//
// This can only be called at channel-creation time
- pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+ pub fn inherit_blocker(&mut self,
+ task: Option<BlockedTask>,
+ guard: MutexGuard<()>) {
match task {
Some(task) => {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
// interfere with this method. After we unlock this lock, we're
// signifying that we're done modifying self.cnt and self.to_wake and
// the port is ready for the world to continue using it.
- unsafe { self.select_lock.unlock_noguard() }
+ drop(guard);
}
pub fn send(&mut self, t: T) -> Result<(), T> {
// done with. Without this bounce, we can race with inherit_blocker
// about looking at and dealing with to_wake. Once we have acquired the
// lock, we are guaranteed that inherit_blocker is done.
- unsafe {
+ {
let _guard = self.select_lock.lock();
}
use rustrt::thread::Thread;
use sync::atomic;
-use sync::spsc_queue as spsc;
+use comm::spsc_queue as spsc;
use comm::Receiver;
const DISCONNECTED: int = int::MIN;
}
pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
- use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
- static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
+ use sync::{StaticMutex, MUTEX_INIT};
+ static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire
// sequence
#![allow(unknown_features)]
#![feature(macro_rules, globs, linkage)]
#![feature(default_type_params, phase, lang_items, unsafe_destructor)]
-#![feature(import_shadowing, slicing_syntax)]
+#![feature(import_shadowing, slicing_syntax, tuple_indexing)]
// Don't link to std. We are std.
#![no_std]
Serialize access through a global lock.
*/
fn with_env_lock<T>(f: || -> T) -> T {
- use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+ use sync::{StaticMutex, MUTEX_INIT};
- static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
+ static LOCK: StaticMutex = MUTEX_INIT;
- unsafe {
- let _guard = LOCK.lock();
- f()
- }
+ let _guard = LOCK.lock();
+ f()
}
/// Returns a vector of (variable, value) pairs, for all the environment
use mem;
use option::{Some, None, Option};
use result::{Ok, Err};
- use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+ use sync::{StaticMutex, MUTEX_INIT};
/// As always - iOS on arm uses SjLj exceptions and
/// _Unwind_Backtrace is even not available there. Still,
// while it doesn't requires lock for work as everything is
// local, it still displays much nicer backtraces when a
// couple of tasks panic simultaneously
- static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
- let _g = unsafe { LOCK.lock() };
+ static LOCK: StaticMutex = MUTEX_INIT;
+ let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
// 100 lines should be enough
// is semi-reasonable in terms of printing anyway, and we know that all
// I/O done here is blocking I/O, not green I/O, so we don't have to
// worry about this being a native vs green mutex.
- static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
- let _g = unsafe { LOCK.lock() };
+ static LOCK: StaticMutex = MUTEX_INIT;
+ let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
use option::{Some, None};
use path::Path;
use result::{Ok, Err};
- use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+ use sync::{StaticMutex, MUTEX_INIT};
use slice::SlicePrelude;
use str::StrPrelude;
use dynamic_lib::DynamicLibrary;
pub fn write(w: &mut Writer) -> IoResult<()> {
// According to windows documentation, all dbghelp functions are
// single-threaded.
- static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
- let _g = unsafe { LOCK.lock() };
+ static LOCK: StaticMutex = MUTEX_INIT;
+ let _g = LOCK.lock();
// Open up dbghelp.dll, we don't link to it explicitly because it can't
// always be found. Additionally, it's nice having fewer dependencies.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
- pub fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
- dur: Duration) -> bool {
+ // Note that this method is *not* public, and this is quite intentional
+ // because we're not quite sure about the semantics of relative vs absolute
+ // durations or how the timing guarantees play into what the system APIs
+ // provide. There are also additional concerns about the unix-specific
+ // implementation which may need to be addressed.
+ #[allow(dead_code)]
+ fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
+ dur: Duration) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(mutex_guard, dur)
/// specified duration.
///
/// See `Condvar::wait_timeout`.
- pub fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
- dur: Duration) -> bool {
+ #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
+ fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
+ dur: Duration) -> bool {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
-/// for _ in range(0, 10) {
+/// for _ in range(0u, 10) {
/// let (data, tx) = (data.clone(), tx.clone());
/// spawn(proc() {
/// // The shared static can only be accessed once the lock is held.
//! can be created in the future and there must be no active timers at that
//! time.
+use prelude::*;
+
+use cell::UnsafeCell;
use mem;
use rustrt::bookkeeping;
-use rustrt::mutex::StaticNativeMutex;
use rustrt;
-use cell::UnsafeCell;
+use sync::{StaticMutex, StaticCondvar};
use sys::helper_signal;
-use prelude::*;
use task;
/// is for static initialization.
pub struct Helper<M> {
/// Internal lock which protects the remaining fields
- pub lock: StaticNativeMutex,
+ pub lock: StaticMutex,
+ pub cond: StaticCondvar,
// You'll notice that the remaining fields are UnsafeCell<T>, and this is
// because all helper thread operations are done through &self, but we need
/// Flag if this helper thread has booted and been initialized yet.
pub initialized: UnsafeCell<bool>,
+
+ /// Flag if this helper thread has shut down
+ pub shutdown: UnsafeCell<bool>,
}
impl<M: Send> Helper<M> {
task::spawn(proc() {
bookkeeping::decrement();
helper(receive, rx, t);
- self.lock.lock().signal()
+ let _g = self.lock.lock();
+ *self.shutdown.get() = true;
+ self.cond.notify_one()
});
rustrt::at_exit(proc() { self.shutdown() });
helper_signal::signal(*self.signal.get() as helper_signal::signal);
// Wait for the child to exit
- guard.wait();
+ while !*self.shutdown.get() {
+ self.cond.wait(&guard);
+ }
drop(guard);
// Clean up after ourselves
use mem;
use num::Int;
use ptr::{mod, null, null_mut};
-use rustrt::mutex;
use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
use io::net::addrinfo;
use io::{IoResult, IoError};
use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock,
wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval,
decode_error_detailed};
+use sync::{Mutex, MutexGuard};
use sys_common::{mod, keep_going, short_write, timeout};
use prelude::*;
use cmp;
// Unused on Linux, where this lock is not necessary.
#[allow(dead_code)]
- lock: mutex::NativeMutex
+ lock: Mutex<()>,
}
impl Inner {
fn new(fd: sock_t) -> Inner {
- Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+ Inner { fd: fd, lock: Mutex::new(()) }
}
}
pub struct Guard<'a> {
pub fd: sock_t,
- pub guard: mutex::LockGuard<'a>,
+ pub guard: MutexGuard<'a, ()>,
}
#[unsafe_destructor]
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
- guard: unsafe { self.inner.lock.lock() },
+ guard: self.inner.lock.lock(),
};
assert!(set_nonblocking(self.fd(), true).is_ok());
ret
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
- guard: unsafe { self.inner.lock.lock() },
+ guard: self.inner.lock.lock(),
};
assert!(set_nonblocking(self.fd(), true).is_ok());
ret
macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
static $name: Helper<$m> = Helper {
- lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
+ lock: ::sync::MUTEX_INIT,
+ cond: ::sync::CONDVAR_INIT,
chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
signal: ::cell::UnsafeCell { value: 0 },
initialized: ::cell::UnsafeCell { value: false },
+ shutdown: ::cell::UnsafeCell { value: false },
};
) )
use libc;
use c_str::CString;
use mem;
-use rustrt::mutex;
-use sync::atomic;
+use sync::{atomic, Mutex};
use io::{mod, IoResult, IoError};
use prelude::*;
// Unused on Linux, where this lock is not necessary.
#[allow(dead_code)]
- lock: mutex::NativeMutex
+ lock: Mutex<()>,
}
impl Inner {
fn new(fd: fd_t) -> Inner {
- Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+ Inner { fd: fd, lock: Mutex::new(()) }
}
}
macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
static $name: Helper<$m> = Helper {
- lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
+ lock: ::sync::MUTEX_INIT,
+ cond: ::sync::CONDVAR_INIT,
chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
signal: ::cell::UnsafeCell { value: 0 },
initialized: ::cell::UnsafeCell { value: false },
+ shutdown: ::cell::UnsafeCell { value: false },
};
) )
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use prelude::*;
+
use sync::atomic;
use alloc::{mod, heap};
pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT };
#[inline]
-pub unsafe fn raw(m: &super::Mutex) -> ffi::LPCRITICAL_SECTION {
- m.0.get()
+pub unsafe fn raw(m: &Mutex) -> ffi::LPCRITICAL_SECTION {
+ m.get()
}
impl Mutex {
use c_str::CString;
use mem;
use ptr;
-use sync::atomic;
-use rustrt::mutex;
+use sync::{atomic, Mutex};
use io::{mod, IoError, IoResult};
use prelude::*;
struct Inner {
handle: libc::HANDLE,
- lock: mutex::NativeMutex,
+ lock: Mutex<()>,
read_closed: atomic::AtomicBool,
write_closed: atomic::AtomicBool,
}
fn new(handle: libc::HANDLE) -> Inner {
Inner {
handle: handle,
- lock: unsafe { mutex::NativeMutex::new() },
+ lock: Mutex::new(()),
read_closed: atomic::AtomicBool::new(false),
write_closed: atomic::AtomicBool::new(false),
}
// ignore-lexer-test FIXME #15679
use std::os;
-use std::sync::{Arc, Future, Mutex};
+use std::sync::{Arc, Future, Mutex, Condvar};
use std::time::Duration;
use std::uint;
// A poor man's pipe.
-type pipe = Arc<Mutex<Vec<uint>>>;
+type pipe = Arc<(Mutex<Vec<uint>>, Condvar)>;
fn send(p: &pipe, msg: uint) {
- let mut arr = p.lock();
+ let &(ref lock, ref cond) = &**p;
+ let mut arr = lock.lock();
arr.push(msg);
- arr.cond.signal();
+ cond.notify_one();
}
fn recv(p: &pipe) -> uint {
- let mut arr = p.lock();
+ let &(ref lock, ref cond) = &**p;
+ let mut arr = lock.lock();
while arr.is_empty() {
- arr.cond.wait();
+ cond.wait(&arr);
}
arr.pop().unwrap()
}
fn init() -> (pipe,pipe) {
- let m = Arc::new(Mutex::new(Vec::new()));
+ let m = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
((&m).clone(), m)
}
+++ /dev/null
-// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-// This test creates a bunch of tasks that simultaneously send to each
-// other in a ring. The messages should all be basically
-// independent.
-// This is like msgsend-ring-pipes but adapted to use Arcs.
-
-// This also serves as a pipes test, because Arcs are implemented with pipes.
-
-// no-pretty-expanded FIXME #15189
-// ignore-lexer-test FIXME #15679
-
-use std::os;
-use std::sync::{RWLock, Arc, Future};
-use std::time::Duration;
-use std::uint;
-
-// A poor man's pipe.
-type pipe = Arc<RWLock<Vec<uint>>>;
-
-fn send(p: &pipe, msg: uint) {
- let mut arr = p.write();
- arr.push(msg);
- arr.cond.signal();
-}
-fn recv(p: &pipe) -> uint {
- let mut arr = p.write();
- while arr.is_empty() {
- arr.cond.wait();
- }
- arr.pop().unwrap()
-}
-
-fn init() -> (pipe,pipe) {
- let x = Arc::new(RWLock::new(Vec::new()));
- ((&x).clone(), x)
-}
-
-
-fn thread_ring(i: uint, count: uint, num_chan: pipe, num_port: pipe) {
- let mut num_chan = Some(num_chan);
- let mut num_port = Some(num_port);
- // Send/Receive lots of messages.
- for j in range(0u, count) {
- //println!("task %?, iter %?", i, j);
- let num_chan2 = num_chan.take().unwrap();
- let num_port2 = num_port.take().unwrap();
- send(&num_chan2, i * j);
- num_chan = Some(num_chan2);
- let _n = recv(&num_port2);
- //log(error, _n);
- num_port = Some(num_port2);
- };
-}
-
-fn main() {
- let args = os::args();
- let args = if os::getenv("RUST_BENCH").is_some() {
- vec!("".to_string(), "100".to_string(), "10000".to_string())
- } else if args.len() <= 1u {
- vec!("".to_string(), "10".to_string(), "100".to_string())
- } else {
- args.clone().into_iter().collect()
- };
-
- let num_tasks = from_str::<uint>(args[1].as_slice()).unwrap();
- let msg_per_task = from_str::<uint>(args[2].as_slice()).unwrap();
-
- let (mut num_chan, num_port) = init();
-
- let mut p = Some((num_chan, num_port));
- let dur = Duration::span(|| {
- let (mut num_chan, num_port) = p.take().unwrap();
-
- // create the ring
- let mut futures = Vec::new();
-
- for i in range(1u, num_tasks) {
- //println!("spawning %?", i);
- let (new_chan, num_port) = init();
- let num_chan_2 = num_chan.clone();
- let new_future = Future::spawn(proc() {
- thread_ring(i, msg_per_task, num_chan_2, num_port)
- });
- futures.push(new_future);
- num_chan = new_chan;
- };
-
- // do our iteration
- thread_ring(0, msg_per_task, num_chan, num_port);
-
- // synchronize
- for f in futures.iter_mut() {
- let _ = f.get();
- }
- });
-
- // all done, report stats.
- let num_msgs = num_tasks * msg_per_task;
- let rate = (num_msgs as f64) / (dur.num_milliseconds() as f64);
-
- println!("Sent {} messages in {} ms", num_msgs, dur.num_milliseconds());
- println!(" {} messages / second", rate / 1000.0);
- println!(" {} μs / message", 1000000. / rate / 1000.0);
-}