Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload: fix submission errors for jobs awaiting preparation #4984

Merged
merged 5 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ default job runner directives for platforms.

### Fixes

[#4984](https://github.com/cylc/cylc-flow/pull/4984) -
Fixes an issue with `cylc reload` which could cause preparing tasks to become
stuck.

[#4976](https://github.com/cylc/cylc-flow/pull/4976) - Fix bug causing tasks
to be stuck in UI due to discontinued graph of optional outputs.

Expand Down
39 changes: 16 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ class Scheduler:
auto_restart_mode: Optional[AutoRestartMode] = None
auto_restart_time: Optional[float] = None

# queue-released tasks awaiting job preparation
pre_prep_tasks: Optional[List[TaskProxy]] = None

# profiling
_profile_amounts: Optional[dict] = None
_profile_update_times: Optional[dict] = None
Expand Down Expand Up @@ -264,7 +261,6 @@ def __init__(self, reg: str, options: Values) -> None:
# mutable defaults
self._profile_amounts = {}
self._profile_update_times = {}
self.pre_prep_tasks = []
self.bad_hosts: Set[str] = set()

self.restored_stop_task_id = None
Expand Down Expand Up @@ -1237,34 +1233,31 @@ def run_event_handlers(self, event, reason=""):
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self):
"""Release queued tasks, and submit task jobs.
"""Release queued tasks, and submit jobs.

The task queue manages references to task proxies in the task pool.

Newly released tasks are passed to job submission multiple times until
associated asynchronous host select, remote init, and remote install
processes are done.
Tasks which have entered the submission pipeline but not yet finished
(pre_prep_tasks) are passed to job submission multiple times until they
have passed through a series of asynchronous operations (host select,
remote init, remote file install, etc).

"""
# Forget tasks that are no longer preparing for job submission.
self.pre_prep_tasks = [
itask for itask in self.pre_prep_tasks if
itask.waiting_on_job_prep
]
Note:
We do not maintain a list of "pre_prep_tasks" between iterations
of this method as this creates an intermediate task staging pool
which has nasty consequences:

* https://github.com/cylc/cylc-flow/pull/4620
* https://github.com/cylc/cylc-flow/issues/4974

"""
if (
not self.is_paused
and self.stop_mode is None
and self.auto_restart_time is None
):
# Add newly released tasks to those still preparing.
self.pre_prep_tasks += self.pool.release_queued_tasks(
# the number of tasks waiting to go through the task
# submission pipeline
self.pre_prep_tasks
)

if not self.pre_prep_tasks:
pre_prep_tasks = self.pool.release_queued_tasks()
if not pre_prep_tasks:
# No tasks to submit.
return

Expand All @@ -1280,7 +1273,7 @@ def release_queued_tasks(self):
meth = LOG.info
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_prep_tasks,
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
self.config.run_mode('simulation')
Expand Down
26 changes: 13 additions & 13 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks = []
bad_tasks = []
for itask in itasks:
if itask.state_reset(TASK_STATUS_PREPARING):
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
# state transition message reflects the correct submit_num
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
Expand All @@ -259,6 +263,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,

Return (list): list of tasks that attempted submission.
"""

if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
Expand Down Expand Up @@ -1000,6 +1005,7 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
"""Simulation mode task jobs submission."""
for itask in itasks:
itask.waiting_on_job_prep = False
itask.submit_num += 1
self._set_retry_timers(itask)
itask.platform = {'name': 'SIMULATION'}
itask.summary['job_runner_name'] = 'SIMULATION'
Expand Down Expand Up @@ -1041,11 +1047,8 @@ def _submit_task_job_callback_255(
self, workflow, itask, cmd_ctx, line
):
"""Helper for _submit_task_jobs_callback, on one task job."""
itask.submit_num -= 1
self.task_events_mgr._retry_task(
itask, time(), submit_retry=True
)
return
Comment on lines -1044 to -1048
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wxtim could you take a look at this bit to make sure you're happy.

  • It used to reset the task back to waiting and slap a submission-retry trigger on it.
  • It now leaves the task unchanged and sets a flag to send it back through job submission.

# send this task back for submission again
itask.waiting_on_job_prep = True # (task is in the preparing state)

def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line):
"""Helper for _submit_task_jobs_callback, on one task job."""
Expand Down Expand Up @@ -1088,7 +1091,10 @@ def _prep_submit_task_job(
):
"""Prepare a task job submission.

Return itask on a good preparation.
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.

"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1140,9 +1146,7 @@ def _prep_submit_task_job(
rtconfig['platform'], PLATFORM_REC_COMMAND
)
except PlatformError as exc:
# Submit number not yet incremented
itask.waiting_on_job_prep = False
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
Expand Down Expand Up @@ -1181,9 +1185,7 @@ def _prep_submit_task_job(
)

except PlatformLookupError as exc:
# Submit number not yet incremented
itask.waiting_on_job_prep = False
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
Expand All @@ -1193,8 +1195,6 @@ def _prep_submit_task_job(
return False
else:
itask.platform = platform
# Submit number not yet incremented
itask.submit_num += 1
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down
62 changes: 40 additions & 22 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,27 +750,44 @@ def queue_task(self, itask: TaskProxy) -> None:
self.data_store_mgr.delta_task_queued(itask)
self.task_queue_mgr.push_task(itask)

def release_queued_tasks(self, pre_prep_tasks):
"""Return list of queue-released tasks for job prep."""
released = self.task_queue_mgr.release_tasks(
Counter(
[
# active tasks
t.tdef.name
for t in self.get_tasks()
if t.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
] + [
# tasks await job preparation which have not yet
# entered the preparing state
itask.tdef.name
for itask in pre_prep_tasks
]
)
)
def release_queued_tasks(self):
"""Return list of queue-released tasks awaiting job prep.

Note:
Tasks can hang about for a while between being released and
entering the PREPARING state for various reasons. This method
returns tasks which are awaiting job prep irrespective of whether
they have been previously returned.

"""
# count active tasks by name
# {task_name: number_of_active_instances, ...}
active_task_counter = Counter()

# tasks which have entered the submission pipeline but have not yet
# entered the PREPARING state
pre_prep_tasks = []

for itask in self.get_tasks():
# populate active_task_counter and pre_prep_tasks together to
# avoid iterating the task pool twice
if itask.waiting_on_job_prep:
# a task which has entered the submission pipeline
# for the purposes of queue limiting this should be treated
# the same as an active task
active_task_counter.update([itask.tdef.name])
pre_prep_tasks.append(itask)
elif itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
):
# an active task
active_task_counter.update([itask.tdef.name])

# release queued tasks
released = self.task_queue_mgr.release_tasks(active_task_counter)

for itask in released:
itask.state_reset(is_queued=False)
itask.waiting_on_job_prep = True
Expand All @@ -782,7 +799,8 @@ def release_queued_tasks(self, pre_prep_tasks):
# prerequisites (which result in incomplete tasks in Cylc 8).
self.spawn_on_all_outputs(itask)

return released
# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class TaskProxy:
flow_wait:
wait for flow merge before spawning children
.waiting_on_job_prep:
task waiting on job prep
True whilst task is awaiting job prep, reset to False once the
preparation has completed.

Args:
tdef: The definition object of this task.
Expand Down Expand Up @@ -246,7 +247,7 @@ def __init__(
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
self.waiting_on_job_prep = True
self.waiting_on_job_prep = False

self.state = TaskState(tdef, self.point, status, is_held)

Expand Down
4 changes: 0 additions & 4 deletions tests/integration/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ async def test_queue_release(
# (otherwise a number of tasks up to the limit should be released)
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == expected_submissions
assert len(submitted_tasks) == expected_submissions

for _ in range(3):
# release runahead/queued tasks
# (no further tasks should be released)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == expected_submissions
assert len(submitted_tasks) == expected_submissions


Expand Down Expand Up @@ -105,7 +103,6 @@ async def test_queue_held_tasks(
# release queued tasks
# (no tasks should be released from the queues because they are held)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == 0
assert len(submitted_tasks) == 0

# un-hold tasks
Expand All @@ -114,5 +111,4 @@ async def test_queue_held_tasks(
# release queued tasks
# (tasks should now be released from the queues)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1
4 changes: 0 additions & 4 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,12 @@ async def test_holding_tasks_whilst_scheduler_paused(
async with start(one):
# capture any job submissions
submitted_tasks = capture_submission(one)
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# release runahead/queued tasks
# (nothing should happen because the scheduler is paused)
one.pool.release_runahead_tasks()
one.release_queued_tasks()
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# hold all tasks & resume the workflow
Expand All @@ -180,7 +178,6 @@ async def test_holding_tasks_whilst_scheduler_paused(
# release queued tasks
# (there should be no change because the task is still held)
one.release_queued_tasks()
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# release all tasks
Expand All @@ -189,7 +186,6 @@ async def test_holding_tasks_whilst_scheduler_paused(
# release queued tasks
# (the task should be submitted)
one.release_queued_tasks()
assert len(one.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1


Expand Down