Skip to content

Commit

Permalink
Merge pull request #4511 from hjoliver/clock-trigger-fix
Browse files Browse the repository at this point in the history
Fix clock-trigger offset computation.
  • Loading branch information
hjoliver authored Jan 10, 2022
2 parents 17496dc + 64c448e commit 818c92d
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 69 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ are now sparse, i.e. they will no longer be fleshed-out with defaults.

### Fixes

[#4511](https://github.com/cylc/cylc-flow/pull/4511) - Fix clock xtriggers for
large inexact offsets (year, months); restore time check for old-style
(task-property) clock triggers.

[#4553](https://github.com/cylc/cylc-flow/pull/4553) - Add job submit time
to the datastore.

Expand Down
23 changes: 15 additions & 8 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
structures.
"""

import contextlib
from contextlib import suppress
from copy import copy
from fnmatch import fnmatchcase
import os
Expand Down Expand Up @@ -727,7 +727,7 @@ def process_final_cycle_point(self) -> None:
self.initial_point
).standardise()
else:
with contextlib.suppress(IsodatetimeError):
with suppress(IsodatetimeError):
# Relative, ISO8601 cycling.
self.final_point = get_point_relative(
fcp_str, self.initial_point).standardise()
Expand Down Expand Up @@ -1642,12 +1642,19 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})

if (xtrig.func_name == 'wall_clock' and
self.cfg['scheduling']['cycling mode'] == (
INTEGER_CYCLING_TYPE)):
sig = xtrig.get_signature()
raise WorkflowConfigError(
f"clock xtriggers need date-time cycling: {label} = {sig}")
if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,13 @@ async def main_loop(self):
):
self.pool.queue_task(itask)

if (
itask.tdef.clocktrigger_offset is not None
and itask.is_waiting_clock_done()
):
# Old-style clock-trigger tasks.
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
itask.identity
))
kwargs = {
'absolute_as_seconds': wallclock_time
'trigger_time': wallclock_time
}

# if this isn't the first retry the xtrigger will already exist
Expand Down
43 changes: 33 additions & 10 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from metomi.isodatetime.timezone import get_local_time_zone

import cylc.flow.cycling.iso8601
from cylc.flow import LOG
from cylc.flow.cycling.loader import standardise_point_string
from cylc.flow.exceptions import PointParsingError
Expand All @@ -34,6 +33,11 @@
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
from cylc.flow.taskdef import generate_graph_children
from cylc.flow.wallclock import get_unix_time_from_time_string as str2time
from cylc.flow.cycling.iso8601 import (
point_parse,
interval_parse,
ISO8601Interval
)

if TYPE_CHECKING:
from cylc.flow.cycling import PointBase
Expand All @@ -47,6 +51,7 @@ class TaskProxy:
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for both old-style clock triggers and wall_clock xtrigger).
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -267,7 +272,7 @@ def copy_to_reload_successor(self, reload_successor):
@staticmethod
def get_offset_as_seconds(offset):
"""Return an ISO interval as seconds."""
iso_offset = cylc.flow.cycling.iso8601.interval_parse(str(offset))
iso_offset = interval_parse(str(offset))
return int(iso_offset.get_seconds())

def get_late_time(self):
Expand All @@ -285,8 +290,7 @@ def get_late_time(self):
def get_point_as_seconds(self):
"""Compute and store my cycle point as seconds since epoch."""
if self.point_as_seconds is None:
iso_timepoint = cylc.flow.cycling.iso8601.point_parse(
str(self.point))
iso_timepoint = point_parse(str(self.point))
self.point_as_seconds = int(iso_timepoint.get(
'seconds_since_unix_epoch'))
if iso_timepoint.time_zone.unknown:
Expand All @@ -297,6 +301,26 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.
"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).get('seconds_since_unix_epoch')
)
return self.clock_trigger_time

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
try:
Expand Down Expand Up @@ -351,17 +375,16 @@ def set_summary_time(self, event_key, time_str=None):
self.summary[event_key + '_time_string'] = time_str

def is_waiting_clock_done(self):
"""Is this task done waiting for its clock trigger time?
"""Is this task done waiting for its old-style clock trigger time?
Return True if there is no clock trigger or when clock trigger is done.
"""
if self.tdef.clocktrigger_offset is None:
return True
if self.clock_trigger_time is None:
self.clock_trigger_time = (
self.get_point_as_seconds() +
self.get_offset_as_seconds(self.tdef.clocktrigger_offset))
return time() >= self.clock_trigger_time
return (
time() >
self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset))
)

def is_task_prereqs_not_done(self):
"""Are some task prerequisites not satisfied?"""
Expand Down
43 changes: 25 additions & 18 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class XtriggerManager:
previous one has returned via the xtrigger callback. The interval (in
"name(args):INTVL") determines frequency of calls (default PT10S).
Once a trigger is satisfied, remember it until the cleanup cutoff point.
Delete satisfied xtriggers no longer needed by any current tasks.
Clock triggers are treated separately and called synchronously in the main
process, because they are guaranteed to be quick (but they are still
Expand Down Expand Up @@ -247,18 +247,32 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
}
farg_templ.update(self.farg_templ)
ctx = deepcopy(self.functx_map[label])
kwargs = {}

args = []
for val in ctx.func_args:
with suppress(TypeError):
val = val % farg_templ
args.append(val)
for key, val in ctx.func_kwargs.items():
with suppress(TypeError):
val = val % farg_templ
kwargs[key] = val
kwargs = {}
if ctx.func_name == 'wall_clock':
# Handle clock xtriggers.
if "trigger_time" not in ctx.func_kwargs:
# External (clock xtrigger) convert offset to trigger_time.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
ctx.func_kwargs["offset"]
)
else:
# Internal (retry timer) we set trigger_time directly.
kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"]
else:
# Other xtrig functions: substitute template values.
for val in ctx.func_args:
with suppress(TypeError):
val = val % farg_templ
args.append(val)
for key, val in ctx.func_kwargs.items():
with suppress(TypeError):
val = val % farg_templ
kwargs[key] = val
ctx.func_args = args
ctx.func_kwargs = kwargs

ctx.update_command(self.workflow_run_dir)
return ctx

Expand All @@ -267,19 +281,12 @@ def call_xtriggers_async(self, itask: TaskProxy):
...if previous call not still in-process and retry period is up.
Args:
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
if sig.startswith("wall_clock"):
# Special case: quick synchronous clock check.
if 'absolute_as_seconds' not in ctx.func_kwargs:
ctx.func_kwargs.update(
{
'point_as_seconds': itask.get_point_as_seconds()
}
)
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
itask.state.xtriggers[label] = True
self.sat_xtrig[sig] = {}
Expand Down Expand Up @@ -359,7 +366,7 @@ def check_xtriggers(
Return True if satisfied, else False
Args:
itasks: task proxs to check
itasks: task proxies to check
db_update_func: method to update xtriggers in the DB
"""
if itask.state.xtriggers_all_satisfied():
Expand Down
31 changes: 5 additions & 26 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""xtrigger function to check cycle point offset against the wall clock.
"""
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from cylc.flow.cycling.iso8601 import interval_parse


def wall_clock(offset=None, absolute_as_seconds=None, point_as_seconds=None):
"""Return True if now > (point + offset) else False.

Either provide an offset from the current cycle point *or* a wall-clock
time.
def wall_clock(trigger_time=None):
"""Return True after the desired wall clock time, False.
Args:
offset (str):
Satisfy this xtrigger after an offset from the current cycle point.
Should be a duration in ISO8601 format.
absolute_as_seconds (int):
Satisfy this xtrigger after the specified time.
Should be a datetime in the unix time format.
point_as_seconds (int):
Provided by Cylc. The cycle point in unix time format.
trigger_time (int):
Trigger time as seconds since Unix epoch.
"""
offset_as_seconds = 0
if offset is not None:
offset_as_seconds = int(interval_parse(offset).get_seconds())
if absolute_as_seconds:
trigger_time = absolute_as_seconds
else:
trigger_time = point_as_seconds + offset_as_seconds

return time() > trigger_time
7 changes: 4 additions & 3 deletions tests/flakyfunctional/xtriggers/00-wall_clock.t
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ run_workflow() {
set_test_number 5
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

NOW="$(date '+%Y%m%dT%H')"
NOW="$(date -u '+%Y%m%dT%H')"

# Validate and run with "now" clock trigger (satisfied).
# Initial cycle point is the hour just passed.
START="$NOW"
HOUR="$(date +%H)"
HOUR="$(date -u +%H)"
OFFSET="PT0S"

# Validate and run with "now" clock trigger (satisfied).
run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" \
-s "START='${START}'" -s "HOUR='${HOUR}'" -s "OFFSET='${OFFSET}'"

Expand Down
4 changes: 2 additions & 2 deletions tests/flakyfunctional/xtriggers/00-wall_clock/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!Jinja2

# Test wall_clock xtrigger: workflow will run to completion and exit if the
# clock trigger is not satisfied, else stall and abort.
# clock trigger is not satisfied, else abort on inactivity.

[scheduler]
cycle point time zone = '+0530'
# Default to time zone Z
[[events]]
abort on inactivity timeout = True
inactivity timeout = PT15S
Expand Down
38 changes: 38 additions & 0 deletions tests/functional/clock-trigger-inexact/00-big-offset.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test clock triggers (xtrigger and old-style) with a large inexact offset.

. "$(dirname "$0")/test_header"
set_test_number 5
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"

cylc cat-log "${WORKFLOW_NAME}" > log
START_HOUR=$(grep 'Workflow:' log | cut -c 1-13)
START_MINU=$(grep 'Workflow:' log | cut -c 15-16)
TRIGG_MINU=$(( 10#${START_MINU} + 1))
[[ $START_MINU == 0* ]] && TRIGG_MINU=0${TRIGG_MINU}

for NAME in foo bar baz; do
grep_ok "${START_HOUR}:${TRIGG_MINU}.* INFO - \[${NAME}\..*\] => waiting$" log
done

purge
Loading

0 comments on commit 818c92d

Please sign in to comment.