Skip to content

Commit

Permalink
Merge pull request #291 from CABLE-LSM/180-log-run-summary-of-tasks-t…
Browse files Browse the repository at this point in the history
…o-standard-output

Log status of fluxsite and comparison runs
  • Loading branch information
SeanBryan51 authored Jul 10, 2024
2 parents fc625c0 + d490ed9 commit a314471
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 19 deletions.
26 changes: 24 additions & 2 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ def _get_models(self, config: dict) -> list[Model]:
self._models.append(Model(repo=repo, model_id=id, **sub_config))
return self._models

def _fluxsite_show_task_composition(self, config: dict) -> str:
n_models = len(self._get_models(config))
n_sites = len(get_met_forcing_file_names(config["fluxsite"]["experiment"]))
n_science_configurations = len(config["science_configurations"])
return (
f"models: {n_models}, "
f"sites: {n_sites}, "
f"science configurations: {n_science_configurations}"
)

def _get_fluxsite_tasks(self, config: dict) -> list[fluxsite.FluxsiteTask]:
if not self._fluxsite_tasks:
self._fluxsite_tasks = fluxsite.get_fluxsite_tasks(
Expand Down Expand Up @@ -295,12 +305,18 @@ def fluxsite_run_tasks(self, config_path: str):
tasks = self._get_fluxsite_tasks(config)

logger.info("Running fluxsite tasks...")
logger.info(
f"tasks: {len(tasks)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
fluxsite.run_tasks_in_parallel(tasks, n_processes=ncpus)
else:
fluxsite.run_tasks(tasks)
logger.info("Successfully ran fluxsite tasks")

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite_bitwise_cmp(self, config_path: str):
"""Endpoint for `benchcab fluxsite-bitwise-cmp`."""
Expand All @@ -318,12 +334,18 @@ def fluxsite_bitwise_cmp(self, config_path: str):
)

logger.info("Running comparison tasks...")
logger.info(
f"tasks: {len(comparisons)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
run_comparisons_in_parallel(comparisons, n_processes=ncpus)
else:
run_comparisons(comparisons)
logger.info("Successfully ran comparison tasks")

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
"""Endpoint for `benchcab fluxsite`."""
Expand Down
32 changes: 27 additions & 5 deletions src/benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from benchcab import internal
from benchcab.utils import get_logger
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface


Expand All @@ -37,8 +38,29 @@ def __init__(
self.files = files
self.task_name = task_name
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "comparisons" / self.task_name
)
self.output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

def run(self) -> None:
"""Runs a single comparison task."""
self.clean()
self.execute_comparison()

def clean(self):
"""Cleans output files if they exist and resets the task state."""
if self.output_file.exists():
self.output_file.unlink()
self.state.reset()

def execute_comparison(self) -> None:
"""Executes `nccmp -df` on the NetCDF files pointed to by `self.files`."""
file_a, file_b = self.files
self.logger.debug(f"Comparing files {file_a.name} and {file_b.name} bitwise...")
Expand All @@ -51,15 +73,15 @@ def run(self) -> None:
self.logger.info(
f"Success: files {file_a.name} {file_b.name} are identical"
)
self.state.set("done")
except CalledProcessError as exc:
output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
)
with output_file.open("w", encoding="utf-8") as file:
with self.output_file.open("w", encoding="utf-8") as file:
file.write(exc.stdout)

self.logger.error(f"Failure: files {file_a.name} {file_b.name} differ. ")
self.logger.error(f"Results of diff have been written to {output_file}")
self.logger.error(
f"Results of diff have been written to {self.output_file}"
)

sys.stdout.flush()

Expand Down
15 changes: 13 additions & 2 deletions src/benchcab/fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from benchcab.utils import get_logger
from benchcab.utils.fs import chdir, mkdir
from benchcab.utils.namelist import patch_namelist, patch_remove_namelist
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface

f90_logical_repr = {True: ".true.", False: ".false."}
Expand Down Expand Up @@ -59,6 +60,13 @@ def __init__(
self.sci_conf_id = sci_conf_id
self.sci_config = sci_config
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "runs" / self.get_task_name()
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

def get_task_name(self) -> str:
"""Returns the file name convention used for this task."""
Expand Down Expand Up @@ -150,7 +158,7 @@ def setup_task(self):
patch_remove_namelist(nml_path, self.model.patch_remove)

def clean_task(self):
"""Cleans output files, namelist files, log files and cable executables if they exist."""
"""Cleans output files, namelist files, log files and cable executables if they exist and resets the task state."""
self.logger.debug(" Cleaning task")

task_dir = internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
Expand Down Expand Up @@ -179,6 +187,8 @@ def clean_task(self):
if log_file.exists():
log_file.unlink()

self.state.reset()

return self

def fetch_files(self):
Expand Down Expand Up @@ -215,6 +225,7 @@ def run(self):
try:
self.run_cable()
self.add_provenance_info()
self.state.set("done")
except CableError:
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
Expand All @@ -239,7 +250,7 @@ def run_cable(self):
output_file=stdout_path.relative_to(task_dir),
)
except CalledProcessError as exc:
self.logger.debug(f"Error: CABLE returned an error for task {task_name}")
self.logger.error(f"Error: CABLE returned an error for task {task_name}")
raise CableError from exc

def add_provenance_info(self):
Expand Down
4 changes: 4 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

# DIRECTORY PATHS/STRUCTURE:

# Path to hidden state directory:
STATE_DIR = Path(".state")
STATE_PREFIX = ".state_attr_"

# Default system paths in Unix
SYSTEM_PATHS = ["/bin", "/usr/bin", "/usr/local/bin"]

Expand Down
51 changes: 51 additions & 0 deletions src/benchcab/utils/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pathlib import Path

from benchcab.internal import STATE_PREFIX


class StateAttributeError(Exception):
"""Exception class for signalling state attribute errors."""


class State:
"""Stores state which persists on the file system."""

def __init__(self, state_dir: Path) -> None:
"""Instantiate a State object.
Parameters
----------
state_dir: Path
Path to the directory in which state is stored. If the specified
directory does not exist, create the directory.
"""
self.state_dir = state_dir
self.state_dir.mkdir(parents=True, exist_ok=True)

def reset(self):
"""Clear all state attributes."""
for path in self.state_dir.glob(f"{STATE_PREFIX}*"):
path.unlink()

def set(self, attr: str):
"""Set state attribute."""
(self.state_dir / (STATE_PREFIX + attr)).touch()

def is_set(self, attr: str):
"""Return True if the state attribute has been set, False otherwise."""
return (self.state_dir / (STATE_PREFIX + attr)).exists()

def get(self) -> str:
"""Get the state of the most recent state attribute."""
attrs = list(self.state_dir.glob(f"{STATE_PREFIX}*"))

def get_mtime(path: Path):
return path.stat().st_mtime

attrs.sort(key=get_mtime)
try:
return attrs.pop().name.removeprefix(STATE_PREFIX)
except IndexError as exc:
msg = "No attributes have been set."
raise StateAttributeError(msg) from exc
3 changes: 3 additions & 0 deletions src/benchcab/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def clean_submission_files():
if internal.RUN_DIR.exists():
shutil.rmtree(internal.RUN_DIR)

if internal.STATE_DIR.exists():
shutil.rmtree(internal.STATE_DIR)

for pbs_job_file in Path.cwd().glob(f"{internal.QSUB_FNAME}*"):
pbs_job_file.unlink()

Expand Down
41 changes: 32 additions & 9 deletions tests/test_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pathlib import Path

import pytest

from benchcab import internal
from benchcab.comparison import ComparisonTask

Expand All @@ -30,21 +29,45 @@ def comparison_task(files, mock_subprocess_handler):
return _comparison_task


class TestRun:
"""Tests for `ComparisonTask.run()`."""
@pytest.fixture(autouse=True)
def bitwise_cmp_dir():
"""Create and return the fluxsite bitwise comparison directory."""
internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True)
return internal.FLUXSITE_DIRS["BITWISE_CMP"]


class TestClean:
"""Tests for `ComparisonTask.clean()`."""

def test_error_logs_are_removed(self, comparison_task):
"""Success case: test error logs are removed."""
output_file = comparison_task.output_file
output_file.touch()
comparison_task.clean()
assert not output_file.exists()

@pytest.fixture(autouse=True)
def bitwise_cmp_dir(self):
"""Create and return the fluxsite bitwise comparison directory."""
internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True)
return internal.FLUXSITE_DIRS["BITWISE_CMP"]
def test_task_state_is_reset(self, comparison_task):
"""Success case: test task state is reset."""
state = comparison_task.state
state.set("dirty")
comparison_task.clean()
assert not state.is_set("dirty")


class TestExecuteComparison:
"""Tests for `ComparisonTask.execute_comparison()`."""

def test_nccmp_execution(self, comparison_task, files, mock_subprocess_handler):
"""Success case: test nccmp is executed."""
file_a, file_b = files
comparison_task.run()
comparison_task.execute_comparison()
assert f"nccmp -df {file_a} {file_b}" in mock_subprocess_handler.commands

def test_task_is_done_on_success(self, comparison_task):
"""Success case: test task is done on success."""
comparison_task.execute_comparison()
assert comparison_task.is_done()

def test_failed_comparison_check(
self, comparison_task, mock_subprocess_handler, bitwise_cmp_dir
):
Expand Down
7 changes: 7 additions & 0 deletions tests/test_fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ def test_clean_files(self, task):
).exists()
assert not (internal.FLUXSITE_DIRS["LOG"] / task.get_log_filename()).exists()

def test_state_is_reset(self, task):
"""Success case: test state is reset on clean."""
state = task.state
state.set("foo")
task.clean_task()
assert not state.is_set("foo")


class TestSetupTask:
"""Tests for `FluxsiteTask.setup_task()`."""
Expand Down
39 changes: 39 additions & 0 deletions tests/test_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
from pathlib import Path

import pytest
from benchcab.utils.state import State, StateAttributeError


@pytest.fixture()
def state():
"""Return a State object."""
return State(state_dir=Path("my_state"))


def test_state_is_set(state):
"""Success case: test state is set."""
state.set("foo")
assert state.is_set("foo")


def test_state_reset(state):
"""Success case: test state is reset."""
state.set("foo")
state.reset()
assert not state.is_set("foo")


def test_state_get(state):
"""Success case: test get() returns the most recent state attribute."""
state.set("foo")
# This is done so that time stamps can be resolved between state attributes
time.sleep(0.01)
state.set("bar")
assert state.get() == "bar"


def test_state_get_raises_exception(state):
"""Failure case: test get() raises an exception when no attributes are set."""
with pytest.raises(StateAttributeError):
state.get()
12 changes: 11 additions & 1 deletion tests/test_workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def runs_path(self) -> Path:
runs_path.mkdir()
return runs_path

@pytest.fixture()
def state_path(self) -> Path:
"""Mock internal.STATE_DIR."""
state_path = Path(".state")
state_path.mkdir()
return state_path

@pytest.fixture()
def pbs_job_files(self) -> List[Path]:
"""Create sample files of the form benchmark_cable_qsub.sh*."""
Expand Down Expand Up @@ -133,8 +140,11 @@ def test_clean_realisation_files_git(self, src_path_with_git: Path):
clean_realisation_files()
assert not src_path_with_git.exists()

def test_clean_submission_files(self, runs_path, pbs_job_files: List[Path]):
def test_clean_submission_files(
self, runs_path, state_path, pbs_job_files: List[Path]
):
"""Success case: Submission files created by benchcab are removed after clean."""
clean_submission_files()
assert not runs_path.exists()
assert not state_path.exists()
assert not self._check_if_any_files_exist(pbs_job_files)

0 comments on commit a314471

Please sign in to comment.