Skip to content

Commit

Permalink
Revert "Db store force triggered (cylc#5023)"
Browse files Browse the repository at this point in the history
This reverts commit 20bc9a0.
  • Loading branch information
MetRonnie committed Oct 3, 2022
1 parent ece609d commit 207b3f5
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 135 deletions.
3 changes: 0 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->

-------------------------------------------------------------------------------
## __cylc-8.0.3 (<span actions:bind='release-date'>Pending YYYY-MM-DD</span>)__

Maintenance release.

### Fixes

[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
after a shutdown was ordered should submit to run immediately on restart.

[#5137](https://github.com/cylc/cylc-flow/pull/5137) -
Install the `ana/` directory to remote platforms by default.
Expand Down
6 changes: 2 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ class CylcWorkflowDAO:
["submit_num", {"datatype": "INTEGER"}],
["status"],
["flow_wait", {"datatype": "INTEGER"}],
["is_manual_submit", {"datatype": "INTEGER"}],
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -803,15 +802,14 @@ def select_task_pool_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.
Invoke callback(row_idx, row) on each row, where each row contains:
the fields in the SELECT statement below.
[cycle, name, is_late, status, is_held, submit_num,
try_num, platform_name, time_submit, time_run, timeout, outputs]
"""
form_stmt = r"""
SELECT
%(task_pool)s.cycle,
%(task_pool)s.name,
%(task_pool)s.flow_nums,
%(task_states)s.flow_wait,
%(task_states)s.is_manual_submit,
%(task_late_flags)s.value,
%(task_pool)s.status,
%(task_pool)s.is_held,
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
Expand Down
80 changes: 40 additions & 40 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,13 @@ def load_from_point(self):
point = tdef.first_point(self.config.start_point)
self.spawn_to_rh_limit(tdef, point, {flow_num})

def db_add_new_flow_rows(self, itask: TaskProxy) -> None:
"""Add new rows to DB task tables that record flow_nums.
Call when a new task is spawned or a flow merge occurs.
"""
# Add row to task_states table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums),
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
}
)
# Add row to task_outputs table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

def add_to_pool(self, itask) -> None:
def add_to_pool(self, itask, is_new: bool = True) -> None:
"""Add a task to the hidden (if not satisfied) or main task pool.
If the task already exists in the hidden pool and is satisfied, move it
to the main pool.
(is_new is False inidcates load from DB at restart).
"""
if itask.is_task_prereqs_not_done() and not itask.is_manual_submit:
# Add to hidden pool if not satisfied.
Expand All @@ -224,6 +205,21 @@ def add_to_pool(self, itask) -> None:

self.create_data_store_elements(itask)

if is_new:
# Add row to "task_states" table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums)
}
)
# Add row to "task_outputs" table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

if itask.tdef.max_future_prereq_offset is not None:
# (Must do this once added to the pool).
self.set_max_future_offset()
Expand Down Expand Up @@ -420,9 +416,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
if row_idx == 0:
LOG.info("LOADING task proxies")
# Create a task proxy corresponding to this DB entry.
(cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status,
is_held, submit_num, _, platform_name, time_submit, time_run, timeout,
outputs_str) = row
(cycle, name, flow_nums, is_late, status, is_held, submit_num, _,
platform_name, time_submit, time_run, timeout, outputs_str) = row

try:
itask = TaskProxy(
self.config.get_taskdef(name),
Expand All @@ -431,9 +427,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
status=status,
is_held=is_held,
submit_num=submit_num,
is_late=bool(is_late),
flow_wait=bool(flow_wait),
is_manual_submit=bool(is_manual_submit)
is_late=bool(is_late)
)
except WorkflowConfigError:
LOG.exception(
Expand Down Expand Up @@ -497,7 +491,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):

if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
self.add_to_pool(itask)
self.add_to_pool(itask, is_new=False)

# All tasks load as runahead-limited, but finished and manually
# triggered tasks (incl. --start-task's) can be released now.
Expand Down Expand Up @@ -634,9 +628,8 @@ def _get_spawned_or_merged_task(

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit."""
if not flow_nums or point is None:
# Force-triggered no-flow task.
# Or called with an invalid next_point.
if not flow_nums:
# force-triggered no-flow task.
return
if self.runahead_limit_point is None:
self.compute_runahead()
Expand Down Expand Up @@ -1212,6 +1205,14 @@ def spawn_on_output(self, itask, output, forced=False):
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
self.workflow_db_mgr.put_insert_task_states(
c_task,
{
"status": c_task.state.status,
"flow_nums": serialise(c_task.flow_nums)
}
)
# self.workflow_db_mgr.process_queued_ops()
elif (
c_task is None
and (itask.flow_nums or forced)
Expand Down Expand Up @@ -1481,7 +1482,6 @@ def spawn_task(
return None

LOG.info(f"[{itask}] spawned")
self.db_add_new_flow_rows(itask)
return itask

def force_spawn_children(
Expand Down Expand Up @@ -1588,6 +1588,7 @@ def force_trigger_tasks(
)
if itask is None:
continue
self.add_to_pool(itask, is_new=True)
itasks.append(itask)

# Trigger matched tasks if not already active.
Expand Down Expand Up @@ -1615,6 +1616,7 @@ def force_trigger_tasks(
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)

self.workflow_db_mgr.put_update_task_state(itask)
return len(unmatched)

def sim_time_check(self, message_queue):
Expand Down Expand Up @@ -1917,26 +1919,24 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
# and via suicide triggers ("A =>!A": A tries to spawn itself).
return

merge_with_no_flow = not itask.flow_nums

itask.merge_flows(flow_nums)
# Merged tasks get a new row in the db task_states table.
self.db_add_new_flow_rows(itask)

if (
itask.state(*TASK_STATUSES_FINAL)
and itask.state.outputs.get_incomplete()
):
# Re-queue incomplete task to run again in the merged flow.
LOG.info(f"[{itask}] incomplete task absorbed by new flow.")
itask.merge_flows(flow_nums)
itask.state_reset(TASK_STATUS_WAITING)
self.queue_task(itask)
self.data_store_mgr.delta_task_state(itask)

elif merge_with_no_flow or itask.flow_wait:
elif not itask.flow_nums or itask.flow_wait:
# 2. Retro-spawn on completed outputs and continue as merged flow.
LOG.info(f"[{itask}] spawning on pre-merge outputs")
itask.merge_flows(flow_nums)
itask.flow_wait = False
self.spawn_on_all_outputs(itask, completed_only=True)
self.spawn_to_rh_limit(
itask.tdef, itask.next_point(), itask.flow_nums)
else:
itask.merge_flows(flow_nums)
20 changes: 7 additions & 13 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,14 @@ def put_xtriggers(self, sat_xtrig):
def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.
NOTE the task_states table is normally updated along with the task pool
table. This method is only needed as a final update for finished tasks,
when they get removed from the task_pool.
For final event-driven update before removing finished tasks.
No need to update task_pool table as finished tasks are immediately
removed from the pool.
"""
set_args = {
"time_updated": itask.state.time_updated,
"status": itask.state.status,
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
"flow_wait": itask.flow_wait
}
where_args = {
"cycle": str(itask.point),
Expand All @@ -452,15 +451,10 @@ def put_update_task_state(self, itask):
(set_args, where_args))

def put_task_pool(self, pool: 'TaskPool') -> None:
"""Delete task pool table content and recreate from current task pool.
"""Update various task tables for current pool, in runtime database.
Also recreate:
- prerequisites table
- timeout timers table
- action timers table
And update:
- task states table
Queue delete (everything) statements to wipe the tables, and queue the
relevant insert statements for the current tasks in the pool.
"""
self.db_deletes_map[self.TABLE_TASK_POOL].append({})
# Comment this out to retain the trigger-time prereq status of past
Expand Down
2 changes: 1 addition & 1 deletion tests/flakyfunctional/database/00-simple/schema.out
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/flow-triggers/11-wait-merge.t
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__
1|b|[1]|["submitted", "started", "succeeded"]
1|a|[2]|["submitted", "started", "succeeded"]
1|c|[2]|["submitted", "started", "x"]
1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|d|[1, 2]|["submitted", "started", "succeeded"]
1|b|[2]|["submitted", "started", "succeeded"]
__END__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0');
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/restart/57-ghost-job/db.sqlite3
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ INSERT INTO task_outputs VALUES('1','foo','[1]','[]');
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','[1]','preparing',0);
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL, '0');
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL);
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
Expand Down
47 changes: 0 additions & 47 deletions tests/functional/restart/58-waiting-manual-triggered.t

This file was deleted.

22 changes: 0 additions & 22 deletions tests/functional/restart/58-waiting-manual-triggered/flow.cylc

This file was deleted.

0 comments on commit 207b3f5

Please sign in to comment.