Skip to content

Commit

Permalink
work on df-db thread
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Feb 7, 2025
1 parent f914a21 commit 87e8597
Showing 1 changed file with 15 additions and 28 deletions.
43 changes: 15 additions & 28 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,10 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
df.loc[i, "status"] = "skipped"
stats["start_job skipped"] += 1

# After updating the row, send the updated subset to the queue
updated_row = df.loc[i:i+1].copy()
self.job_output_queue.put(updated_row)

def on_job_done(self, job: BatchJob, row):
"""
Handles jobs that have finished. Can be overridden to provide custom behaviour.
Expand Down Expand Up @@ -715,44 +719,28 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

def _output_queue_worker(self,
df: Optional[pd.DataFrame] = None,
job_db: Union[str, Path, JobDatabaseInterface, None] = None,):
def _output_queue_worker(self, job_db: JobDatabaseInterface):
"""
Dedicated thread that:
- Constantly checks the output queue for job outcomes.
- For each result, it updates the DataFrame (optionally calling job.status())
and persists the updated row via the job database.
Dedicated thread that processes DataFrame updates from the queue and persists them.
"""
while True:
try:
item = self.job_output_queue.get(timeout=self.poll_sleep)
item = self.job_output_queue.get(timeout=1)
except Empty:
if self.stop_event.is_set():
break
continue

if item is None: # Sentinel to signal shutdown.
if item is None: # Sentinel to shutdown
break

index, outcome, job = item

# Update the DataFrame.
df.loc[index, "status"] = outcome
if outcome == "started":
try:
status = job.status()
except Exception as e:
_log.error(f"Error retrieving status for index {index}: {e}", exc_info=True)
status = "status_error"
df.loc[index, "status"] = status

# Persist the updated row.
try:
job_db.persist(self.df.loc[index : index + 1])
except Exception as persist_err:
_log.error(f"Error persisting update for index {index}: {persist_err}")
self.job_output_queue.task_done()
# Persist the DataFrame subset received from the queue
job_db.persist(item)
except Exception as e:
_log.error(f"Error persisting job updates: {e}")
finally:
self.job_output_queue.task_done()

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
Expand Down Expand Up @@ -841,8 +829,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats["job tracking error"] += 1
_log.warning(f"Error while tracking status of job {job_id!r} on backend {backend_name}: {e!r}")

stats["job_db persist"] += 1
job_db.persist(active)
self.job_output_queue.put(active.copy())

return jobs_done, jobs_error, jobs_cancel

Expand Down

0 comments on commit 87e8597

Please sign in to comment.