Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: ensure the DB is updated when tasks are removed #5619

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ Various enhancements to `cylc lint`:

### Fixes

[#5619](https://github.com/cylc/cylc-flow/pull/5619) -
Fix an issue where the `task_pool` table in the database wasn't being updated
in a timely fashion when tasks completed.

[#5606](https://github.com/cylc/cylc-flow/pull/5606) -
Task outputs and messages are now validated to avoid conflicts with built-in
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,11 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
"""Update DB, UIS, Summary data elements"""
updated_tasks = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = self.is_updated or updated_tasks
has_updated = (
self.is_updated
or updated_tasks
or self.pool.tasks_removed
)
reloaded = self.is_reloaded
# Add tasks that have moved moved from runahead to live pool.
if has_updated or self.data_store_mgr.updates_pending:
Expand All @@ -1716,6 +1720,7 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
for itask in updated_tasks:
itask.state.is_updated = False
self.update_data_store()
self.pool.tasks_removed = False
return has_updated

def check_workflow_timers(self):
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(
self._hidden_pool_list: List[TaskProxy] = []
self.main_pool_changed = False
self.hidden_pool_changed = False
self.tasks_removed = False

self.hold_point: Optional['PointBase'] = None
self.abs_outputs_done: Set[Tuple[str, str, str]] = set()
Expand Down Expand Up @@ -736,6 +737,7 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:

def remove(self, itask, reason=""):
"""Remove a task from the pool (e.g. after a reload)."""
self.tasks_removed = True
msg = "task proxy removed"
if reason:
msg += f" ({reason})"
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,67 @@ async def test_runahead_limit_for_sequence_before_start_cycle(
schd = scheduler(id_, startcp='2005')
async with start(schd):
assert str(schd.pool.runahead_limit_point) == '20070101T0000Z'


def list_pool_from_db(schd):
"""Returns the task pool table as a sorted list."""
db_task_pool = []
schd.workflow_db_mgr.pri_dao.select_task_pool(
lambda _, row: db_task_pool.append(row)
)
return sorted(db_task_pool)


async def test_db_update_on_removal(
flow,
scheduler,
start,
):
"""It should updated the task_pool table when tasks complete.

There was a bug where the task_pool table was only being updated when tasks
in the pool were updated. This meant that if a task was removed the DB
would not reflect this change and would hold a record of the task in the
wrong state.

This test ensures that the DB is updated when a task is removed from the
pool.

See: https://github.com/cylc/cylc-flow/issues/5598
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'true',
},
'scheduling': {
'graph': {
'R1': 'a',
},
},
})
schd = scheduler(id_)
async with start(schd):
task_a = schd.pool.get_tasks()[0]

# set the task to running
task_a.state_reset('running')

# update the db
await schd.update_data_structure()
schd.workflow_db_mgr.process_queued_ops()

# the task should appear in the DB
assert list_pool_from_db(schd) == [
['1', 'a', 'running', 0],
]

# mark the task as succeeded and allow it to be removed from the pool
task_a.state_reset('succeeded')
schd.pool.remove_if_complete(task_a)

# update the DB, note no new tasks have been added to the pool
await schd.update_data_structure()
schd.workflow_db_mgr.process_queued_ops()

# the task should be gone from the DB
assert list_pool_from_db(schd) == []