Skip to content

Commit

Permalink
Symlink batch ensembles and batch models (#547)
Browse files Browse the repository at this point in the history
After testing a bunch of batch ensembles and batch models, I found that
I hadn't actually symlinked the substeps in the controller. This fix
should properly symlink the substeps.

[ committed by @AlyssaCote ]
[ reviewed by @ankona ]
  • Loading branch information
AlyssaCote authored Apr 23, 2024
1 parent d4a78b6 commit 62f2e8c
Show file tree
Hide file tree
Showing 6 changed files with 551 additions and 116 deletions.
3 changes: 3 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ To be released at some future point in time

Description

- Fix symlinking batch ensemble and model bug
- Remove defensive regexp in .gitignore
- Upgrade ubuntu to 22.04
- Remove helper function ``init_default``
Expand Down Expand Up @@ -45,6 +46,7 @@ Description

Detailed Notes

- Properly symlinks batch ensembles and batch models. (SmartSim-PR547_)
- Remove defensive regexp in .gitignore and ensure tests write to test_output.
(SmartSim-PR560_)
- After dropping support for Python 3.8, ubuntu needs to be upgraded.
Expand Down Expand Up @@ -121,6 +123,7 @@ Detailed Notes
handler. SmartSim will now attempt to kill any launched jobs before calling
the previously registered signal handler. (SmartSim-PR535_)

.. _SmartSim-PR547: https://github.com/CrayLabs/SmartSim/pull/547
.. _SmartSim-PR560: https://github.com/CrayLabs/SmartSim/pull/560
.. _SmartSim-PR559: https://github.com/CrayLabs/SmartSim/pull/559
.. _SmartSim-PR558: https://github.com/CrayLabs/SmartSim/pull/558
Expand Down
82 changes: 33 additions & 49 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
shutdown_db_node,
)
from ...database import Orchestrator
from ...entity import Ensemble, EntityList, EntitySequence, Model, SmartSimEntity
from ...entity import Ensemble, EntitySequence, Model, SmartSimEntity
from ...error import (
LauncherError,
SmartSimError,
Expand All @@ -70,6 +70,7 @@
from ..launcher import LocalLauncher, LSFLauncher, PBSLauncher, SlurmLauncher
from ..launcher.launcher import Launcher
from ..utils import check_cluster_status, create_cluster, serialize
from .controller_utils import _AnonymousBatchJob, _look_up_launched_data
from .job import Job
from .jobmanager import JobManager
from .manifest import LaunchedManifest, LaunchedManifestBuilder, Manifest
Expand Down Expand Up @@ -376,14 +377,17 @@ def symlink_output_files(
entity_out.unlink()
entity_err.unlink()

try:
historical_err.touch()
historical_out.touch()

if historical_err.exists() and historical_out.exists():
entity_out.symlink_to(historical_out)
entity_err.symlink_to(historical_err)
except FileNotFoundError as fnf:
else:
raise FileNotFoundError(
f"Output files for {entity.name} could not be found. "
"Symlinking files failed."
) from fnf
)

def _launch(
self, exp_name: str, exp_path: str, manifest: Manifest
Expand Down Expand Up @@ -432,13 +436,23 @@ def _launch(
steps: t.List[
t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]]
] = []

symlink_substeps: t.List[
t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]]
] = []

for elist in manifest.ensembles:
ens_telem_dir = manifest_builder.run_telemetry_subdirectory / "ensemble"
if elist.batch:
batch_step, substeps = self._create_batch_job_step(elist, ens_telem_dir)
manifest_builder.add_ensemble(
elist, [(batch_step.name, step) for step in substeps]
)

# symlink substeps to maintain directory structure
for substep, substep_entity in zip(substeps, elist.models):
symlink_substeps.append((substep, substep_entity))

steps.append((batch_step, elist))
else:
# if ensemble is to be run as separate job steps, aka not in a batch
Expand All @@ -456,19 +470,26 @@ def _launch(
model_telem_dir = manifest_builder.run_telemetry_subdirectory / "model"
if model.batch_settings:
anon_entity_list = _AnonymousBatchJob(model)
batch_step, _ = self._create_batch_job_step(
batch_step, substeps = self._create_batch_job_step(
anon_entity_list, model_telem_dir
)
manifest_builder.add_model(model, (batch_step.name, batch_step))

symlink_substeps.append((substeps[0], model))
steps.append((batch_step, model))
else:
job_step = self._create_job_step(model, model_telem_dir)
manifest_builder.add_model(model, (job_step.name, job_step))
steps.append((job_step, model))

# launch steps
# launch and symlink steps
for step, entity in steps:
self._launch_step(step, entity)
self.symlink_output_files(step, entity)

# symlink substeps to maintain directory structure
for substep, entity in symlink_substeps:
self.symlink_output_files(substep, entity)

return manifest_builder.finalize()

Expand Down Expand Up @@ -501,12 +522,13 @@ def _launch_orchestrator(
orchestrator, [(orc_batch_step.name, step) for step in substeps]
)

self._launch_step(orc_batch_step, orchestrator)
self.symlink_output_files(orc_batch_step, orchestrator)

# symlink substeps to maintain directory structure
for substep, substep_entity in zip(substeps, orchestrator.entities):
self.symlink_output_files(substep, substep_entity)

self._launch_step(orc_batch_step, orchestrator)

# if orchestrator was run on existing allocation, locally, or in allocation
else:
db_steps = [
Expand All @@ -518,6 +540,7 @@ def _launch_orchestrator(
)
for db_step in db_steps:
self._launch_step(*db_step)
self.symlink_output_files(*db_step)

# wait for orchestrator to spin up
self._orchestrator_launch_wait(orchestrator)
Expand Down Expand Up @@ -572,7 +595,6 @@ def _launch_step(
if completed_job is None and (
entity.name not in self._jobs.jobs and entity.name not in self._jobs.db_jobs
):
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand All @@ -581,10 +603,10 @@ def _launch_step(
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e

# if the completed job does exist and the entity passed in is the same
# that has ran and completed, relaunch the entity.
elif completed_job is not None and completed_job.entity is entity:
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand All @@ -593,6 +615,7 @@ def _launch_step(
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e

# the entity is using a duplicate name of an existing entity in
# the experiment, throw an error
else:
Expand Down Expand Up @@ -938,42 +961,3 @@ def _start_telemetry_monitor(self, exp_dir: str) -> None:
cwd=str(pathlib.Path(__file__).parent.parent.parent),
shell=False,
)


class _AnonymousBatchJob(EntityList[Model]):
@staticmethod
def _validate(model: Model) -> None:
if model.batch_settings is None:
msg = "Unable to create _AnonymousBatchJob without batch_settings"
raise SmartSimError(msg)

def __init__(self, model: Model) -> None:
self._validate(model)
super().__init__(model.name, model.path)
self.entities = [model]
self.batch_settings = model.batch_settings

def _initialize_entities(self, **kwargs: t.Any) -> None: ...


def _look_up_launched_data(
launcher: Launcher,
) -> t.Callable[[t.Tuple[str, Step]], "TStepLaunchMetaData"]:
def _unpack_launched_data(data: t.Tuple[str, Step]) -> "TStepLaunchMetaData":
# NOTE: we cannot assume that the name of the launched step
# ``launched_step_name`` is equal to the name of the step referring to
# the entity ``step.name`` as is the case when an entity list is
# launched as a batch job
launched_step_name, step = data
launched_step_map = launcher.step_mapping[launched_step_name]
out_file, err_file = step.get_output_files()
return (
launched_step_map.step_id,
launched_step_map.task_id,
launched_step_map.managed,
out_file,
err_file,
pathlib.Path(step.meta.get("status_dir", step.cwd)),
)

return _unpack_launched_data
77 changes: 77 additions & 0 deletions smartsim/_core/control/controller_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import pathlib
import typing as t

from ..._core.launcher.step import Step
from ...entity import EntityList, Model
from ...error import SmartSimError
from ..launcher.launcher import Launcher

if t.TYPE_CHECKING:
from ..utils.serialize import TStepLaunchMetaData


class _AnonymousBatchJob(EntityList[Model]):
@staticmethod
def _validate(model: Model) -> None:
if model.batch_settings is None:
msg = "Unable to create _AnonymousBatchJob without batch_settings"
raise SmartSimError(msg)

def __init__(self, model: Model) -> None:
self._validate(model)
super().__init__(model.name, model.path)
self.entities = [model]
self.batch_settings = model.batch_settings

def _initialize_entities(self, **kwargs: t.Any) -> None: ...


def _look_up_launched_data(
launcher: Launcher,
) -> t.Callable[[t.Tuple[str, Step]], "TStepLaunchMetaData"]:
def _unpack_launched_data(data: t.Tuple[str, Step]) -> "TStepLaunchMetaData":
# NOTE: we cannot assume that the name of the launched step
# ``launched_step_name`` is equal to the name of the step referring to
# the entity ``step.name`` as is the case when an entity list is
# launched as a batch job
launched_step_name, step = data
launched_step_map = launcher.step_mapping[launched_step_name]
out_file, err_file = step.get_output_files()
return (
launched_step_map.step_id,
launched_step_map.task_id,
launched_step_map.managed,
out_file,
err_file,
pathlib.Path(step.meta.get("status_dir", step.cwd)),
)

return _unpack_launched_data
Loading

0 comments on commit 62f2e8c

Please sign in to comment.