]> git.lizzy.rs Git - rust.git/blob - src/libcore/private.rs
Fix compare_and_swap to not break tests.
[rust.git] / src / libcore / private.rs
1 // NB: transitionary, de-mode-ing.
2 // tjc: Re-forbid deprecated modes once a snapshot fixes the
3 // function problem
4 #[forbid(deprecated_pattern)];
5
6 #[doc(hidden)];
7
8 use task::TaskBuilder;
9 use task::atomically;
10
11 extern mod rustrt {
12     #[legacy_exports];
13     fn rust_task_weaken(ch: rust_port_id);
14     fn rust_task_unweaken(ch: rust_port_id);
15
16     #[rust_stack]
17     fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
18                                  oldval: libc::uintptr_t,
19                                  newval: libc::uintptr_t) -> bool;
20
21     fn rust_create_little_lock() -> rust_little_lock;
22     fn rust_destroy_little_lock(lock: rust_little_lock);
23     fn rust_lock_little_lock(lock: rust_little_lock);
24     fn rust_unlock_little_lock(lock: rust_little_lock);
25 }
26
27 #[abi = "rust-intrinsic"]
28 extern mod rusti {
29
30     #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)]    
31     fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
32     fn atomic_xadd(dst: &mut int, src: int) -> int;
33     fn atomic_xsub(dst: &mut int, src: int) -> int;
34 }
35
36 #[allow(non_camel_case_types)] // runtime type
37 type rust_port_id = uint;
38
39 type GlobalPtr = *libc::uintptr_t;
40
41 // TODO: Remove once snapshots have atomic_cxchg
42 #[cfg(stage0)]
43 fn compare_and_swap(address: &mut libc::uintptr_t,
44                     oldval: libc::uintptr_t,
45                     newval: libc::uintptr_t) -> bool {
46     rustrt::rust_compare_and_swap_ptr(address, oldval, newval)
47 }
48
49 #[cfg(stage1)]
50 #[cfg(stage2)]
51 #[cfg(stage3)]
52 fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
53     let old = rusti::atomic_cxchg(address, oldval, newval);
54     old == oldval
55 }
56
57 /**
58  * Atomically gets a channel from a pointer to a pointer-sized memory location
59  * or, if no channel exists creates and installs a new channel and sets up a
60  * new task to receive from it.
61  */
62 pub unsafe fn chan_from_global_ptr<T: Send>(
63     global: GlobalPtr,
64     task_fn: fn() -> task::TaskBuilder,
65     f: fn~(comm::Port<T>)
66 ) -> comm::Chan<T> {
67
68     enum Msg {
69         Proceed,
70         Abort
71     }
72
73     log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
74     let is_probably_zero = *global == 0u;
75     log(debug,~"after is_prob_zero check");
76     if is_probably_zero {
77         log(debug,~"is probably zero...");
78         // There's no global channel. We must make it
79
80         let (setup_po, setup_ch) = do task_fn().spawn_conversation
81             |move f, setup_po, setup_ch| {
82             let po = comm::Port::<T>();
83             let ch = comm::Chan(&po);
84             comm::send(setup_ch, ch);
85
86             // Wait to hear if we are the official instance of
87             // this global task
88             match comm::recv::<Msg>(setup_po) {
89               Proceed => f(move po),
90               Abort => ()
91             }
92         };
93
94         log(debug,~"before setup recv..");
95         // This is the proposed global channel
96         let ch = comm::recv(setup_po);
97         // 0 is our sentinal value. It is not a valid channel
98         assert *ch != 0;
99
100         // Install the channel
101         log(debug,~"BEFORE COMPARE AND SWAP");
102         let swapped = compare_and_swap(
103             cast::reinterpret_cast(&global),
104             0, cast::reinterpret_cast(&ch));
105         log(debug,fmt!("AFTER .. swapped? %?", swapped));
106
107         if swapped {
108             // Success!
109             comm::send(setup_ch, Proceed);
110             ch
111         } else {
112             // Somebody else got in before we did
113             comm::send(setup_ch, Abort);
114             cast::reinterpret_cast(&*global)
115         }
116     } else {
117         log(debug, ~"global != 0");
118         cast::reinterpret_cast(&*global)
119     }
120 }
121
122 #[test]
123 pub fn test_from_global_chan1() {
124
125     // This is unreadable, right?
126
127     // The global channel
128     let globchan = 0;
129     let globchanp = ptr::addr_of(&globchan);
130
131     // Create the global channel, attached to a new task
132     let ch = unsafe {
133         do chan_from_global_ptr(globchanp, task::task) |po| {
134             let ch = comm::recv(po);
135             comm::send(ch, true);
136             let ch = comm::recv(po);
137             comm::send(ch, true);
138         }
139     };
140     // Talk to it
141     let po = comm::Port();
142     comm::send(ch, comm::Chan(&po));
143     assert comm::recv(po) == true;
144
145     // This one just reuses the previous channel
146     let ch = unsafe {
147         do chan_from_global_ptr(globchanp, task::task) |po| {
148             let ch = comm::recv(po);
149             comm::send(ch, false);
150         }
151     };
152
153     // Talk to the original global task
154     let po = comm::Port();
155     comm::send(ch, comm::Chan(&po));
156     assert comm::recv(po) == true;
157 }
158
159 #[test]
160 pub fn test_from_global_chan2() {
161
162     for iter::repeat(100) {
163         // The global channel
164         let globchan = 0;
165         let globchanp = ptr::addr_of(&globchan);
166
167         let resultpo = comm::Port();
168         let resultch = comm::Chan(&resultpo);
169
170         // Spawn a bunch of tasks that all want to compete to
171         // create the global channel
172         for uint::range(0, 10) |i| {
173             do task::spawn {
174                 let ch = unsafe {
175                     do chan_from_global_ptr(
176                         globchanp, task::task) |po| {
177
178                         for uint::range(0, 10) |_j| {
179                             let ch = comm::recv(po);
180                             comm::send(ch, {i});
181                         }
182                     }
183                 };
184                 let po = comm::Port();
185                 comm::send(ch, comm::Chan(&po));
186                 // We are The winner if our version of the
187                 // task was installed
188                 let winner = comm::recv(po);
189                 comm::send(resultch, winner == i);
190             }
191         }
192         // There should be only one winner
193         let mut winners = 0u;
194         for uint::range(0u, 10u) |_i| {
195             let res = comm::recv(resultpo);
196             if res { winners += 1u };
197         }
198         assert winners == 1u;
199     }
200 }
201
202 /**
203  * Convert the current task to a 'weak' task temporarily
204  *
205  * As a weak task it will not be counted towards the runtime's set
206  * of live tasks. When there are no more outstanding live (non-weak) tasks
207  * the runtime will send an exit message on the provided channel.
208  *
209  * This function is super-unsafe. Do not use.
210  *
211  * # Safety notes
212  *
213  * * Weak tasks must either die on their own or exit upon receipt of
214  *   the exit message. Failure to do so will cause the runtime to never
215  *   exit
216  * * Tasks must not call `weaken_task` multiple times. This will
217  *   break the kernel's accounting of live tasks.
218  * * Weak tasks must not be supervised. A supervised task keeps
219  *   a reference to its parent, so the parent will not die.
220  */
221 pub unsafe fn weaken_task(f: fn(comm::Port<()>)) {
222     let po = comm::Port();
223     let ch = comm::Chan(&po);
224     unsafe {
225         rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
226     }
227     let _unweaken = Unweaken(ch);
228     f(po);
229
230     struct Unweaken {
231       ch: comm::Chan<()>,
232       drop unsafe {
233         rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
234       }
235     }
236
237     fn Unweaken(ch: comm::Chan<()>) -> Unweaken {
238         Unweaken {
239             ch: ch
240         }
241     }
242 }
243
244 #[test]
245 pub fn test_weaken_task_then_unweaken() {
246     do task::try {
247         unsafe {
248             do weaken_task |_po| {
249             }
250         }
251     };
252 }
253
254 #[test]
255 pub fn test_weaken_task_wait() {
256     do task::spawn_unlinked {
257         unsafe {
258             do weaken_task |po| {
259                 comm::recv(po);
260             }
261         }
262     }
263 }
264
265 #[test]
266 pub fn test_weaken_task_stress() {
267     // Create a bunch of weak tasks
268     for iter::repeat(100u) {
269         do task::spawn {
270             unsafe {
271                 do weaken_task |_po| {
272                 }
273             }
274         }
275         do task::spawn_unlinked {
276             unsafe {
277                 do weaken_task |po| {
278                     // Wait for it to tell us to die
279                     comm::recv(po);
280                 }
281             }
282         }
283     }
284 }
285
286 #[test]
287 #[ignore(cfg(windows))]
288 pub fn test_weaken_task_fail() {
289     let res = do task::try {
290         unsafe {
291             do weaken_task |_po| {
292                 fail;
293             }
294         }
295     };
296     assert result::is_err(&res);
297 }
298
299 /****************************************************************************
300  * Shared state & exclusive ARC
301  ****************************************************************************/
302
303 // An unwrapper uses this protocol to communicate with the "other" task that
304 // drops the last refcount on an arc. Unfortunately this can't be a proper
305 // pipe protocol because the unwrapper has to access both stages at once.
306 type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
307
308 struct ArcData<T> {
309     mut count:     libc::intptr_t,
310     mut unwrapper: libc::uintptr_t, // either a UnwrapProto or 0
311     // FIXME(#3224) should be able to make this non-option to save memory, and
312     // in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
313     mut data:      Option<T>,
314 }
315
316 struct ArcDestruct<T> {
317     mut data: *libc::c_void,
318     drop unsafe {
319         if self.data.is_null() {
320             return; // Happens when destructing an unwrapper's handle.
321         }
322         do task::unkillable {
323             let data: ~ArcData<T> = cast::reinterpret_cast(&self.data);
324             let new_count = rusti::atomic_xsub(&mut data.count, 1) - 1;
325             assert new_count >= 0;
326             if new_count == 0 {
327                 // Were we really last, or should we hand off to an unwrapper?
328                 // It's safe to not xchg because the unwrapper will set the
329                 // unwrap lock *before* dropping his/her reference. In effect,
330                 // being here means we're the only *awake* task with the data.
331                 if data.unwrapper != 0 {
332                     let p: UnwrapProto =
333                         cast::reinterpret_cast(&data.unwrapper);
334                     let (message, response) = option::swap_unwrap(p);
335                     // Send 'ready' and wait for a response.
336                     pipes::send_one(move message, ());
337                     // Unkillable wait. Message guaranteed to come.
338                     if pipes::recv_one(move response) {
339                         // Other task got the data.
340                         cast::forget(move data);
341                     } else {
342                         // Other task was killed. drop glue takes over.
343                     }
344                 } else {
345                     // drop glue takes over.
346                 }
347             } else {
348                 cast::forget(move data);
349             }
350         }
351     }
352 }
353
354 fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
355     ArcDestruct {
356         data: data
357     }
358 }
359
360 pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
361         -> T {
362     struct DeathThroes<T> {
363         mut ptr:      Option<~ArcData<T>>,
364         mut response: Option<pipes::ChanOne<bool>>,
365         drop unsafe {
366             let response = option::swap_unwrap(&mut self.response);
367             // In case we get killed early, we need to tell the person who
368             // tried to wake us whether they should hand-off the data to us.
369             if task::failing() {
370                 pipes::send_one(move response, false);
371                 // Either this swap_unwrap or the one below (at "Got here")
372                 // ought to run.
373                 cast::forget(option::swap_unwrap(&mut self.ptr));
374             } else {
375                 assert self.ptr.is_none();
376                 pipes::send_one(move response, true);
377             }
378         }
379     }
380
381     do task::unkillable {
382         let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
383         let (c1,p1) = pipes::oneshot(); // ()
384         let (c2,p2) = pipes::oneshot(); // bool
385         let server: UnwrapProto = ~mut Some((move c1,move p2));
386         let serverp: libc::uintptr_t = cast::transmute(move server);
387         // Try to put our server end in the unwrapper slot.
388         if rustrt::rust_compare_and_swap_ptr(&mut ptr.unwrapper, 0, serverp) {
389             // Got in. Step 0: Tell destructor not to run. We are now it.
390             rc.data = ptr::null();
391             // Step 1 - drop our own reference.
392             let new_count = rusti::atomic_xsub(&mut ptr.count, 1) - 1;
393             //assert new_count >= 0;
394             if new_count == 0 {
395                 // We were the last owner. Can unwrap immediately.
396                 // Also we have to free the server endpoints.
397                 let _server: UnwrapProto = cast::transmute(move serverp);
398                 option::swap_unwrap(&mut ptr.data)
399                 // drop glue takes over.
400             } else {
401                 // The *next* person who sees the refcount hit 0 will wake us.
402                 let end_result =
403                     DeathThroes { ptr: Some(move ptr),
404                                   response: Some(move c2) };
405                 let mut p1 = Some(move p1); // argh
406                 do task::rekillable {
407                     pipes::recv_one(option::swap_unwrap(&mut p1));
408                 }
409                 // Got here. Back in the 'unkillable' without getting killed.
410                 // Recover ownership of ptr, then take the data out.
411                 let ptr = option::swap_unwrap(&mut end_result.ptr);
412                 option::swap_unwrap(&mut ptr.data)
413                 // drop glue takes over.
414             }
415         } else {
416             // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
417             cast::forget(move ptr);
418             // Also we have to free the (rejected) server endpoints.
419             let _server: UnwrapProto = cast::transmute(move serverp);
420             fail ~"Another task is already unwrapping this ARC!";
421         }
422     }
423 }
424
425 /**
426  * COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
427  *
428  * Data races between tasks can result in crashes and, with sufficient
429  * cleverness, arbitrary type coercion.
430  */
431 pub type SharedMutableState<T: Send> = ArcDestruct<T>;
432
433 pub unsafe fn shared_mutable_state<T: Send>(data: T) ->
434         SharedMutableState<T> {
435     let data = ~ArcData { count: 1, unwrapper: 0, data: Some(move data) };
436     unsafe {
437         let ptr = cast::transmute(move data);
438         ArcDestruct(ptr)
439     }
440 }
441
442 #[inline(always)]
443 pub unsafe fn get_shared_mutable_state<T: Send>(rc: &a/SharedMutableState<T>)
444         -> &a/mut T {
445     unsafe {
446         let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
447         assert ptr.count > 0;
448         // Cast us back into the correct region
449         let r = cast::transmute_region(option::get_ref(&ptr.data));
450         cast::forget(move ptr);
451         return cast::transmute_mut(r);
452     }
453 }
454 #[inline(always)]
455 pub unsafe fn get_shared_immutable_state<T: Send>(
456         rc: &a/SharedMutableState<T>) -> &a/T {
457     unsafe {
458         let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
459         assert ptr.count > 0;
460         // Cast us back into the correct region
461         let r = cast::transmute_region(option::get_ref(&ptr.data));
462         cast::forget(move ptr);
463         return r;
464     }
465 }
466
467 pub unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
468         -> SharedMutableState<T> {
469     unsafe {
470         let ptr: ~ArcData<T> = cast::reinterpret_cast(&(*rc).data);
471         let new_count = rusti::atomic_xadd(&mut ptr.count, 1) + 1;
472         assert new_count >= 2;
473         cast::forget(move ptr);
474     }
475     ArcDestruct((*rc).data)
476 }
477
478 /****************************************************************************/
479
480 #[allow(non_camel_case_types)] // runtime type
481 type rust_little_lock = *libc::c_void;
482
483 struct LittleLock {
484     l: rust_little_lock,
485     drop { rustrt::rust_destroy_little_lock(self.l); }
486 }
487
488 fn LittleLock() -> LittleLock {
489     LittleLock {
490         l: rustrt::rust_create_little_lock()
491     }
492 }
493
494 impl LittleLock {
495     #[inline(always)]
496     unsafe fn lock<T>(f: fn() -> T) -> T {
497         struct Unlock {
498             l: rust_little_lock,
499             drop { rustrt::rust_unlock_little_lock(self.l); }
500         }
501
502         fn Unlock(l: rust_little_lock) -> Unlock {
503             Unlock {
504                 l: l
505             }
506         }
507
508         do atomically {
509             rustrt::rust_lock_little_lock(self.l);
510             let _r = Unlock(self.l);
511             f()
512         }
513     }
514 }
515
516 struct ExData<T: Send> { lock: LittleLock, mut failed: bool, mut data: T, }
517 /**
518  * An arc over mutable data that is protected by a lock. For library use only.
519  */
520 pub struct Exclusive<T: Send> { x: SharedMutableState<ExData<T>> }
521
522 pub fn exclusive<T:Send >(user_data: T) -> Exclusive<T> {
523     let data = ExData {
524         lock: LittleLock(), mut failed: false, mut data: move user_data
525     };
526     Exclusive { x: unsafe { shared_mutable_state(move data) } }
527 }
528
529 impl<T: Send> Exclusive<T> {
530     // Duplicate an exclusive ARC, as std::arc::clone.
531     fn clone() -> Exclusive<T> {
532         Exclusive { x: unsafe { clone_shared_mutable_state(&self.x) } }
533     }
534
535     // Exactly like std::arc::mutex_arc,access(), but with the little_lock
536     // instead of a proper mutex. Same reason for being unsafe.
537     //
538     // Currently, scheduling operations (i.e., yielding, receiving on a pipe,
539     // accessing the provided condition variable) are prohibited while inside
540     // the exclusive. Supporting that is a work in progress.
541     #[inline(always)]
542     unsafe fn with<U>(f: fn(x: &mut T) -> U) -> U {
543         let rec = unsafe { get_shared_mutable_state(&self.x) };
544         do rec.lock.lock {
545             if rec.failed {
546                 fail ~"Poisoned exclusive - another task failed inside!";
547             }
548             rec.failed = true;
549             let result = f(&mut rec.data);
550             rec.failed = false;
551             move result
552         }
553     }
554
555     #[inline(always)]
556     unsafe fn with_imm<U>(f: fn(x: &T) -> U) -> U {
557         do self.with |x| {
558             f(cast::transmute_immut(x))
559         }
560     }
561 }
562
563 // FIXME(#3724) make this a by-move method on the exclusive
564 pub fn unwrap_exclusive<T: Send>(arc: Exclusive<T>) -> T {
565     let Exclusive { x: x } <- arc;
566     let inner = unsafe { unwrap_shared_mutable_state(move x) };
567     let ExData { data: data, _ } <- inner;
568     move data
569 }
570
571 #[cfg(test)]
572 pub mod tests {
573     #[test]
574     pub fn exclusive_arc() {
575         let mut futures = ~[];
576
577         let num_tasks = 10;
578         let count = 10;
579
580         let total = exclusive(~mut 0);
581
582         for uint::range(0, num_tasks) |_i| {
583             let total = total.clone();
584             futures.push(future::spawn(|move total| {
585                 for uint::range(0, count) |_i| {
586                     do total.with |count| {
587                         **count += 1;
588                     }
589                 }
590             }));
591         };
592
593         for futures.each |f| { f.get() }
594
595         do total.with |total| {
596             assert **total == num_tasks * count
597         };
598     }
599
600     #[test] #[should_fail] #[ignore(cfg(windows))]
601     pub fn exclusive_poison() {
602         // Tests that if one task fails inside of an exclusive, subsequent
603         // accesses will also fail.
604         let x = exclusive(1);
605         let x2 = x.clone();
606         do task::try |move x2| {
607             do x2.with |one| {
608                 assert *one == 2;
609             }
610         };
611         do x.with |one| {
612             assert *one == 1;
613         }
614     }
615
616     #[test]
617     pub fn exclusive_unwrap_basic() {
618         let x = exclusive(~~"hello");
619         assert unwrap_exclusive(move x) == ~~"hello";
620     }
621
622     #[test]
623     pub fn exclusive_unwrap_contended() {
624         let x = exclusive(~~"hello");
625         let x2 = ~mut Some(x.clone());
626         do task::spawn |move x2| {
627             let x2 = option::swap_unwrap(x2);
628             do x2.with |_hello| { }
629             task::yield();
630         }
631         assert unwrap_exclusive(move x) == ~~"hello";
632
633         // Now try the same thing, but with the child task blocking.
634         let x = exclusive(~~"hello");
635         let x2 = ~mut Some(x.clone());
636         let mut res = None;
637         do task::task().future_result(|+r| res = Some(move r)).spawn
638               |move x2| {
639             let x2 = option::swap_unwrap(x2);
640             assert unwrap_exclusive(move x2) == ~~"hello";
641         }
642         // Have to get rid of our reference before blocking.
643         { let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
644         let res = option::swap_unwrap(&mut res);
645         future::get(&res);
646     }
647
648     #[test] #[should_fail] #[ignore(cfg(windows))]
649     pub fn exclusive_unwrap_conflict() {
650         let x = exclusive(~~"hello");
651         let x2 = ~mut Some(x.clone());
652         let mut res = None;
653         do task::task().future_result(|+r| res = Some(move r)).spawn
654            |move x2| {
655             let x2 = option::swap_unwrap(x2);
656             assert unwrap_exclusive(move x2) == ~~"hello";
657         }
658         assert unwrap_exclusive(move x) == ~~"hello";
659         let res = option::swap_unwrap(&mut res);
660         future::get(&res);
661     }
662
663     #[test] #[ignore(cfg(windows))]
664     pub fn exclusive_unwrap_deadlock() {
665         // This is not guaranteed to get to the deadlock before being killed,
666         // but it will show up sometimes, and if the deadlock were not there,
667         // the test would nondeterministically fail.
668         let result = do task::try {
669             // a task that has two references to the same exclusive will
670             // deadlock when it unwraps. nothing to be done about that.
671             let x = exclusive(~~"hello");
672             let x2 = x.clone();
673             do task::spawn {
674                 for 10.times { task::yield(); } // try to let the unwrapper go
675                 fail; // punt it awake from its deadlock
676             }
677             let _z = unwrap_exclusive(move x);
678             do x2.with |_hello| { }
679         };
680         assert result.is_err();
681     }
682 }