Skip to content

Commit

Permalink
Merge pull request #5286 from MetRonnie/clock-trigger
Browse files Browse the repository at this point in the history
Fix clock trigger execution retry delay bug
  • Loading branch information
MetRonnie authored Jan 11, 2023
2 parents c1dd151 + ae7a87a commit 3469a76
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 33 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ to task_jobs table in the workflow database.

### Fixes

[#5286](https://github.com/cylc/cylc-flow/pull/5286) - Fix bug where
`[scheduling][special tasks]clock-trigger` would skip execution retry delays.

[#5292](https://github.com/cylc/cylc-flow/pull/5292) -
Fix an issue where polling could be repeated if the job's platform
was not available.
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,13 @@ def sub(self, other):
def is_null(self):
return (self == self.get_null())

def __str__(self):
def __str__(self) -> str:
# Stringify.
return self.value

def __repr__(self) -> str:
return f"<{type(self).__name__} {self}>"

def __add__(self, other):
# Add other (point or interval) to self.
if self.TYPE != other.TYPE:
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,11 +1591,11 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

# Old-style clock-trigger tasks:
if (
itask.tdef.clocktrigger_offset is not None
and itask.is_waiting_clock_done()
and all(itask.is_ready_to_run())
):
# Old-style clock-trigger tasks.
self.pool.queue_task(itask)

if housekeep_xtriggers:
Expand Down
48 changes: 19 additions & 29 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
Expand Down Expand Up @@ -959,38 +959,29 @@ def _run_job_cmd(
)

@staticmethod
def _set_retry_timers(itask, rtconfig=None, retry=True):
def _set_retry_timers(
itask: 'TaskProxy',
rtconfig: Optional[dict] = None
) -> None:
"""Set try number and retry delays."""
if rtconfig is None:
rtconfig = itask.tdef.rtconfig
if (
itask.tdef.run_mode + ' mode' in rtconfig and
'disable retries' in rtconfig[itask.tdef.run_mode + ' mode']
):
retry = False

if retry:
if rtconfig['submission retry delays']:
submit_delays = rtconfig['submission retry delays']
else:
submit_delays = itask.platform['submission retry delays']
submit_delays = (
rtconfig['submission retry delays']
or itask.platform['submission retry delays']
)

for key, delays in [
(
TimerFlags.SUBMISSION_RETRY,
submit_delays
),
(
TimerFlags.EXECUTION_RETRY,
rtconfig['execution retry delays']
)
]:
if delays is None:
delays = []
try:
itask.try_timers[key].set_delays(delays)
except KeyError:
itask.try_timers[key] = TaskActionTimer(delays=delays)
for key, delays in [
(TimerFlags.SUBMISSION_RETRY, submit_delays),
(TimerFlags.EXECUTION_RETRY, rtconfig['execution retry delays'])
]:
if delays is None:
delays = []
try:
itask.try_timers[key].set_delays(delays)
except KeyError:
itask.try_timers[key] = TaskActionTimer(delays=delays)

def _simulation_submit_task_jobs(self, itasks, workflow):
"""Simulation mode task jobs submission."""
Expand Down Expand Up @@ -1180,7 +1171,6 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
self._set_retry_timers(itask, rtconfig, False)
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def is_waiting_prereqs_done(self):
"""Are ALL prerequisites satisfied?"""
return (
all(pre.is_satisfied() for pre in self.state.prerequisites)
and all(tri for tri in self.state.external_triggers.values())
and self.state.external_triggers_all_satisfied()
and self.state.xtriggers_all_satisfied()
)

Expand Down
49 changes: 49 additions & 0 deletions tests/functional/special/08-clock-trigger-retry.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Old-style clock trigger should only go ahead if xtriggers are satisfied
# https://github.com/cylc/cylc-flow/issues/5217

. "$(dirname "$0")/test_header"
set_test_number 4

init_workflow "${TEST_NAME_BASE}" << __FLOW__
[scheduling]
initial cycle point = 2015
final cycle point = PT1S
[[special tasks]]
clock-trigger = foo(PT1H)
[[graph]]
T00 = foo
[runtime]
[[foo]]
execution retry delays = PT5S # hopefully enough time to check task doesn't resubmit immediately
script = ((CYLC_TASK_TRY_NUMBER > 1))
__FLOW__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "$WORKFLOW_NAME"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "$WORKFLOW_NAME"

log_scan "${TEST_NAME_BASE}-log-scan" \
"${WORKFLOW_RUN_DIR}/log/scheduler/log" 2 1 \
"\[20150101.*/foo .* job:01 .* retrying in PT5S" \
"xtrigger satisfied: _cylc_retry_20150101"
# (if task resubmits immediately instead of waiting PT5S, xtrigger msg will not appear)

purge

0 comments on commit 3469a76

Please sign in to comment.