From f702a11923341c195d536d2610d1267c1bacace5 Mon Sep 17 00:00:00 2001 From: Nicolas Drebenstedt <897972+cutoffthetop@users.noreply.github.com> Date: Tue, 28 Jan 2025 09:34:02 +0100 Subject: [PATCH] feature/mx-1708 convert sinks to connectors (#367) ### PR Context - sibling PR to https://github.com/robert-koch-institut/mex-common/pull/366 ### Added - add a sink registry with `register_sink` and `get_sink` functions - add a `MultiSink` implementation, akin to `mex.extractors.load` ### Changes - BREAKING: convert post_to_backend_api to BackendApiSink - BREAKING: convert write_ndjson to NdjsonSink --------- Signed-off-by: Nicolas Drebenstedt <897972+cutoffthetop@users.noreply.github.com> Co-authored-by: rababerladuseladim --- CHANGELOG.md | 8 ++- mex/common/backend_api/connector.py | 2 + mex/common/sinks/backend_api.py | 42 ++++++++------ mex/common/sinks/base.py | 23 ++++++++ mex/common/sinks/ndjson.py | 89 +++++++++++++++++------------ mex/common/sinks/registry.py | 79 +++++++++++++++++++++++++ tests/backend_api/test_connector.py | 2 +- tests/sinks/test_backend_api.py | 7 ++- tests/sinks/test_ndjson.py | 7 ++- 9 files changed, 198 insertions(+), 61 deletions(-) create mode 100644 mex/common/sinks/base.py create mode 100644 mex/common/sinks/registry.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0569bb65..28f787c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- port backend identity provider implementation from editor/extractors to common +- add a sink registry with `register_sink` and `get_sink` functions +- add a multi-sink implementation, akin to `mex.extractors.load` ### Changes +- BREAKING: convert post_to_backend_api to BackendApiSink +- BREAKING: convert write_ndjson to NdjsonSink +- backend and ndjson sinks log progress only in batches +- increase timeout and decrease chunk size for backend API sink +- port backend identity provider implementation from editor/extractors to common - allow backend and graph as identity provider setting to simplify setting subclasses, even though graph is not implemented in mex-common - BREAKING: make backend api connector response models generic, to keep DRY diff --git a/mex/common/backend_api/connector.py b/mex/common/backend_api/connector.py index 351eeeaf..8f5bb32a 100644 --- a/mex/common/backend_api/connector.py +++ b/mex/common/backend_api/connector.py @@ -24,6 +24,7 @@ class BackendApiConnector(HTTPConnector): """Connector class to handle interaction with the Backend API.""" API_VERSION = "v0" + INGEST_TIMEOUT = 30 def _check_availability(self) -> None: """Send a GET request to verify the API is available.""" @@ -58,6 +59,7 @@ def post_extracted_items( method="POST", endpoint="ingest", payload=ItemsContainer[AnyExtractedModel](items=extracted_items), + timeout=self.INGEST_TIMEOUT, ) return IdentifiersResponse.model_validate(response) diff --git a/mex/common/sinks/backend_api.py b/mex/common/sinks/backend_api.py index 69550d6a..b5fa2e34 100644 --- a/mex/common/sinks/backend_api.py +++ b/mex/common/sinks/backend_api.py @@ -2,27 +2,35 @@ from typing import cast from mex.common.backend_api.connector import BackendApiConnector -from mex.common.logging import watch +from mex.common.logging import logger from mex.common.models import AnyExtractedModel +from mex.common.sinks.base import BaseSink from mex.common.types import AnyExtractedIdentifier from mex.common.utils import grouper -@watch -def post_to_backend_api( - models: Iterable[AnyExtractedModel], chunk_size: int = 100 -) -> Generator[AnyExtractedIdentifier, None, None]: - """Load models to the Backend API using bulk insertion. +class BackendApiSink(BaseSink): + """Sink to load models to the Backend API.""" - Args: - models: Iterable of extracted models - chunk_size: Optional size to chunks to post in one request + CHUNK_SIZE = 50 - Returns: - Generator for identifiers of posted models - """ - connector = BackendApiConnector.get() - for chunk in grouper(chunk_size, models): - model_list = [model for model in chunk if model is not None] - response = connector.post_extracted_items(model_list) - yield from cast(list[AnyExtractedIdentifier], response.identifiers) + def load( + self, + models: Iterable[AnyExtractedModel], + ) -> Generator[AnyExtractedIdentifier, None, None]: + """Load models to the Backend API using bulk insertion. + + Args: + models: Iterable of extracted models + + Returns: + Generator for identifiers of posted models + """ + total_count = 0 + connector = BackendApiConnector.get() + for chunk in grouper(self.CHUNK_SIZE, models): + model_list = [model for model in chunk if model is not None] + response = connector.post_extracted_items(model_list) + total_count += len(model_list) + yield from cast(list[AnyExtractedIdentifier], response.identifiers) + logger.info("%s - written %s models", type(self).__name__, total_count) diff --git a/mex/common/sinks/base.py b/mex/common/sinks/base.py new file mode 100644 index 00000000..673002a8 --- /dev/null +++ b/mex/common/sinks/base.py @@ -0,0 +1,23 @@ +from abc import abstractmethod +from collections.abc import Iterable + +from mex.common.connector import BaseConnector +from mex.common.models import AnyExtractedModel +from mex.common.types import Identifier + + +class BaseSink(BaseConnector): + """Base class to define the interface of sink instances.""" + + def __init__(self) -> None: + """Create a new sink.""" + + def close(self) -> None: + """Close the sink.""" + + @abstractmethod + def load( + self, models: Iterable[AnyExtractedModel] + ) -> Iterable[Identifier]: # pragma: no cover + """Iteratively load models to a destination and yield their identifiers.""" + ... diff --git a/mex/common/sinks/ndjson.py b/mex/common/sinks/ndjson.py index 619b5b63..1bb4cf58 100644 --- a/mex/common/sinks/ndjson.py +++ b/mex/common/sinks/ndjson.py @@ -4,45 +4,62 @@ from pathlib import Path from typing import IO, Any -from mex.common.logging import logger, watch +from mex.common.logging import logger from mex.common.models import AnyExtractedModel from mex.common.settings import BaseSettings +from mex.common.sinks.base import BaseSink from mex.common.transform import MExEncoder from mex.common.types import AnyExtractedIdentifier +from mex.common.utils import grouper -@watch -def write_ndjson( - models: Iterable[AnyExtractedModel], -) -> Generator[AnyExtractedIdentifier, None, None]: - """Write the incoming models into a new-line delimited JSON file. - - Args: - models: Iterable of extracted models to write - - Settings: - work_dir: Path to store the NDJSON files in - - Returns: - Generator for identifiers of written models - """ - file_handles: dict[str, IO[Any]] = {} - settings = BaseSettings.get() - with ExitStack() as stack: - for model in models: - class_name = model.__class__.__name__ - try: - handle = file_handles[class_name] - except KeyError: - file_name = Path(settings.work_dir, f"{class_name}.ndjson") - writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115 - file_handles[class_name] = handle = stack.enter_context(writer) - logger.info( - "write_ndjson - writing %s to file %s", - class_name, - file_name.as_posix(), - ) - - json.dump(model, handle, sort_keys=True, cls=MExEncoder) - handle.write("\n") - yield model.identifier +class NdjsonSink(BaseSink): + """Sink to load models into new-line delimited JSON files.""" + + CHUNK_SIZE = 100 + _work_dir: Path + + def __init__(self) -> None: + """Instantiate the multi sink singleton.""" + settings = BaseSettings.get() + self._work_dir = Path(settings.work_dir) + + def close(self) -> None: + """Nothing to close, since load already closes all file handles.""" + + def load( + self, + models: Iterable[AnyExtractedModel], + ) -> Generator[AnyExtractedIdentifier, None, None]: + """Write models into a new-line delimited JSON file. + + Args: + models: Iterable of extracted models to write + + Returns: + Generator for identifiers of written models + """ + file_handles: dict[str, IO[Any]] = {} + total_count = 0 + with ExitStack() as stack: + for chunk in grouper(self.CHUNK_SIZE, models): + for model in chunk: + if model is None: + continue + class_name = model.__class__.__name__ + try: + fh = file_handles[class_name] + except KeyError: + file_name = self._work_dir / f"{class_name}.ndjson" + writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115 + file_handles[class_name] = fh = stack.enter_context(writer) + logger.info( + "%s - writing %s to file %s", + type(self).__name__, + class_name, + file_name.as_posix(), + ) + fh.write(f"{json.dumps(model, sort_keys=True, cls=MExEncoder)}\n") + total_count += 1 + yield model.identifier + logger.info("%s - written %s models", type(self).__name__, total_count) diff --git a/mex/common/sinks/registry.py b/mex/common/sinks/registry.py new file mode 100644 index 00000000..d61d35cd --- /dev/null +++ b/mex/common/sinks/registry.py @@ -0,0 +1,79 @@ +from collections.abc import Generator, Iterable +from itertools import tee +from typing import Final + +from mex.common.models import AnyExtractedModel +from mex.common.settings import BaseSettings +from mex.common.sinks.backend_api import BackendApiSink +from mex.common.sinks.base import BaseSink +from mex.common.sinks.ndjson import NdjsonSink +from mex.common.types import Identifier, Sink + +_SINK_REGISTRY: Final[dict[Sink, type["BaseSink"]]] = {} + + +class _MultiSink(BaseSink): + """Sink to load models to multiple sinks simultaneously.""" + + # This class is private because it should only be acquired by calling `get_sink`. + + _sinks: list[BaseSink] = [] + + def __init__(self) -> None: + """Instantiate the multi sink singleton.""" + settings = BaseSettings.get() + for sink in settings.sink: + if sink in _SINK_REGISTRY: + sink_cls = _SINK_REGISTRY[sink] + self._sinks.append(sink_cls.get()) + else: + msg = f"Sink function not implemented: {sink}" + raise RuntimeError(msg) + + def close(self) -> None: + """Close all underlying sinks.""" + for sink in self._sinks: + sink.close() + + def load( + self, + models: Iterable[AnyExtractedModel], + ) -> Generator[Identifier, None, None]: + """Load models to multiple sinks simultaneously.""" + for sink, model_gen in zip( + self._sinks, tee(models, len(self._sinks)), strict=True + ): + yield from sink.load(model_gen) + + +def register_sink(key: Sink, sink_cls: type["BaseSink"]) -> None: + """Register an implementation of a sink function to a settings key. + + Args: + key: Possible value of `BaseSettings.sink` + sink_cls: Implementation of the abstract sink class + + Raises: + RuntimeError: When the `key` is already registered + """ + if key in _SINK_REGISTRY: + msg = f"Already registered sink function: {key}" + raise RuntimeError(msg) + _SINK_REGISTRY[key] = sink_cls + + +def get_sink() -> "BaseSink": + """Get a sink function that serves all configured `sink` destinations. + + Raises: + RuntimeError: When the configured sink is not registered + + Returns: + A function that pours the models into all configured sinks + """ + return _MultiSink.get() + + +# register the default providers shipped with mex-common +register_sink(Sink.BACKEND, BackendApiSink) +register_sink(Sink.NDJSON, NdjsonSink) diff --git a/tests/backend_api/test_connector.py b/tests/backend_api/test_connector.py index 296f4284..ae11aad7 100644 --- a/tests/backend_api/test_connector.py +++ b/tests/backend_api/test_connector.py @@ -42,7 +42,7 @@ def test_post_extracted_items_mocked( "Accept": "application/json", "User-Agent": "rki/mex", }, - timeout=10, + timeout=BackendApiConnector.INGEST_TIMEOUT, data=Joker(), ) assert ( diff --git a/tests/sinks/test_backend_api.py b/tests/sinks/test_backend_api.py index 52ab29f1..d8015184 100644 --- a/tests/sinks/test_backend_api.py +++ b/tests/sinks/test_backend_api.py @@ -5,10 +5,10 @@ from mex.common.backend_api.connector import BackendApiConnector from mex.common.backend_api.models import IdentifiersResponse from mex.common.models import ExtractedPerson -from mex.common.sinks.backend_api import post_to_backend_api +from mex.common.sinks.backend_api import BackendApiSink -def test_post_to_backend_api_mocked( +def test_sink_load_mocked( extracted_person: ExtractedPerson, monkeypatch: MonkeyPatch ) -> None: def __init__(self: BackendApiConnector) -> None: @@ -22,6 +22,7 @@ def __init__(self: BackendApiConnector) -> None: BackendApiConnector, "post_extracted_items", post_extracted_items ) - model_ids = list(post_to_backend_api([extracted_person])) + sink = BackendApiSink.get() + model_ids = list(sink.load([extracted_person])) assert model_ids == response.identifiers post_extracted_items.assert_called_once_with([extracted_person]) diff --git a/tests/sinks/test_ndjson.py b/tests/sinks/test_ndjson.py index ad9072b9..07ec4f6e 100644 --- a/tests/sinks/test_ndjson.py +++ b/tests/sinks/test_ndjson.py @@ -5,7 +5,7 @@ from mex.common.models import ExtractedData from mex.common.settings import BaseSettings -from mex.common.sinks.ndjson import write_ndjson +from mex.common.sinks.ndjson import NdjsonSink from mex.common.types import Identifier, TemporalEntity @@ -21,7 +21,7 @@ class ExtractedThing(ExtractedData): ts_attr: TemporalEntity | None = None -def test_write_ndjson() -> None: +def test_sink_load() -> None: settings = BaseSettings.get() test_models = [ @@ -37,7 +37,8 @@ def test_write_ndjson() -> None: ), ] - ids = list(write_ndjson(test_models)) + sink = NdjsonSink.get() + ids = list(sink.load(test_models)) assert len(ids) with open(settings.work_dir / "ExtractedThing.ndjson") as handle: