X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Flibstd%2Fthread.rs;h=cc9d7492441cd86175a138ad7b325c6332d33573;hb=7d2404cb420591588684d4681cf81fe8cff1ace3;hp=4d932b3e7779d21b0b535dffc7be4029df3589c5;hpb=78567f124465f6259edbaf09678415a537d57a78;p=rust.git diff --git a/src/libstd/thread.rs b/src/libstd/thread.rs index 4d932b3e777..cc9d7492441 100644 --- a/src/libstd/thread.rs +++ b/src/libstd/thread.rs @@ -123,7 +123,8 @@ //! * The `Thread::park()` function blocks the current thread unless or until //! the token is available for its thread handle, at which point It atomically //! consumes the token. It may also return *spuriously*, without consuming the -//! token. +//! token. `Thread::park_timeout()` does the same, but allows specifying a +//! maximum time to block the thread for. //! //! * The `unpark()` method on a `Thread` atomically makes the token available //! if it wasn't already. @@ -144,7 +145,7 @@ //! //! * It can be implemented highly efficiently on many platforms. -#![stable] +#![stable(feature = "rust1", since = "1.0.0")] use any::Any; use boxed::Box; @@ -160,13 +161,14 @@ use rt::{self, unwind}; use old_io::{Writer, stdio}; use thunk::Thunk; +use time::Duration; use sys::thread as imp; use sys_common::{stack, thread_info}; /// Thread configuration. Provides detailed control over the properties /// and behavior of new threads. -#[stable] +#[stable(feature = "rust1", since = "1.0.0")] pub struct Builder { // A name for the thread-to-be, for identification in panic messages name: Option, @@ -181,7 +183,7 @@ pub struct Builder { impl Builder { /// Generate the base configuration for spawning a thread, from which /// configuration methods can be chained. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn new() -> Builder { Builder { name: None, @@ -193,28 +195,30 @@ pub fn new() -> Builder { /// Name the thread-to-be. Currently the name is used for identification /// only in panic messages. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn name(mut self, name: String) -> Builder { self.name = Some(name); self } /// Set the size of the stack for the new thread. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn stack_size(mut self, size: uint) -> Builder { self.stack_size = Some(size); self } /// Redirect thread-local stdout. - #[unstable = "Will likely go away after proc removal"] + #[unstable(feature = "std_misc", + reason = "Will likely go away after proc removal")] pub fn stdout(mut self, stdout: Box) -> Builder { self.stdout = Some(stdout); self } /// Redirect thread-local stderr. - #[unstable = "Will likely go away after proc removal"] + #[unstable(feature = "std_misc", + reason = "Will likely go away after proc removal")] pub fn stderr(mut self, stderr: Box) -> Builder { self.stderr = Some(stderr); self @@ -223,7 +227,8 @@ pub fn stderr(mut self, stderr: Box) -> Builder { /// Spawn a new detached thread, and return a handle to it. /// /// See `Thead::spawn` and the module doc for more details. - #[unstable = "may change with specifics of new Send semantics"] + #[unstable(feature = "std_misc", + reason = "may change with specifics of new Send semantics")] pub fn spawn(self, f: F) -> Thread where F: FnOnce(), F: Send + 'static { let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(|_| {})); unsafe { imp::detach(native) }; @@ -234,13 +239,14 @@ pub fn spawn(self, f: F) -> Thread where F: FnOnce(), F: Send + 'static { /// scope, and return a `JoinGuard`. /// /// See `Thead::scoped` and the module doc for more details. - #[unstable = "may change with specifics of new Send semantics"] + #[unstable(feature = "std_misc", + reason = "may change with specifics of new Send semantics")] pub fn scoped<'a, T, F>(self, f: F) -> JoinGuard<'a, T> where T: Send + 'a, F: FnOnce() -> T, F: Send + 'a { let my_packet = Packet(Arc::new(UnsafeCell::new(None))); let their_packet = Packet(my_packet.0.clone()); - let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(move |: ret| unsafe { + let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(move |ret| unsafe { *their_packet.0.get() = Some(ret); })); @@ -267,7 +273,7 @@ fn spawn_inner(self, f: Thunk<(), T>, finish: Thunk, ()>) // because by the time that this function is executing we've already // consumed at least a little bit of stack (we don't know the exact byte // address at which our stack started). - let main = move |:| { + let main = move || { let something_around_the_top_of_the_stack = 1; let addr = &something_around_the_top_of_the_stack as *const int; let my_stack_top = addr as uint; @@ -275,6 +281,10 @@ fn spawn_inner(self, f: Thunk<(), T>, finish: Thunk, ()>) unsafe { stack::record_os_managed_stack_bounds(my_stack_bottom, my_stack_top); } + match their_thread.name() { + Some(name) => unsafe { imp::set_name(name.as_slice()); }, + None => {} + } thread_info::set( (my_stack_bottom, my_stack_top), unsafe { imp::guard::current() }, @@ -283,7 +293,7 @@ fn spawn_inner(self, f: Thunk<(), T>, finish: Thunk, ()>) let mut output = None; let f: Thunk<(), T> = if stdout.is_some() || stderr.is_some() { - Thunk::new(move |:| { + Thunk::new(move || { let _ = stdout.map(stdio::set_stdout); let _ = stderr.map(stdio::set_stderr); f.invoke(()) @@ -326,7 +336,7 @@ struct Inner { unsafe impl Sync for Inner {} #[derive(Clone)] -#[stable] +#[stable(feature = "rust1", since = "1.0.0")] /// A handle to a thread. pub struct Thread { inner: Arc, @@ -350,7 +360,8 @@ fn new(name: Option) -> Thread { /// main thread; the whole process is terminated when the main thread /// finishes.) The thread handle can be used for low-level /// synchronization. See the module documentation for additional details. - #[unstable = "may change with specifics of new Send semantics"] + #[unstable(feature = "std_misc", + reason = "may change with specifics of new Send semantics")] pub fn spawn(f: F) -> Thread where F: FnOnce(), F: Send + 'static { Builder::new().spawn(f) } @@ -363,7 +374,8 @@ pub fn spawn(f: F) -> Thread where F: FnOnce(), F: Send + 'static { /// current thread's stack (hence the "scoped" name), it cannot be detached; /// it *must* be joined before the relevant stack frame is popped. See the /// module documentation for additional details. - #[unstable = "may change with specifics of new Send semantics"] + #[unstable(feature = "std_misc", + reason = "may change with specifics of new Send semantics")] pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where T: Send + 'a, F: FnOnce() -> T, F: Send + 'a { @@ -371,20 +383,20 @@ pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where } /// Gets a handle to the thread that invokes it. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn current() -> Thread { thread_info::current_thread() } /// Cooperatively give up a timeslice to the OS scheduler. - #[unstable = "name may change"] + #[unstable(feature = "std_misc", reason = "name may change")] pub fn yield_now() { unsafe { imp::yield_now() } } /// Determines whether the current thread is unwinding because of panic. #[inline] - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn panicking() -> bool { unwind::panicking() } @@ -398,7 +410,7 @@ pub fn panicking() -> bool { // future, this will be implemented in a more efficient way, perhaps along the lines of // http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp // or futuxes, and in either case may allow spurious wakeups. - #[unstable = "recently introduced"] + #[unstable(feature = "std_misc", reason = "recently introduced")] pub fn park() { let thread = Thread::current(); let mut guard = thread.inner.lock.lock().unwrap(); @@ -408,10 +420,31 @@ pub fn park() { *guard = false; } + /// Block unless or until the current thread's token is made available or + /// the specified duration has been reached (may wake spuriously). + /// + /// The semantics of this function are equivalent to `park()` except that the + /// thread will be blocked for roughly no longer than dur. This method + /// should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely dur + /// + /// See the module doc for more detail. + #[unstable(feature = "std_misc", reason = "recently introduced")] + pub fn park_timeout(dur: Duration) { + let thread = Thread::current(); + let mut guard = thread.inner.lock.lock().unwrap(); + if !*guard { + let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap(); + guard = g; + } + *guard = false; + } + /// Atomically makes the handle's token available if it is not already. /// /// See the module doc for more detail. - #[unstable = "recently introduced"] + #[unstable(feature = "std_misc", reason = "recently introduced")] pub fn unpark(&self) { let mut guard = self.inner.lock.lock().unwrap(); if !*guard { @@ -421,9 +454,9 @@ pub fn unpark(&self) { } /// Get the thread's name. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn name(&self) -> Option<&str> { - self.inner.name.as_ref().map(|s| s.as_slice()) + self.inner.name.as_ref().map(|s| &**s) } } @@ -435,7 +468,7 @@ fn new(name: Option) -> Thread { Thread::new(name) } /// Indicates the manner in which a thread exited. /// /// A thread that completes without panicking is considered to exit successfully. -#[stable] +#[stable(feature = "rust1", since = "1.0.0")] pub type Result = ::result::Result>; struct Packet(Arc>>>); @@ -447,7 +480,8 @@ unsafe impl Sync for Packet {} /// /// The type `T` is the return type for the thread's main function. #[must_use] -#[unstable = "may change with specifics of new Send semantics"] +#[unstable(feature = "std_misc", + reason = "may change with specifics of new Send semantics")] pub struct JoinGuard<'a, T: 'a> { native: imp::rust_thread, thread: Thread, @@ -455,12 +489,12 @@ pub struct JoinGuard<'a, T: 'a> { packet: Packet, } -#[stable] +#[stable(feature = "rust1", since = "1.0.0")] unsafe impl<'a, T: Send + 'a> Sync for JoinGuard<'a, T> {} impl<'a, T: Send + 'a> JoinGuard<'a, T> { /// Extract a handle to the thread this guard will join on. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn thread(&self) -> &Thread { &self.thread } @@ -470,7 +504,7 @@ pub fn thread(&self) -> &Thread { /// /// If the child thread panics, `Err` is returned with the parameter given /// to `panic`. - #[stable] + #[stable(feature = "rust1", since = "1.0.0")] pub fn join(mut self) -> Result { assert!(!self.joined); unsafe { imp::join(self.native) }; @@ -483,7 +517,8 @@ pub fn join(mut self) -> Result { impl JoinGuard<'static, T> { /// Detaches the child thread, allowing it to outlive its parent. - #[unstable = "unsure whether this API imposes limitations elsewhere"] + #[unstable(feature = "std_misc", + reason = "unsure whether this API imposes limitations elsewhere")] pub fn detach(mut self) { unsafe { imp::detach(self.native) }; self.joined = true; // avoid joining in the destructor @@ -491,7 +526,7 @@ pub fn detach(mut self) { } #[unsafe_destructor] -#[stable] +#[stable(feature = "rust1", since = "1.0.0")] impl<'a, T: Send + 'a> Drop for JoinGuard<'a, T> { fn drop(&mut self) { if !self.joined { @@ -511,6 +546,7 @@ mod test { use std::old_io::{ChanReader, ChanWriter}; use super::{Thread, Builder}; use thunk::Thunk; + use time::Duration; // !!! These tests are dangerous. If something is buggy, they will hang, !!! // !!! instead of exiting cleanly. This might wedge the buildbots. !!! @@ -542,7 +578,7 @@ fn test_run_basic() { fn test_join_success() { match Thread::scoped(move|| -> String { "Success!".to_string() - }).join().as_ref().map(|s| s.as_slice()) { + }).join().as_ref().map(|s| &**s) { result::Result::Ok("Success!") => (), _ => panic!() } @@ -725,6 +761,37 @@ fn test_stdout() { assert_eq!(output, "Hello, world!".to_string()); } + #[test] + fn test_park_timeout_unpark_before() { + for _ in 0..10 { + Thread::current().unpark(); + Thread::park_timeout(Duration::seconds(10_000_000)); + } + } + + #[test] + fn test_park_timeout_unpark_not_called() { + for _ in 0..10 { + Thread::park_timeout(Duration::milliseconds(10)); + } + } + + #[test] + fn test_park_timeout_unpark_called_other_thread() { + use std::old_io; + + for _ in 0..10 { + let th = Thread::current(); + + let _guard = Thread::scoped(move || { + old_io::timer::sleep(Duration::milliseconds(50)); + th.unpark(); + }); + + Thread::park_timeout(Duration::seconds(10_000_000)); + } + } + // NOTE: the corresponding test for stderr is in run-pass/task-stderr, due // to the test harness apparently interfering with stderr configuration. }