1 // NB: transitionary, de-mode-ing.
2 // tjc: Re-forbid deprecated modes once a snapshot fixes the
4 #[forbid(deprecated_pattern)];
13 fn rust_task_weaken(ch: rust_port_id);
14 fn rust_task_unweaken(ch: rust_port_id);
17 fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
18 oldval: libc::uintptr_t,
19 newval: libc::uintptr_t) -> bool;
21 fn rust_create_little_lock() -> rust_little_lock;
22 fn rust_destroy_little_lock(lock: rust_little_lock);
23 fn rust_lock_little_lock(lock: rust_little_lock);
24 fn rust_unlock_little_lock(lock: rust_little_lock);
27 #[abi = "rust-intrinsic"]
30 #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]
31 fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
32 fn atomic_xadd(dst: &mut int, src: int) -> int;
33 fn atomic_xsub(dst: &mut int, src: int) -> int;
36 #[allow(non_camel_case_types)] // runtime type
37 type rust_port_id = uint;
39 type GlobalPtr = *libc::uintptr_t;
41 // TODO: Remove once snapshots have atomic_cxchg
43 fn compare_and_swap(address: &mut libc::uintptr_t,
44 oldval: libc::uintptr_t,
45 newval: libc::uintptr_t) -> bool {
46 rustrt::rust_compare_and_swap_ptr(address, oldval, newval)
52 fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
53 let old = rusti::atomic_cxchg(address, oldval, newval);
58 * Atomically gets a channel from a pointer to a pointer-sized memory location
59 * or, if no channel exists creates and installs a new channel and sets up a
60 * new task to receive from it.
62 pub unsafe fn chan_from_global_ptr<T: Send>(
64 task_fn: fn() -> task::TaskBuilder,
73 log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
74 let is_probably_zero = *global == 0u;
75 log(debug,~"after is_prob_zero check");
77 log(debug,~"is probably zero...");
78 // There's no global channel. We must make it
80 let (setup_po, setup_ch) = do task_fn().spawn_conversation
81 |move f, setup_po, setup_ch| {
82 let po = comm::Port::<T>();
83 let ch = comm::Chan(&po);
84 comm::send(setup_ch, ch);
86 // Wait to hear if we are the official instance of
88 match comm::recv::<Msg>(setup_po) {
89 Proceed => f(move po),
94 log(debug,~"before setup recv..");
95 // This is the proposed global channel
96 let ch = comm::recv(setup_po);
97 // 0 is our sentinal value. It is not a valid channel
100 // Install the channel
101 log(debug,~"BEFORE COMPARE AND SWAP");
102 let swapped = compare_and_swap(
103 cast::reinterpret_cast(&global),
104 0, cast::reinterpret_cast(&ch));
105 log(debug,fmt!("AFTER .. swapped? %?", swapped));
109 comm::send(setup_ch, Proceed);
112 // Somebody else got in before we did
113 comm::send(setup_ch, Abort);
114 cast::reinterpret_cast(&*global)
117 log(debug, ~"global != 0");
118 cast::reinterpret_cast(&*global)
123 pub fn test_from_global_chan1() {
125 // This is unreadable, right?
127 // The global channel
129 let globchanp = ptr::addr_of(&globchan);
131 // Create the global channel, attached to a new task
133 do chan_from_global_ptr(globchanp, task::task) |po| {
134 let ch = comm::recv(po);
135 comm::send(ch, true);
136 let ch = comm::recv(po);
137 comm::send(ch, true);
141 let po = comm::Port();
142 comm::send(ch, comm::Chan(&po));
143 assert comm::recv(po) == true;
145 // This one just reuses the previous channel
147 do chan_from_global_ptr(globchanp, task::task) |po| {
148 let ch = comm::recv(po);
149 comm::send(ch, false);
153 // Talk to the original global task
154 let po = comm::Port();
155 comm::send(ch, comm::Chan(&po));
156 assert comm::recv(po) == true;
160 pub fn test_from_global_chan2() {
162 for iter::repeat(100) {
163 // The global channel
165 let globchanp = ptr::addr_of(&globchan);
167 let resultpo = comm::Port();
168 let resultch = comm::Chan(&resultpo);
170 // Spawn a bunch of tasks that all want to compete to
171 // create the global channel
172 for uint::range(0, 10) |i| {
175 do chan_from_global_ptr(
176 globchanp, task::task) |po| {
178 for uint::range(0, 10) |_j| {
179 let ch = comm::recv(po);
184 let po = comm::Port();
185 comm::send(ch, comm::Chan(&po));
186 // We are The winner if our version of the
187 // task was installed
188 let winner = comm::recv(po);
189 comm::send(resultch, winner == i);
192 // There should be only one winner
193 let mut winners = 0u;
194 for uint::range(0u, 10u) |_i| {
195 let res = comm::recv(resultpo);
196 if res { winners += 1u };
198 assert winners == 1u;
203 * Convert the current task to a 'weak' task temporarily
205 * As a weak task it will not be counted towards the runtime's set
206 * of live tasks. When there are no more outstanding live (non-weak) tasks
207 * the runtime will send an exit message on the provided channel.
209 * This function is super-unsafe. Do not use.
213 * * Weak tasks must either die on their own or exit upon receipt of
214 * the exit message. Failure to do so will cause the runtime to never
216 * * Tasks must not call `weaken_task` multiple times. This will
217 * break the kernel's accounting of live tasks.
218 * * Weak tasks must not be supervised. A supervised task keeps
219 * a reference to its parent, so the parent will not die.
221 pub unsafe fn weaken_task(f: fn(comm::Port<()>)) {
222 let po = comm::Port();
223 let ch = comm::Chan(&po);
225 rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
227 let _unweaken = Unweaken(ch);
233 rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
237 fn Unweaken(ch: comm::Chan<()>) -> Unweaken {
245 pub fn test_weaken_task_then_unweaken() {
248 do weaken_task |_po| {
255 pub fn test_weaken_task_wait() {
256 do task::spawn_unlinked {
258 do weaken_task |po| {
266 pub fn test_weaken_task_stress() {
267 // Create a bunch of weak tasks
268 for iter::repeat(100u) {
271 do weaken_task |_po| {
275 do task::spawn_unlinked {
277 do weaken_task |po| {
278 // Wait for it to tell us to die
287 #[ignore(cfg(windows))]
288 pub fn test_weaken_task_fail() {
289 let res = do task::try {
291 do weaken_task |_po| {
296 assert result::is_err(&res);
299 /****************************************************************************
300 * Shared state & exclusive ARC
301 ****************************************************************************/
303 // An unwrapper uses this protocol to communicate with the "other" task that
304 // drops the last refcount on an arc. Unfortunately this can't be a proper
305 // pipe protocol because the unwrapper has to access both stages at once.
306 type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
309 mut count: libc::intptr_t,
310 mut unwrapper: libc::uintptr_t, // either a UnwrapProto or 0
311 // FIXME(#3224) should be able to make this non-option to save memory, and
312 // in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
316 struct ArcDestruct<T> {
317 mut data: *libc::c_void,
319 if self.data.is_null() {
320 return; // Happens when destructing an unwrapper's handle.
322 do task::unkillable {
323 let data: ~ArcData<T> = cast::reinterpret_cast(&self.data);
324 let new_count = rusti::atomic_xsub(&mut data.count, 1) - 1;
325 assert new_count >= 0;
327 // Were we really last, or should we hand off to an unwrapper?
328 // It's safe to not xchg because the unwrapper will set the
329 // unwrap lock *before* dropping his/her reference. In effect,
330 // being here means we're the only *awake* task with the data.
331 if data.unwrapper != 0 {
333 cast::reinterpret_cast(&data.unwrapper);
334 let (message, response) = option::swap_unwrap(p);
335 // Send 'ready' and wait for a response.
336 pipes::send_one(move message, ());
337 // Unkillable wait. Message guaranteed to come.
338 if pipes::recv_one(move response) {
339 // Other task got the data.
340 cast::forget(move data);
342 // Other task was killed. drop glue takes over.
345 // drop glue takes over.
348 cast::forget(move data);
354 fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
360 pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
362 struct DeathThroes<T> {
363 mut ptr: Option<~ArcData<T>>,
364 mut response: Option<pipes::ChanOne<bool>>,
366 let response = option::swap_unwrap(&mut self.response);
367 // In case we get killed early, we need to tell the person who
368 // tried to wake us whether they should hand-off the data to us.
370 pipes::send_one(move response, false);
371 // Either this swap_unwrap or the one below (at "Got here")
373 cast::forget(option::swap_unwrap(&mut self.ptr));
375 assert self.ptr.is_none();
376 pipes::send_one(move response, true);
381 do task::unkillable {
382 let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
383 let (c1,p1) = pipes::oneshot(); // ()
384 let (c2,p2) = pipes::oneshot(); // bool
385 let server: UnwrapProto = ~mut Some((move c1,move p2));
386 let serverp: libc::uintptr_t = cast::transmute(move server);
387 // Try to put our server end in the unwrapper slot.
388 if rustrt::rust_compare_and_swap_ptr(&mut ptr.unwrapper, 0, serverp) {
389 // Got in. Step 0: Tell destructor not to run. We are now it.
390 rc.data = ptr::null();
391 // Step 1 - drop our own reference.
392 let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
393 //assert new_count >= 0;
395 // We were the last owner. Can unwrap immediately.
396 // Also we have to free the server endpoints.
397 let _server: UnwrapProto = cast::transmute(move serverp);
398 option::swap_unwrap(&mut ptr.data)
399 // drop glue takes over.
401 // The *next* person who sees the refcount hit 0 will wake us.
403 DeathThroes { ptr: Some(move ptr),
404 response: Some(move c2) };
405 let mut p1 = Some(move p1); // argh
406 do task::rekillable {
407 pipes::recv_one(option::swap_unwrap(&mut p1));
409 // Got here. Back in the 'unkillable' without getting killed.
410 // Recover ownership of ptr, then take the data out.
411 let ptr = option::swap_unwrap(&mut end_result.ptr);
412 option::swap_unwrap(&mut ptr.data)
413 // drop glue takes over.
416 // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
417 cast::forget(move ptr);
418 // Also we have to free the (rejected) server endpoints.
419 let _server: UnwrapProto = cast::transmute(move serverp);
420 fail ~"Another task is already unwrapping this ARC!";
426 * COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
428 * Data races between tasks can result in crashes and, with sufficient
429 * cleverness, arbitrary type coercion.
431 pub type SharedMutableState<T: Send> = ArcDestruct<T>;
433 pub unsafe fn shared_mutable_state<T: Send>(data: T) ->
434 SharedMutableState<T> {
435 let data = ~ArcData { count: 1, unwrapper: 0, data: Some(move data) };
437 let ptr = cast::transmute(move data);
443 pub unsafe fn get_shared_mutable_state<T: Send>(rc: &a/SharedMutableState<T>)
446 let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
447 assert ptr.count > 0;
448 // Cast us back into the correct region
449 let r = cast::transmute_region(option::get_ref(&ptr.data));
450 cast::forget(move ptr);
451 return cast::transmute_mut(r);
455 pub unsafe fn get_shared_immutable_state<T: Send>(
456 rc: &a/SharedMutableState<T>) -> &a/T {
458 let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
459 assert ptr.count > 0;
460 // Cast us back into the correct region
461 let r = cast::transmute_region(option::get_ref(&ptr.data));
462 cast::forget(move ptr);
467 pub unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
468 -> SharedMutableState<T> {
470 let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
471 let new_count = rusti::atomic_xadd(&mut ptr.count, 1) + 1;
472 assert new_count >= 2;
473 cast::forget(move ptr);
475 ArcDestruct((*rc).data)
478 /****************************************************************************/
480 #[allow(non_camel_case_types)] // runtime type
481 type rust_little_lock = *libc::c_void;
485 drop { rustrt::rust_destroy_little_lock(self.l); }
488 fn LittleLock() -> LittleLock {
490 l: rustrt::rust_create_little_lock()
496 unsafe fn lock<T>(f: fn() -> T) -> T {
499 drop { rustrt::rust_unlock_little_lock(self.l); }
502 fn Unlock(l: rust_little_lock) -> Unlock {
509 rustrt::rust_lock_little_lock(self.l);
510 let _r = Unlock(self.l);
516 struct ExData<T: Send> { lock: LittleLock, mut failed: bool, mut data: T, }
518 * An arc over mutable data that is protected by a lock. For library use only.
520 pub struct Exclusive<T: Send> { x: SharedMutableState<ExData<T>> }
522 pub fn exclusive<T:Send >(user_data: T) -> Exclusive<T> {
524 lock: LittleLock(), mut failed: false, mut data: move user_data
526 Exclusive { x: unsafe { shared_mutable_state(move data) } }
529 impl<T: Send> Exclusive<T> {
530 // Duplicate an exclusive ARC, as std::arc::clone.
531 fn clone() -> Exclusive<T> {
532 Exclusive { x: unsafe { clone_shared_mutable_state(&self.x) } }
535 // Exactly like std::arc::mutex_arc,access(), but with the little_lock
536 // instead of a proper mutex. Same reason for being unsafe.
538 // Currently, scheduling operations (i.e., yielding, receiving on a pipe,
539 // accessing the provided condition variable) are prohibited while inside
540 // the exclusive. Supporting that is a work in progress.
542 unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
543 let rec = unsafe { get_shared_mutable_state(&self.x) };
546 fail ~"Poisoned exclusive - another task failed inside!";
549 let result = f(&mut rec.data);
556 unsafe fn with_imm<U>(f: fn(x: &T) -> U) -> U {
558 f(cast::transmute_immut(x))
563 // FIXME(#3724) make this a by-move method on the exclusive
564 pub fn unwrap_exclusive<T: Send>(arc: Exclusive<T>) -> T {
565 let Exclusive { x: x } <- arc;
566 let inner = unsafe { unwrap_shared_mutable_state(move x) };
567 let ExData { data: data, _ } <- inner;
574 pub fn exclusive_arc() {
575 let mut futures = ~[];
580 let total = exclusive(~mut 0);
582 for uint::range(0, num_tasks) |_i| {
583 let total = total.clone();
584 futures.push(future::spawn(|move total| {
585 for uint::range(0, count) |_i| {
586 do total.with |count| {
593 for futures.each |f| { f.get() }
595 do total.with |total| {
596 assert **total == num_tasks * count
600 #[test] #[should_fail] #[ignore(cfg(windows))]
601 pub fn exclusive_poison() {
602 // Tests that if one task fails inside of an exclusive, subsequent
603 // accesses will also fail.
604 let x = exclusive(1);
606 do task::try |move x2| {
617 pub fn exclusive_unwrap_basic() {
618 let x = exclusive(~~"hello");
619 assert unwrap_exclusive(move x) == ~~"hello";
623 pub fn exclusive_unwrap_contended() {
624 let x = exclusive(~~"hello");
625 let x2 = ~mut Some(x.clone());
626 do task::spawn |move x2| {
627 let x2 = option::swap_unwrap(x2);
628 do x2.with |_hello| { }
631 assert unwrap_exclusive(move x) == ~~"hello";
633 // Now try the same thing, but with the child task blocking.
634 let x = exclusive(~~"hello");
635 let x2 = ~mut Some(x.clone());
637 do task::task().future_result(|+r| res = Some(move r)).spawn
639 let x2 = option::swap_unwrap(x2);
640 assert unwrap_exclusive(move x2) == ~~"hello";
642 // Have to get rid of our reference before blocking.
643 { let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
644 let res = option::swap_unwrap(&mut res);
648 #[test] #[should_fail] #[ignore(cfg(windows))]
649 pub fn exclusive_unwrap_conflict() {
650 let x = exclusive(~~"hello");
651 let x2 = ~mut Some(x.clone());
653 do task::task().future_result(|+r| res = Some(move r)).spawn
655 let x2 = option::swap_unwrap(x2);
656 assert unwrap_exclusive(move x2) == ~~"hello";
658 assert unwrap_exclusive(move x) == ~~"hello";
659 let res = option::swap_unwrap(&mut res);
663 #[test] #[ignore(cfg(windows))]
664 pub fn exclusive_unwrap_deadlock() {
665 // This is not guaranteed to get to the deadlock before being killed,
666 // but it will show up sometimes, and if the deadlock were not there,
667 // the test would nondeterministically fail.
668 let result = do task::try {
669 // a task that has two references to the same exclusive will
670 // deadlock when it unwraps. nothing to be done about that.
671 let x = exclusive(~~"hello");
674 for 10.times { task::yield(); } // try to let the unwrapper go
675 fail; // punt it awake from its deadlock
677 let _z = unwrap_exclusive(move x);
678 do x2.with |_hello| { }
680 assert result.is_err();