Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pipelines] πŸ”¨ Intra-stage result passing #2061

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/anomalib/pipelines/benchmark/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from anomalib.models import get_model
from anomalib.pipelines.components import JobGenerator
from anomalib.pipelines.components.utils import get_iterator_from_grid_dict
from anomalib.pipelines.types import PREV_STAGE_RESULT
from anomalib.utils.logging import hide_output

from .job import BenchmarkJob
Expand All @@ -30,8 +31,13 @@ def job_class(self) -> type:
return BenchmarkJob

@hide_output
def generate_jobs(self, args: dict) -> Generator[BenchmarkJob, None, None]:
def generate_jobs(
self,
args: dict,
previous_stage_result: PREV_STAGE_RESULT,
) -> Generator[BenchmarkJob, None, None]:
"""Return iterator based on the arguments."""
del previous_stage_result # Not needed for this job
for _container in get_iterator_from_grid_dict(args):
yield BenchmarkJob(
accelerator=self.accelerator,
Expand Down
16 changes: 12 additions & 4 deletions src/anomalib/pipelines/components/base/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from collections.abc import Generator

from anomalib.pipelines.types import GATHERED_RESULTS, RUN_RESULTS
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT, RUN_RESULTS


class Job(ABC):
Expand Down Expand Up @@ -54,12 +54,20 @@ class JobGenerator(ABC):
iterator of specific jobs.
"""

def __call__(self, args: dict | None = None) -> Generator[Job, None, None]:
def __call__(
self,
args: dict | None = None,
prev_stage_result: PREV_STAGE_RESULT = None,
) -> Generator[Job, None, None]:
"""Calls the ``generate_jobs`` method."""
return self.generate_jobs(args)
return self.generate_jobs(args, prev_stage_result)

@abstractmethod
def generate_jobs(self, args: dict | None = None) -> Generator[Job, None, None]:
def generate_jobs(
self,
args: dict | None = None,
prev_stage_result: PREV_STAGE_RESULT = None,
) -> Generator[Job, None, None]:
"""Return an iterator based on the arguments.

This can be used to generate the configurations that will be passed to run.
Expand Down
6 changes: 5 additions & 1 deletion src/anomalib/pipelines/components/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING

import yaml
from jsonargparse import ArgumentParser, Namespace
Expand All @@ -15,6 +16,8 @@

from .runner import Runner

if TYPE_CHECKING:
from anomalib.pipelines.types import PREV_STAGE_RESULT
traceback.install()

log_file = "runs/pipeline.log"
Expand Down Expand Up @@ -54,11 +57,12 @@ def run(self, args: Namespace | None = None) -> None:
args = self._get_args(args)
runners = self._setup_runners(args)
redirect_logs(log_file)
previous_results: PREV_STAGE_RESULT = None

for runner in runners:
try:
_args = args.get(runner.generator.job_class.name, None)
runner.run(_args)
previous_results = runner.run(_args, previous_results)
except Exception: # noqa: PERF203 catch all exception and allow try-catch in loop
logger.exception("An error occurred when running the runner.")
print(
Expand Down
4 changes: 3 additions & 1 deletion src/anomalib/pipelines/components/base/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from abc import ABC, abstractmethod

from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

from .job import JobGenerator


Expand All @@ -15,5 +17,5 @@ def __init__(self, generator: JobGenerator) -> None:
self.generator = generator

@abstractmethod
def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the pipeline."""
6 changes: 4 additions & 2 deletions src/anomalib/pipelines/components/runners/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich.progress import Progress, TaskID

from anomalib.pipelines.components.base import JobGenerator, Runner
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

if TYPE_CHECKING:
from concurrent.futures import Future
Expand Down Expand Up @@ -55,14 +56,14 @@ def __init__(self, generator: JobGenerator, n_jobs: int) -> None:
self.results: list[dict] = []
self.failures = False

def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the job in parallel."""
self.task_id = self.progress.add_task(self.generator.job_class.name, total=None)
self.progress.start()
self.processes = {i: None for i in range(self.n_jobs)}

with ProcessPoolExecutor(max_workers=self.n_jobs, mp_context=multiprocessing.get_context("spawn")) as executor:
for job in self.generator.generate_jobs(args):
for job in self.generator(args, prev_stage_results):
while None not in self.processes.values():
self._await_cleanup_processes()
# get free index
Expand All @@ -80,6 +81,7 @@ def run(self, args: dict) -> None:
logger.error(msg)
raise ParallelExecutionError(msg)
logger.info(f"Job {self.generator.job_class.name} completed successfully.")
return gathered_result

def _await_cleanup_processes(self, blocking: bool = False) -> None:
"""Wait for any one process to finish.
Expand Down
6 changes: 4 additions & 2 deletions src/anomalib/pipelines/components/runners/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rich.progress import track

from anomalib.pipelines.components.base import JobGenerator, Runner
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

logger = logging.getLogger(__name__)

Expand All @@ -23,12 +24,12 @@ class SerialRunner(Runner):
def __init__(self, generator: JobGenerator) -> None:
super().__init__(generator)

def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the job."""
results = []
failures = False
logger.info(f"Running job {self.generator.job_class.name}")
for job in track(self.generator(args), description=self.generator.job_class.name):
for job in track(self.generator(args, prev_stage_results), description=self.generator.job_class.name):
try:
results.append(job.run())
except Exception: # noqa: PERF203
Expand All @@ -42,3 +43,4 @@ def run(self, args: dict) -> None:
logger.error(msg)
raise SerialExecutionError(msg)
logger.info(f"Job {self.generator.job_class.name} completed successfully.")
return gathered_result
1 change: 1 addition & 0 deletions src/anomalib/pipelines/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

RUN_RESULTS = Any
GATHERED_RESULTS = Any
PREV_STAGE_RESULT = GATHERED_RESULTS | None
Loading