Skip to content

Commit

Permalink
Change Status Module (#509)
Browse files Browse the repository at this point in the history
Promote SmartSim statuses to a dedicated type named SmartSimStatus.

[ reviewed by @MattToast @al-rigazzi ]
[ committed by @amandarichardsonn ]
  • Loading branch information
amandarichardsonn authored Mar 11, 2024
1 parent 63836a9 commit 33ee012
Show file tree
Hide file tree
Showing 42 changed files with 346 additions and 282 deletions.
20 changes: 13 additions & 7 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from ...log import get_logger
from ...servertype import CLUSTERED, STANDALONE
from ...status import STATUS_CANCELLED, STATUS_RUNNING, TERMINAL_STATUSES
from ...status import TERMINAL_STATUSES, SmartSimStatus
from ..config import CONFIG
from ..launcher import LocalLauncher, LSFLauncher, PBSLauncher, SlurmLauncher
from ..launcher.launcher import Launcher
Expand Down Expand Up @@ -243,7 +243,13 @@ def stop_db(self, db: Orchestrator) -> None:
continue

job = self._jobs[node.name]
job.set_status(STATUS_CANCELLED, "", 0, output=None, error=None)
job.set_status(
SmartSimStatus.STATUS_CANCELLED,
"",
0,
output=None,
error=None,
)
self._jobs.move_to_completed(job)

db.reset_hosts()
Expand Down Expand Up @@ -271,14 +277,14 @@ def get_jobs(self) -> t.Dict[str, Job]:

def get_entity_status(
self, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]
) -> str:
) -> SmartSimStatus:
"""Get the status of an entity
:param entity: entity to get status of
:type entity: SmartSimEntity | EntitySequence
:raises TypeError: if not SmartSimEntity | EntitySequence
:return: status of entity
:rtype: str
:rtype: SmartSimStatus
"""
if not isinstance(entity, (SmartSimEntity, EntitySequence)):
raise TypeError(
Expand All @@ -289,14 +295,14 @@ def get_entity_status(

def get_entity_list_status(
self, entity_list: EntitySequence[SmartSimEntity]
) -> t.List[str]:
) -> t.List[SmartSimStatus]:
"""Get the statuses of an entity list
:param entity_list: entity list containing entities to
get statuses of
:type entity_list: EntitySequence
:raises TypeError: if not EntitySequence
:return: list of str statuses
:return: list of SmartSimStatus statuses
:rtype: list
"""
if not isinstance(entity_list, EntitySequence):
Expand Down Expand Up @@ -726,7 +732,7 @@ def _orchestrator_launch_wait(self, orchestrator: Orchestrator) -> None:

# _jobs.get_status acquires JM lock for main thread, no need for locking
statuses = self.get_entity_list_status(orchestrator)
if all(stat == STATUS_RUNNING for stat in statuses):
if all(stat == SmartSimStatus.STATUS_RUNNING for stat in statuses):
ready = True
# TODO remove in favor of by node status check
time.sleep(CONFIG.jm_interval)
Expand Down
22 changes: 14 additions & 8 deletions smartsim/_core/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dataclasses import dataclass

from ...entity import EntitySequence, SmartSimEntity
from ...status import STATUS_NEW
from ...status import SmartSimStatus


@dataclass(frozen=True)
Expand Down Expand Up @@ -96,7 +96,7 @@ def __init__(
self.name = job_name
self.jid = job_id
self.entity = entity
self.status = STATUS_NEW
self.status = SmartSimStatus.STATUS_NEW
# status before smartsim status mapping is applied
self.raw_status: t.Optional[str] = None
self.returncode: t.Optional[int] = None
Expand All @@ -116,7 +116,7 @@ def ename(self) -> str:

def set_status(
self,
new_status: str,
new_status: SmartSimStatus,
raw_status: str,
returncode: t.Optional[int],
error: t.Optional[str] = None,
Expand All @@ -125,9 +125,15 @@ def set_status(
"""Set the status of a job.
:param new_status: The new status of the job
:type new_status: str
:type new_status: SmartSimStatus
:param raw_status: The raw status of the launcher
:type raw_status: str
:param returncode: The return code for the job
:type return_code: str
:type return_code: int
:param error: Content produced by stderr
:type error: str
:param output: Content produced by stdout
:type output: str
"""
self.status = new_status
self.raw_status = raw_status
Expand Down Expand Up @@ -157,7 +163,7 @@ def reset(
"""
self.name = new_job_name
self.jid = new_job_id
self.status = STATUS_NEW
self.status = SmartSimStatus.STATUS_NEW
self.returncode = None
self.output = None
self.error = None
Expand Down Expand Up @@ -213,14 +219,14 @@ def __init__(self, runs: int = 0) -> None:
"""
self.runs = runs
self.jids: t.Dict[int, t.Optional[str]] = {}
self.statuses: t.Dict[int, str] = {}
self.statuses: t.Dict[int, SmartSimStatus] = {}
self.returns: t.Dict[int, t.Optional[int]] = {}
self.job_times: t.Dict[int, float] = {}

def record(
self,
job_id: t.Optional[str],
status: str,
status: SmartSimStatus,
returncode: t.Optional[int],
job_time: float,
) -> None:
Expand Down
9 changes: 5 additions & 4 deletions smartsim/_core/control/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ...database import Orchestrator
from ...entity import DBNode, EntitySequence, SmartSimEntity
from ...log import ContextThread, get_logger
from ...status import STATUS_NEVER_STARTED, TERMINAL_STATUSES
from ...status import TERMINAL_STATUSES, SmartSimStatus
from ..config import CONFIG
from ..launcher import Launcher, LocalLauncher
from ..utils.network import get_ip_from_host
Expand Down Expand Up @@ -239,12 +239,13 @@ def check_jobs(self) -> None:
def get_status(
self,
entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]],
) -> str:
) -> SmartSimStatus:
"""Return the status of a job.
:param entity: SmartSimEntity or EntitySequence instance
:type entity: SmartSimEntity | EntitySequence
:returns: tuple of status
:returns: a SmartSimStatus status
:rtype: SmartSimStatus
"""
with self._lock:
if entity.name in self.completed:
Expand All @@ -254,7 +255,7 @@ def get_status(
job: Job = self[entity.name] # locked
return job.status

return STATUS_NEVER_STARTED
return SmartSimStatus.STATUS_NEVER_STARTED

def set_launcher(self, launcher: Launcher) -> None:
"""Set the launcher of the job manager to a specific launcher instance
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/entrypoints/telemetrymonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from smartsim._core.utils.helpers import get_ts
from smartsim._core.utils.serialize import MANIFEST_FILENAME
from smartsim.error.errors import SmartSimError
from smartsim.status import STATUS_COMPLETED, TERMINAL_STATUSES
from smartsim.status import TERMINAL_STATUSES, SmartSimStatus

"""Telemetry Monitor entrypoint"""

Expand Down Expand Up @@ -286,7 +286,7 @@ def faux_return_code(step_info: StepInfo) -> t.Optional[int]:
if step_info.status not in TERMINAL_STATUSES:
return None

if step_info.status == STATUS_COMPLETED:
if step_info.status == SmartSimStatus.STATUS_COMPLETED:
return os.EX_OK

return 1
Expand Down
8 changes: 5 additions & 3 deletions smartsim/_core/launcher/lsf/lsfLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
RunSettings,
SettingsBase,
)
from ....status import STATUS_CANCELLED, STATUS_COMPLETED
from ....status import SmartSimStatus
from ...config import CONFIG
from ..launcher import WLMLauncher
from ..step import (
Expand Down Expand Up @@ -155,7 +155,9 @@ def stop(self, step_name: str) -> StepInfo:
if not step_info:
raise LauncherError(f"Could not get step_info for job step {step_name}")

step_info.status = STATUS_CANCELLED # set status to cancelled instead of failed
step_info.status = (
SmartSimStatus.STATUS_CANCELLED
) # set status to cancelled instead of failed
return step_info

@staticmethod
Expand Down Expand Up @@ -207,7 +209,7 @@ def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]:
# create LSFBatchStepInfo objects to return
batch_info = LSFBatchStepInfo(stat, None)
# account for case where job history is not logged by LSF
if batch_info.status == STATUS_COMPLETED:
if batch_info.status == SmartSimStatus.STATUS_COMPLETED:
batch_info.returncode = 0
updates.append(batch_info)
return updates
Expand Down
8 changes: 5 additions & 3 deletions smartsim/_core/launcher/pbs/pbsLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
RunSettings,
SettingsBase,
)
from ....status import STATUS_CANCELLED, STATUS_COMPLETED
from ....status import SmartSimStatus
from ...config import CONFIG
from ..launcher import WLMLauncher
from ..step import (
Expand Down Expand Up @@ -149,7 +149,9 @@ def stop(self, step_name: str) -> StepInfo:
if not step_info:
raise LauncherError(f"Could not get step_info for job step {step_name}")

step_info.status = STATUS_CANCELLED # set status to cancelled instead of failed
step_info.status = (
SmartSimStatus.STATUS_CANCELLED
) # set status to cancelled instead of failed
return step_info

@staticmethod
Expand Down Expand Up @@ -191,7 +193,7 @@ def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]:
for stat, _ in zip(stats, step_ids):
info = PBSStepInfo(stat, None)
# account for case where job history is not logged by PBS
if info.status == STATUS_COMPLETED:
if info.status == SmartSimStatus.STATUS_COMPLETED:
info.returncode = 0

updates.append(info)
Expand Down
6 changes: 4 additions & 2 deletions smartsim/_core/launcher/slurm/slurmLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
SettingsBase,
SrunSettings,
)
from ....status import STATUS_CANCELLED
from ....status import SmartSimStatus
from ...config import CONFIG
from ..launcher import WLMLauncher
from ..step import (
Expand Down Expand Up @@ -218,7 +218,9 @@ def stop(self, step_name: str) -> StepInfo:
if not step_info:
raise LauncherError(f"Could not get step_info for job step {step_name}")

step_info.status = STATUS_CANCELLED # set status to cancelled instead of failed
step_info.status = (
SmartSimStatus.STATUS_CANCELLED
) # set status to cancelled instead of failed
return step_info

@staticmethod
Expand Down
Loading

0 comments on commit 33ee012

Please sign in to comment.