Skip to content

Commit

Permalink
Remove some redundant DB updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 18, 2022
1 parent cdc49b3 commit 6fa54c6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 39 deletions.
1 change: 0 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
Expand Down
65 changes: 34 additions & 31 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,32 @@ def load_from_point(self):
point = tdef.first_point(self.config.start_point)
self.spawn_to_rh_limit(tdef, point, {flow_num})

def add_to_pool(self, itask, is_new: bool = True) -> None:
def db_add_new_flow_rows(self, itask: TaskProxy) -> None:
"""Add new rows to DB task tables that record flow_nums.
Call when a new task is spawned or a flow merge occurs.
"""
# Add row to task_states table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums),
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
}
)
# Add row to task_outputs table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

def add_to_pool(self, itask) -> None:
"""Add a task to the hidden (if not satisfied) or main task pool.
If the task already exists in the hidden pool and is satisfied, move it
to the main pool.
(is_new is False inidcates load from DB at restart).
"""
if itask.is_task_prereqs_not_done() and not itask.is_manual_submit:
# Add to hidden pool if not satisfied.
Expand All @@ -209,23 +228,6 @@ def add_to_pool(self, itask, is_new: bool = True) -> None:

self.create_data_store_elements(itask)

if is_new:
# Add row to "task_states" table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums),
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
}
)
# Add row to "task_outputs" table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

if itask.tdef.max_future_prereq_offset is not None:
# (Must do this once added to the pool).
self.set_max_future_offset()
Expand Down Expand Up @@ -497,7 +499,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):

if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
self.add_to_pool(itask, is_new=False)
self.add_to_pool(itask)

# All tasks load as runahead-limited, but finished and manually
# triggered tasks (incl. --start-task's) can be released now.
Expand Down Expand Up @@ -634,8 +636,9 @@ def _get_spawned_or_merged_task(

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit."""
if not flow_nums:
# force-triggered no-flow task.
if not flow_nums or point is None:
# Force-triggered no-flow task.
# Or called with an invalid next_point.
return
if self.runahead_limit_point is None:
self.compute_runahead()
Expand Down Expand Up @@ -1211,7 +1214,6 @@ def spawn_on_output(self, itask, output, forced=False):
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
self.workflow_db_mgr.put_update_task_state(c_task)
elif (
c_task is None
and (itask.flow_nums or forced)
Expand Down Expand Up @@ -1481,6 +1483,7 @@ def spawn_task(
return None

LOG.info(f"[{itask}] spawned")
self.db_add_new_flow_rows(itask)
return itask

def force_spawn_children(
Expand Down Expand Up @@ -1587,7 +1590,6 @@ def force_trigger_tasks(
)
if itask is None:
continue
self.add_to_pool(itask, is_new=True)
itasks.append(itask)

# Trigger matched tasks if not already active.
Expand Down Expand Up @@ -1615,7 +1617,6 @@ def force_trigger_tasks(
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)

self.workflow_db_mgr.put_update_task_state(itask)
return len(unmatched)

def sim_time_check(self, message_queue):
Expand Down Expand Up @@ -1916,24 +1917,26 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
# and via suicide triggers ("A =>!A": A tries to spawn itself).
return

merge_with_no_flow = not itask.flow_nums

itask.merge_flows(flow_nums)
# Merged tasks get a new row in the db task_states table.
self.db_add_new_flow_rows(itask)

if (
itask.state(*TASK_STATUSES_FINAL)
and itask.state.outputs.get_incomplete()
):
# Re-queue incomplete task to run again in the merged flow.
LOG.info(f"[{itask}] incomplete task absorbed by new flow.")
itask.merge_flows(flow_nums)
itask.state_reset(TASK_STATUS_WAITING)
self.queue_task(itask)
self.data_store_mgr.delta_task_state(itask)

elif not itask.flow_nums or itask.flow_wait:
elif merge_with_no_flow or itask.flow_wait:
# 2. Retro-spawn on completed outputs and continue as merged flow.
LOG.info(f"[{itask}] spawning on pre-merge outputs")
itask.merge_flows(flow_nums)
itask.flow_wait = False
self.spawn_on_all_outputs(itask, completed_only=True)
self.spawn_to_rh_limit(
itask.tdef, itask.next_point(), itask.flow_nums)
else:
itask.merge_flows(flow_nums)
17 changes: 11 additions & 6 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ def put_xtriggers(self, sat_xtrig):
def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.
For final event-driven update before removing finished tasks.
No need to update task_pool table as finished tasks are immediately
removed from the pool.
NOTE the task_states table is normally updated along with the task pool
table. This method is only needed as a final update for finished tasks,
when they get removed from the task_pool.
"""
set_args = {
"time_updated": itask.state.time_updated,
Expand All @@ -452,10 +452,15 @@ def put_update_task_state(self, itask):
(set_args, where_args))

def put_task_pool(self, pool: 'TaskPool') -> None:
"""Update various task tables for current pool, in runtime database.
"""Delete task pool table content and recreate from current task pool.
Queue delete (everything) statements to wipe the tables, and queue the
relevant insert statements for the current tasks in the pool.
Also recreate:
- prerequisites table
- timeout timers table
- action timers table
And update:
- task states table
"""
self.db_deletes_map[self.TABLE_TASK_POOL].append({})
# Comment this out to retain the trigger-time prereq status of past
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/flow-triggers/11-wait-merge.t
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__
1|b|[1]|["submitted", "started", "succeeded"]
1|a|[2]|["submitted", "started", "succeeded"]
1|c|[2]|["submitted", "started", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
1|d|[1, 2]|["submitted", "started", "succeeded"]
1|b|[2]|["submitted", "started", "succeeded"]
__END__
Expand Down

0 comments on commit 6fa54c6

Please sign in to comment.