Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log status of fluxsite and comparison runs #291

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@
self._models.append(Model(repo=repo, model_id=id, **sub_config))
return self._models

def _fluxsite_show_task_composition(self, config: dict) -> str:
n_models = len(self._get_models(config))
n_sites = len(get_met_forcing_file_names(config["fluxsite"]["experiment"]))
n_science_configurations = len(config["science_configurations"])
return (

Check warning on line 164 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L161-L164

Added lines #L161 - L164 were not covered by tests
f"models: {n_models}, "
f"sites: {n_sites}, "
f"science configurations: {n_science_configurations}"
)

def _get_fluxsite_tasks(self, config: dict) -> list[fluxsite.FluxsiteTask]:
if not self._fluxsite_tasks:
self._fluxsite_tasks = fluxsite.get_fluxsite_tasks(
Expand Down Expand Up @@ -295,12 +305,18 @@
tasks = self._get_fluxsite_tasks(config)

logger.info("Running fluxsite tasks...")
logger.info(

Check warning on line 308 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L308

Added line #L308 was not covered by tests
f"tasks: {len(tasks)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
fluxsite.run_tasks_in_parallel(tasks, n_processes=ncpus)
else:
fluxsite.run_tasks(tasks)
logger.info("Successfully ran fluxsite tasks")

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 319 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L317-L319

Added lines #L317 - L319 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to users if there was a warning level log entry if there were any failed tasks? And some indication either of which tasks failed or how to find out which failed. This is valid for the bitwise comparison as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked that failure cases always output to the PBS log file. Here is the PBS log of a benchcab run with a task that fails due to a malformed namelist file:

/scratch/tm70/sb8430/conda/envs/benchcab-dev/bin/benchcab fluxsite-run-tasks --config=config.yaml
2024-05-20 12:30:37,479 - INFO - benchcab.benchcab.py:307 - Running fluxsite tasks...
2024-05-20 12:30:37,483 - INFO - benchcab.benchcab.py:308 - tasks: 20 (models: 1, sites: 5, science configurations: 4)
2024-05-20 12:30:37,615 - ERROR - fluxsite.fluxsite.py:252 - Error: CABLE returned an error for task AU-Tum_2002-2017_OzFlux_Met_R0_S0
2024-05-20 12:33:47,687 - INFO - benchcab.benchcab.py:319 - 1 failed, 19 passed
/scratch/tm70/sb8430/conda/envs/benchcab-dev/bin/benchcab fluxsite-bitwise-cmp --config=config.yaml
2024-05-20 12:33:48,725 - INFO - benchcab.benchcab.py:336 - Running comparison tasks...
2024-05-20 12:33:48,729 - INFO - benchcab.benchcab.py:337 - tasks: 0 (models: 1, sites: 5, science configurations: 4)
2024-05-20 12:33:48,773 - INFO - benchcab.benchcab.py:348 - 0 failed, 0 passed

======================================================================================
                  Resource Usage on 2024-05-20 12:33:52:
   Job Id:             116077189.gadi-pbs
   Project:            tm70
   Exit Status:        0
   Service Units:      1.93
   NCPUs Requested:    18                     NCPUs Used: 18              
                                           CPU Time Used: 00:40:46        
   Memory Requested:   30.0GB                Memory Used: 1.86GB          
   Walltime requested: 06:00:00            Walltime Used: 00:03:13        
   JobFS requested:    100.0MB                JobFS used: 0B              
======================================================================================


def fluxsite_bitwise_cmp(self, config_path: str):
"""Endpoint for `benchcab fluxsite-bitwise-cmp`."""
Expand All @@ -318,12 +334,18 @@
)

logger.info("Running comparison tasks...")
logger.info(

Check warning on line 337 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L337

Added line #L337 was not covered by tests
f"tasks: {len(comparisons)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
run_comparisons_in_parallel(comparisons, n_processes=ncpus)
else:
run_comparisons(comparisons)
logger.info("Successfully ran comparison tasks")

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 348 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L346-L348

Added lines #L346 - L348 were not covered by tests

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
"""Endpoint for `benchcab fluxsite`."""
Expand Down
32 changes: 27 additions & 5 deletions src/benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from benchcab import internal
from benchcab.utils import get_logger
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface


Expand All @@ -37,8 +38,29 @@ def __init__(
self.files = files
self.task_name = task_name
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "comparisons" / self.task_name
)
self.output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

def run(self) -> None:
"""Runs a single comparison task."""
self.clean()
self.execute_comparison()

def clean(self):
"""Cleans output files if they exist and resets the task state."""
if self.output_file.exists():
self.output_file.unlink()
self.state.reset()

def execute_comparison(self) -> None:
"""Executes `nccmp -df` on the NetCDF files pointed to by `self.files`."""
file_a, file_b = self.files
self.logger.debug(f"Comparing files {file_a.name} and {file_b.name} bitwise...")
Expand All @@ -51,15 +73,15 @@ def run(self) -> None:
self.logger.info(
f"Success: files {file_a.name} {file_b.name} are identical"
)
self.state.set("done")
except CalledProcessError as exc:
output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
)
with output_file.open("w", encoding="utf-8") as file:
with self.output_file.open("w", encoding="utf-8") as file:
file.write(exc.stdout)

self.logger.error(f"Failure: files {file_a.name} {file_b.name} differ. ")
self.logger.error(f"Results of diff have been written to {output_file}")
self.logger.error(
f"Results of diff have been written to {self.output_file}"
)

sys.stdout.flush()

Expand Down
15 changes: 13 additions & 2 deletions src/benchcab/fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from benchcab.utils import get_logger
from benchcab.utils.fs import chdir, mkdir
from benchcab.utils.namelist import patch_namelist, patch_remove_namelist
from benchcab.utils.state import State
from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface

f90_logical_repr = {True: ".true.", False: ".false."}
Expand Down Expand Up @@ -59,6 +60,13 @@
self.sci_conf_id = sci_conf_id
self.sci_config = sci_config
self.logger = get_logger()
self.state = State(
state_dir=internal.STATE_DIR / "fluxsite" / "runs" / self.get_task_name()
)

def is_done(self) -> bool:
"""Return status of current task."""
return self.state.is_set("done")

Check warning on line 69 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L69

Added line #L69 was not covered by tests

def get_task_name(self) -> str:
"""Returns the file name convention used for this task."""
Expand Down Expand Up @@ -150,7 +158,7 @@
patch_remove_namelist(nml_path, self.model.patch_remove)

def clean_task(self):
"""Cleans output files, namelist files, log files and cable executables if they exist."""
"""Cleans output files, namelist files, log files and cable executables if they exist and resets the task state."""
self.logger.debug(" Cleaning task")

task_dir = internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name()
Expand Down Expand Up @@ -179,6 +187,8 @@
if log_file.exists():
log_file.unlink()

self.state.reset()

return self

def fetch_files(self):
Expand Down Expand Up @@ -215,6 +225,7 @@
try:
self.run_cable()
self.add_provenance_info()
self.state.set("done")

Check warning on line 228 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L228

Added line #L228 was not covered by tests
except CableError:
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
Expand All @@ -239,7 +250,7 @@
output_file=stdout_path.relative_to(task_dir),
)
except CalledProcessError as exc:
self.logger.debug(f"Error: CABLE returned an error for task {task_name}")
self.logger.error(f"Error: CABLE returned an error for task {task_name}")
raise CableError from exc

def add_provenance_info(self):
Expand Down
4 changes: 4 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

# DIRECTORY PATHS/STRUCTURE:

# Path to hidden state directory:
STATE_DIR = Path(".state")
STATE_PREFIX = ".state_attr_"

# Default system paths in Unix
SYSTEM_PATHS = ["/bin", "/usr/bin", "/usr/local/bin"]

Expand Down
51 changes: 51 additions & 0 deletions src/benchcab/utils/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pathlib import Path

from benchcab.internal import STATE_PREFIX


class StateAttributeError(Exception):
"""Exception class for signalling state attribute errors."""


class State:
"""Stores state which persists on the file system."""

def __init__(self, state_dir: Path) -> None:
"""Instantiate a State object.

Parameters
----------
state_dir: Path
Path to the directory in which state is stored. If the specified
directory does not exist, create the directory.

"""
self.state_dir = state_dir
self.state_dir.mkdir(parents=True, exist_ok=True)

def reset(self):
"""Clear all state attributes."""
for path in self.state_dir.glob(f"{STATE_PREFIX}*"):
path.unlink()

def set(self, attr: str):
"""Set state attribute."""
(self.state_dir / (STATE_PREFIX + attr)).touch()

def is_set(self, attr: str):
"""Return True if the state attribute has been set, False otherwise."""
return (self.state_dir / (STATE_PREFIX + attr)).exists()

def get(self) -> str:
"""Get the state of the most recent state attribute."""
attrs = list(self.state_dir.glob(f"{STATE_PREFIX}*"))

def get_mtime(path: Path):
return path.stat().st_mtime

attrs.sort(key=get_mtime)
try:
return attrs.pop().name.removeprefix(STATE_PREFIX)
except IndexError as exc:
msg = "No attributes have been set."
raise StateAttributeError(msg) from exc
3 changes: 3 additions & 0 deletions src/benchcab/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def clean_submission_files():
if internal.RUN_DIR.exists():
shutil.rmtree(internal.RUN_DIR)

if internal.STATE_DIR.exists():
shutil.rmtree(internal.STATE_DIR)

Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems my previous comment about cleanup wasn't clear. I was talking about updating this test:

def test_clean_submission_files(self, runs_path, pbs_job_files: List[Path]):

To check that the STATE_DIR is removed.

for pbs_job_file in Path.cwd().glob(f"{internal.QSUB_FNAME}*"):
pbs_job_file.unlink()

Expand Down
41 changes: 32 additions & 9 deletions tests/test_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pathlib import Path

import pytest

from benchcab import internal
from benchcab.comparison import ComparisonTask

Expand All @@ -30,21 +29,45 @@ def comparison_task(files, mock_subprocess_handler):
return _comparison_task


class TestRun:
"""Tests for `ComparisonTask.run()`."""
@pytest.fixture(autouse=True)
def bitwise_cmp_dir():
"""Create and return the fluxsite bitwise comparison directory."""
internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True)
return internal.FLUXSITE_DIRS["BITWISE_CMP"]


class TestClean:
"""Tests for `ComparisonTask.clean()`."""

def test_error_logs_are_removed(self, comparison_task):
"""Success case: test error logs are removed."""
output_file = comparison_task.output_file
output_file.touch()
comparison_task.clean()
assert not output_file.exists()

@pytest.fixture(autouse=True)
def bitwise_cmp_dir(self):
"""Create and return the fluxsite bitwise comparison directory."""
internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True)
return internal.FLUXSITE_DIRS["BITWISE_CMP"]
def test_task_state_is_reset(self, comparison_task):
"""Success case: test task state is reset."""
state = comparison_task.state
state.set("dirty")
comparison_task.clean()
assert not state.is_set("dirty")


class TestExecuteComparison:
"""Tests for `ComparisonTask.execute_comparison()`."""

def test_nccmp_execution(self, comparison_task, files, mock_subprocess_handler):
"""Success case: test nccmp is executed."""
file_a, file_b = files
comparison_task.run()
comparison_task.execute_comparison()
assert f"nccmp -df {file_a} {file_b}" in mock_subprocess_handler.commands

def test_task_is_done_on_success(self, comparison_task):
"""Success case: test task is done on success."""
comparison_task.execute_comparison()
assert comparison_task.is_done()

def test_failed_comparison_check(
self, comparison_task, mock_subprocess_handler, bitwise_cmp_dir
):
Expand Down
7 changes: 7 additions & 0 deletions tests/test_fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ def test_clean_files(self, task):
).exists()
assert not (internal.FLUXSITE_DIRS["LOG"] / task.get_log_filename()).exists()

def test_state_is_reset(self, task):
"""Success case: test state is reset on clean."""
state = task.state
state.set("foo")
task.clean_task()
assert not state.is_set("foo")


class TestSetupTask:
"""Tests for `FluxsiteTask.setup_task()`."""
Expand Down
39 changes: 39 additions & 0 deletions tests/test_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
from pathlib import Path

import pytest
from benchcab.utils.state import State, StateAttributeError


@pytest.fixture()
def state():
"""Return a State object."""
return State(state_dir=Path("my_state"))


def test_state_is_set(state):
"""Success case: test state is set."""
state.set("foo")
assert state.is_set("foo")


def test_state_reset(state):
"""Success case: test state is reset."""
state.set("foo")
state.reset()
assert not state.is_set("foo")


def test_state_get(state):
"""Success case: test get() returns the most recent state attribute."""
state.set("foo")
# This is done so that time stamps can be resolved between state attributes
time.sleep(0.01)
state.set("bar")
assert state.get() == "bar"


def test_state_get_raises_exception(state):
"""Failure case: test get() raises an exception when no attributes are set."""
with pytest.raises(StateAttributeError):
state.get()
12 changes: 11 additions & 1 deletion tests/test_workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def runs_path(self) -> Path:
runs_path.mkdir()
return runs_path

@pytest.fixture()
def state_path(self) -> Path:
"""Mock internal.STATE_DIR."""
state_path = Path(".state")
state_path.mkdir()
return state_path

@pytest.fixture()
def pbs_job_files(self) -> List[Path]:
"""Create sample files of the form benchmark_cable_qsub.sh*."""
Expand Down Expand Up @@ -133,8 +140,11 @@ def test_clean_realisation_files_git(self, src_path_with_git: Path):
clean_realisation_files()
assert not src_path_with_git.exists()

def test_clean_submission_files(self, runs_path, pbs_job_files: List[Path]):
def test_clean_submission_files(
self, runs_path, state_path, pbs_job_files: List[Path]
):
"""Success case: Submission files created by benchcab are removed after clean."""
clean_submission_files()
assert not runs_path.exists()
assert not state_path.exists()
assert not self._check_if_any_files_exist(pbs_job_files)
Loading