From bb20f5ebd94adebe2bbe8025a896a37006247645 Mon Sep 17 00:00:00 2001 From: Sean Bryan Date: Thu, 9 May 2024 13:56:07 +1000 Subject: [PATCH 1/2] Make error message visible on failure --- src/benchcab/fluxsite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/benchcab/fluxsite.py b/src/benchcab/fluxsite.py index 816f5a4..4208810 100644 --- a/src/benchcab/fluxsite.py +++ b/src/benchcab/fluxsite.py @@ -239,7 +239,7 @@ def run_cable(self): output_file=stdout_path.relative_to(task_dir), ) except CalledProcessError as exc: - self.logger.debug(f"Error: CABLE returned an error for task {task_name}") + self.logger.error(f"Error: CABLE returned an error for task {task_name}") raise CableError from exc def add_provenance_info(self): From d490ed949e4831ff661a4c96e788c57810b820cc Mon Sep 17 00:00:00 2001 From: Sean Bryan Date: Wed, 8 May 2024 10:49:20 +1000 Subject: [PATCH 2/2] Log status of fluxsite and comparison runs This change introduces a State object as a minimal way of having state persist between separate processes. This is necessary for correctly showing the status of fluxsite and comparison runs as these tasks are run inside child processes which do not share the same data structures in the parent process. Fixes #180 --- src/benchcab/benchcab.py | 26 +++++++++++++++++-- src/benchcab/comparison.py | 32 +++++++++++++++++++---- src/benchcab/fluxsite.py | 13 +++++++++- src/benchcab/internal.py | 4 +++ src/benchcab/utils/state.py | 51 +++++++++++++++++++++++++++++++++++++ src/benchcab/workdir.py | 3 +++ tests/test_comparison.py | 41 ++++++++++++++++++++++------- tests/test_fluxsite.py | 7 +++++ tests/test_state.py | 39 ++++++++++++++++++++++++++++ tests/test_workdir.py | 12 ++++++++- 10 files changed, 210 insertions(+), 18 deletions(-) create mode 100644 src/benchcab/utils/state.py create mode 100644 tests/test_state.py diff --git a/src/benchcab/benchcab.py b/src/benchcab/benchcab.py index ac6ef08..1f0ad76 100644 --- a/src/benchcab/benchcab.py +++ b/src/benchcab/benchcab.py @@ -157,6 +157,16 @@ def _get_models(self, config: dict) -> list[Model]: self._models.append(Model(repo=repo, model_id=id, **sub_config)) return self._models + def _fluxsite_show_task_composition(self, config: dict) -> str: + n_models = len(self._get_models(config)) + n_sites = len(get_met_forcing_file_names(config["fluxsite"]["experiment"])) + n_science_configurations = len(config["science_configurations"]) + return ( + f"models: {n_models}, " + f"sites: {n_sites}, " + f"science configurations: {n_science_configurations}" + ) + def _get_fluxsite_tasks(self, config: dict) -> list[fluxsite.FluxsiteTask]: if not self._fluxsite_tasks: self._fluxsite_tasks = fluxsite.get_fluxsite_tasks( @@ -295,12 +305,18 @@ def fluxsite_run_tasks(self, config_path: str): tasks = self._get_fluxsite_tasks(config) logger.info("Running fluxsite tasks...") + logger.info( + f"tasks: {len(tasks)} ({self._fluxsite_show_task_composition(config)})" + ) if config["fluxsite"]["multiprocess"]: ncpus = config["fluxsite"]["pbs"]["ncpus"] fluxsite.run_tasks_in_parallel(tasks, n_processes=ncpus) else: fluxsite.run_tasks(tasks) - logger.info("Successfully ran fluxsite tasks") + + tasks_failed = [task for task in tasks if not task.is_done()] + n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed) + logger.info(f"{n_failed} failed, {n_success} passed") def fluxsite_bitwise_cmp(self, config_path: str): """Endpoint for `benchcab fluxsite-bitwise-cmp`.""" @@ -318,12 +334,18 @@ def fluxsite_bitwise_cmp(self, config_path: str): ) logger.info("Running comparison tasks...") + logger.info( + f"tasks: {len(comparisons)} ({self._fluxsite_show_task_composition(config)})" + ) if config["fluxsite"]["multiprocess"]: ncpus = config["fluxsite"]["pbs"]["ncpus"] run_comparisons_in_parallel(comparisons, n_processes=ncpus) else: run_comparisons(comparisons) - logger.info("Successfully ran comparison tasks") + + tasks_failed = [task for task in comparisons if not task.is_done()] + n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed) + logger.info(f"{n_failed} failed, {n_success} passed") def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]): """Endpoint for `benchcab fluxsite`.""" diff --git a/src/benchcab/comparison.py b/src/benchcab/comparison.py index 495b9d7..df3da2b 100644 --- a/src/benchcab/comparison.py +++ b/src/benchcab/comparison.py @@ -11,6 +11,7 @@ from benchcab import internal from benchcab.utils import get_logger +from benchcab.utils.state import State from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface @@ -37,8 +38,29 @@ def __init__( self.files = files self.task_name = task_name self.logger = get_logger() + self.state = State( + state_dir=internal.STATE_DIR / "fluxsite" / "comparisons" / self.task_name + ) + self.output_file = ( + internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt" + ) + + def is_done(self) -> bool: + """Return status of current task.""" + return self.state.is_set("done") def run(self) -> None: + """Runs a single comparison task.""" + self.clean() + self.execute_comparison() + + def clean(self): + """Cleans output files if they exist and resets the task state.""" + if self.output_file.exists(): + self.output_file.unlink() + self.state.reset() + + def execute_comparison(self) -> None: """Executes `nccmp -df` on the NetCDF files pointed to by `self.files`.""" file_a, file_b = self.files self.logger.debug(f"Comparing files {file_a.name} and {file_b.name} bitwise...") @@ -51,15 +73,15 @@ def run(self) -> None: self.logger.info( f"Success: files {file_a.name} {file_b.name} are identical" ) + self.state.set("done") except CalledProcessError as exc: - output_file = ( - internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt" - ) - with output_file.open("w", encoding="utf-8") as file: + with self.output_file.open("w", encoding="utf-8") as file: file.write(exc.stdout) self.logger.error(f"Failure: files {file_a.name} {file_b.name} differ. ") - self.logger.error(f"Results of diff have been written to {output_file}") + self.logger.error( + f"Results of diff have been written to {self.output_file}" + ) sys.stdout.flush() diff --git a/src/benchcab/fluxsite.py b/src/benchcab/fluxsite.py index 4208810..90e948b 100644 --- a/src/benchcab/fluxsite.py +++ b/src/benchcab/fluxsite.py @@ -19,6 +19,7 @@ from benchcab.utils import get_logger from benchcab.utils.fs import chdir, mkdir from benchcab.utils.namelist import patch_namelist, patch_remove_namelist +from benchcab.utils.state import State from benchcab.utils.subprocess import SubprocessWrapper, SubprocessWrapperInterface f90_logical_repr = {True: ".true.", False: ".false."} @@ -59,6 +60,13 @@ def __init__( self.sci_conf_id = sci_conf_id self.sci_config = sci_config self.logger = get_logger() + self.state = State( + state_dir=internal.STATE_DIR / "fluxsite" / "runs" / self.get_task_name() + ) + + def is_done(self) -> bool: + """Return status of current task.""" + return self.state.is_set("done") def get_task_name(self) -> str: """Returns the file name convention used for this task.""" @@ -150,7 +158,7 @@ def setup_task(self): patch_remove_namelist(nml_path, self.model.patch_remove) def clean_task(self): - """Cleans output files, namelist files, log files and cable executables if they exist.""" + """Cleans output files, namelist files, log files and cable executables if they exist and resets the task state.""" self.logger.debug(" Cleaning task") task_dir = internal.FLUXSITE_DIRS["TASKS"] / self.get_task_name() @@ -179,6 +187,8 @@ def clean_task(self): if log_file.exists(): log_file.unlink() + self.state.reset() + return self def fetch_files(self): @@ -215,6 +225,7 @@ def run(self): try: self.run_cable() self.add_provenance_info() + self.state.set("done") except CableError: # Note: here we suppress CABLE specific errors so that `benchcab` # exits successfully. This then allows us to run bitwise comparisons diff --git a/src/benchcab/internal.py b/src/benchcab/internal.py index 08deb07..ddaa741 100644 --- a/src/benchcab/internal.py +++ b/src/benchcab/internal.py @@ -30,6 +30,10 @@ # DIRECTORY PATHS/STRUCTURE: +# Path to hidden state directory: +STATE_DIR = Path(".state") +STATE_PREFIX = ".state_attr_" + # Default system paths in Unix SYSTEM_PATHS = ["/bin", "/usr/bin", "/usr/local/bin"] diff --git a/src/benchcab/utils/state.py b/src/benchcab/utils/state.py new file mode 100644 index 0000000..3523b33 --- /dev/null +++ b/src/benchcab/utils/state.py @@ -0,0 +1,51 @@ +from pathlib import Path + +from benchcab.internal import STATE_PREFIX + + +class StateAttributeError(Exception): + """Exception class for signalling state attribute errors.""" + + +class State: + """Stores state which persists on the file system.""" + + def __init__(self, state_dir: Path) -> None: + """Instantiate a State object. + + Parameters + ---------- + state_dir: Path + Path to the directory in which state is stored. If the specified + directory does not exist, create the directory. + + """ + self.state_dir = state_dir + self.state_dir.mkdir(parents=True, exist_ok=True) + + def reset(self): + """Clear all state attributes.""" + for path in self.state_dir.glob(f"{STATE_PREFIX}*"): + path.unlink() + + def set(self, attr: str): + """Set state attribute.""" + (self.state_dir / (STATE_PREFIX + attr)).touch() + + def is_set(self, attr: str): + """Return True if the state attribute has been set, False otherwise.""" + return (self.state_dir / (STATE_PREFIX + attr)).exists() + + def get(self) -> str: + """Get the state of the most recent state attribute.""" + attrs = list(self.state_dir.glob(f"{STATE_PREFIX}*")) + + def get_mtime(path: Path): + return path.stat().st_mtime + + attrs.sort(key=get_mtime) + try: + return attrs.pop().name.removeprefix(STATE_PREFIX) + except IndexError as exc: + msg = "No attributes have been set." + raise StateAttributeError(msg) from exc diff --git a/src/benchcab/workdir.py b/src/benchcab/workdir.py index 5961876..c174d75 100644 --- a/src/benchcab/workdir.py +++ b/src/benchcab/workdir.py @@ -24,6 +24,9 @@ def clean_submission_files(): if internal.RUN_DIR.exists(): shutil.rmtree(internal.RUN_DIR) + if internal.STATE_DIR.exists(): + shutil.rmtree(internal.STATE_DIR) + for pbs_job_file in Path.cwd().glob(f"{internal.QSUB_FNAME}*"): pbs_job_file.unlink() diff --git a/tests/test_comparison.py b/tests/test_comparison.py index ef0bdcf..cec5125 100644 --- a/tests/test_comparison.py +++ b/tests/test_comparison.py @@ -8,7 +8,6 @@ from pathlib import Path import pytest - from benchcab import internal from benchcab.comparison import ComparisonTask @@ -30,21 +29,45 @@ def comparison_task(files, mock_subprocess_handler): return _comparison_task -class TestRun: - """Tests for `ComparisonTask.run()`.""" +@pytest.fixture(autouse=True) +def bitwise_cmp_dir(): + """Create and return the fluxsite bitwise comparison directory.""" + internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True) + return internal.FLUXSITE_DIRS["BITWISE_CMP"] + + +class TestClean: + """Tests for `ComparisonTask.clean()`.""" + + def test_error_logs_are_removed(self, comparison_task): + """Success case: test error logs are removed.""" + output_file = comparison_task.output_file + output_file.touch() + comparison_task.clean() + assert not output_file.exists() - @pytest.fixture(autouse=True) - def bitwise_cmp_dir(self): - """Create and return the fluxsite bitwise comparison directory.""" - internal.FLUXSITE_DIRS["BITWISE_CMP"].mkdir(parents=True) - return internal.FLUXSITE_DIRS["BITWISE_CMP"] + def test_task_state_is_reset(self, comparison_task): + """Success case: test task state is reset.""" + state = comparison_task.state + state.set("dirty") + comparison_task.clean() + assert not state.is_set("dirty") + + +class TestExecuteComparison: + """Tests for `ComparisonTask.execute_comparison()`.""" def test_nccmp_execution(self, comparison_task, files, mock_subprocess_handler): """Success case: test nccmp is executed.""" file_a, file_b = files - comparison_task.run() + comparison_task.execute_comparison() assert f"nccmp -df {file_a} {file_b}" in mock_subprocess_handler.commands + def test_task_is_done_on_success(self, comparison_task): + """Success case: test task is done on success.""" + comparison_task.execute_comparison() + assert comparison_task.is_done() + def test_failed_comparison_check( self, comparison_task, mock_subprocess_handler, bitwise_cmp_dir ): diff --git a/tests/test_fluxsite.py b/tests/test_fluxsite.py index 4154632..e4732ba 100644 --- a/tests/test_fluxsite.py +++ b/tests/test_fluxsite.py @@ -152,6 +152,13 @@ def test_clean_files(self, task): ).exists() assert not (internal.FLUXSITE_DIRS["LOG"] / task.get_log_filename()).exists() + def test_state_is_reset(self, task): + """Success case: test state is reset on clean.""" + state = task.state + state.set("foo") + task.clean_task() + assert not state.is_set("foo") + class TestSetupTask: """Tests for `FluxsiteTask.setup_task()`.""" diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000..51738bd --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,39 @@ +import time +from pathlib import Path + +import pytest +from benchcab.utils.state import State, StateAttributeError + + +@pytest.fixture() +def state(): + """Return a State object.""" + return State(state_dir=Path("my_state")) + + +def test_state_is_set(state): + """Success case: test state is set.""" + state.set("foo") + assert state.is_set("foo") + + +def test_state_reset(state): + """Success case: test state is reset.""" + state.set("foo") + state.reset() + assert not state.is_set("foo") + + +def test_state_get(state): + """Success case: test get() returns the most recent state attribute.""" + state.set("foo") + # This is done so that time stamps can be resolved between state attributes + time.sleep(0.01) + state.set("bar") + assert state.get() == "bar" + + +def test_state_get_raises_exception(state): + """Failure case: test get() raises an exception when no attributes are set.""" + with pytest.raises(StateAttributeError): + state.get() diff --git a/tests/test_workdir.py b/tests/test_workdir.py index 1f2dd0f..b6cba07 100644 --- a/tests/test_workdir.py +++ b/tests/test_workdir.py @@ -86,6 +86,13 @@ def runs_path(self) -> Path: runs_path.mkdir() return runs_path + @pytest.fixture() + def state_path(self) -> Path: + """Mock internal.STATE_DIR.""" + state_path = Path(".state") + state_path.mkdir() + return state_path + @pytest.fixture() def pbs_job_files(self) -> List[Path]: """Create sample files of the form benchmark_cable_qsub.sh*.""" @@ -133,8 +140,11 @@ def test_clean_realisation_files_git(self, src_path_with_git: Path): clean_realisation_files() assert not src_path_with_git.exists() - def test_clean_submission_files(self, runs_path, pbs_job_files: List[Path]): + def test_clean_submission_files( + self, runs_path, state_path, pbs_job_files: List[Path] + ): """Success case: Submission files created by benchcab are removed after clean.""" clean_submission_files() assert not runs_path.exists() + assert not state_path.exists() assert not self._check_if_any_files_exist(pbs_job_files)