Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data store avoid graph re-walk #5660

Merged
merged 14 commits into from
Nov 2, 2023
3 changes: 3 additions & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ message PbWorkflow {
optional bool pruned = 37;
optional int32 is_runahead_total = 38;
optional bool states_updated = 39;
optional int32 n_edge_distance = 40;
}

// Selected runtime fields
Expand Down Expand Up @@ -227,6 +228,7 @@ message PbTaskProxy {
optional bool is_runahead = 26;
optional bool flow_wait = 27;
optional PbRuntime runtime = 28;
optional int32 graph_depth = 29;
}

message PbFamily {
Expand Down Expand Up @@ -264,6 +266,7 @@ message PbFamilyProxy {
optional bool is_runahead = 19;
optional int32 is_runahead_total = 20;
optional PbRuntime runtime = 21;
optional int32 graph_depth = 22;
}

message PbEdge {
Expand Down
137 changes: 68 additions & 69 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

669 changes: 486 additions & 183 deletions cylc/flow/data_store_mgr.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ def node_filter(node, node_type, args, state):
args.get('maxdepth', -1) < 0
or node.depth <= args['maxdepth']
)
and (
args.get('graph_depth', -1) < 0
or node.graph_depth <= args['graph_depth']
)
# Now filter node against id arg lists
and (
not args.get('ids')
Expand Down
21 changes: 19 additions & 2 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand All @@ -218,6 +219,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand All @@ -226,8 +228,6 @@ class SortArgs(InputObjectType):
'exids': graphene.List(ID, default_value=[]),
'states': graphene.List(String, default_value=[]),
'exstates': graphene.List(String, default_value=[]),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand Down Expand Up @@ -785,6 +785,12 @@ class Meta:
description='Any active workflow broadcasts.'
)
pruned = Boolean() # TODO: what is this? write description
n_edge_distance = Int(
description=sstrip('''
The maximum graph distance (n) from an active node
of the data-store graph window.
'''),
)


class RuntimeSetting(ObjectType):
Expand Down Expand Up @@ -1072,6 +1078,11 @@ class Meta:
depth = Int(
description='The family inheritance depth',
)
graph_depth = Int(
description=sstrip('''
The n-window graph edge depth from closet active task(s).
'''),
)
job_submits = Int(
description='The number of job submissions for this task instance.',
)
Expand Down Expand Up @@ -1222,6 +1233,12 @@ class Meta:
is_runahead = Boolean()
is_runahead_total = Int()
depth = Int()
graph_depth = Int(
description=sstrip('''
The n-window graph edge smallest child task/family depth
from closet active task(s).
'''),
)
child_tasks = graphene.List(
TaskProxy,
description="""Descendant task proxies.""",
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,13 +842,14 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':

return point_itasks

def get_task(self, point, name):
def get_task(self, point, name) -> Optional[TaskProxy]:
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
for pool in (self.main_pool, self.hidden_pool):
tasks = pool.get(point)
if tasks and rel_id in tasks:
return tasks[rel_id]
return None

def _get_hidden_task_by_id(self, id_: str) -> Optional[TaskProxy]:
"""Return runahead pool task by ID if it exists, or None."""
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ def __init__(
self.state = TaskState(tdef, self.point, status, is_held)

# Determine graph children of this task (for spawning).
self.graph_children = generate_graph_children(tdef, self.point)
if data_mode:
self.graph_children = {}
else:
self.graph_children = generate_graph_children(tdef, self.point)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} '{self.tokens}'>"
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/cylc-show/06-past-present-future/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
cylc stop --now --max-polls=10 --interval=1 $CYLC_WORKFLOW_ID
false
else
# Allow time for c submission => running
sleep 2
cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/graphql/01-workflow.t
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ query {
oldestActiveCyclePoint
reloaded
runMode
nEdgeDistance
stateTotals
workflowLogDir
timeZoneInfo {
Expand Down Expand Up @@ -96,6 +97,7 @@ cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__
"oldestActiveCyclePoint": "20210101T0000Z",
"reloaded": false,
"runMode": "live",
"nEdgeDistance": 1,
"stateTotals": {
"waiting": 1,
"expired": 0,
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/graphql/03-is-held-arg.t
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ query {
workflows {
name
isHeldTotal
taskProxies(isHeld: true) {
taskProxies(isHeld: true, graphDepth: 1) {
id
jobs {
submittedTime
startedTime
}
}
familyProxies(exids: [\"*/root\"], isHeld: true) {
familyProxies(exids: [\"*/root\"], isHeld: true, graphDepth: 1) {
id
}
}
Expand Down
66 changes: 66 additions & 0 deletions tests/functional/n-window/01-past-present-future.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test window size using graphql and cylc-show for all tasks.

. "$(dirname "$0")/test_header"

set_test_number 7

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => c => d => e', 'a' sets window size to 2, 'c' uses cylc show on all.
workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-show-a.past"
contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
state: succeeded
prerequisites: (None)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-b.past"
contains_ok "$WORKFLOW_RUN_DIR/show-b.txt" <<__END__
state: succeeded
prerequisites: (n/a for past tasks)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-c.present"
contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__
prerequisites: ('-': not satisfied)
+ 1/b succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-d.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-d.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/c succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-e.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-e.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/d succeeded
__END__

purge
41 changes: 41 additions & 0 deletions tests/functional/n-window/01-past-present-future/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[scheduler]
allow implicit tasks = True
[[events]]
inactivity timeout = PT1M
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = """
a => b => c => d => e
"""
[runtime]
[[a]]
script = """
set +e

read -r -d '' gqlDoc <<_DOC_
{"request_string": "
mutation {
setGraphWindowExtent (
workflows: [\"${CYLC_WORKFLOW_ID}\"],
nEdgeDistance: 2) {
result
}
}",
"variables": null}
_DOC_

echo "${gqlDoc}"

cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null

set -e
"""
[[c]]
script = """
cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
cylc show "$CYLC_WORKFLOW_ID//1/e" >> $CYLC_WORKFLOW_RUN_DIR/show-e.txt
"""
56 changes: 56 additions & 0 deletions tests/functional/n-window/02-big-window.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test large window size using graphql and find tasks in window.
# This is helpful with coverage by using most the no-rewalk mechanics.
Comment on lines +19 to +20
Copy link
Member

@oliver-sanders oliver-sanders Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These show tests are integration testable which removes the potential for race conditions and gives us more flexibility to push tasks into whatever state we choose.

# untested
from cylc.flow.scripts import show
from cylc.flow.option_parsers import Options
from cylc.flow.cyclers.integer import IntegerPoint
from cylc.flow.task_status import TASK_STATUS_SUCCEEDED

ShowOptions = Options(get_option_parser())

def show_task(schd, task):
        return await show(
            schd.tokens.id,
            schd.tokens.duplicate(cycle='1', task=task),
            ShowOptions(json=True)
        )

async def test_whatevs(flow, scheduler, start)
    id_ = flow({
        'scheduler': {
            'allow implicit tasks': 'true',
        },
        'scheduling': {
             'initial cycle point': '1',
             'cycling mode': 'integer',
             'graph': {
                'R1': ...
            }
         }
    })
    schd = scheduler(id_)
    async with start(schd):
        itask = schd.pool.get_task(IntegerPoint(1), 'task-name')
        await schd.update_data_structure()

        assert await show_task(schd, 'task-name') == {...}

        itask.reset_state(TASK_STATUS_SUCCEEDED)
        schd.data_store_mgr.delta_task_status(itask)
        assert await show_task(schd, 'task-name') == {...}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this functional test still needed now that integration tests have been added? Or do they cover different things?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At min tests API and cylc-show on large windows, also larger window size than integration tests (I think)


. "$(dirname "$0")/test_header"

set_test_number 5

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => . . . f => g => h', 'a' sets window size to 5,
# 'b => i => j => f', 'c' finds 'a', 'j', 'h'
workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-show-a.past"
contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
state: succeeded
prerequisites: (None)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-j.parallel"
contains_ok "${WORKFLOW_RUN_DIR}/show-j.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/i succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-h.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-h.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/g succeeded
__END__

purge
52 changes: 52 additions & 0 deletions tests/functional/n-window/02-big-window/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[scheduler]
allow implicit tasks = True
[[events]]
inactivity timeout = PT1M
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = """
a => b => c => d => e => f => g => h
b => i => j => f
"""
[runtime]
[[a]]
script = """
set +e

read -r -d '' gqlDoc <<_DOC_
{"request_string": "
mutation {
setGraphWindowExtent (
workflows: [\"${CYLC_WORKFLOW_ID}\"],
nEdgeDistance: 5) {
result
}
}",
"variables": null}
_DOC_

echo "${gqlDoc}"

cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null

set -e
"""
[[c]]
script = """
cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
cylc show "$CYLC_WORKFLOW_ID//1/j" >> $CYLC_WORKFLOW_RUN_DIR/show-j.txt
cylc show "$CYLC_WORKFLOW_ID//1/h" >> $CYLC_WORKFLOW_RUN_DIR/show-h.txt
"""

[[i]]
script = """
# Slow 2nd branch down
sleep 5
"""

[[f]]
script = """
# test re-trigger of old point
cylc trigger "$CYLC_WORKFLOW_ID//1/b"
"""
1 change: 1 addition & 0 deletions tests/functional/n-window/test_header
Loading