Skip to content

Commit

Permalink
Log status of fluxsite and comparison runs
Browse files Browse the repository at this point in the history
This change introduces a State object as a minimal way of having state
persist between separate processes. This is necessary for correctly
showing the status of fluxsite and comparison runs as these tasks are
run inside child processes which do not share the same data structures
in the parent process.
  • Loading branch information
SeanBryan51 committed May 16, 2024
1 parent bb20f5e commit 4f4e54d
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ def _get_models(self, config: dict) -> list[Model]:
self._models.append(Model(repo=repo, model_id=id, **sub_config))
return self._models

def _fluxsite_show_task_composition(self, config: dict) -> str:
n_models = len(self._get_models(config))
n_sites = len(get_met_forcing_file_names(config["fluxsite"]["experiment"]))
n_science_configurations = len(config["science_configurations"])
return (

Check warning on line 164 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L161-L164

Added lines #L161 - L164 were not covered by tests
f"models: {n_models}, "
f"sites: {n_sites}, "
f"science configurations: {n_science_configurations}"
)

def _get_fluxsite_tasks(self, config: dict) -> list[fluxsite.FluxsiteTask]:
if not self._fluxsite_tasks:
self._fluxsite_tasks = fluxsite.get_fluxsite_tasks(
Expand Down Expand Up @@ -295,12 +305,18 @@ def fluxsite_run_tasks(self, config_path: str):
tasks = self._get_fluxsite_tasks(config)

logger.info("Running fluxsite tasks...")
logger.info(

Check warning on line 308 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L308

Added line #L308 was not covered by tests
f"tasks: {len(tasks)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
fluxsite.run_tasks_in_parallel(tasks, n_processes=ncpus)
else:
fluxsite.run_tasks(tasks)
logger.info("Successfully ran fluxsite tasks")

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 319 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L317-L319

Added lines #L317 - L319 were not covered by tests

def fluxsite_bitwise_cmp(self, config_path: str):
"""Endpoint for `benchcab fluxsite-bitwise-cmp`."""
Expand All @@ -318,12 +334,18 @@ def fluxsite_bitwise_cmp(self, config_path: str):
)

logger.info("Running comparison tasks...")
logger.info(

Check warning on line 337 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L337

Added line #L337 was not covered by tests
f"tasks: {len(comparisons)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
run_comparisons_in_parallel(comparisons, n_processes=ncpus)
else:
run_comparisons(comparisons)
logger.info("Successfully ran comparison tasks")

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 348 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L346-L348

Added lines #L346 - L348 were not covered by tests

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
"""Endpoint for `benchcab fluxsite`."""
Expand Down
10 changes: 10 additions & 0 deletions src/benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from benchcab import internal
from benchcab.utils import get_logger
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface


Expand All @@ -37,12 +38,20 @@ def __init__(
self.files = files
self.task_name = task_name
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "comparisons" / self.task_name
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

Check warning on line 47 in src/benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/comparison.py#L47

Added line #L47 was not covered by tests

def run(self) -> None:
"""Executes `nccmp -df` on the NetCDF files pointed to by `self.files`."""
file_a, file_b = self.files
self.logger.debug(f"Comparing files {file_a.name} and {file_b.name} bitwise...")

self.state.reset()
try:
self.subprocess_handler.run_cmd(
f"nccmp -df {file_a} {file_b}",
Expand All @@ -51,6 +60,7 @@ def run(self) -> None:
self.logger.info(
f"Success: files {file_a.name} {file_b.name} are identical"
)
self.state.set("done")
except CalledProcessError as exc:
output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
Expand Down
10 changes: 10 additions & 0 deletions src/benchcab/fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from benchcab.utils import get_logger
from benchcab.utils.fs import chdir, mkdir
from benchcab.utils.namelist import patch_namelist, patch_remove_namelist
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface

f90_logical_repr = {True: ".true.", False: ".false."}
Expand Down Expand Up @@ -59,6 +60,13 @@ def __init__(
self.sci_conf_id = sci_conf_id
self.sci_config = sci_config
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "runs" / self.get_task_name()
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

Check warning on line 69 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L69

Added line #L69 was not covered by tests

def get_task_name(self) -> str:
"""Returns the file name convention used for this task."""
Expand Down Expand Up @@ -212,9 +220,11 @@ def run(self):
self.logger.debug(f"Running task {task_name}... CABLE standard output ")
self.logger.debug(f"saved in {task_dir / internal.CABLE_STDOUT_FILENAME}")

self.state.reset()

Check warning on line 223 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L223

Added line #L223 was not covered by tests
try:
self.run_cable()
self.add_provenance_info()
self.state.set("done")

Check warning on line 227 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L227

Added line #L227 was not covered by tests
except CableError:
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
Expand Down
4 changes: 4 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

# DIRECTORY PATHS/STRUCTURE:

# Path to hidden state directory:
STATE_DIR = Path(".state")
STATE_PREFIX = ".attr_"

# Default system paths in Unix
SYSTEM_PATHS = ["/bin", "/usr/bin", "/usr/local/bin"]

Expand Down
51 changes: 51 additions & 0 deletions src/benchcab/utils/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pathlib import Path

from benchcab.internal import STATE_PREFIX


class StateAttributeError(Exception):
"""Exception class for signalling state attribute errors."""


class State:
"""Stores state which persists on the file system."""

def __init__(self, state_dir: Path) -> None:
"""Instantiate a State object.
Parameters
----------
state_dir: Path
Path to the directory in which state is stored. If the specified
directory does not exist, create the directory.
"""
self.state_dir = state_dir
self.state_dir.mkdir(parents=True, exist_ok=True)

def reset(self):
"""Clear all state attributes."""
for path in self.state_dir.glob(f"{STATE_PREFIX}*"):
path.unlink()

def set(self, attr: str):
"""Set state attribute."""
(self.state_dir / (STATE_PREFIX + attr)).touch()

def is_set(self, attr: str):
"""Return True if the state attribute has been set, False otherwise."""
return (self.state_dir / (STATE_PREFIX + attr)).exists()

def get(self) -> str:
"""Get the state of the most recent state attribute."""
attrs = list(self.state_dir.glob(f"{STATE_PREFIX}*"))

def get_mtime(path: Path):
return path.stat().st_mtime

attrs.sort(key=get_mtime)
try:
return attrs.pop().name.removeprefix(STATE_PREFIX)
except IndexError as exc:
msg = "No attributes have been set."
raise StateAttributeError(msg) from exc
3 changes: 3 additions & 0 deletions src/benchcab/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def clean_submission_files():
if internal.RUN_DIR.exists():
shutil.rmtree(internal.RUN_DIR)

if internal.STATE_DIR.exists():
shutil.rmtree(internal.STATE_DIR)

Check warning on line 28 in src/benchcab/workdir.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/workdir.py#L28

Added line #L28 was not covered by tests

for pbs_job_file in Path.cwd().glob(f"{internal.QSUB_FNAME}*"):
pbs_job_file.unlink()

Expand Down
39 changes: 39 additions & 0 deletions tests/test_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
from pathlib import Path

import pytest
from benchcab.utils.state import State, StateAttributeError


@pytest.fixture()
def state():
"""Return a State object."""
return State(state_dir=Path("my_state"))


def test_state_is_set(state):
"""Success case: test state is set."""
state.set("foo")
assert state.is_set("foo")


def test_state_reset(state):
"""Success case: test state is reset."""
state.set("foo")
state.reset()
assert not state.is_set("foo")


def test_state_get(state):
"""Success case: test get() returns the most recent state attribute."""
state.set("foo")
# This is done so that time stamps can be resolved between state attributes
time.sleep(0.01)
state.set("bar")
assert state.get() == "bar"


def test_state_get_raises_exception(state):
"""Failure case: test get() raises an exception when no attributes are set."""
with pytest.raises(StateAttributeError):
state.get()

0 comments on commit 4f4e54d

Please sign in to comment.