// 'do_resched' configures whether the scheduler immediately switches to
// the receiving task, or leaves the sending task still running.
- fn try_send_inner(self, val: T, do_resched: bool) -> bool {
+ fn try_send_inner(mut self, val: T, do_resched: bool) -> bool {
if do_resched {
rtassert!(!rt::in_sched_context());
}
sched.maybe_yield();
}
- let mut this = self;
let mut recvr_active = true;
- let packet = this.packet();
+ let packet = self.packet();
unsafe {
// done with the packet. NB: In case of do_resched, this *must*
// happen before waking up a blocked task (or be unkillable),
// because we might get a kill signal during the reschedule.
- this.suppress_finalize = true;
+ self.suppress_finalize = true;
match oldstate {
STATE_BOTH => {
}
STATE_ONE => {
// Port has closed. Need to clean up.
- let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+ let _packet: ~Packet<T> = cast::transmute(self.void_packet);
recvr_active = false;
}
task_as_state => {
}
/// As `recv`, but returns `None` if the send end is closed rather than failing.
- pub fn try_recv(self) -> Option<T> {
- let mut this = self;
-
+ pub fn try_recv(mut self) -> Option<T> {
// Optimistic check. If data was sent already, we don't even need to block.
// No release barrier needed here; we're not handing off our task pointer yet.
- if !this.optimistic_check() {
+ if !self.optimistic_check() {
// No data available yet.
// Switch to the scheduler to put the ~Task into the Packet state.
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |sched, task| {
- this.block_on(sched, task);
+ self.block_on(sched, task);
}
}
// Task resumes.
- this.recv_ready()
+ self.recv_ready()
}
}
impl<T> Select for PortOne<T> { }
impl<T> SelectPortInner<T> for PortOne<T> {
- fn recv_ready(self) -> Option<T> {
- let mut this = self;
- let packet = this.packet();
+ fn recv_ready(mut self) -> Option<T> {
+ let packet = self.packet();
// No further memory barrier is needed here to access the
// payload. Some scenarios:
let payload = (*packet).payload.take();
// The sender has closed up shop. Drop the packet.
- let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+ let _packet: ~Packet<T> = cast::transmute(self.void_packet);
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
- this.suppress_finalize = true;
+ self.suppress_finalize = true;
return payload;
}
}
if self.suppress_finalize { return }
unsafe {
- let this = cast::transmute_mut(self);
- let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
+ let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port still active. It will destroy the Packet.
},
STATE_ONE => {
- let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+ let _packet: ~Packet<T> = cast::transmute(self.void_packet);
},
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
- rtassert!((*this.packet()).payload.is_none());
+ rtassert!((*self.packet()).payload.is_none());
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map |woken_task| {
Scheduler::run_task(woken_task);
if self.suppress_finalize { return }
unsafe {
- let this = cast::transmute_mut(self);
- let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
+ let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Chan still active. It will destroy the packet.
},
STATE_ONE => {
- let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+ let _packet: ~Packet<T> = cast::transmute(self.void_packet);
}
task_as_state => {
// This case occurs during unwinding, when the blocked
// Take a main task to run, and a scheduler to run it in. Create a
// scheduler task and bootstrap into it.
- pub fn bootstrap(~self, task: ~Task) {
-
- let mut this = self;
+ pub fn bootstrap(mut ~self, task: ~Task) {
// Build an Idle callback.
- this.idle_callback = Some(this.event_loop.pausible_idle_callback());
+ self.idle_callback = Some(self.event_loop.pausible_idle_callback());
// Initialize the TLS key.
local_ptr::init_tls_key();
// Before starting our first task, make sure the idle callback
// is active. As we do not start in the sleep state this is
// important.
- this.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
+ self.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
// Now, as far as all the scheduler state is concerned, we are
// inside the "scheduler" context. So we can act like the
// scheduler and resume the provided task.
- this.resume_task_immediately(task);
+ self.resume_task_immediately(task);
// Now we are back in the scheduler context, having
// successfully run the input task. Start by running the
// This does not return a scheduler, as the scheduler is placed
// inside the task.
- pub fn run(~self) {
-
- let mut self_sched = self;
+ pub fn run(mut ~self) {
// This is unsafe because we need to place the scheduler, with
// the event_loop inside, inside our task. But we still need a
// mutable reference to the event_loop to give it the "run"
// command.
unsafe {
- let event_loop: *mut ~EventLoop = &mut self_sched.event_loop;
+ let event_loop: *mut ~EventLoop = &mut self.event_loop;
// Our scheduler must be in the task before the event loop
// is started.
- let self_sched = Cell::new(self_sched);
+ let self_sched = Cell::new(self);
do Local::borrow |stask: &mut Task| {
stask.sched = Some(self_sched.take());
};
// returns the still-available scheduler. At this point all
// message-handling will count as a turn of work, and as a result
// return None.
- fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
-
- let mut this = self;
+ fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> {
let msg = if effort == DontTryTooHard {
// Do a cheap check that may miss messages
- this.message_queue.casual_pop()
+ self.message_queue.casual_pop()
} else {
- this.message_queue.pop()
+ self.message_queue.pop()
};
match msg {
Some(PinnedTask(task)) => {
let mut task = task;
- task.give_home(Sched(this.make_handle()));
- this.resume_task_immediately(task);
+ task.give_home(Sched(self.make_handle()));
+ self.resume_task_immediately(task);
return None;
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
- this.process_task(task, Scheduler::resume_task_immediately_cl);
+ self.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing).
- Scheduler::resume_task_immediately_cl(this, task);
+ Scheduler::resume_task_immediately_cl(self, task);
return None;
}
Some(Wake) => {
- this.sleepy = false;
- Local::put(this);
+ self.sleepy = false;
+ Local::put(self);
return None;
}
Some(Shutdown) => {
rtdebug!("shutting down");
- if this.sleepy {
+ if self.sleepy {
// There may be an outstanding handle on the
// sleeper list. Pop them all to make sure that's
// not the case.
loop {
- match this.sleeper_list.pop() {
+ match self.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake);
}
// No more sleeping. After there are no outstanding
// event loop references we will shut down.
- this.no_sleep = true;
- this.sleepy = false;
- Local::put(this);
+ self.no_sleep = true;
+ self.sleepy = false;
+ Local::put(self);
return None;
}
None => {
- return Some(this);
+ return Some(self);
}
}
}
- fn do_work(~self) -> Option<~Scheduler> {
- let mut this = self;
-
+ fn do_work(mut ~self) -> Option<~Scheduler> {
rtdebug!("scheduler calling do work");
- match this.find_work() {
+ match self.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
- this.process_task(task, Scheduler::resume_task_immediately_cl);
+ self.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
- return Some(this);
+ return Some(self);
}
}
}
// * Task Routing Functions - Make sure tasks send up in the right
// place.
- fn process_task(~self, task: ~Task,
+ fn process_task(mut ~self, mut task: ~Task,
schedule_fn: SchedulingFn) {
- let mut this = self;
- let mut task = task;
-
rtdebug!("processing a task");
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
- if home_handle.sched_id != this.sched_id() {
+ if home_handle.sched_id != self.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
- Local::put(this);
+ Local::put(self);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
- schedule_fn(this, task);
+ schedule_fn(self, task);
}
}
- AnySched if this.run_anything => {
+ AnySched if self.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
- schedule_fn(this, task);
+ schedule_fn(self, task);
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
- this.send_to_friend(task);
- Local::put(this);
+ self.send_to_friend(task);
+ Local::put(self);
}
}
}
/// to the work queue directly.
pub fn enqueue_task(&mut self, task: ~Task) {
- let this = self;
-
// We push the task onto our local queue clone.
- this.work_queue.push(task);
- this.idle_callback.get_mut_ref().resume();
+ self.work_queue.push(task);
+ self.idle_callback.get_mut_ref().resume();
// We've made work available. Notify a
// sleeping scheduler.
- match this.sleeper_list.casual_pop() {
+ match self.sleeper_list.casual_pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
// cleanup function f, which takes the scheduler and the
// old task as inputs.
- pub fn change_task_context(~self,
+ pub fn change_task_context(mut ~self,
next_task: ~Task,
f: &fn(&mut Scheduler, ~Task)) {
- let mut this = self;
-
// The current task is grabbed from TLS, not taken as an input.
// Doing an unsafe_take to avoid writing back a null pointer -
// We're going to call `put` later to do that.
// The current task is placed inside an enum with the cleanup
// function. This enum is then placed inside the scheduler.
- this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
+ self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
// The scheduler is then placed inside the next task.
let mut next_task = next_task;
- next_task.sched = Some(this);
+ next_task.sched = Some(self);
// However we still need an internal mutable pointer to the
// original task. The strategy here was "arrange memory, then
/// This passes a Scheduler pointer to the fn after the context switch
/// in order to prevent that fn from performing further scheduling operations.
/// Doing further scheduling could easily result in infinite recursion.
- pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
+ pub fn deschedule_running_task_and_then(mut ~self,
+ f: &fn(&mut Scheduler, BlockedTask)) {
// Trickier - we need to get the scheduler task out of self
// and use it as the destination.
- let mut this = self;
- let stask = this.sched_task.take_unwrap();
+ let stask = self.sched_task.take_unwrap();
// Otherwise this is the same as below.
- this.switch_running_tasks_and_then(stask, f);
+ self.switch_running_tasks_and_then(stask, f);
}
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
- pub fn terminate_current_task(~self) {
+ pub fn terminate_current_task(mut ~self) {
// Similar to deschedule running task and then, but cannot go through
// the task-blocking path. The task is already dying.
- let mut this = self;
- let stask = this.sched_task.take_unwrap();
- do this.change_task_context(stask) |sched, mut dead_task| {
+ let stask = self.sched_task.take_unwrap();
+ do self.change_task_context(stask) |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
}
/// to introduce some amount of randomness to the scheduler. Currently the
/// randomness is a result of performing a round of work stealing (which
/// may end up stealing from the current scheduler).
- pub fn yield_now(~self) {
- let mut this = self;
- this.yield_check_count = reset_yield_check(&mut this.rng);
+ pub fn yield_now(mut ~self) {
+ self.yield_check_count = reset_yield_check(&mut self.rng);
// Tell the scheduler to start stealing on the next iteration
- this.steal_for_yield = true;
- do this.deschedule_running_task_and_then |sched, task| {
+ self.steal_for_yield = true;
+ do self.deschedule_running_task_and_then |sched, task| {
sched.enqueue_blocked_task(task);
}
}
- pub fn maybe_yield(~self) {
+ pub fn maybe_yield(mut ~self) {
// The number of times to do the yield check before yielding, chosen arbitrarily.
- let mut this = self;
- rtassert!(this.yield_check_count > 0);
- this.yield_check_count -= 1;
- if this.yield_check_count == 0 {
- this.yield_now();
+ rtassert!(self.yield_check_count > 0);
+ self.yield_check_count -= 1;
+ if self.yield_check_count == 0 {
+ self.yield_now();
} else {
- Local::put(this);
+ Local::put(self);
}
}