Skip to content

Commit

Permalink
remove remaining references to "spot" controller (#4631)
Browse files Browse the repository at this point in the history
* remove remaining references to "spot" controller

* fix test

* re-add LEGACY_JOB_CONTROLLER_NAME

* update comment

* remove managed_job_version completely
  • Loading branch information
cg505 authored Feb 6, 2025
1 parent ade003a commit 0b10721
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 110 deletions.
2 changes: 1 addition & 1 deletion sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def setup_lambda_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
config['auth']['ssh_public_key'] = PUBLIC_SSH_KEY_PATH

# TODO(zhwu): we need to avoid uploading the public ssh key to the
# nodes, as that will cause problem when the node is used as spot
# nodes, as that will cause problem when the node is used as jobs
# controller, i.e., the public and private key on the node may
# not match.
file_mounts = config['file_mounts']
Expand Down
16 changes: 5 additions & 11 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3416,17 +3416,11 @@ def _dump_code_to_file(codegen: str) -> None:
managed_job_code = managed_job_codegen.set_pending(
job_id, managed_job_dag)
# Set the managed job to PENDING state to make sure that this
# managed job appears in the `sky jobs queue`, when there are
# already 2x vCPU controller processes running on the controller VM,
# e.g., 16 controller processes running on a controller with 8
# vCPUs.
# The managed job should be set to PENDING state *after* the
# controller process job has been queued, as our skylet on spot
# controller will set the managed job in FAILED state if the
# controller process job does not exist.
# We cannot set the managed job to PENDING state in the codegen for
# the controller process job, as it will stay in the job pending
# table and not be executed until there is an empty slot.
# managed job appears in the `sky jobs queue`, even if it needs to
# wait to be submitted.
# We cannot set the managed job to PENDING state in the job template
# (jobs-controller.yaml.j2), as it may need to wait for the run
# commands to be scheduled on the job controller in high-load cases.
job_submit_cmd = job_submit_cmd + ' && ' + managed_job_code

returncode, stdout, stderr = self.run_on_head(handle,
Expand Down
8 changes: 3 additions & 5 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, job_id: int, dag_yaml: str) -> None:
else:
assert task.name is not None, task
task_name = task.name
# This is guaranteed by the spot_launch API, where we fill in
# This is guaranteed by the jobs.launch API, where we fill in
# the task.name with
# dag_utils.maybe_infer_and_fill_dag_and_task_names.
assert task_name is not None, self._dag
Expand Down Expand Up @@ -137,8 +137,8 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:
1. The optimizer cannot find a feasible solution.
2. Precheck errors: invalid cluster name, failure in getting
cloud user identity, or unsupported feature.
exceptions.SpotJobReachedMaxRetryError: This will be raised when
all prechecks passed but the maximum number of retries is
exceptions.ManagedJobReachedMaxRetriesError: This will be raised
when all prechecks passed but the maximum number of retries is
reached for `sky.launch`. The failure of `sky.launch` can be
due to:
1. Any of the underlying failover exceptions is due to resources
Expand Down Expand Up @@ -482,8 +482,6 @@ def _cleanup(job_id: int, dag_yaml: str):
when reaching here, as we currently only support chain DAGs, and only
task is executed at a time.
"""
# NOTE: The code to get cluster name is same as what we did in the spot
# controller, we should keep it in sync with JobsController.__init__()
dag, _ = _get_dag_and_name(dag_yaml)
for task in dag.tasks:
assert task.name is not None, task
Expand Down
21 changes: 0 additions & 21 deletions sky/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,24 +472,3 @@ def sync_down_logs(
job_name=name,
controller=controller,
local_dir=local_dir)


spot_launch = common_utils.deprecated_function(
launch,
name='sky.jobs.launch',
deprecated_name='spot_launch',
removing_version='0.8.0',
override_argument={'use_spot': True})
spot_queue = common_utils.deprecated_function(queue,
name='sky.jobs.queue',
deprecated_name='spot_queue',
removing_version='0.8.0')
spot_cancel = common_utils.deprecated_function(cancel,
name='sky.jobs.cancel',
deprecated_name='spot_cancel',
removing_version='0.8.0')
spot_tail_logs = common_utils.deprecated_function(
tail_logs,
name='sky.jobs.tail_logs',
deprecated_name='spot_tail_logs',
removing_version='0.8.0')
4 changes: 2 additions & 2 deletions sky/jobs/recovery_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ def _launch(self,
1. The optimizer cannot find a feasible solution.
2. Precheck errors: invalid cluster name, failure in getting
cloud user identity, or unsupported feature.
exceptions.SpotJobReachedMaxRetryError: This will be raised when
all prechecks passed but the maximum number of retries is
exceptions.ManagedJobReachedMaxRetriesError: This will be raised
when all prechecks passed but the maximum number of retries is
reached for `sky.launch`. The failure of `sky.launch` can be
due to:
1. Any of the underlying failover exceptions is due to resources
Expand Down
2 changes: 1 addition & 1 deletion sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# the same content as the `task_name` column.
# The `job_id` is now not really a job id, but a only a unique
# identifier/primary key for all the tasks. We will use `spot_job_id`
# to identify the spot job.
# to identify the job.
# TODO(zhwu): schema migration may be needed.
def create_table(cursor, conn):
# Enable WAL mode to avoid locking issues.
Expand Down
53 changes: 6 additions & 47 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
"""
import collections
import enum
import inspect
import os
import pathlib
import shlex
import shutil
import textwrap
import time
import traceback
Expand Down Expand Up @@ -53,7 +51,6 @@
LEGACY_JOB_CONTROLLER_NAME: str = (
f'sky-spot-controller-{common_utils.get_user_hash()}')
SIGNAL_FILE_PREFIX = '/tmp/sky_jobs_controller_signal_{}'
LEGACY_SIGNAL_FILE_PREFIX = '/tmp/sky_spot_controller_signal_{}'
# Controller checks its job's status every this many seconds.
JOB_STATUS_CHECK_GAP_SECONDS = 20

Expand Down Expand Up @@ -458,17 +455,12 @@ def cancel_jobs_by_id(job_ids: Optional[List[int]]) -> str:

# Send the signal to the jobs controller.
signal_file = pathlib.Path(SIGNAL_FILE_PREFIX.format(job_id))
legacy_signal_file = pathlib.Path(
LEGACY_SIGNAL_FILE_PREFIX.format(job_id))
# Filelock is needed to prevent race condition between signal
# check/removal and signal writing.
with filelock.FileLock(str(signal_file) + '.lock'):
with signal_file.open('w', encoding='utf-8') as f:
f.write(UserSignal.CANCEL.value)
f.flush()
# Backward compatibility for managed jobs launched before #3419. It
# can be removed in the future 0.8.0 release.
shutil.copy(str(signal_file), str(legacy_signal_file))
cancelled_job_ids.append(job_id)

if not cancelled_job_ids:
Expand Down Expand Up @@ -1116,28 +1108,15 @@ class ManagedJobCodeGen:
>> codegen = ManagedJobCodeGen.show_jobs(...)
"""
# TODO: the try..except.. block is for backward compatibility. Remove it in
# v0.8.0.
_PREFIX = textwrap.dedent("""\
managed_job_version = 0
try:
from sky.jobs import utils
from sky.jobs import constants as managed_job_constants
from sky.jobs import state as managed_job_state
managed_job_version = managed_job_constants.MANAGED_JOBS_VERSION
except ImportError:
from sky.spot import spot_state as managed_job_state
from sky.spot import spot_utils as utils
from sky.jobs import utils
from sky.jobs import state as managed_job_state
""")

@classmethod
def get_job_table(cls) -> str:
code = textwrap.dedent("""\
if managed_job_version < 1:
job_table = utils.dump_spot_job_queue()
else:
job_table = utils.dump_managed_job_queue()
job_table = utils.dump_managed_job_queue()
print(job_table, flush=True)
""")
return cls._build(code)
Expand Down Expand Up @@ -1173,29 +1152,9 @@ def stream_logs(cls,
job_id: Optional[int],
follow: bool = True,
controller: bool = False) -> str:
# We inspect the source code of the function here for backward
# compatibility.
# TODO: change to utils.stream_logs(job_id, job_name, follow) in v0.8.0.
# Import libraries required by `stream_logs`. The try...except... block
# should be removed in v0.8.0.
code = textwrap.dedent("""\
import os
import time
from sky.skylet import job_lib, log_lib
from sky.skylet import constants
from sky.utils import ux_utils
try:
from sky.jobs.utils import stream_logs_by_id
except ImportError:
from sky.spot.spot_utils import stream_logs_by_id
from typing import Optional
""")
code += inspect.getsource(stream_logs)
code += textwrap.dedent(f"""\
msg = stream_logs({job_id!r}, {job_name!r},
follow={follow}, controller={controller})
code = textwrap.dedent(f"""\
msg = utils.stream_logs({job_id!r}, {job_name!r},
follow={follow}, controller={controller})
print(msg, flush=True)
""")
return cls._build(code)
Expand Down
2 changes: 1 addition & 1 deletion sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@
FILE_MOUNTS_SUBPATH = 'job-{run_id}/local-file-mounts/{i}'
FILE_MOUNTS_TMP_SUBPATH = 'job-{run_id}/tmp-files'

# The default idle timeout for SkyPilot controllers. This include spot
# The default idle timeout for SkyPilot controllers. This include jobs
# controller and sky serve controller.
# TODO(tian): Refactor to controller_utils. Current blocker: circular import.
CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10
Expand Down
6 changes: 1 addition & 5 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,6 @@ def get_controller_resources(
if custom_controller_resources_config is not None:
controller_resources_config_copied.update(
custom_controller_resources_config)
elif controller == Controllers.JOBS_CONTROLLER:
controller_resources_config_copied.update(
skypilot_config.get_nested(('spot', 'controller', 'resources'),
{}))

try:
controller_resources = resources.Resources.from_yaml_config(
Expand Down Expand Up @@ -938,7 +934,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
if (storage_obj.source is not None and
not data_utils.is_cloud_store_url(storage_obj.source)):
# Need to replace the local path with bucket URI, and remove the
# name field, so that the storage mount can work on the spot
# name field, so that the storage mount can work on the jobs
# controller.
store_types = list(storage_obj.stores.keys())
assert len(store_types) == 1, (
Expand Down
4 changes: 2 additions & 2 deletions sky/utils/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

logger = sky_logging.init_logger(__name__)

# Message thrown when APIs sky.{exec,launch,spot.launch}() received a string
# Message thrown when APIs sky.{exec,launch,jobs.launch}() received a string
# instead of a Dag. CLI (cli.py) is implemented by us so should not trigger
# this.
_ENTRYPOINT_STRING_AS_DAG_MESSAGE = """\
Expand All @@ -31,7 +31,7 @@
sky.launch(task, ...)
sky.spot.launch(task, ...)
sky.jobs.launch(task, ...)
""".strip()


Expand Down
3 changes: 2 additions & 1 deletion sky/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ def readable_time_duration(start: Optional[float],
e.g. "1h 2m 23s"
"""
# start < 0 means that the starting time is not specified yet.
# It is only used in spot_utils.show_jobs() for job duration calculation.
# It is only used in jobs_utils.format_job_table() for job duration
# calculation.
if start is None or start < 0:
return '-'
if end == start == 0:
Expand Down
10 changes: 0 additions & 10 deletions sky/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ def _get_single_resources_schema():
'use_spot': {
'type': 'boolean',
},
# Deprecated: use 'job_recovery' instead. This is for backward
# compatibility, and can be removed in 0.8.0.
'spot_recovery': {
'type': 'string',
},
'job_recovery': {
# Either a string or a dict.
'anyOf': [{
Expand Down Expand Up @@ -256,8 +251,6 @@ def get_resources_schema():
'items': multi_resources_schema,
}
},
# Avoid job_recovery and spot_recovery being present at the same time.
**_check_not_both_fields_present('job_recovery', 'spot_recovery')
}


Expand Down Expand Up @@ -974,14 +967,11 @@ def get_config_schema():
'additionalProperties': False,
'properties': {
'jobs': controller_resources_schema,
'spot': controller_resources_schema,
'serve': controller_resources_schema,
'allowed_clouds': allowed_clouds,
'admin_policy': admin_policy_schema,
'docker': docker_configs,
'nvidia_gpus': gpu_configs,
**cloud_configs,
},
# Avoid spot and jobs being present at the same time.
**_check_not_both_fields_present('spot', 'jobs')
}
6 changes: 3 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def test_valid_null_proxy_config(monkeypatch, tmp_path) -> None:
use_internal_ips: true
vpc_name: abc
spot:
jobs:
controller:
resources:
disk_size: 256
Expand Down Expand Up @@ -201,7 +201,7 @@ def test_invalid_indent_config(monkeypatch, tmp_path) -> None:
config_path = tmp_path / 'invalid.yaml'
config_path.open('w', encoding='utf-8').write(
textwrap.dedent(f"""\
spot:
jobs:
controller:
resources:
cloud: gcp
Expand All @@ -221,7 +221,7 @@ def test_invalid_enum_config(monkeypatch, tmp_path) -> None:
config_path = tmp_path / 'invalid.yaml'
config_path.open('w', encoding='utf-8').write(
textwrap.dedent(f"""\
spot:
jobs:
controller:
resources:
cloud: notacloud
Expand Down

0 comments on commit 0b10721

Please sign in to comment.