Skip to content

Commit

Permalink
feature/mx-1708 convert sinks to connectors (#367)
Browse files Browse the repository at this point in the history
### PR Context

- sibling PR to
#366

### Added

- add a sink registry with `register_sink` and `get_sink` functions
- add a `MultiSink` implementation, akin to `mex.extractors.load`

### Changes

- BREAKING: convert post_to_backend_api to BackendApiSink
- BREAKING: convert write_ndjson to NdjsonSink

---------

Signed-off-by: Nicolas Drebenstedt <[email protected]>
Co-authored-by: rababerladuseladim <[email protected]>
  • Loading branch information
cutoffthetop and rababerladuseladim authored Jan 28, 2025
1 parent 4f66746 commit f702a11
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 61 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- port backend identity provider implementation from editor/extractors to common
- add a sink registry with `register_sink` and `get_sink` functions
- add a multi-sink implementation, akin to `mex.extractors.load`

### Changes

- BREAKING: convert post_to_backend_api to BackendApiSink
- BREAKING: convert write_ndjson to NdjsonSink
- backend and ndjson sinks log progress only in batches
- increase timeout and decrease chunk size for backend API sink
- port backend identity provider implementation from editor/extractors to common
- allow backend and graph as identity provider setting to simplify setting subclasses,
even though graph is not implemented in mex-common
- BREAKING: make backend api connector response models generic, to keep DRY
Expand Down
2 changes: 2 additions & 0 deletions mex/common/backend_api/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BackendApiConnector(HTTPConnector):
"""Connector class to handle interaction with the Backend API."""

API_VERSION = "v0"
INGEST_TIMEOUT = 30

def _check_availability(self) -> None:
"""Send a GET request to verify the API is available."""
Expand Down Expand Up @@ -58,6 +59,7 @@ def post_extracted_items(
method="POST",
endpoint="ingest",
payload=ItemsContainer[AnyExtractedModel](items=extracted_items),
timeout=self.INGEST_TIMEOUT,
)
return IdentifiersResponse.model_validate(response)

Expand Down
42 changes: 25 additions & 17 deletions mex/common/sinks/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
from typing import cast

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.logging import watch
from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.sinks.base import BaseSink
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper


@watch
def post_to_backend_api(
models: Iterable[AnyExtractedModel], chunk_size: int = 100
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Load models to the Backend API using bulk insertion.
class BackendApiSink(BaseSink):
"""Sink to load models to the Backend API."""

Args:
models: Iterable of extracted models
chunk_size: Optional size to chunks to post in one request
CHUNK_SIZE = 50

Returns:
Generator for identifiers of posted models
"""
connector = BackendApiConnector.get()
for chunk in grouper(chunk_size, models):
model_list = [model for model in chunk if model is not None]
response = connector.post_extracted_items(model_list)
yield from cast(list[AnyExtractedIdentifier], response.identifiers)
def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Load models to the Backend API using bulk insertion.
Args:
models: Iterable of extracted models
Returns:
Generator for identifiers of posted models
"""
total_count = 0
connector = BackendApiConnector.get()
for chunk in grouper(self.CHUNK_SIZE, models):
model_list = [model for model in chunk if model is not None]
response = connector.post_extracted_items(model_list)
total_count += len(model_list)
yield from cast(list[AnyExtractedIdentifier], response.identifiers)
logger.info("%s - written %s models", type(self).__name__, total_count)
23 changes: 23 additions & 0 deletions mex/common/sinks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from abc import abstractmethod
from collections.abc import Iterable

from mex.common.connector import BaseConnector
from mex.common.models import AnyExtractedModel
from mex.common.types import Identifier


class BaseSink(BaseConnector):
"""Base class to define the interface of sink instances."""

def __init__(self) -> None:
"""Create a new sink."""

def close(self) -> None:
"""Close the sink."""

@abstractmethod
def load(
self, models: Iterable[AnyExtractedModel]
) -> Iterable[Identifier]: # pragma: no cover
"""Iteratively load models to a destination and yield their identifiers."""
...
89 changes: 53 additions & 36 deletions mex/common/sinks/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,62 @@
from pathlib import Path
from typing import IO, Any

from mex.common.logging import logger, watch
from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.settings import BaseSettings
from mex.common.sinks.base import BaseSink
from mex.common.transform import MExEncoder
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper


@watch
def write_ndjson(
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Write the incoming models into a new-line delimited JSON file.
Args:
models: Iterable of extracted models to write
Settings:
work_dir: Path to store the NDJSON files in
Returns:
Generator for identifiers of written models
"""
file_handles: dict[str, IO[Any]] = {}
settings = BaseSettings.get()
with ExitStack() as stack:
for model in models:
class_name = model.__class__.__name__
try:
handle = file_handles[class_name]
except KeyError:
file_name = Path(settings.work_dir, f"{class_name}.ndjson")
writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115
file_handles[class_name] = handle = stack.enter_context(writer)
logger.info(
"write_ndjson - writing %s to file %s",
class_name,
file_name.as_posix(),
)

json.dump(model, handle, sort_keys=True, cls=MExEncoder)
handle.write("\n")
yield model.identifier
class NdjsonSink(BaseSink):
"""Sink to load models into new-line delimited JSON files."""

CHUNK_SIZE = 100
_work_dir: Path

def __init__(self) -> None:
"""Instantiate the multi sink singleton."""
settings = BaseSettings.get()
self._work_dir = Path(settings.work_dir)

def close(self) -> None:
"""Nothing to close, since load already closes all file handles."""

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Write models into a new-line delimited JSON file.
Args:
models: Iterable of extracted models to write
Returns:
Generator for identifiers of written models
"""
file_handles: dict[str, IO[Any]] = {}
total_count = 0
with ExitStack() as stack:
for chunk in grouper(self.CHUNK_SIZE, models):
for model in chunk:
if model is None:
continue
class_name = model.__class__.__name__
try:
fh = file_handles[class_name]
except KeyError:
file_name = self._work_dir / f"{class_name}.ndjson"
writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115
file_handles[class_name] = fh = stack.enter_context(writer)
logger.info(
"%s - writing %s to file %s",
type(self).__name__,
class_name,
file_name.as_posix(),
)
fh.write(f"{json.dumps(model, sort_keys=True, cls=MExEncoder)}\n")
total_count += 1
yield model.identifier
logger.info("%s - written %s models", type(self).__name__, total_count)
79 changes: 79 additions & 0 deletions mex/common/sinks/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from collections.abc import Generator, Iterable
from itertools import tee
from typing import Final

from mex.common.models import AnyExtractedModel
from mex.common.settings import BaseSettings
from mex.common.sinks.backend_api import BackendApiSink
from mex.common.sinks.base import BaseSink
from mex.common.sinks.ndjson import NdjsonSink
from mex.common.types import Identifier, Sink

_SINK_REGISTRY: Final[dict[Sink, type["BaseSink"]]] = {}


class _MultiSink(BaseSink):
"""Sink to load models to multiple sinks simultaneously."""

# This class is private because it should only be acquired by calling `get_sink`.

_sinks: list[BaseSink] = []

def __init__(self) -> None:
"""Instantiate the multi sink singleton."""
settings = BaseSettings.get()
for sink in settings.sink:
if sink in _SINK_REGISTRY:
sink_cls = _SINK_REGISTRY[sink]
self._sinks.append(sink_cls.get())
else:
msg = f"Sink function not implemented: {sink}"
raise RuntimeError(msg)

def close(self) -> None:
"""Close all underlying sinks."""
for sink in self._sinks:
sink.close()

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[Identifier, None, None]:
"""Load models to multiple sinks simultaneously."""
for sink, model_gen in zip(
self._sinks, tee(models, len(self._sinks)), strict=True
):
yield from sink.load(model_gen)


def register_sink(key: Sink, sink_cls: type["BaseSink"]) -> None:
"""Register an implementation of a sink function to a settings key.
Args:
key: Possible value of `BaseSettings.sink`
sink_cls: Implementation of the abstract sink class
Raises:
RuntimeError: When the `key` is already registered
"""
if key in _SINK_REGISTRY:
msg = f"Already registered sink function: {key}"
raise RuntimeError(msg)
_SINK_REGISTRY[key] = sink_cls


def get_sink() -> "BaseSink":
"""Get a sink function that serves all configured `sink` destinations.
Raises:
RuntimeError: When the configured sink is not registered
Returns:
A function that pours the models into all configured sinks
"""
return _MultiSink.get()


# register the default providers shipped with mex-common
register_sink(Sink.BACKEND, BackendApiSink)
register_sink(Sink.NDJSON, NdjsonSink)
2 changes: 1 addition & 1 deletion tests/backend_api/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_post_extracted_items_mocked(
"Accept": "application/json",
"User-Agent": "rki/mex",
},
timeout=10,
timeout=BackendApiConnector.INGEST_TIMEOUT,
data=Joker(),
)
assert (
Expand Down
7 changes: 4 additions & 3 deletions tests/sinks/test_backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from mex.common.backend_api.connector import BackendApiConnector
from mex.common.backend_api.models import IdentifiersResponse
from mex.common.models import ExtractedPerson
from mex.common.sinks.backend_api import post_to_backend_api
from mex.common.sinks.backend_api import BackendApiSink


def test_post_to_backend_api_mocked(
def test_sink_load_mocked(
extracted_person: ExtractedPerson, monkeypatch: MonkeyPatch
) -> None:
def __init__(self: BackendApiConnector) -> None:
Expand All @@ -22,6 +22,7 @@ def __init__(self: BackendApiConnector) -> None:
BackendApiConnector, "post_extracted_items", post_extracted_items
)

model_ids = list(post_to_backend_api([extracted_person]))
sink = BackendApiSink.get()
model_ids = list(sink.load([extracted_person]))
assert model_ids == response.identifiers
post_extracted_items.assert_called_once_with([extracted_person])
7 changes: 4 additions & 3 deletions tests/sinks/test_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from mex.common.models import ExtractedData
from mex.common.settings import BaseSettings
from mex.common.sinks.ndjson import write_ndjson
from mex.common.sinks.ndjson import NdjsonSink
from mex.common.types import Identifier, TemporalEntity


Expand All @@ -21,7 +21,7 @@ class ExtractedThing(ExtractedData):
ts_attr: TemporalEntity | None = None


def test_write_ndjson() -> None:
def test_sink_load() -> None:
settings = BaseSettings.get()

test_models = [
Expand All @@ -37,7 +37,8 @@ def test_write_ndjson() -> None:
),
]

ids = list(write_ndjson(test_models))
sink = NdjsonSink.get()
ids = list(sink.load(test_models))
assert len(ids)

with open(settings.work_dir / "ExtractedThing.ndjson") as handle:
Expand Down

0 comments on commit f702a11

Please sign in to comment.