1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 use option::{Option,Some,None};
17 use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst};
18 use unstable::finally::Finally;
19 use unstable::mutex::Mutex;
25 /// An atomically reference counted pointer.
27 /// Enforces no shared-memory safety.
28 //#[unsafe_no_drop_flag] FIXME: #9758
29 pub struct UnsafeArc<T> {
30 data: *mut ArcData<T>,
33 pub enum UnsafeArcUnwrap<T> {
34 UnsafeArcSelf(UnsafeArc<T>),
38 impl<T> UnsafeArcUnwrap<T> {
39 fn expect_t(self, msg: &'static str) -> T {
41 UnsafeArcSelf(_) => fail!(msg),
46 fn is_self(&self) -> bool {
48 UnsafeArcSelf(_) => true,
49 UnsafeArcT(_) => false
56 // An unwrapper uses this protocol to communicate with the "other" task that
57 // drops the last refcount on an arc. Unfortunately this can't be a proper
58 // pipe protocol because the unwrapper has to access both stages at once.
59 // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)?
60 unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>,
61 // FIXME(#3224) should be able to make this non-option to save memory
65 unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
66 let data = ~ArcData { count: AtomicUint::new(refcount),
67 unwrapper: AtomicOption::empty(),
72 impl<T: Send> UnsafeArc<T> {
73 pub fn new(data: T) -> UnsafeArc<T> {
74 unsafe { UnsafeArc { data: new_inner(data, 1) } }
77 /// As new(), but returns an extra pre-cloned handle.
78 pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) {
80 let ptr = new_inner(data, 2);
81 (UnsafeArc { data: ptr }, UnsafeArc { data: ptr })
85 /// As new(), but returns a vector of as many pre-cloned handles as requested.
86 pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] {
89 ~[] // need to free data here
91 let ptr = new_inner(data, num_handles);
92 vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
97 /// As newN(), but from an already-existing handle. Uses one xadd.
98 pub fn cloneN(self, num_handles: uint) -> ~[UnsafeArc<T>] {
100 ~[] // The "num_handles - 1" trick (below) fails in the 0 case.
103 // Minus one because we are recycling the given handle's refcount.
104 let old_count = (*self.data).count.fetch_add(num_handles - 1, Acquire);
105 // let old_count = (*self.data).count.fetch_add(num_handles, Acquire);
106 assert!(old_count >= 1);
108 cast::forget(self); // Don't run the destructor on this handle.
109 vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
115 pub fn get(&self) -> *mut T {
117 assert!((*self.data).count.load(Relaxed) > 0);
118 let r: *mut T = (*self.data).data.get_mut_ref();
124 pub fn get_immut(&self) -> *T {
126 assert!((*self.data).count.load(Relaxed) > 0);
127 let r: *T = (*self.data).data.get_ref();
132 /// Wait until all other handles are dropped, then retrieve the enclosed
133 /// data. See extra::arc::Arc for specific semantics documentation.
134 /// If called when the task is already unkillable, unwrap will unkillably
135 /// block; otherwise, an unwrapping task can be killed by linked failure.
136 pub fn unwrap(self) -> T {
139 // The ~ dtor needs to run if this code succeeds.
140 let mut data: ~ArcData<T> = cast::transmute(this.data);
141 // Set up the unwrap protocol.
142 let (p1,c1) = comm::oneshot(); // ()
143 let (p2,c2) = comm::oneshot(); // bool
144 // Try to put our server end in the unwrapper slot.
145 // This needs no barrier -- it's protected by the release barrier on
146 // the xadd, and the acquire+release barrier in the destructor's xadd.
147 if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
148 // Got in. Tell this handle's destructor not to run (we are now it).
149 this.data = ptr::mut_null();
150 // Drop our own reference.
151 let old_count = data.count.fetch_sub(1, Release);
152 assert!(old_count >= 1);
154 // We were the last owner. Can unwrap immediately.
155 // AtomicOption's destructor will free the server endpoint.
156 // FIXME(#3224): it should be like this
157 // let ~ArcData { data: user_data, _ } = data;
159 data.data.take_unwrap()
161 // The *next* person who sees the refcount hit 0 will wake us.
162 let p1 = Cell::new(p1); // argh
163 // Unlike the above one, this cell is necessary. It will get
164 // taken either in the do block or in the finally block.
165 let c2_and_data = Cell::new((c2,data));
168 // Got here. Back in the 'unkillable' without getting killed.
169 let (c2, data) = c2_and_data.take();
171 // FIXME(#3224): it should be like this
172 // let ~ArcData { data: user_data, _ } = data;
175 data.data.take_unwrap()
178 // Killed during wait. Because this might happen while
179 // someone else still holds a reference, we can't free
180 // the data now; the "other" last refcount will free it.
181 let (c2, data) = c2_and_data.take();
185 assert!(c2_and_data.is_empty());
190 // If 'put' returns the server end back to us, we were rejected;
191 // someone else was trying to unwrap. Avoid guaranteed deadlock.
193 fail!("Another task is already unwrapping this Arc!");
198 /// As unwrap above, but without blocking. Returns 'UnsafeArcSelf(self)' if this is
199 /// not the last reference; 'UnsafeArcT(unwrapped_data)' if so.
200 pub fn try_unwrap(mut self) -> UnsafeArcUnwrap<T> {
202 // The ~ dtor needs to run if this code succeeds.
203 let mut data: ~ArcData<T> = cast::transmute(self.data);
204 // This can of course race with anybody else who has a handle, but in
205 // such a case, the returned count will always be at least 2. If we
206 // see 1, no race was possible. All that matters is 1 or not-1.
207 let count = data.count.load(Acquire);
209 // The more interesting race is one with an unwrapper. They may have
210 // already dropped their count -- but if so, the unwrapper pointer
211 // will have been set first, which the barriers ensure we will see.
212 // (Note: using is_empty(), not take(), to not free the unwrapper.)
213 if count == 1 && data.unwrapper.is_empty(Acquire) {
214 // Tell this handle's destructor not to run (we are now it).
215 self.data = ptr::mut_null();
216 // FIXME(#3224) as above
217 UnsafeArcT(data.data.take_unwrap())
226 impl<T: Send> Clone for UnsafeArc<T> {
227 fn clone(&self) -> UnsafeArc<T> {
229 // This barrier might be unnecessary, but I'm not sure...
230 let old_count = (*self.data).count.fetch_add(1, Acquire);
231 assert!(old_count >= 1);
232 return UnsafeArc { data: self.data };
238 impl<T> Drop for UnsafeArc<T>{
241 // Happens when destructing an unwrapper's handle and from `#[unsafe_no_drop_flag]`
242 if self.data.is_null() {
245 let mut data: ~ArcData<T> = cast::transmute(self.data);
246 // Must be acquire+release, not just release, to make sure this
247 // doesn't get reordered to after the unwrapper pointer load.
248 let old_count = data.count.fetch_sub(1, SeqCst);
249 assert!(old_count >= 1);
251 // Were we really last, or should we hand off to an
252 // unwrapper? It's safe to not xchg because the unwrapper
253 // will set the unwrap lock *before* dropping his/her
254 // reference. In effect, being here means we're the only
255 // *awake* task with the data.
256 match data.unwrapper.take(Acquire) {
257 Some(~(message,response)) => {
258 // Send 'ready' and wait for a response.
260 // Unkillable wait. Message guaranteed to come.
262 // Other task got the data.
265 // Other task was killed. drop glue takes over.
269 // drop glue takes over.
280 /****************************************************************************/
283 * Enables a runtime assertion that no operation in the argument closure shall
284 * use scheduler operations (deschedule, recv, spawn, etc). This is for use with
285 * pthread mutexes, which may block the entire scheduler thread, rather than
286 * just one task, and is hence prone to deadlocks if mixed with descheduling.
288 * NOTE: THIS DOES NOT PROVIDE LOCKING, or any sort of critical-section
289 * synchronization whatsoever. It only makes sense to use for CPU-local issues.
291 // FIXME(#8140) should not be pub
292 pub unsafe fn atomically<U>(f: || -> U) -> U {
293 use rt::task::{Task, GreenTask, SchedTask};
294 use rt::local::Local;
296 let task_opt: Option<*mut Task> = Local::try_unsafe_borrow();
299 match (*t).task_type {
302 (*t).death.inhibit_deschedule();
304 }).finally(|| (*t).death.allow_deschedule())
313 pub struct LittleLock {
317 pub struct LittleGuard<'a> {
318 priv l: &'a mut Mutex,
321 impl Drop for LittleLock {
323 unsafe { self.l.destroy(); }
328 impl<'a> Drop for LittleGuard<'a> {
330 unsafe { self.l.unlock(); }
335 pub fn new() -> LittleLock {
336 unsafe { LittleLock { l: Mutex::new() } }
339 pub unsafe fn lock<'a>(&'a mut self) -> LittleGuard<'a> {
341 LittleGuard { l: &mut self.l }
344 pub unsafe fn try_lock<'a>(&'a mut self) -> Option<LittleGuard<'a>> {
345 if self.l.trylock() {
346 Some(LittleGuard { l: &mut self.l })
352 pub unsafe fn signal(&mut self) {
357 impl<'a> LittleGuard<'a> {
358 pub unsafe fn wait(&mut self) {
370 * An arc over mutable data that is protected by a lock. For library use only.
374 * This uses a pthread mutex, not one that's aware of the userspace scheduler.
375 * The user of an Exclusive must be careful not to invoke any functions that may
376 * reschedule the task while holding the lock, or deadlock may result. If you
377 * need to block or deschedule while accessing shared state, use extra::sync::RWArc.
379 pub struct Exclusive<T> {
380 priv x: UnsafeArc<ExData<T>>
383 impl<T:Send> Clone for Exclusive<T> {
384 // Duplicate an Exclusive Arc, as std::arc::clone.
385 fn clone(&self) -> Exclusive<T> {
386 Exclusive { x: self.x.clone() }
390 impl<T:Send> Exclusive<T> {
391 pub fn new(user_data: T) -> Exclusive<T> {
393 lock: LittleLock::new(),
398 x: UnsafeArc::new(data)
402 // Exactly like std::arc::MutexArc,access(), but with the LittleLock
403 // instead of a proper mutex. Same reason for being unsafe.
405 // Currently, scheduling operations (i.e., descheduling, receiving on a pipe,
406 // accessing the provided condition variable) are prohibited while inside
407 // the Exclusive. Supporting that is a work in progress.
409 pub unsafe fn with<U>(&self, f: |x: &mut T| -> U) -> U {
410 let rec = self.x.get();
411 let _l = (*rec).lock.lock();
413 fail!("Poisoned Exclusive::new - another task failed inside!");
415 (*rec).failed = true;
416 let result = f(&mut (*rec).data);
417 (*rec).failed = false;
422 pub unsafe fn with_imm<U>(&self, f: |x: &T| -> U) -> U {
427 pub unsafe fn hold_and_signal(&self, f: |x: &mut T|) {
428 let rec = self.x.get();
429 let _l = (*rec).lock.lock();
431 fail!("Poisoned Exclusive::new - another task failed inside!");
433 (*rec).failed = true;
435 (*rec).failed = false;
436 (*rec).lock.signal();
440 pub unsafe fn hold_and_wait(&self, f: |x: &T| -> bool) {
441 let rec = self.x.get();
442 let mut l = (*rec).lock.lock();
444 fail!("Poisoned Exclusive::new - another task failed inside!");
446 (*rec).failed = true;
447 let result = f(&(*rec).data);
448 (*rec).failed = false;
454 pub fn unwrap(self) -> T {
455 let Exclusive { x: x } = self;
456 // Someday we might need to unkillably unwrap an Exclusive, but not today.
457 let inner = x.unwrap();
458 let ExData { data: user_data, .. } = inner; // will destroy the LittleLock
468 use super::{Exclusive, UnsafeArc, atomically};
473 //#[unsafe_no_drop_flag] FIXME: #9758
477 assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>());
481 fn test_atomically() {
482 // NB. The whole runtime will abort on an 'atomic-sleep' violation,
483 // so we can't really test for the converse behaviour.
484 unsafe { atomically(|| ()) } task::deschedule(); // oughtn't fail
488 fn exclusive_new_arc() {
490 let mut futures = ~[];
495 let total = Exclusive::new(~0);
497 for _ in range(0u, num_tasks) {
498 let total = total.clone();
499 let (port, chan) = comm::stream();
503 for _ in range(0u, count) {
504 total.with(|count| **count += 1);
510 for f in futures.iter() { f.recv() }
512 total.with(|total| assert!(**total == num_tasks * count));
516 #[test] #[should_fail]
517 fn exclusive_new_poison() {
519 // Tests that if one task fails inside of an Exclusive::new, subsequent
520 // accesses will also fail.
521 let x = Exclusive::new(1);
524 x2.with(|one| assert_eq!(*one, 2))
526 x.with(|one| assert_eq!(*one, 1));
532 // Tests that the many-refcounts-at-once constructors don't leak.
533 let _ = UnsafeArc::new2(~~"hello");
534 let x = UnsafeArc::newN(~~"hello", 0);
535 assert_eq!(x.len(), 0)
536 let x = UnsafeArc::newN(~~"hello", 1);
537 assert_eq!(x.len(), 1)
538 let x = UnsafeArc::newN(~~"hello", 10);
539 assert_eq!(x.len(), 10)
543 fn arclike_cloneN() {
544 // Tests that the many-refcounts-at-once special-clone doesn't leak.
545 let x = UnsafeArc::new(~~"hello");
547 assert_eq!(x.len(), 0);
548 let x = UnsafeArc::new(~~"hello");
550 assert_eq!(x.len(), 1);
551 let x = UnsafeArc::new(~~"hello");
552 let x = x.cloneN(10);
553 assert_eq!(x.len(), 10);
557 fn arclike_unwrap_basic() {
558 let x = UnsafeArc::new(~~"hello");
559 assert!(x.unwrap() == ~~"hello");
563 fn arclike_try_unwrap() {
564 let x = UnsafeArc::new(~~"hello");
565 assert!(x.try_unwrap().expect_t("try_unwrap failed") == ~~"hello");
569 fn arclike_try_unwrap_fail() {
570 let x = UnsafeArc::new(~~"hello");
572 let left_x = x.try_unwrap();
573 assert!(left_x.is_self());
574 util::ignore(left_x);
575 assert!(x2.try_unwrap().expect_t("try_unwrap none") == ~~"hello");
579 fn arclike_try_unwrap_unwrap_race() {
580 // When an unwrap and a try_unwrap race, the unwrapper should always win.
581 let x = UnsafeArc::new(~~"hello");
583 let (p,c) = comm::stream();
586 assert!(x2.unwrap() == ~~"hello");
590 task::deschedule(); // Try to make the unwrapper get blocked first.
591 let left_x = x.try_unwrap();
592 assert!(left_x.is_self());
593 util::ignore(left_x);
598 fn exclusive_new_unwrap_basic() {
599 // Unlike the above, also tests no double-freeing of the LittleLock.
600 let x = Exclusive::new(~~"hello");
601 assert!(x.unwrap() == ~~"hello");
605 fn exclusive_new_unwrap_contended() {
606 let x = Exclusive::new(~~"hello");
609 unsafe { x2.with(|_hello| ()); }
612 assert!(x.unwrap() == ~~"hello");
614 // Now try the same thing, but with the child task blocking.
615 let x = Exclusive::new(~~"hello");
617 let mut builder = task::task();
618 let res = builder.future_result();
620 assert!(x2.unwrap() == ~~"hello");
622 // Have to get rid of our reference before blocking.
627 #[test] #[should_fail]
628 fn exclusive_new_unwrap_conflict() {
629 let x = Exclusive::new(~~"hello");
631 let mut builder = task::task();
632 let res = builder.future_result();
634 assert!(x2.unwrap() == ~~"hello");
636 assert!(x.unwrap() == ~~"hello");
637 assert!(res.recv().is_ok());