Skip to content

Commit

Permalink
SkyBenchmark: fix job_status is None for failed candidates. (#2767)
Browse files Browse the repository at this point in the history
* SkyBenchmark: fix job_status is None for failed candidates.

* Fix --gpus parsing

* Logic updates

* Simplify and add smoke test.
  • Loading branch information
concretevitamin authored Nov 28, 2023
1 parent f34ff09 commit c1be039
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 30 deletions.
96 changes: 68 additions & 28 deletions sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ def _update_benchmark_result(benchmark_result: Dict[str, Any]) -> Optional[str]:
run_end_path = os.path.join(local_dir, _RUN_END)
end_time = None
if os.path.exists(run_end_path):
# The job has terminated with a zero exit code. See
# generate_benchmark_configs() which ensures the 'run' commands write
# out end_time remotely on success; and the caller of this func which
# downloads all benchmark log files including the end_time file to
# local.
end_time = _read_timestamp(run_end_path)

# Get the status of the benchmarking cluster and job.
Expand All @@ -310,39 +315,74 @@ def _update_benchmark_result(benchmark_result: Dict[str, Any]) -> Optional[str]:
if record is not None:
cluster_status, handle = backend_utils.refresh_cluster_status_handle(
cluster)
backend = backend_utils.get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend)
if handle is not None:
backend = backend_utils.get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend)

if cluster_status == status_lib.ClusterStatus.UP:
# NOTE: The id of the benchmarking job must be 1.
# TODO(woosuk): Handle exceptions.
job_status = backend.get_job_status(handle,
job_ids=[1],
stream_logs=False)[1]

if cluster_status == status_lib.ClusterStatus.UP:
# NOTE: The id of the benchmarking job must be 1.
# TODO(woosuk): Handle exceptions.
job_status = backend.get_job_status(handle,
job_ids=[1],
stream_logs=False)[1]
logger.debug(f'Cluster {cluster}, cluster_status: {cluster_status}, '
f'benchmark_status {benchmark_status}, job_status: '
f'{job_status}, start_time {start_time}, end_time {end_time}')

# Update the benchmark status.
if (cluster_status == status_lib.ClusterStatus.INIT or
job_status < job_lib.JobStatus.RUNNING):
if end_time is not None:
# The job has terminated with zero exit code.
benchmark_status = benchmark_state.BenchmarkStatus.FINISHED
elif cluster_status is None:
# Candidate cluster: preempted or never successfully launched.
#
# Note that benchmark record is only inserted after all clusters
# finished launch() (successful or not). See
# launch_benchmark_clusters(). So this case doesn't include "just before
# candidate cluster's launch() is called".

# See above: if cluster_status is not UP, job_status is defined as None.
assert job_status is None, job_status
benchmark_status = benchmark_state.BenchmarkStatus.TERMINATED
elif cluster_status == status_lib.ClusterStatus.INIT:
# Candidate cluster's launch has something gone wrong, or is still
# launching.

# See above: if cluster_status is not UP, job_status is defined as None.
assert job_status is None, job_status
benchmark_status = benchmark_state.BenchmarkStatus.INIT
elif job_status == job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.RUNNING
elif (cluster_status is None or
cluster_status == status_lib.ClusterStatus.STOPPED or
(job_status is not None and job_status.is_terminal())):
# The cluster has terminated or stopped, or
# the cluster is UP and the job has terminated.
if end_time is not None:
# The job has terminated with zero exit code.
benchmark_status = benchmark_state.BenchmarkStatus.FINISHED
elif job_status == job_lib.JobStatus.SUCCEEDED:
# Since we download the benchmark logs before checking the cluster
# status, there is a chance that the end timestamp is saved
# and the cluster is stopped AFTER we download the logs.
# In this case, we consider the current timestamp as the end time.
end_time = time.time()
benchmark_status = benchmark_state.BenchmarkStatus.FINISHED
elif cluster_status == status_lib.ClusterStatus.STOPPED:
# Candidate cluster is auto-stopped, or user manually stops it at any
# time. Also, end_time is None.

# See above: if cluster_status is not UP, job_status is defined as None.
assert job_status is None, job_status
benchmark_status = benchmark_state.BenchmarkStatus.TERMINATED
else:
assert cluster_status == status_lib.ClusterStatus.UP, (
'ClusterStatus enum should have been handled')
if job_status is None:
benchmark_status = benchmark_state.BenchmarkStatus.INIT
else:
benchmark_status = benchmark_state.BenchmarkStatus.TERMINATED
if job_status < job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.INIT
elif job_status == job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.RUNNING
else:
assert job_status.is_terminal(), '> RUNNING means terminal'
# Case: cluster_status UP, job_status.is_terminal()
if job_status == job_lib.JobStatus.SUCCEEDED:
# Since we download the benchmark logs before checking the
# cluster status, there is a chance that the end timestamp
# is saved and the cluster is stopped AFTER we download the
# logs. In this case, we consider the current timestamp as
# the end time.
end_time = time.time()
benchmark_status = benchmark_state.BenchmarkStatus.FINISHED
else:
benchmark_status = (
benchmark_state.BenchmarkStatus.TERMINATED)

callback_log_dirs = glob.glob(os.path.join(local_dir, 'sky-callback-*'))
if callback_log_dirs:
Expand Down
4 changes: 2 additions & 2 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4754,13 +4754,13 @@ def benchmark_launch(
if gpus is not None:
gpu_list = gpus.split(',')
gpu_list = [gpu.strip() for gpu in gpu_list]
if '' in gpus:
if ' ' in gpus:
raise click.BadParameter('Remove blanks in --gpus.')

if len(gpu_list) == 1:
override_gpu = gpu_list[0]
else:
# If len(gpus) > 1, gpus is intrepreted
# If len(gpu_list) > 1, gpus is interpreted
# as a list of benchmark candidates.
if candidates is None:
candidates = [{'accelerators': gpu} for gpu in gpu_list]
Expand Down
15 changes: 15 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -3855,3 +3855,18 @@ def test_multiple_resources():
f'sky down -y {name}',
)
run_one_test(test)


# ---------- Sky Benchmark ----------
def test_sky_bench(generic_cloud: str):
name = _get_cluster_name()
test = Test(
'sky-bench',
[
f'sky bench launch -y -b {name} --cloud {generic_cloud} -i0 tests/test_yamls/minimal.yaml',
'sleep 120',
f'sky bench show {name} | grep sky-bench-{name} | grep FINISHED',
],
f'sky bench down {name} -y; sky bench delete {name} -y',
)
run_one_test(test)

0 comments on commit c1be039

Please sign in to comment.