diff --git a/CHANGES.md b/CHANGES.md index 2249f62d625..d93c5b8ed96 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -39,6 +39,9 @@ Second Release Candidate for Cylc 8 suitable for acceptance testing. validating/running a Jinja2 workflow (for users who have installed Cylc using `pip`.) +[#4737](https://github.com/cylc/cylc-flow/pull/4737) - +Fix issue which prevented tasks with incomplete outputs from being rerun by +subsequent flows. ------------------------------------------------------------------------------- ## __cylc-8.0rc1 (Released 2022-02-17)__ diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 12c0e6423f8..3dc0d997760 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -23,6 +23,9 @@ from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager +FlowNums = Set[int] + + class FlowMgr: """Logic to manage flow counter and flow metadata.""" @@ -55,7 +58,7 @@ def get_new_flow(self, description: Optional[str] = None) -> int: ) return self.counter - def load_from_db(self, flow_nums: Set[int]) -> None: + def load_from_db(self, flow_nums: FlowNums) -> None: """Load flow data for scheduler restart. Sets the flow counter to the max flow number in the DB. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2a1cd690f53..ce4b2f38724 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -68,7 +68,7 @@ from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager - from cylc.flow.flow_mgr import FlowMgr + from cylc.flow.flow_mgr import FlowMgr, FlowNums Pool = Dict['PointBase', Dict[str, TaskProxy]] @@ -581,11 +581,7 @@ def spawn_successor_if_parentless( not next_task.flow_nums and not next_task.state.is_runahead ) - next_task.merge_flows(itask.flow_nums) - LOG.info( - f"[{next_task}] merged in flow(s) " - f"{','.join(str(f) for f in itask.flow_nums)}" - ) + self.merge_flows(next_task, itask.flow_nums) if retroactive_spawn: # Did not belong to a flow (force-triggered) before merge. # Now it does, so spawn successor, and children if needed. @@ -1173,15 +1169,7 @@ def spawn_on_output(self, itask, output, forced=False): ) if c_task is not None: # Child already exists, update it. - if not c_task.flow_nums: - # Child does not belong to a flow (force-triggered). Now - # (merging) it does, so spawn outputs completed so far. - self.spawn_on_all_outputs(c_task, completed_only=True) - c_task.merge_flows(itask.flow_nums) - LOG.info( - f"[{c_task}] merged in flow(s) " - f"{','.join(str(f) for f in itask.flow_nums)}" - ) + self.merge_flows(c_task, itask.flow_nums) self.workflow_db_mgr.put_insert_task_states( c_task, { @@ -1776,3 +1764,34 @@ def match_taskdefs( n_warnings += 1 continue return n_warnings, task_items + + def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: + """Merge itask.flow_nums with flow_nums. + + This also performs required spawning / state changing for edge cases. + """ + # case 1: task has finished with incomplete outputs + # (we reset it to waiting and re-queue so it can run again as a task + # with complete outputs would) + if ( + itask.state(*TASK_STATUSES_FINAL) + and itask.state.outputs.get_incomplete() + ): + itask.state_reset(TASK_STATUS_WAITING) + if not itask.state.is_queued: + self.queue_task(itask) + self.data_store_mgr.delta_task_state(itask) + + # case 2: task does not belong to a flow (force-triggered no-flow). + # (we spawn the outputs complete so far to allow the flow to continue + # post merge) + # (note we don't do this if case 1 also applies) + elif not itask.flow_nums: + self.spawn_on_all_outputs(itask, completed_only=True) + + # merge the flows + itask.merge_flows(flow_nums) + LOG.info( + f"[{itask}] merged in flow(s) " + f"{','.join(str(f) for f in itask.flow_nums)}" + ) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 1f56814c52d..e054405ad75 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -439,6 +439,7 @@ def name_match(self, name: str) -> bool: def merge_flows(self, flow_nums: Set) -> None: """Merge another set of flow_nums with mine.""" + # update the flow nums self.flow_nums.update(flow_nums) def state_reset( diff --git a/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs.t b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs.t new file mode 100644 index 00000000000..1458e606fec --- /dev/null +++ b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs.t @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check correct behaviour if a no-flow task is manually triggered just ahead of +# the main flow. See GitHub #4645 + +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/flow.cylc b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/flow.cylc new file mode 100644 index 00000000000..5428c8a24a3 --- /dev/null +++ b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/flow.cylc @@ -0,0 +1,20 @@ +[scheduler] + [[events]] + expected task failures = 1/b + +[scheduling] + [[graph]] + R1 = """ + a => b => c + """ + +[runtime] + [[b]] + script = """ + # test $CYLC_TASK_SUBMIT_NUMBER -gt 1 + if [[ $CYLC_TASK_SUBMIT_NUMBER -eq 1 ]]; then + cylc trigger --reflow "$CYLC_WORKFLOW_NAME//1/a" + false + fi + """ + [[a,c]] diff --git a/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/reference.log b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/reference.log new file mode 100644 index 00000000000..434bf4f37e4 --- /dev/null +++ b/tests/functional/spawn-on-demand/15-reflow-incomplete-outputs/reference.log @@ -0,0 +1,7 @@ +Initial point: 1 +Final point: 1 +1/a -triggered off [] +1/b -triggered off ['1/a'] +1/a -triggered off [] +1/b -triggered off ['1/a'] +1/c -triggered off ['1/b']