Skip to content

Commit

Permalink
Refactor logging in LocalDaskProcessor to use instance logger
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexDo1 committed Nov 12, 2024
1 parent ef0e414 commit 6706027
Showing 1 changed file with 41 additions and 35 deletions.
76 changes: 41 additions & 35 deletions stgrid2area/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ def __init__(self, areas: List[Area], stgrid: xr.Dataset, variable: str, method:
self.logger = logger

# Set up basic logging if no handler is configured
if not self.logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
if not self.logger:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.StreamHandler())

def clip_and_aggregate(self, area: Area) -> Union[pd.DataFrame, Exception]:
"""
Expand Down Expand Up @@ -81,7 +83,7 @@ def clip_and_aggregate(self, area: Area) -> Union[pd.DataFrame, Exception]:
try:
return area.aggregate(clipped, self.variable, "exact_extract", self.operations, save_result=True, skip_exist=self.skip_exist)
except ValueError:
logging.warning(f"Method 'exact_extract' failed for area {area.id}. Falling back to 'xarray' method.")
self.logger.warning(f"Method 'exact_extract' failed for area {area.id}. Falling back to 'xarray' method.")
return area.aggregate(clipped, self.variable, "xarray", self.operations, save_result=True, skip_exist=self.skip_exist)
else:
raise ValueError("Invalid method. Use 'exact_extract', 'xarray' or 'fallback_xarray'.")
Expand All @@ -91,40 +93,44 @@ def run(self) -> None:
Run the parallel processing of areas using Dask.
"""
logging.info("Starting processing with LocalDaskProcessor.")
self.logger.info("Starting processing with LocalDaskProcessor.")

with Client(LocalCluster(n_workers=self.n_workers, threads_per_worker=1)) as client:
# Log the Dask dashboard address
logging.info(f"Dask dashboard address: {client.dashboard_link}")

# Persist stgrid in memory to avoid repetitive scattering
self.stgrid = self.stgrid.persist()

# Process the areas in parallel and keep track of futures
tasks = [delayed(self.clip_and_aggregate)(area, dask_key_name=f"{area.id}") for area in self.areas]
futures = client.compute(tasks)

counter = 0
success = 0

# Wait for the tasks to complete
for future in as_completed(futures):
with LocalCluster(n_workers=self.n_workers, threads_per_worker=1) as cluster:
with Client(cluster) as client:
try:
# Get the result of the task
result = future.result()
if isinstance(result, pd.DataFrame):
counter += 1
success += 1
logging.info(f"[{counter} / {len(self.areas)}]: {future.key} --- Processing completed")
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
counter += 1
logging.exception(f"[{counter} / {len(self.areas)}]: {future.key} --- An error occurred: {e}")
else:
counter += 1
logging.error(f"[{counter} / {len(self.areas)}]: {future.key} --- An error occurred: {e}")

logging.info(f"Processing completed and was successful for [{success} / {len(self.areas)}] areas")
# Log the Dask dashboard address
self.logger.info(f"Dask dashboard address: {client.dashboard_link}")

# Persist stgrid in memory to avoid repetitive scattering
self.stgrid = self.stgrid.persist()

# Process the areas in parallel and keep track of futures
tasks = [delayed(self.clip_and_aggregate)(area, dask_key_name=f"{area.id}") for area in self.areas]
futures = client.compute(tasks)

counter = 0
success = 0

# Wait for the tasks to complete
for future in as_completed(futures):
try:
# Get the result of the task
result = future.result()
if isinstance(result, pd.DataFrame):
counter += 1
success += 1
self.logger.info(f"[{counter} / {len(self.areas)}]: {future.key} --- Processing completed")
except Exception as e:
if self.logger.getLogger().level == self.logger.DEBUG:
counter += 1
self.logger.exception(f"[{counter} / {len(self.areas)}]: {future.key} --- An error occurred: {e}")
else:
counter += 1
self.logger.error(f"[{counter} / {len(self.areas)}]: {future.key} --- An error occurred: {e}")

self.logger.info(f"Processing completed and was successful for [{success} / {len(self.areas)}] areas")
finally:
self.logger.info("Shutting down Dask client and cluster.")



Expand Down

0 comments on commit 6706027

Please sign in to comment.