From 9ad53661fabbfbc507dd4e07ff07c8709bc394b4 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 5 Jul 2023 13:46:06 +0100 Subject: [PATCH] pool: ensure the DB is updated when tasks are removed * Before this change, the `task_pool` table was only being updated when tasks were added, or changed state, but not when they were removed. * Closes #5598 --- CHANGES.md | 3 ++ cylc/flow/scheduler.py | 7 +++- cylc/flow/task_pool.py | 2 + tests/integration/test_task_pool.py | 64 +++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6fad5e29ee5..2374ff5fcdd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,9 @@ Various enhancements to `cylc lint`: ### Fixes +[#5619](https://github.com/cylc/cylc-flow/pull/5619) - +Fix an issue where the `task_pool` table in the database wasn't being updated +in a timely fashion when tasks completed. [#5606](https://github.com/cylc/cylc-flow/pull/5606) - Task outputs and messages are now validated to avoid conflicts with built-in diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index e192185c388..7c782c58bae 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1691,7 +1691,11 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: """Update DB, UIS, Summary data elements""" updated_tasks = [ t for t in self.pool.get_tasks() if t.state.is_updated] - has_updated = self.is_updated or updated_tasks + has_updated = ( + self.is_updated + or updated_tasks + or self.pool.tasks_removed + ) reloaded = self.is_reloaded # Add tasks that have moved moved from runahead to live pool. if has_updated or self.data_store_mgr.updates_pending: @@ -1716,6 +1720,7 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: for itask in updated_tasks: itask.state.is_updated = False self.update_data_store() + self.pool.tasks_removed = False return has_updated def check_workflow_timers(self): diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ae193c044a4..d03db8a9bef 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -123,6 +123,7 @@ def __init__( self._hidden_pool_list: List[TaskProxy] = [] self.main_pool_changed = False self.hidden_pool_changed = False + self.tasks_removed = False self.hold_point: Optional['PointBase'] = None self.abs_outputs_done: Set[Tuple[str, str, str]] = set() @@ -736,6 +737,7 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: def remove(self, itask, reason=""): """Remove a task from the pool (e.g. after a reload).""" + self.tasks_removed = True msg = "task proxy removed" if reason: msg += f" ({reason})" diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 4cf4ab78208..f25d2410ef8 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -989,3 +989,67 @@ async def test_runahead_limit_for_sequence_before_start_cycle( schd = scheduler(id_, startcp='2005') async with start(schd): assert str(schd.pool.runahead_limit_point) == '20070101T0000Z' + + +def list_pool_from_db(schd): + """Returns the task pool table as a sorted list.""" + db_task_pool = [] + schd.workflow_db_mgr.pri_dao.select_task_pool( + lambda _, row: db_task_pool.append(row) + ) + return sorted(db_task_pool) + + +async def test_db_update_on_removal( + flow, + scheduler, + start, +): + """It should updated the task_pool table when tasks complete. + + There was a bug where the task_pool table was only being updated when tasks + in the pool were updated. This meant that if a task was removed the DB + would not reflect this change and would hold a record of the task in the + wrong state. + + This test ensures that the DB is updated when a task is removed from the + pool. + + See: https://github.com/cylc/cylc-flow/issues/5598 + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'true', + }, + 'scheduling': { + 'graph': { + 'R1': 'a', + }, + }, + }) + schd = scheduler(id_) + async with start(schd): + task_a = schd.pool.get_tasks()[0] + + # set the task to running + task_a.state_reset('running') + + # update the db + await schd.update_data_structure() + schd.workflow_db_mgr.process_queued_ops() + + # the task should appear in the DB + assert list_pool_from_db(schd) == [ + ['1', 'a', 'running', 0], + ] + + # mark the task as succeeded and allow it to be removed from the pool + task_a.state_reset('succeeded') + schd.pool.remove_if_complete(task_a) + + # update the DB, note no new tasks have been added to the pool + await schd.update_data_structure() + schd.workflow_db_mgr.process_queued_ops() + + # the task should be gone from the DB + assert list_pool_from_db(schd) == []