Skip to content

Commit

Permalink
[Pipelines] 🔨 Intra-stage result passing (#2061)
Browse files Browse the repository at this point in the history
* Add initial design

Signed-off-by: Ashwin Vaidya <[email protected]>

* Refactor + add to CLI

Signed-off-by: Ashwin Vaidya <[email protected]>

* Support grid search on class path

Signed-off-by: Ashwin Vaidya <[email protected]>

* redirect outputs

Signed-off-by: Ashwin Vaidya <[email protected]>

* design v2

Signed-off-by: Ashwin Vaidya <[email protected]>

* remove commented code

Signed-off-by: Ashwin Vaidya <[email protected]>

* add dummy experiment

Signed-off-by: Ashwin Vaidya <[email protected]>

* add config

Signed-off-by: Ashwin Vaidya <[email protected]>

* Refactor

Signed-off-by: Ashwin Vaidya <[email protected]>

* Add tests

Signed-off-by: Ashwin Vaidya <[email protected]>

* Apply suggestions from code review

Co-authored-by: Samet Akcay <[email protected]>

* address pr comments

Signed-off-by: Ashwin Vaidya <[email protected]>

* Apply suggestions from code review

Co-authored-by: Samet Akcay <[email protected]>

* refactor

Signed-off-by: Ashwin Vaidya <[email protected]>

* Simplify argparse

Signed-off-by: Ashwin Vaidya <[email protected]>

* modify logger redirect

Signed-off-by: Ashwin Vaidya <[email protected]>

* update docstrings

Signed-off-by: Ashwin Vaidya <[email protected]>

* Add proposal

Signed-off-by: Ashwin Vaidya <[email protected]>

---------

Signed-off-by: Ashwin Vaidya <[email protected]>
Co-authored-by: Samet Akcay <[email protected]>
  • Loading branch information
ashwinvaidya17 and samet-akcay authored May 15, 2024
1 parent 849de79 commit 3de51a4
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 11 deletions.
8 changes: 7 additions & 1 deletion src/anomalib/pipelines/benchmark/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from anomalib.models import get_model
from anomalib.pipelines.components import JobGenerator
from anomalib.pipelines.components.utils import get_iterator_from_grid_dict
from anomalib.pipelines.types import PREV_STAGE_RESULT
from anomalib.utils.logging import hide_output

from .job import BenchmarkJob
Expand All @@ -30,8 +31,13 @@ def job_class(self) -> type:
return BenchmarkJob

@hide_output
def generate_jobs(self, args: dict) -> Generator[BenchmarkJob, None, None]:
def generate_jobs(
self,
args: dict,
previous_stage_result: PREV_STAGE_RESULT,
) -> Generator[BenchmarkJob, None, None]:
"""Return iterator based on the arguments."""
del previous_stage_result # Not needed for this job
for _container in get_iterator_from_grid_dict(args):
yield BenchmarkJob(
accelerator=self.accelerator,
Expand Down
16 changes: 12 additions & 4 deletions src/anomalib/pipelines/components/base/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from collections.abc import Generator

from anomalib.pipelines.types import GATHERED_RESULTS, RUN_RESULTS
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT, RUN_RESULTS


class Job(ABC):
Expand Down Expand Up @@ -54,12 +54,20 @@ class JobGenerator(ABC):
iterator of specific jobs.
"""

def __call__(self, args: dict | None = None) -> Generator[Job, None, None]:
def __call__(
self,
args: dict | None = None,
prev_stage_result: PREV_STAGE_RESULT = None,
) -> Generator[Job, None, None]:
"""Calls the ``generate_jobs`` method."""
return self.generate_jobs(args)
return self.generate_jobs(args, prev_stage_result)

@abstractmethod
def generate_jobs(self, args: dict | None = None) -> Generator[Job, None, None]:
def generate_jobs(
self,
args: dict | None = None,
prev_stage_result: PREV_STAGE_RESULT = None,
) -> Generator[Job, None, None]:
"""Return an iterator based on the arguments.
This can be used to generate the configurations that will be passed to run.
Expand Down
6 changes: 5 additions & 1 deletion src/anomalib/pipelines/components/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING

import yaml
from jsonargparse import ArgumentParser, Namespace
Expand All @@ -15,6 +16,8 @@

from .runner import Runner

if TYPE_CHECKING:
from anomalib.pipelines.types import PREV_STAGE_RESULT

Check warning on line 20 in src/anomalib/pipelines/components/base/pipeline.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/base/pipeline.py#L20

Added line #L20 was not covered by tests
traceback.install()

log_file = "runs/pipeline.log"
Expand Down Expand Up @@ -54,11 +57,12 @@ def run(self, args: Namespace | None = None) -> None:
args = self._get_args(args)
runners = self._setup_runners(args)
redirect_logs(log_file)
previous_results: PREV_STAGE_RESULT = None

for runner in runners:
try:
_args = args.get(runner.generator.job_class.name, None)
runner.run(_args)
previous_results = runner.run(_args, previous_results)
except Exception: # noqa: PERF203 catch all exception and allow try-catch in loop
logger.exception("An error occurred when running the runner.")
print(

Check warning on line 68 in src/anomalib/pipelines/components/base/pipeline.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/base/pipeline.py#L66-L68

Added lines #L66 - L68 were not covered by tests
Expand Down
4 changes: 3 additions & 1 deletion src/anomalib/pipelines/components/base/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from abc import ABC, abstractmethod

from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

from .job import JobGenerator


Expand All @@ -15,5 +17,5 @@ def __init__(self, generator: JobGenerator) -> None:
self.generator = generator

@abstractmethod
def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the pipeline."""
6 changes: 4 additions & 2 deletions src/anomalib/pipelines/components/runners/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich.progress import Progress, TaskID

from anomalib.pipelines.components.base import JobGenerator, Runner
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

if TYPE_CHECKING:
from concurrent.futures import Future

Check warning on line 18 in src/anomalib/pipelines/components/runners/parallel.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/parallel.py#L18

Added line #L18 was not covered by tests
Expand Down Expand Up @@ -55,14 +56,14 @@ def __init__(self, generator: JobGenerator, n_jobs: int) -> None:
self.results: list[dict] = []
self.failures = False

Check warning on line 57 in src/anomalib/pipelines/components/runners/parallel.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/parallel.py#L51-L57

Added lines #L51 - L57 were not covered by tests

def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the job in parallel."""
self.task_id = self.progress.add_task(self.generator.job_class.name, total=None)
self.progress.start()
self.processes = {i: None for i in range(self.n_jobs)}

Check warning on line 63 in src/anomalib/pipelines/components/runners/parallel.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/parallel.py#L61-L63

Added lines #L61 - L63 were not covered by tests

with ProcessPoolExecutor(max_workers=self.n_jobs, mp_context=multiprocessing.get_context("spawn")) as executor:
for job in self.generator.generate_jobs(args):
for job in self.generator(args, prev_stage_results):
while None not in self.processes.values():
self._await_cleanup_processes()

Check warning on line 68 in src/anomalib/pipelines/components/runners/parallel.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/parallel.py#L65-L68

Added lines #L65 - L68 were not covered by tests
# get free index
Expand All @@ -80,6 +81,7 @@ def run(self, args: dict) -> None:
logger.error(msg)
raise ParallelExecutionError(msg)
logger.info(f"Job {self.generator.job_class.name} completed successfully.")
return gathered_result

Check warning on line 84 in src/anomalib/pipelines/components/runners/parallel.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/parallel.py#L74-L84

Added lines #L74 - L84 were not covered by tests

def _await_cleanup_processes(self, blocking: bool = False) -> None:
"""Wait for any one process to finish.
Expand Down
6 changes: 4 additions & 2 deletions src/anomalib/pipelines/components/runners/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rich.progress import track

from anomalib.pipelines.components.base import JobGenerator, Runner
from anomalib.pipelines.types import GATHERED_RESULTS, PREV_STAGE_RESULT

logger = logging.getLogger(__name__)

Expand All @@ -23,12 +24,12 @@ class SerialRunner(Runner):
def __init__(self, generator: JobGenerator) -> None:
super().__init__(generator)

def run(self, args: dict) -> None:
def run(self, args: dict, prev_stage_results: PREV_STAGE_RESULT = None) -> GATHERED_RESULTS:
"""Run the job."""
results = []
failures = False
logger.info(f"Running job {self.generator.job_class.name}")
for job in track(self.generator(args), description=self.generator.job_class.name):
for job in track(self.generator(args, prev_stage_results), description=self.generator.job_class.name):
try:
results.append(job.run())
except Exception: # noqa: PERF203
Expand All @@ -42,3 +43,4 @@ def run(self, args: dict) -> None:
logger.error(msg)
raise SerialExecutionError(msg)

Check warning on line 44 in src/anomalib/pipelines/components/runners/serial.py

View check run for this annotation

Codecov / codecov/patch

src/anomalib/pipelines/components/runners/serial.py#L41-L44

Added lines #L41 - L44 were not covered by tests
logger.info(f"Job {self.generator.job_class.name} completed successfully.")
return gathered_result
1 change: 1 addition & 0 deletions src/anomalib/pipelines/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

RUN_RESULTS = Any
GATHERED_RESULTS = Any
PREV_STAGE_RESULT = GATHERED_RESULTS | None

0 comments on commit 3de51a4

Please sign in to comment.