Skip to content

Commit

Permalink
Log status of fluxsite and comparison runs
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanBryan51 committed May 10, 2024
1 parent bb20f5e commit e897894
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ def _get_models(self, config: dict) -> list[Model]:
self._models.append(Model(repo=repo, model_id=id, **sub_config))
return self._models

def _fluxsite_show_task_composition(self, config: dict) -> str:
n_models = len(self._get_models(config))
n_sites = len(get_met_forcing_file_names(config["fluxsite"]["experiment"]))
n_science_configurations = len(config["science_configurations"])
return (

Check warning on line 164 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L161-L164

Added lines #L161 - L164 were not covered by tests
f"models: {n_models}, "
f"sites: {n_sites}, "
f"science configurations: {n_science_configurations}"
)

def _get_fluxsite_tasks(self, config: dict) -> list[fluxsite.FluxsiteTask]:
if not self._fluxsite_tasks:
self._fluxsite_tasks = fluxsite.get_fluxsite_tasks(
Expand Down Expand Up @@ -295,12 +305,18 @@ def fluxsite_run_tasks(self, config_path: str):
tasks = self._get_fluxsite_tasks(config)

logger.info("Running fluxsite tasks...")
logger.info(

Check warning on line 308 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L308

Added line #L308 was not covered by tests
f"tasks: {len(tasks)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
fluxsite.run_tasks_in_parallel(tasks, n_processes=ncpus)
else:
fluxsite.run_tasks(tasks)
logger.info("Successfully ran fluxsite tasks")

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 319 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L317-L319

Added lines #L317 - L319 were not covered by tests

def fluxsite_bitwise_cmp(self, config_path: str):
"""Endpoint for `benchcab fluxsite-bitwise-cmp`."""
Expand All @@ -318,12 +334,18 @@ def fluxsite_bitwise_cmp(self, config_path: str):
)

logger.info("Running comparison tasks...")
logger.info(

Check warning on line 337 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L337

Added line #L337 was not covered by tests
f"tasks: {len(comparisons)} ({self._fluxsite_show_task_composition(config)})"
)
if config["fluxsite"]["multiprocess"]:
ncpus = config["fluxsite"]["pbs"]["ncpus"]
run_comparisons_in_parallel(comparisons, n_processes=ncpus)
else:
run_comparisons(comparisons)
logger.info("Successfully ran comparison tasks")

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
logger.info(f"{n_failed} failed, {n_success} passed")

Check warning on line 348 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L346-L348

Added lines #L346 - L348 were not covered by tests

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
"""Endpoint for `benchcab fluxsite`."""
Expand Down
16 changes: 16 additions & 0 deletions src/benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,26 @@ def __init__(
self.task_name = task_name
self.logger = get_logger()

def _get_state_dir(self) -> Path:
return internal.STATE_DIR / "fluxsite" / "comparisons" / self.task_name

def _state_init(self):
state_dir = self._get_state_dir()
state_dir.mkdir(parents=True, exist_ok=True)

def _state_set(self, attr: str):
(self._get_state_dir() / attr).touch()

def is_done(self) -> bool:
"""Return status of current task."""
return (self._get_state_dir() / "done").exists()

Check warning on line 53 in src/benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/comparison.py#L53

Added line #L53 was not covered by tests

def run(self) -> None:
"""Executes `nccmp -df` on the NetCDF files pointed to by `self.files`."""
file_a, file_b = self.files
self.logger.debug(f"Comparing files {file_a.name} and {file_b.name} bitwise...")

self._state_init()
try:
self.subprocess_handler.run_cmd(
f"nccmp -df {file_a} {file_b}",
Expand All @@ -51,6 +66,7 @@ def run(self) -> None:
self.logger.info(
f"Success: files {file_a.name} {file_b.name} are identical"
)
self._state_set("done")
except CalledProcessError as exc:
output_file = (
internal.FLUXSITE_DIRS["BITWISE_CMP"] / f"{self.task_name}.txt"
Expand Down
17 changes: 17 additions & 0 deletions src/benchcab/fluxsite.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import operator
import shutil
import sys
from pathlib import Path
from subprocess import CalledProcessError

import f90nml
Expand Down Expand Up @@ -60,6 +61,20 @@ def __init__(
self.sci_config = sci_config
self.logger = get_logger()

def _get_state_dir(self) -> Path:
return internal.STATE_DIR / "fluxsite" / "runs" / self.get_task_name()

Check warning on line 65 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L65

Added line #L65 was not covered by tests

def _state_init(self):
state_dir = self._get_state_dir()
state_dir.mkdir(parents=True, exist_ok=True)

Check warning on line 69 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L68-L69

Added lines #L68 - L69 were not covered by tests

def _state_set(self, attr: str):
(self._get_state_dir() / attr).touch()

Check warning on line 72 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L72

Added line #L72 was not covered by tests

def is_done(self) -> bool:
"""Return status of current task."""
return (self._get_state_dir() / "done").exists()

Check warning on line 76 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L76

Added line #L76 was not covered by tests

def get_task_name(self) -> str:
"""Returns the file name convention used for this task."""
met_forcing_base_filename = self.met_forcing_file.split(".")[0]
Expand Down Expand Up @@ -212,9 +227,11 @@ def run(self):
self.logger.debug(f"Running task {task_name}... CABLE standard output ")
self.logger.debug(f"saved in {task_dir / internal.CABLE_STDOUT_FILENAME}")

self._state_init()

Check warning on line 230 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L230

Added line #L230 was not covered by tests
try:
self.run_cable()
self.add_provenance_info()
self._state_set("done")

Check warning on line 234 in src/benchcab/fluxsite.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/fluxsite.py#L234

Added line #L234 was not covered by tests
except CableError:
# Note: here we suppress CABLE specific errors so that `benchcab`
# exits successfully. This then allows us to run bitwise comparisons
Expand Down
3 changes: 3 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

# DIRECTORY PATHS/STRUCTURE:

# Path to hidden state directory:
STATE_DIR = Path(".state")

# Default system paths in Unix
SYSTEM_PATHS = ["/bin", "/usr/bin", "/usr/local/bin"]

Expand Down
3 changes: 3 additions & 0 deletions src/benchcab/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def clean_submission_files():
if internal.RUN_DIR.exists():
shutil.rmtree(internal.RUN_DIR)

if internal.STATE_DIR.exists():
shutil.rmtree(internal.STATE_DIR)

Check warning on line 28 in src/benchcab/workdir.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/workdir.py#L28

Added line #L28 was not covered by tests

for pbs_job_file in Path.cwd().glob(f"{internal.QSUB_FNAME}*"):
pbs_job_file.unlink()

Expand Down

0 comments on commit e897894

Please sign in to comment.