Skip to content

Commit

Permalink
[jobs] make status updates robust when controller dies (#4602)
Browse files Browse the repository at this point in the history
* [jobs] CANCELLING is not terminal

This discrepancy caused issues, such as jobs getting stuck as
CANCELLING when the job controller process crashes during cleanup.

* revamp nonterminal status checking

* lint

* fix stream_logs_by_id

* remove set_failed_controller

* address PR comments

* Apply suggestions from code review

Co-authored-by: Zhanghao Wu <[email protected]>

* address PR review

---------

Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
cg505 and Michaelvll authored Jan 24, 2025
1 parent 1c94d0f commit 485b1cd
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 156 deletions.
5 changes: 4 additions & 1 deletion sky/jobs/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Controller: handles the life cycle of a managed job."""
"""Controller: handles the life cycle of a managed job.
TODO(cooperc): Document lifecycle, and multiprocess layout.
"""
import argparse
import multiprocessing
import os
Expand Down
93 changes: 79 additions & 14 deletions sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ class ManagedJobStatus(enum.Enum):
# RECOVERING: The cluster is preempted, and the controller process is
# recovering the cluster (relaunching/failover).
RECOVERING = 'RECOVERING'
# Terminal statuses
# SUCCEEDED: The job is finished successfully.
SUCCEEDED = 'SUCCEEDED'
# CANCELLING: The job is requested to be cancelled by the user, and the
# controller is cleaning up the cluster.
CANCELLING = 'CANCELLING'
# Terminal statuses
# SUCCEEDED: The job is finished successfully.
SUCCEEDED = 'SUCCEEDED'
# CANCELLED: The job is cancelled by the user. When the managed job is in
# CANCELLED status, the cluster has been cleaned up.
CANCELLED = 'CANCELLED'
Expand Down Expand Up @@ -281,7 +281,6 @@ def terminal_statuses(cls) -> List['ManagedJobStatus']:
cls.FAILED_PRECHECKS,
cls.FAILED_NO_RESOURCE,
cls.FAILED_CONTROLLER,
cls.CANCELLING,
cls.CANCELLED,
]

Expand Down Expand Up @@ -512,8 +511,12 @@ def set_failed(
failure_reason: str,
callback_func: Optional[CallbackType] = None,
end_time: Optional[float] = None,
override_terminal: bool = False,
):
"""Set an entire job or task to failed, if they are in non-terminal states.
"""Set an entire job or task to failed.
By default, don't override tasks that are already terminal (that is, for
which end_at is already set).
Args:
job_id: The job id.
Expand All @@ -522,12 +525,13 @@ def set_failed(
failure_type: The failure type. One of ManagedJobStatus.FAILED_*.
failure_reason: The failure reason.
end_time: The end time. If None, the current time will be used.
override_terminal: If True, override the current status even if end_at
is already set.
"""
assert failure_type.is_failed(), failure_type
end_time = time.time() if end_time is None else end_time

fields_to_set = {
'end_at': end_time,
fields_to_set: Dict[str, Any] = {
'status': failure_type.value,
'failure_reason': failure_reason,
}
Expand All @@ -542,14 +546,31 @@ def set_failed(
# affect the job duration calculation.
fields_to_set['last_recovered_at'] = end_time
set_str = ', '.join(f'{k}=(?)' for k in fields_to_set)
task_str = '' if task_id is None else f' AND task_id={task_id}'
task_query_str = '' if task_id is None else 'AND task_id=(?)'
task_value = [] if task_id is None else [
task_id,
]

cursor.execute(
f"""\
UPDATE spot SET
{set_str}
WHERE spot_job_id=(?){task_str} AND end_at IS null""",
(*list(fields_to_set.values()), job_id))
if override_terminal:
# Use COALESCE for end_at to avoid overriding the existing end_at if
# it's already set.
cursor.execute(
f"""\
UPDATE spot SET
end_at = COALESCE(end_at, ?),
{set_str}
WHERE spot_job_id=(?) {task_query_str}""",
(end_time, *list(fields_to_set.values()), job_id, *task_value))
else:
# Only set if end_at is null, i.e. the previous status is not
# terminal.
cursor.execute(
f"""\
UPDATE spot SET
end_at = (?),
{set_str}
WHERE spot_job_id=(?) {task_query_str} AND end_at IS null""",
(end_time, *list(fields_to_set.values()), job_id, *task_value))
if callback_func:
callback_func('FAILED')
logger.info(failure_reason)
Expand Down Expand Up @@ -677,6 +698,50 @@ def get_schedule_live_jobs(job_id: Optional[int]) -> List[Dict[str, Any]]:
return jobs


def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:
"""Get jobs that need controller process checking.
Args:
job_id: Optional job ID to check. If None, checks all jobs.
Returns a list of job_ids, including the following:
- For jobs with schedule state: jobs that have schedule state not DONE
- For legacy jobs (no schedule state): jobs that are in non-terminal status
"""
job_filter = '' if job_id is None else 'AND spot.spot_job_id=(?)'
job_value = () if job_id is None else (job_id,)

status_filter_str = ', '.join(['?'] *
len(ManagedJobStatus.terminal_statuses()))
terminal_status_values = [
status.value for status in ManagedJobStatus.terminal_statuses()
]

# Get jobs that are either:
# 1. Have schedule state that is not DONE, or
# 2. Have no schedule state (legacy) AND are in non-terminal status
with db_utils.safe_cursor(_DB_PATH) as cursor:
rows = cursor.execute(
f"""\
SELECT DISTINCT spot.spot_job_id
FROM spot
LEFT OUTER JOIN job_info
ON spot.spot_job_id=job_info.spot_job_id
WHERE (
(job_info.schedule_state IS NOT NULL AND
job_info.schedule_state IS NOT ?)
OR
(job_info.schedule_state IS NULL AND
status NOT IN ({status_filter_str}))
)
{job_filter}
ORDER BY spot.spot_job_id DESC""", [
ManagedJobScheduleState.DONE.value, *terminal_status_values,
*job_value
]).fetchall()
return [row[0] for row in rows if row[0] is not None]


def get_all_job_ids_by_name(name: Optional[str]) -> List[int]:
"""Get all job ids by name."""
name_filter = ''
Expand Down
Loading

0 comments on commit 485b1cd

Please sign in to comment.