diff --git a/CHANGES.md b/CHANGES.md index 779741195c9..4bc158c5e24 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,6 +83,10 @@ are now sparse, i.e. they will no longer be fleshed-out with defaults. ### Fixes +[#4511](https://github.com/cylc/cylc-flow/pull/4511) - Fix clock xtriggers for +large inexact offsets (year, months); restore time check for old-style +(task-property) clock triggers. + [#4553](https://github.com/cylc/cylc-flow/pull/4553) - Add job submit time to the datastore. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 703c60e5408..957ad4b5696 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -26,7 +26,7 @@ structures. """ -import contextlib +from contextlib import suppress from copy import copy from fnmatch import fnmatchcase import os @@ -727,7 +727,7 @@ def process_final_cycle_point(self) -> None: self.initial_point ).standardise() else: - with contextlib.suppress(IsodatetimeError): + with suppress(IsodatetimeError): # Relative, ISO8601 cycling. self.final_point = get_point_relative( fcp_str, self.initial_point).standardise() @@ -1646,12 +1646,19 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, xtrig = SubFuncContext( 'wall_clock', 'wall_clock', [], {}) - if (xtrig.func_name == 'wall_clock' and - self.cfg['scheduling']['cycling mode'] == ( - INTEGER_CYCLING_TYPE)): - sig = xtrig.get_signature() - raise WorkflowConfigError( - f"clock xtriggers need date-time cycling: {label} = {sig}") + if xtrig.func_name == 'wall_clock': + if self.cycling_type == INTEGER_CYCLING_TYPE: + raise WorkflowConfigError( + "Clock xtriggers require datetime cycling:" + f" {label} = {xtrig.get_signature()}" + ) + else: + # Convert offset arg to kwarg for certainty later. + if "offset" not in xtrig.func_kwargs: + xtrig.func_kwargs["offset"] = None + with suppress(IndexError): + xtrig.func_kwargs["offset"] = xtrig.func_args[0] + if self.xtrigger_mgr is None: XtriggerManager.validate_xtrigger(label, xtrig, self.fdir) else: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 1731ef16514..d2c7a843aad 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1501,6 +1501,13 @@ async def main_loop(self): ): self.pool.queue_task(itask) + if ( + itask.tdef.clocktrigger_offset is not None + and itask.is_waiting_clock_done() + ): + # Old-style clock-trigger tasks. + self.pool.queue_task(itask) + if housekeep_xtriggers: # (Could do this periodically?) self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index f3551c3ac19..bb9c854e7c2 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -867,7 +867,7 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): itask.identity )) kwargs = { - 'absolute_as_seconds': wallclock_time + 'trigger_time': wallclock_time } # if this isn't the first retry the xtrigger will already exist diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 893af5ffdc7..18e76f9d6b7 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -24,7 +24,6 @@ from metomi.isodatetime.timezone import get_local_time_zone -import cylc.flow.cycling.iso8601 from cylc.flow import LOG from cylc.flow.cycling.loader import standardise_point_string from cylc.flow.exceptions import PointParsingError @@ -34,6 +33,11 @@ from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING from cylc.flow.taskdef import generate_graph_children from cylc.flow.wallclock import get_unix_time_from_time_string as str2time +from cylc.flow.cycling.iso8601 import ( + point_parse, + interval_parse, + ISO8601Interval +) if TYPE_CHECKING: from cylc.flow.cycling import PointBase @@ -47,6 +51,7 @@ class TaskProxy: Attributes: .clock_trigger_time: Clock trigger time in seconds since epoch. + (Used for both old-style clock triggers and wall_clock xtrigger). .expire_time: Time in seconds since epoch when this task is considered expired. .identity: @@ -267,7 +272,7 @@ def copy_to_reload_successor(self, reload_successor): @staticmethod def get_offset_as_seconds(offset): """Return an ISO interval as seconds.""" - iso_offset = cylc.flow.cycling.iso8601.interval_parse(str(offset)) + iso_offset = interval_parse(str(offset)) return int(iso_offset.get_seconds()) def get_late_time(self): @@ -285,8 +290,7 @@ def get_late_time(self): def get_point_as_seconds(self): """Compute and store my cycle point as seconds since epoch.""" if self.point_as_seconds is None: - iso_timepoint = cylc.flow.cycling.iso8601.point_parse( - str(self.point)) + iso_timepoint = point_parse(str(self.point)) self.point_as_seconds = int(iso_timepoint.get( 'seconds_since_unix_epoch')) if iso_timepoint.time_zone.unknown: @@ -297,6 +301,26 @@ def get_point_as_seconds(self): self.point_as_seconds += utc_offset_in_seconds return self.point_as_seconds + def get_clock_trigger_time(self, offset_str): + """Compute, cache, and return trigger time relative to cycle point. + + Args: + offset_str: ISO8601Interval string, e.g. "PT2M". + Can be None for zero offset. + Returns: + Absolute trigger time in seconds since Unix epoch. + + """ + if self.clock_trigger_time is None: + if offset_str is None: + trigger_time = self.point + else: + trigger_time = self.point + ISO8601Interval(offset_str) + self.clock_trigger_time = int( + point_parse(str(trigger_time)).get('seconds_since_unix_epoch') + ) + return self.clock_trigger_time + def get_try_num(self): """Return the number of automatic tries (try number).""" try: @@ -351,17 +375,16 @@ def set_summary_time(self, event_key, time_str=None): self.summary[event_key + '_time_string'] = time_str def is_waiting_clock_done(self): - """Is this task done waiting for its clock trigger time? + """Is this task done waiting for its old-style clock trigger time? Return True if there is no clock trigger or when clock trigger is done. """ if self.tdef.clocktrigger_offset is None: return True - if self.clock_trigger_time is None: - self.clock_trigger_time = ( - self.get_point_as_seconds() + - self.get_offset_as_seconds(self.tdef.clocktrigger_offset)) - return time() >= self.clock_trigger_time + return ( + time() > + self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset)) + ) def is_task_prereqs_not_done(self): """Are some task prerequisites not satisfied?""" diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 0dfe1d1869a..45d292623ac 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -82,7 +82,7 @@ class XtriggerManager: previous one has returned via the xtrigger callback. The interval (in "name(args):INTVL") determines frequency of calls (default PT10S). - Once a trigger is satisfied, remember it until the cleanup cutoff point. + Delete satisfied xtriggers no longer needed by any current tasks. Clock triggers are treated separately and called synchronously in the main process, because they are guaranteed to be quick (but they are still @@ -247,18 +247,32 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: } farg_templ.update(self.farg_templ) ctx = deepcopy(self.functx_map[label]) - kwargs = {} + args = [] - for val in ctx.func_args: - with suppress(TypeError): - val = val % farg_templ - args.append(val) - for key, val in ctx.func_kwargs.items(): - with suppress(TypeError): - val = val % farg_templ - kwargs[key] = val + kwargs = {} + if ctx.func_name == 'wall_clock': + # Handle clock xtriggers. + if "trigger_time" not in ctx.func_kwargs: + # External (clock xtrigger) convert offset to trigger_time. + kwargs["trigger_time"] = itask.get_clock_trigger_time( + ctx.func_kwargs["offset"] + ) + else: + # Internal (retry timer) we set trigger_time directly. + kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"] + else: + # Other xtrig functions: substitute template values. + for val in ctx.func_args: + with suppress(TypeError): + val = val % farg_templ + args.append(val) + for key, val in ctx.func_kwargs.items(): + with suppress(TypeError): + val = val % farg_templ + kwargs[key] = val ctx.func_args = args ctx.func_kwargs = kwargs + ctx.update_command(self.workflow_run_dir) return ctx @@ -267,19 +281,12 @@ def call_xtriggers_async(self, itask: TaskProxy): ...if previous call not still in-process and retry period is up. - Args: itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): if sig.startswith("wall_clock"): # Special case: quick synchronous clock check. - if 'absolute_as_seconds' not in ctx.func_kwargs: - ctx.func_kwargs.update( - { - 'point_as_seconds': itask.get_point_as_seconds() - } - ) if wall_clock(*ctx.func_args, **ctx.func_kwargs): itask.state.xtriggers[label] = True self.sat_xtrig[sig] = {} @@ -359,7 +366,7 @@ def check_xtriggers( Return True if satisfied, else False Args: - itasks: task proxs to check + itasks: task proxies to check db_update_func: method to update xtriggers in the DB """ if itask.state.xtriggers_all_satisfied(): diff --git a/cylc/flow/xtriggers/wall_clock.py b/cylc/flow/xtriggers/wall_clock.py index 008f1be0b1f..3c50d956f0a 100644 --- a/cylc/flow/xtriggers/wall_clock.py +++ b/cylc/flow/xtriggers/wall_clock.py @@ -14,37 +14,16 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""xtrigger function to check cycle point offset against the wall clock. - -""" +"""xtrigger function to trigger off of a wall clock time.""" from time import time -from cylc.flow.cycling.iso8601 import interval_parse - -def wall_clock(offset=None, absolute_as_seconds=None, point_as_seconds=None): - """Return True if now > (point + offset) else False. - Either provide an offset from the current cycle point *or* a wall-clock - time. +def wall_clock(trigger_time=None): + """Return True after the desired wall clock time, False. Args: - offset (str): - Satisfy this xtrigger after an offset from the current cycle point. - Should be a duration in ISO8601 format. - absolute_as_seconds (int): - Satisfy this xtrigger after the specified time. - Should be a datetime in the unix time format. - point_as_seconds (int): - Provided by Cylc. The cycle point in unix time format. - + trigger_time (int): + Trigger time as seconds since Unix epoch. """ - offset_as_seconds = 0 - if offset is not None: - offset_as_seconds = int(interval_parse(offset).get_seconds()) - if absolute_as_seconds: - trigger_time = absolute_as_seconds - else: - trigger_time = point_as_seconds + offset_as_seconds - return time() > trigger_time diff --git a/tests/flakyfunctional/xtriggers/00-wall_clock.t b/tests/flakyfunctional/xtriggers/00-wall_clock.t index efc99257b7c..9e966f8cd52 100644 --- a/tests/flakyfunctional/xtriggers/00-wall_clock.t +++ b/tests/flakyfunctional/xtriggers/00-wall_clock.t @@ -26,13 +26,14 @@ run_workflow() { set_test_number 5 install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" -NOW="$(date '+%Y%m%dT%H')" +NOW="$(date -u '+%Y%m%dT%H')" -# Validate and run with "now" clock trigger (satisfied). +# Initial cycle point is the hour just passed. START="$NOW" -HOUR="$(date +%H)" +HOUR="$(date -u +%H)" OFFSET="PT0S" +# Validate and run with "now" clock trigger (satisfied). run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" \ -s "START='${START}'" -s "HOUR='${HOUR}'" -s "OFFSET='${OFFSET}'" diff --git a/tests/flakyfunctional/xtriggers/00-wall_clock/flow.cylc b/tests/flakyfunctional/xtriggers/00-wall_clock/flow.cylc index 9edcc7b5cd6..eff07b589cd 100644 --- a/tests/flakyfunctional/xtriggers/00-wall_clock/flow.cylc +++ b/tests/flakyfunctional/xtriggers/00-wall_clock/flow.cylc @@ -1,10 +1,10 @@ #!Jinja2 # Test wall_clock xtrigger: workflow will run to completion and exit if the -# clock trigger is not satisfied, else stall and abort. +# clock trigger is not satisfied, else abort on inactivity. [scheduler] - cycle point time zone = '+0530' + # Default to time zone Z [[events]] abort on inactivity timeout = True inactivity timeout = PT15S diff --git a/tests/functional/clock-trigger-inexact/00-big-offset.t b/tests/functional/clock-trigger-inexact/00-big-offset.t new file mode 100644 index 00000000000..1dcbec06528 --- /dev/null +++ b/tests/functional/clock-trigger-inexact/00-big-offset.t @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Test clock triggers (xtrigger and old-style) with a large inexact offset. + +. "$(dirname "$0")/test_header" +set_test_number 5 +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" + +cylc cat-log "${WORKFLOW_NAME}" > log +START_HOUR=$(grep 'Workflow:' log | cut -c 1-13) +START_MINU=$(grep 'Workflow:' log | cut -c 15-16) +TRIGG_MINU=$(( 10#${START_MINU} + 1)) +[[ $START_MINU == 0* ]] && TRIGG_MINU=0${TRIGG_MINU} + +for NAME in foo bar baz; do + grep_ok "${START_HOUR}:${TRIGG_MINU}.* INFO - \[${NAME}\..*\] => waiting$" log +done + +purge diff --git a/tests/functional/clock-trigger-inexact/00-big-offset/flow.cylc b/tests/functional/clock-trigger-inexact/00-big-offset/flow.cylc new file mode 100644 index 00000000000..9f75d95e4dd --- /dev/null +++ b/tests/functional/clock-trigger-inexact/00-big-offset/flow.cylc @@ -0,0 +1,32 @@ +#!Jinja2 + +# Test clock-trigger offset involving inexact intervals (months and years), +# which requires adding the offset to the cycle point before conversion to +# absolute seconds. Initial cycle point is next minute minus P1Y7M, with +# opposite clock-offset to get real time triggering once per minute. + +{% set OFFSET = "P1Y7M" %} + +[scheduler] + [[events]] + # May take up to 60 secs to finish, allow some extra time. + inactivity timeout = PT80S + abort on inactivity timeout = True +[scheduling] + initial cycle point = next(T--00) - {{OFFSET}} # next minute - P1Y7M + final cycle point = +P0Y + [[xtriggers]] + clock1 = wall_clock({{OFFSET}}) # xtrigger with arg + clock2 = wall_clock(offset={{OFFSET}}) # xtrigger with kwarg + [[special tasks]] + clock-trigger = baz({{OFFSET}}) # old-style clock-triggered task + [[graph]] + PT1M = """ + # These should all trigger at once, at the first minute boundary + # after start-up + @clock1 => foo + @clock2 => bar + baz + """ +[runtime] + [[foo, bar, baz]] diff --git a/tests/functional/clock-trigger-inexact/test_header b/tests/functional/clock-trigger-inexact/test_header new file mode 120000 index 00000000000..90bd5a36f92 --- /dev/null +++ b/tests/functional/clock-trigger-inexact/test_header @@ -0,0 +1 @@ +../lib/bash/test_header \ No newline at end of file diff --git a/tests/functional/validate/70-no-clock-int-cycle.t b/tests/functional/validate/70-no-clock-int-cycle.t index ddce73c6065..7fd5d2b10ed 100644 --- a/tests/functional/validate/70-no-clock-int-cycle.t +++ b/tests/functional/validate/70-no-clock-int-cycle.t @@ -34,5 +34,5 @@ __FLOW_CONFIG__ run_fail "${TEST_NAME_BASE}-val" cylc validate 'flow.cylc' contains_ok "${TEST_NAME_BASE}-val.stderr" <<'__END__' -WorkflowConfigError: clock xtriggers need date-time cycling: c1 = wall_clock(offset=P0Y) +WorkflowConfigError: Clock xtriggers require datetime cycling: c1 = wall_clock(offset=P0Y) __END__ diff --git a/tests/unit/test_task_proxy.py b/tests/unit/test_task_proxy.py index e96a8f2258c..c974822a0c8 100644 --- a/tests/unit/test_task_proxy.py +++ b/tests/unit/test_task_proxy.py @@ -46,6 +46,47 @@ def test_point_match( assert TaskProxy.point_match(mock_itask, point_str) is expected +@pytest.mark.parametrize( + 'itask_point, offset_str, expected', + [ + param( # date -u -d 19700101 "+%s" + ISO8601Point('19700101T00Z'), 'PT0M', 0, id="zero epoch" + ), + param( # 2025 is not a leap year: Jan 1 + P2M = P59D + ISO8601Point('20250101T00Z'), 'PT0M', 1735689600, id="nonleap base" + ), + param( + ISO8601Point('20250101T00Z'), 'P59D', 1740787200, id="nonleap off1" + ), + param( + ISO8601Point('20250101T00Z'), 'P2M', 1740787200, id="nonleap off2" + ), + param( # 2024 is a leap year: Jan 1 + P2M = P60D + ISO8601Point('20240101T00Z'), 'PT0M', 1704067200, id="leap base" + ), + param( + ISO8601Point('20240101T00Z'), 'P60D', 1709251200, id="leap off1" + ), + param( + ISO8601Point('20240101T00Z'), 'P2M', 1709251200, id="leap off2" + ), + ] +) +def test_get_clock_trigger_time( + itask_point: PointBase, + offset_str: str, + expected: int, + set_cycling_type: Callable +) -> None: + """Test get_clock_trigger_time() for exact and inexact offsets.""" + set_cycling_type(itask_point.TYPE) + mock_itask = Mock( + point=itask_point.standardise(), + clock_trigger_time=None + ) + assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected + + @pytest.mark.parametrize( 'name_str, expected', [('beer', True),