Skip to content

Commit

Permalink
Duplicate entity name prevention (#480)
Browse files Browse the repository at this point in the history
This PR prevents the launch of duplicate named entities. Completed entities are allowed to rerun.

[ committed by @amandarichardsonn ]
[ reviewed by @ankona @MattToast ]
  • Loading branch information
amandarichardsonn authored Feb 22, 2024
1 parent ccb25e4 commit 39354db
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 11 deletions.
40 changes: 32 additions & 8 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,38 @@ def _launch_step(
:type entity: SmartSimEntity
:raises SmartSimError: if launch fails
"""
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
msg = f"An error occurred when launching {entity.name} \n"
msg += "Check error and output files for details.\n"
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e
# attempt to retrieve entity name in JobManager.completed
completed_job = self._jobs.completed.get(entity.name, None)

# if completed job DNE and is the entity name is not
# running in JobManager.jobs or JobManager.db_jobs,
# launch the job
if completed_job is None and (
entity.name not in self._jobs.jobs and entity.name not in self._jobs.db_jobs
):
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
msg = f"An error occurred when launching {entity.name} \n"
msg += "Check error and output files for details.\n"
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e
# if the completed job does exist and the entity passed in is the same
# that has ran and completed, relaunch the entity.
elif completed_job is not None and completed_job.entity is entity:
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
msg = f"An error occurred when launching {entity.name} \n"
msg += "Check error and output files for details.\n"
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e
# the entity is using a duplicate name of an existing entity in
# the experiment, throw an error
else:
raise SSUnsupportedError("SmartSim entities cannot have duplicate names.")

# a job step is a task if it is not managed by a workload manager (i.e. Slurm)
# but is rather started, monitored, and exited through the Popen interface
Expand Down
91 changes: 90 additions & 1 deletion tests/test_controller_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,27 @@
import pytest

from smartsim._core.control import Controller, Manifest
from smartsim._core.launcher.step import Step
from smartsim.database import Orchestrator
from smartsim.entity import Model
from smartsim.entity.ensemble import Ensemble
from smartsim.error import SmartSimError, SSUnsupportedError
from smartsim.error.errors import SSUnsupportedError
from smartsim.settings import RunSettings
from smartsim.settings import RunSettings, SrunSettings

# The tests in this file belong to the group_a group
pytestmark = pytest.mark.group_a

entity_settings = SrunSettings("echo", ["spam", "eggs"])
model_dup_setting = RunSettings("echo", ["spam_1", "eggs_2"])
model = Model("model_name", run_settings=entity_settings, params={}, path="")
# Model entity slightly different but with same name
model_2 = Model("model_name", run_settings=model_dup_setting, params={}, path="")
ens = Ensemble("ensemble_name", params={}, run_settings=entity_settings, replicas=2)
# Ensemble entity slightly different but with same name
ens_2 = Ensemble("ensemble_name", params={}, run_settings=entity_settings, replicas=3)
orc = Orchestrator(db_nodes=3, batch=True, launcher="slurm", run_command="srun")


def test_finished_entity_orc_error():
"""Orchestrators are never 'finished', either run forever or stopped by user"""
Expand Down Expand Up @@ -108,3 +120,80 @@ def test_bad_orc_checkpoint():
cont = Controller(launcher="local")
with pytest.raises(FileNotFoundError):
cont.reload_saved_db(checkpoint)


class MockStep(Step):
"""Mock step to implement any abstract methods so that it can be
instanced for test purposes
"""

def get_launch_cmd(self):
return ["echo", "spam"]


@pytest.mark.parametrize(
"entity",
[
pytest.param(ens, id="Ensemble_running"),
pytest.param(model, id="Model_running"),
pytest.param(orc, id="Orch_running"),
],
)
def test_duplicate_running_entity(test_dir, wlmutils, entity):
"""This test validates that users cannot reuse entity names
that are running in JobManager.jobs or JobManager.db_jobs
"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(entity.name, job_id="1234", entity=entity)
with pytest.raises(SSUnsupportedError) as ex:
controller._launch_step(step, entity=entity)
assert ex.value.args[0] == "SmartSim entities cannot have duplicate names."


@pytest.mark.parametrize(
"entity",
[pytest.param(ens, id="Ensemble_running"), pytest.param(model, id="Model_running")],
)
def test_restarting_entity(test_dir, wlmutils, entity):
"""Validate restarting a completed Model/Ensemble job"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(entity.name, job_id="1234", entity=entity)
controller._jobs.move_to_completed(controller._jobs.jobs.get(entity.name))
controller._launch_step(step, entity=entity)


def test_restarting_orch(test_dir, wlmutils):
"""Validate restarting a completed Orchestrator job"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(orc.name, job_id="1234", entity=orc)
controller._jobs.move_to_completed(controller._jobs.db_jobs.get(orc.name))
controller._launch_step(step, entity=orc)


@pytest.mark.parametrize(
"entity,entity_2",
[
pytest.param(ens, ens_2, id="Ensemble_running"),
pytest.param(model, model_2, id="Model_running"),
],
)
def test_starting_entity(test_dir, wlmutils, entity, entity_2):
"""Test launching a job of Model/Ensemble with same name in completed"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(entity.name, job_id="1234", entity=entity)
controller._jobs.move_to_completed(controller._jobs.jobs.get(entity.name))
with pytest.raises(SSUnsupportedError) as ex:
controller._launch_step(step, entity=entity_2)
assert ex.value.args[0] == "SmartSim entities cannot have duplicate names."
4 changes: 2 additions & 2 deletions tests/test_launch_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def test_orchestrator_relaunch(test_dir, wlmutils):
exp_name = "test-orc-on-relaunch"
exp = Experiment(exp_name, launcher="local", exp_path=test_dir)

orc = Orchestrator(port=wlmutils.get_test_port())
orc = Orchestrator(port=wlmutils.get_test_port(), db_identifier="orch_1")
orc.set_path(test_dir)
orc_1 = Orchestrator(port=wlmutils.get_test_port() + 1)
orc_1 = Orchestrator(port=wlmutils.get_test_port() + 1, db_identifier="orch_2")
orc_1.set_path(test_dir)
try:
exp.start(orc)
Expand Down

0 comments on commit 39354db

Please sign in to comment.