Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spawning of mixed-parentage tasks. #4906

Merged
merged 18 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ triggers in back-compat mode.
[#4887](https://github.com/cylc/cylc-flow/pull/4887) - Disallow relative paths
in `global.cylc[install]source dirs`.

[#4906](https://github.com/cylc/cylc-flow/pull/4906)
- Fix delayed spawning of parentless tasks that do have parents in a previous
cycle point.
- Make integer-interval runahead limits consistent with time-interval limits:
`P0` means just the runahead base point; `P1` the base point and the point
(i.e. one cycle interval), and so on.

[#4936](https://github.com/cylc/cylc-flow/pull/4936) - Fix incorrect
error messages when workflow CLI commands fail.

Expand Down
54 changes: 24 additions & 30 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,41 +615,32 @@ def get_script_common_text(this: str, example: Optional[str] = None):
datetime calendars: 360 day (12 months of 30 days in a year),
365 day (never a leap year) and 366 day (always a leap year).
''')
Conf('runahead limit', VDR.V_STRING, 'P5', desc='''
How many cycles ahead of the slowest tasks the fastest may run.
Conf('runahead limit', VDR.V_STRING, 'P4', desc='''
The scheduler runahead limit determines how many consecutive cycle
points can be active at once. The base point of the runahead
calculation is the lowest-valued point with :term:`active` or
:term:`incomplete` tasks present.

Runahead limiting prevents the fastest tasks in a workflow from
getting too far ahead of the slowest ones, as documented in
:ref:`RunaheadLimit`.
An integer interval value of ``Pn`` allows up to ``n+1`` cycle
points (including the base point) to be active at once.

This limit on the number of consecutive spawned cycle points is
specified by an interval between the least and most recent: either
an integer (e.g. ``P3`` - works for both :term:`integer cycling`
and :term:`datetime cycling`), or a time interval (e.g. ``PT12H`` -
only works for datetime cycling). Alternatively, if a raw number is
given, e.g. ``7``, it will be taken to mean ``PT7H``, though this
usage is deprecated.
The default runahead limit is ``P4``, i.e. 5 active points
including the base point.

.. note::

The integer limit format is irrespective of the labelling of
cycle points. For example, if the runahead limit is ``P3`` and
you have a workflow *solely* consisting of a task that repeats
"every four cycles", it would still spawn three consecutive
cycle points at a time (starting with 1, 5 and 9). This is
because the workflow is functionally equivalent to one where the
task repeats every cycle.
Datetime cycling workflows can optionally use a datetime interval
value instead, in which case the number of active cycle points
within the interval depends on the cycling intervals present.

.. note::
.. seealso::

The runahead limit may be automatically raised if this is
necessary to allow a future task to be triggered, preventing
the workflow from stalling.
:ref:`RunaheadLimit`

.. versionchanged:: 8.0.0

The integer (``Pn``) type limit was introduced to replace the
deprecated ``[scheduling]max active cycle points = n`` setting.
The integer format ``Pn`` was introduced to replace the
deprecated ``[scheduling]max active cycle points = m``
(with ``n = m-1``) and unify it with the existing datetime
interval ``runahead limit`` setting.
''')

with Conf('queues', desc='''
Expand All @@ -676,7 +667,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):

.. seealso::

See also :ref:`InternalQueues`.
:ref:`InternalQueues`.
'''):
with Conf('<queue name>', desc='''
Section heading for configuration of a single queue.
Expand Down Expand Up @@ -1573,8 +1564,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):
identity variables, which are exported earlier in the task job
script. Variable assignment expressions can use cylc
utility commands because access to cylc is also configured
earlier in the script. See also
:ref:`TaskExecutionEnvironment`.
earlier in the script.

.. seealso::

:ref:`TaskExecutionEnvironment`.

You can also specify job environment templates here for
:ref:`parameterized tasks <User Guide Param>`.
Expand Down
20 changes: 4 additions & 16 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ def __init__(
self._start_point_for_actual_first_point: Optional['PointBase'] = None

self.task_param_vars = {} # type: ignore # TODO figure out type
self.custom_runahead_limit: Optional['IntervalBase'] = None
self.max_num_active_cycle_points = None
self.runahead_limit: Optional['IntervalBase'] = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime: Dict[str, dict] = { # TODO figure out type
Expand Down Expand Up @@ -1181,7 +1180,7 @@ def compute_inheritance(self):
# LOG.info(log_msg)

def process_runahead_limit(self):
"""Extract the runahead limits information."""
"""Extract runahead limit information."""
limit = self.cfg['scheduling']['runahead limit']
if limit.isdigit():
limit = f'PT{limit}H'
Expand All @@ -1193,28 +1192,17 @@ def process_runahead_limit(self):
time_limit_regexes = DurationParser.DURATION_REGEXES

if number_limit_regex.fullmatch(limit):
self.custom_runahead_limit = IntegerInterval(limit)
# Handle "runahead limit = P0":
if self.custom_runahead_limit.is_null():
self.custom_runahead_limit = IntegerInterval('P1')
self.runahead_limit = IntegerInterval(limit)
elif ( # noqa: SIM106
self.cycling_type == ISO8601_CYCLING_TYPE
and any(tlr.fullmatch(limit) for tlr in time_limit_regexes)
):
self.custom_runahead_limit = ISO8601Interval(limit)
self.runahead_limit = ISO8601Interval(limit)
else:
raise WorkflowConfigError(
f'bad runahead limit "{limit}" for {self.cycling_type} '
'cycling type')

def get_custom_runahead_limit(self):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit

def get_max_num_active_cycle_points(self):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_config(self, args, sparse=False):
return self.pcfg.get(args, sparse)

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def filter_ids(
if warn:
LOG.warning(f"No active tasks matching: {id_}")
else:
_cycles.extend(cycles)
_cycles.extend(list(cycles))
_tasks.extend(tasks)

ret: List[Any] = []
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ def select_task_pool_for_restart(self, callback):
LEFT OUTER JOIN
%(task_outputs)s
ON %(task_pool)s.cycle == %(task_outputs)s.cycle AND
%(task_pool)s.name == %(task_outputs)s.name
%(task_pool)s.name == %(task_outputs)s.name AND
%(task_pool)s.flow_nums == %(task_outputs)s.flow_nums
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug fix! Loading the task pool from the DB at restart, on master I get multiple rows per task proxy if a waiting task had also run in previous flows (and hence attempt to spawn multiple copies of the same task).

"""
form_data = {
"task_pool": self.TABLE_TASK_POOL,
Expand Down
54 changes: 13 additions & 41 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,27 +689,7 @@ def _load_pool_from_point(self):
else "Cold"
)
LOG.info(f"{start_type} start from {self.config.start_point}")

flow_num = self.flow_mgr.get_new_flow(
f"original flow from {self.config.start_point}"
)
for name in self.config.get_task_name_list():
tdef = self.config.get_taskdef(name)
try:
point = sorted([
point for point in
(seq.get_first_point(self.config.start_point)
for seq in tdef.sequences) if point
])[0]
except IndexError:
# No points
continue
parent_points = tdef.get_parent_points(point)
if not parent_points or all(
x < self.config.start_point for x in parent_points):
self.pool.add_to_pool(
TaskProxy(tdef, point, {flow_num})
)
self.pool.load_from_point()

def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
Expand Down Expand Up @@ -898,23 +878,20 @@ def command_stop(
self.pool.stop_flow(flow_num)
return

if cycle_point:
if cycle_point is not None:
# schedule shutdown after tasks pass provided cycle point
point = TaskID.get_standardised_point(cycle_point)
if self.pool.set_stop_point(point):
if point is not None and self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.workflow_db_mgr.put_workflow_stop_cycle_point(
self.options.stopcp)
else:
# TODO: yield warning
pass
elif clock_time:
elif clock_time is not None:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
self.set_stop_clock(
int(parser.parse(clock_time).seconds_since_unix_epoch)
)
elif task:
elif task is not None:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
self.pool.set_stop_task(task_id)
Expand Down Expand Up @@ -1293,6 +1270,7 @@ def release_queued_tasks(self):

# Start the job submission process.
self.is_updated = True
self.reset_inactivity_timer()

self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
Expand Down Expand Up @@ -1500,15 +1478,12 @@ async def main_loop(self):
self.workflow_db_mgr.pri_dao.select_jobs_for_restart(
self.data_store_mgr.insert_db_job)
LOG.info("Reload completed.")
if self.pool.compute_runahead(force=True):
self.pool.release_runahead_tasks()
self.is_reloaded = True
self.is_updated = True

self.process_command_queue()

if self.pool.release_runahead_tasks():
self.is_updated = True
self.reset_inactivity_timer()

self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
Expand Down Expand Up @@ -1563,7 +1538,6 @@ async def main_loop(self):
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()

self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down Expand Up @@ -1835,8 +1809,6 @@ def check_auto_shutdown(self):
# Don't if paused.
return False

self.pool.release_runahead_tasks()

if self.check_workflow_stalled():
return False

Expand All @@ -1847,19 +1819,19 @@ def check_auto_shutdown(self):
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False

# Can shut down.
if self.pool.stop_point:
self.options.stopcp = None
self.pool.stop_point = None
# Forget early stop point in case of a restart.
self.workflow_db_mgr.delete_workflow_stop_cycle_point()
self.update_data_store()

return True

Expand Down
11 changes: 4 additions & 7 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ def process_message(
event_time = get_current_time_string()
if submit_num is None:
submit_num = itask.submit_num

# Any message represents activity.
self.reset_inactivity_timer_func()

if not self._process_message_check(
itask, severity, message, event_time, flag, submit_num):
return None
Expand Down Expand Up @@ -710,7 +714,6 @@ def process_message(
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
itask.job_vacated = True
# Believe this and change state without polling (could poll?).
self.reset_inactivity_timer_func()
if itask.state_reset(TASK_STATUS_SUBMITTED):
itask.state_reset(is_queued=False)
self.data_store_mgr.delta_task_state(itask)
Expand Down Expand Up @@ -1075,7 +1078,6 @@ def _process_message_failed(self, itask, event_time, message):
"run_status": 1,
"time_run_exit": event_time,
})
self.reset_inactivity_timer_func()
if (
TimerFlags.EXECUTION_RETRY not in itask.try_timers
or itask.try_timers[TimerFlags.EXECUTION_RETRY].next() is None
Expand All @@ -1102,7 +1104,6 @@ def _process_message_started(self, itask, event_time):
if itask.job_vacated:
itask.job_vacated = False
LOG.warning(f"[{itask}] Vacated job restarted")
self.reset_inactivity_timer_func()
job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id
self.data_store_mgr.delta_job_time(job_d, 'started', event_time)
self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_RUNNING)
Expand All @@ -1124,7 +1125,6 @@ def _process_message_succeeded(self, itask, event_time):
job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id
self.data_store_mgr.delta_job_time(job_d, 'finished', event_time)
self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_SUCCEEDED)
self.reset_inactivity_timer_func()
itask.set_summary_time('finished', event_time)
self.workflow_db_mgr.put_update_task_jobs(itask, {
"run_status": 0,
Expand Down Expand Up @@ -1155,7 +1155,6 @@ def _process_message_submit_failed(self, itask, event_time, submit_num):
"submit_status": 1,
})
itask.summary['submit_method_id'] = None
self.reset_inactivity_timer_func()
if (
TimerFlags.SUBMISSION_RETRY not in itask.try_timers
or itask.try_timers[TimerFlags.SUBMISSION_RETRY].next() is None
Expand Down Expand Up @@ -1216,8 +1215,6 @@ def _process_message_submitted(self, itask, event_time, submit_num):
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')

self.reset_inactivity_timer_func()
if itask.state.status == TASK_STATUS_PREPARING:
# The job started message can (rarely) come in before the
# submit command returns - in which case do not go back to
Expand Down
Loading