Skip to content

Commit

Permalink
Fix reload after prereq change in the graph.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Feb 10, 2023
1 parent c3640f2 commit 637cee2
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 93 deletions.
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ ones in. -->

### Fixes

[#5334](https://github.com/cylc/cylc-flow/pull/5334) - Fix to prevent scheduler
crash if already-spawned tasks have new prerequisites added before a restart.
[#5334](https://github.com/cylc/cylc-flow/pull/5334) - Apply graph prerequisite
changes to already-spawned tasks after reload or restart.


-------------------------------------------------------------------------------
Expand Down
24 changes: 23 additions & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import Counter
from copy import copy
from contextlib import suppress
from fnmatch import fnmatchcase
from time import time
from typing import (
Expand Down Expand Up @@ -255,6 +256,7 @@ def __init__(
self.waiting_on_job_prep = False

self.state = TaskState(tdef, self.point, status, is_held)
#print(self.tdef.name, self.state.prerequisites)

# Determine graph children of this task (for spawning).
self.graph_children = generate_graph_children(tdef, self.point)
Expand Down Expand Up @@ -288,7 +290,27 @@ def copy_to_reload_successor(self, reload_successor):
reload_successor.state.is_held = self.state.is_held
reload_successor.state.is_runahead = self.state.is_runahead
reload_successor.state.is_updated = self.state.is_updated
reload_successor.state.prerequisites = self.state.prerequisites

# Prerequisites: the graph might have changed before reload, so
# we need to use the new prerequisites but update them with the
# pre-reload state of prerequisites that still exist post-reload.

# Get all prereq states, e.g. {('1', 'c', 'succeeded'): False, ...}
pre_reload = {
k: v
for pre in self.state.prerequisites
for (k, v) in pre.satisfied.items()
}
# Use them to update the new prerequisites.
# - unchanged prerequisites will keep their pre-reload state.
# - removed prerequisites will not be carried over
# - added prerequisites will be recorded as unsatisfied
# NOTE: even if the corresponding output was completed pre-reload!
for pre in reload_successor.state.prerequisites:
for k in pre.satisfied.keys():
with suppress(KeyError):
pre.satisfied[k] = pre_reload[k]

reload_successor.state.xtriggers.update({
# copy across any special "_cylc" xtriggers which were added
# dynamically at runtime (i.e. execution retry xtriggers)
Expand Down
247 changes: 157 additions & 90 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cylc.flow import CYLC_LOG
from copy import deepcopy
import logging
from pathlib import Path
import pytest
from pytest import param
from typing import AsyncGenerator, Callable, Iterable, List, Tuple, Union
Expand All @@ -33,6 +34,9 @@
TASK_STATUS_SUCCEEDED,
)

from cylc.flow.pathutil import get_cylc_run_dir, get_workflow_run_dir
from .utils.flow_tools import _make_flow


# NOTE: foo and bar have no parents so at start-up (even with the workflow
# paused) they get spawned out to the runahead limit. 2/pub spawns
Expand Down Expand Up @@ -594,164 +598,227 @@ def list_tasks(schd):
)


async def tests_restart_graph_change(flow, scheduler, start):
"""It should handle prerequisite addition on restart.
If the graph has changed when a workflow is restarted those changes should
be applied to tasks already in the pool.
@pytest.mark.parametrize(
'graph_1, graph_2, '
'expected_1, expected_2, expected_3, expected_4',
[
param( # Restart after adding a prerequisite to task z
'''a => z
b => z''',
'''a => z
b => z
c => z''',
[
('1', 'a', 'running'),
('1', 'b', 'running'),
],
[
('1', 'b', 'running'),
('1', 'z', 'waiting'),
],
[
('1', 'b', 'running'),
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
{('1', 'c', 'succeeded'): False},
]
),
param( # Restart after removing a prerequisite from task z
'''a => z
b => z
c => z''',
'''a => z
b => z''',
[
('1', 'a', 'running'),
('1', 'b', 'running'),
('1', 'c', 'running'),
],
[
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
],
[
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
]
)
]
)
async def test_restart_prereqs(
flow, scheduler, start,
graph_1, graph_2,
expected_1, expected_2, expected_3, expected_4
):
"""It should handle graph prerequisites change on restart.
Prerequisite changes must be applied to tasks already in the pool.
See https://github.com/cylc/cylc-flow/pull/5334
"""

conf = {
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'graph': {
'R1': '''
a => z
b => z
''',
'R1': graph_1
}
}
}
id_ = flow(conf)
schd = scheduler(id_, run_mode='simulation', paused_start=False)

async with start(schd):
# release tasks 1/a and 1/b
# Release tasks 1/a and 1/b
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert list_tasks(schd) == [
('1', 'a', 'running'),
('1', 'b', 'running'),
]
assert list_tasks(schd) == expected_1

# mark 1/a as succeeded and spawn 1/z
# Mark 1/a as succeeded and spawn 1/z
schd.pool.get_all_tasks()[0].state_reset('succeeded')
schd.pool.spawn_on_output(schd.pool.get_all_tasks()[0], 'succeeded')
assert list_tasks(schd) == [
('1', 'b', 'running'),
('1', 'z', 'waiting'),
]
assert list_tasks(schd) == expected_2

# save our progress
# Save our progress
schd.workflow_db_mgr.put_task_pool(schd.pool)

# edit the workflow to add a new dependency on "z"
conf['scheduling']['graph']['R1'] += '\n c => z'
# Edit the workflow to add a new dependency on "z"
conf['scheduling']['graph']['R1'] = graph_2
id_ = flow(conf, id_=id_)

# and restart it
# Restart it
schd = scheduler(id_, run_mode='simulation', paused_start=False)
async with start(schd):
# load jobs from db
schd.pool.reload_taskdefs()
# Load jobs from db
schd.workflow_db_mgr.pri_dao.select_jobs_for_restart(
schd.data_store_mgr.insert_db_job
)
assert list_tasks(schd) == [
('1', 'b', 'running'),
('1', 'z', 'waiting'),
]
assert list_tasks(schd) == expected_3

# ensure the new dependency 1/c has been added to 1/z
# and is *not* satisfied
# Check resulting dependencies of task z
task_z = schd.pool.get_all_tasks()[0]
assert sorted(
(
p.satisfied
for p in task_z.state.prerequisites
),
key=lambda d: tuple(d.keys())[0],
) == [
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
{('1', 'c', 'succeeded'): False},
) == expected_4

]

@pytest.mark.parametrize(
'graph_1, graph_2, '
'expected_1, expected_2, expected_3, expected_4',
[
param( # Reload after adding a prerequisite to task z
'''a => z
b => z''',
'''a => z
b => z
c => z''',
[
('1', 'a', 'running'),
('1', 'b', 'running'),
],
[
('1', 'b', 'running'),
('1', 'z', 'waiting'),
],
[
('1', 'b', 'running'),
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
{('1', 'c', 'succeeded'): False},
]
),
param( # Reload after removing a prerequisite from task z
'''a => z
b => z
c => z''',
'''a => z
b => z''',
[
('1', 'a', 'running'),
('1', 'b', 'running'),
('1', 'c', 'running'),
],
[
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
],
[
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
]
)
]
)
async def test_reload_prereqs(
flow, scheduler, start,
graph_1, graph_2,
expected_1, expected_2, expected_3, expected_4
):
"""It should handle graph prerequisites change on reload.
async def tests_restart_graph_change_2(flow, scheduler, start):
"""It should handle prerequisite removal on restart.
If the graph has changed when a workflow is restarted those changes should
be applied to tasks already in the pool.
Prerequisite changes must be applied to tasks already in the pool.
See https://github.com/cylc/cylc-flow/pull/5334
"""

conf = {
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'graph': {
'R1': '''
a => z
b => z
c => z
''',
'R1': graph_1
}
}
}
id_ = flow(conf)
schd = scheduler(id_, run_mode='simulation', paused_start=False)

async with start(schd):
# release tasks 1/a and 1/b
# Release tasks 1/a and 1/b
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert list_tasks(schd) == [
('1', 'a', 'running'),
('1', 'b', 'running'),
('1', 'c', 'running'),
]
assert list_tasks(schd) == expected_1

# mark 1/a as succeeded and spawn 1/z
# Mark 1/a as succeeded and spawn 1/z
schd.pool.get_all_tasks()[0].state_reset('succeeded')
schd.pool.spawn_on_output(schd.pool.get_all_tasks()[0], 'succeeded')
assert sorted(list_tasks(schd)) == sorted([
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
])

# save our progress
schd.workflow_db_mgr.put_task_pool(schd.pool)
assert list_tasks(schd) == expected_2

# edit the workflow to remove a dependency (c) from "z"
conf['scheduling']['graph']['R1'] = (
'''
a => z
b => z
'''
)

id_ = flow(conf, id_=id_)
# Modify flow.cylc to add a new dependency on "z"
conf['scheduling']['graph']['R1'] = graph_2
run_dir = Path(get_workflow_run_dir(id_))
_make_flow(get_cylc_run_dir(), run_dir, conf, '')

# and restart it
schd = scheduler(id_, run_mode='simulation', paused_start=False)
async with start(schd):
# load jobs from db
# Reload the workflow config
schd.command_reload_workflow()
schd.pool.reload_taskdefs()
schd.workflow_db_mgr.pri_dao.select_jobs_for_restart(
schd.data_store_mgr.insert_db_job
)
assert sorted(list_tasks(schd)) == sorted([
('1', 'b', 'running'),
('1', 'c', 'running'),
('1', 'z', 'waiting'),
])
assert list_tasks(schd) == expected_3

# ensure the dependency on 1/c has been removed from 1/z
# Check resulting dependencies of task z
task_z = schd.pool.get_all_tasks()[0]
assert sorted(
(
p.satisfied
for p in task_z.state.prerequisites
),
key=lambda d: tuple(d.keys())[0],
) == [
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'b', 'succeeded'): False},
]
) == expected_4

0 comments on commit 637cee2

Please sign in to comment.