From 207b3f595ba5244e48df9273bda68a1237407a0d Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 3 Oct 2022 17:35:16 +0100
Subject: [PATCH] Revert "Db store force triggered (#5023)"
This reverts commit 20bc9a097668fd365163a9c87d03b6e5ee3c54c5.
---
CHANGES.md | 3 -
cylc/flow/rundb.py | 6 +-
cylc/flow/task_job_mgr.py | 1 +
cylc/flow/task_pool.py | 80 +++++++++----------
cylc/flow/workflow_db_mgr.py | 20 ++---
.../database/00-simple/schema.out | 2 +-
.../functional/flow-triggers/11-wait-merge.t | 2 +-
.../01-job-nn-localhost/db.sqlite3 | 4 +-
.../restart/57-ghost-job/db.sqlite3 | 4 +-
.../restart/58-waiting-manual-triggered.t | 47 -----------
.../58-waiting-manual-triggered/flow.cylc | 22 -----
11 files changed, 56 insertions(+), 135 deletions(-)
delete mode 100644 tests/functional/restart/58-waiting-manual-triggered.t
delete mode 100644 tests/functional/restart/58-waiting-manual-triggered/flow.cylc
diff --git a/CHANGES.md b/CHANGES.md
index 84a9dd8dc28..9d2f958bf90 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -28,7 +28,6 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
-
-------------------------------------------------------------------------------
## __cylc-8.0.3 (Pending YYYY-MM-DD)__
@@ -36,8 +35,6 @@ Maintenance release.
### Fixes
-[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
-after a shutdown was ordered should submit to run immediately on restart.
[#5137](https://github.com/cylc/cylc-flow/pull/5137) -
Install the `ana/` directory to remote platforms by default.
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index 42a4cca3e9a..4cb3c630638 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -297,7 +297,6 @@ class CylcWorkflowDAO:
["submit_num", {"datatype": "INTEGER"}],
["status"],
["flow_wait", {"datatype": "INTEGER"}],
- ["is_manual_submit", {"datatype": "INTEGER"}],
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
@@ -803,15 +802,14 @@ def select_task_pool_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.
Invoke callback(row_idx, row) on each row, where each row contains:
- the fields in the SELECT statement below.
+ [cycle, name, is_late, status, is_held, submit_num,
+ try_num, platform_name, time_submit, time_run, timeout, outputs]
"""
form_stmt = r"""
SELECT
%(task_pool)s.cycle,
%(task_pool)s.name,
%(task_pool)s.flow_nums,
- %(task_states)s.flow_wait,
- %(task_states)s.is_manual_submit,
%(task_late_flags)s.value,
%(task_pool)s.status,
%(task_pool)s.is_held,
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 1188e3cab01..7d18bd347a1 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -238,6 +238,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
+ self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index c9d7ca483ea..cdf10156dda 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -173,32 +173,13 @@ def load_from_point(self):
point = tdef.first_point(self.config.start_point)
self.spawn_to_rh_limit(tdef, point, {flow_num})
- def db_add_new_flow_rows(self, itask: TaskProxy) -> None:
- """Add new rows to DB task tables that record flow_nums.
-
- Call when a new task is spawned or a flow merge occurs.
- """
- # Add row to task_states table.
- now = get_current_time_string()
- self.workflow_db_mgr.put_insert_task_states(
- itask,
- {
- "time_created": now,
- "time_updated": now,
- "status": itask.state.status,
- "flow_nums": serialise(itask.flow_nums),
- "flow_wait": itask.flow_wait,
- "is_manual_submit": itask.is_manual_submit
- }
- )
- # Add row to task_outputs table:
- self.workflow_db_mgr.put_insert_task_outputs(itask)
-
- def add_to_pool(self, itask) -> None:
+ def add_to_pool(self, itask, is_new: bool = True) -> None:
"""Add a task to the hidden (if not satisfied) or main task pool.
If the task already exists in the hidden pool and is satisfied, move it
to the main pool.
+
+ (is_new is False inidcates load from DB at restart).
"""
if itask.is_task_prereqs_not_done() and not itask.is_manual_submit:
# Add to hidden pool if not satisfied.
@@ -224,6 +205,21 @@ def add_to_pool(self, itask) -> None:
self.create_data_store_elements(itask)
+ if is_new:
+ # Add row to "task_states" table.
+ now = get_current_time_string()
+ self.workflow_db_mgr.put_insert_task_states(
+ itask,
+ {
+ "time_created": now,
+ "time_updated": now,
+ "status": itask.state.status,
+ "flow_nums": serialise(itask.flow_nums)
+ }
+ )
+ # Add row to "task_outputs" table:
+ self.workflow_db_mgr.put_insert_task_outputs(itask)
+
if itask.tdef.max_future_prereq_offset is not None:
# (Must do this once added to the pool).
self.set_max_future_offset()
@@ -420,9 +416,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
if row_idx == 0:
LOG.info("LOADING task proxies")
# Create a task proxy corresponding to this DB entry.
- (cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status,
- is_held, submit_num, _, platform_name, time_submit, time_run, timeout,
- outputs_str) = row
+ (cycle, name, flow_nums, is_late, status, is_held, submit_num, _,
+ platform_name, time_submit, time_run, timeout, outputs_str) = row
+
try:
itask = TaskProxy(
self.config.get_taskdef(name),
@@ -431,9 +427,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
status=status,
is_held=is_held,
submit_num=submit_num,
- is_late=bool(is_late),
- flow_wait=bool(flow_wait),
- is_manual_submit=bool(is_manual_submit)
+ is_late=bool(is_late)
)
except WorkflowConfigError:
LOG.exception(
@@ -497,7 +491,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
- self.add_to_pool(itask)
+ self.add_to_pool(itask, is_new=False)
# All tasks load as runahead-limited, but finished and manually
# triggered tasks (incl. --start-task's) can be released now.
@@ -634,9 +628,8 @@ def _get_spawned_or_merged_task(
def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit."""
- if not flow_nums or point is None:
- # Force-triggered no-flow task.
- # Or called with an invalid next_point.
+ if not flow_nums:
+ # force-triggered no-flow task.
return
if self.runahead_limit_point is None:
self.compute_runahead()
@@ -1212,6 +1205,14 @@ def spawn_on_output(self, itask, output, forced=False):
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
+ self.workflow_db_mgr.put_insert_task_states(
+ c_task,
+ {
+ "status": c_task.state.status,
+ "flow_nums": serialise(c_task.flow_nums)
+ }
+ )
+ # self.workflow_db_mgr.process_queued_ops()
elif (
c_task is None
and (itask.flow_nums or forced)
@@ -1481,7 +1482,6 @@ def spawn_task(
return None
LOG.info(f"[{itask}] spawned")
- self.db_add_new_flow_rows(itask)
return itask
def force_spawn_children(
@@ -1588,6 +1588,7 @@ def force_trigger_tasks(
)
if itask is None:
continue
+ self.add_to_pool(itask, is_new=True)
itasks.append(itask)
# Trigger matched tasks if not already active.
@@ -1615,6 +1616,7 @@ def force_trigger_tasks(
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)
+ self.workflow_db_mgr.put_update_task_state(itask)
return len(unmatched)
def sim_time_check(self, message_queue):
@@ -1917,26 +1919,24 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
# and via suicide triggers ("A =>!A": A tries to spawn itself).
return
- merge_with_no_flow = not itask.flow_nums
-
- itask.merge_flows(flow_nums)
- # Merged tasks get a new row in the db task_states table.
- self.db_add_new_flow_rows(itask)
-
if (
itask.state(*TASK_STATUSES_FINAL)
and itask.state.outputs.get_incomplete()
):
# Re-queue incomplete task to run again in the merged flow.
LOG.info(f"[{itask}] incomplete task absorbed by new flow.")
+ itask.merge_flows(flow_nums)
itask.state_reset(TASK_STATUS_WAITING)
self.queue_task(itask)
self.data_store_mgr.delta_task_state(itask)
- elif merge_with_no_flow or itask.flow_wait:
+ elif not itask.flow_nums or itask.flow_wait:
# 2. Retro-spawn on completed outputs and continue as merged flow.
LOG.info(f"[{itask}] spawning on pre-merge outputs")
+ itask.merge_flows(flow_nums)
itask.flow_wait = False
self.spawn_on_all_outputs(itask, completed_only=True)
self.spawn_to_rh_limit(
itask.tdef, itask.next_point(), itask.flow_nums)
+ else:
+ itask.merge_flows(flow_nums)
diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py
index 18265d67b79..e8f82a27669 100644
--- a/cylc/flow/workflow_db_mgr.py
+++ b/cylc/flow/workflow_db_mgr.py
@@ -431,15 +431,14 @@ def put_xtriggers(self, sat_xtrig):
def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.
- NOTE the task_states table is normally updated along with the task pool
- table. This method is only needed as a final update for finished tasks,
- when they get removed from the task_pool.
+ For final event-driven update before removing finished tasks.
+ No need to update task_pool table as finished tasks are immediately
+ removed from the pool.
"""
set_args = {
"time_updated": itask.state.time_updated,
"status": itask.state.status,
- "flow_wait": itask.flow_wait,
- "is_manual_submit": itask.is_manual_submit
+ "flow_wait": itask.flow_wait
}
where_args = {
"cycle": str(itask.point),
@@ -452,15 +451,10 @@ def put_update_task_state(self, itask):
(set_args, where_args))
def put_task_pool(self, pool: 'TaskPool') -> None:
- """Delete task pool table content and recreate from current task pool.
+ """Update various task tables for current pool, in runtime database.
- Also recreate:
- - prerequisites table
- - timeout timers table
- - action timers table
-
- And update:
- - task states table
+ Queue delete (everything) statements to wipe the tables, and queue the
+ relevant insert statements for the current tasks in the pool.
"""
self.db_deletes_map[self.TABLE_TASK_POOL].append({})
# Comment this out to retain the trigger-time prereq status of past
diff --git a/tests/flakyfunctional/database/00-simple/schema.out b/tests/flakyfunctional/database/00-simple/schema.out
index 814faac2a59..ac62d48639d 100644
--- a/tests/flakyfunctional/database/00-simple/schema.out
+++ b/tests/flakyfunctional/database/00-simple/schema.out
@@ -10,7 +10,7 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
-CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
+CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
diff --git a/tests/functional/flow-triggers/11-wait-merge.t b/tests/functional/flow-triggers/11-wait-merge.t
index cb3218ae463..25b6443776a 100644
--- a/tests/functional/flow-triggers/11-wait-merge.t
+++ b/tests/functional/flow-triggers/11-wait-merge.t
@@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__
1|b|[1]|["submitted", "started", "succeeded"]
1|a|[2]|["submitted", "started", "succeeded"]
1|c|[2]|["submitted", "started", "x"]
-1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
+1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|d|[1, 2]|["submitted", "started", "succeeded"]
1|b|[2]|["submitted", "started", "succeeded"]
__END__
diff --git a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3
index 15d9e84418f..9453b6960e3 100644
--- a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3
+++ b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3
@@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
-CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
-INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0');
+CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
+INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
diff --git a/tests/functional/restart/57-ghost-job/db.sqlite3 b/tests/functional/restart/57-ghost-job/db.sqlite3
index 4230831602f..d6837d6bd0c 100644
--- a/tests/functional/restart/57-ghost-job/db.sqlite3
+++ b/tests/functional/restart/57-ghost-job/db.sqlite3
@@ -19,8 +19,8 @@ INSERT INTO task_outputs VALUES('1','foo','[1]','[]');
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','[1]','preparing',0);
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
-CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
-INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL, '0');
+CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
+INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL);
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t
deleted file mode 100644
index efba9f42b70..00000000000
--- a/tests/functional/restart/58-waiting-manual-triggered.t
+++ /dev/null
@@ -1,47 +0,0 @@
-#!/bin/bash
-# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-#-------------------------------------------------------------------------------
-# Test that a task manually triggered just before shutdown will run on restart.
-
-. "$(dirname "$0")/test_header"
-
-set_test_number 6
-
-install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
-
-run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
-
-workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"
-
-DB_FILE="${WORKFLOW_RUN_DIR}/log/db"
-
-# It should have shut down with 2/foo waiting with the is_manual_submit flag on.
-TEST_NAME="${TEST_NAME_BASE}-db-task-states"
-QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2;'
-run_ok "$TEST_NAME" sqlite3 "$DB_FILE" "$QUERY"
-cmp_ok "${TEST_NAME}.stdout" << '__EOF__'
-waiting|1
-__EOF__
-
-# It should restart and shut down normally, not stall with 2/foo waiting on 1/foo.
-workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}"
-# Check that 2/foo job 02 did run before shutdown.
-grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo running job:02 flows:1\] => succeeded"
-
-purge
-exit
diff --git a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc b/tests/functional/restart/58-waiting-manual-triggered/flow.cylc
deleted file mode 100644
index ea5f47c46d7..00000000000
--- a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc
+++ /dev/null
@@ -1,22 +0,0 @@
-[scheduler]
- [[events]]
- stall timeout = PT0S
- abort on stall timeout = True
-[scheduling]
- cycling mode = integer
- runahead limit = P1
- final cycle point = 3
- [[graph]]
- P1 = foo[-P1] => foo
-[runtime]
- [[foo]]
- script = """
- if (( CYLC_TASK_CYCLE_POINT == 3 )); then
- # Order a normal shutdown: no more job submissions, and shut
- # down after active jobs (i.e. this one) finish.
- cylc stop "$CYLC_WORKFLOW_ID"
- # Force-trigger 2/foo before shutdown. On restart it should be
- # in the waiting state with the force-triggered flag set.
- cylc trigger "${CYLC_WORKFLOW_ID}//2/foo"
- fi
- """