Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/mx-1708 convert sinks to connectors #367

Merged
merged 11 commits into from
Jan 28, 2025
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- add a sink registry with `register_sink` and `get_sink` functions
- add a multi-sink implementation, akin to `mex.extractors.load`

### Changes

- BREAKING: convert post_to_backend_api to BackendApiSink
- BREAKING: convert write_ndjson to NdjsonSink
- backend and ndjson sinks log progress only in batches
- increase timeout and decrease chunk size for backend API sink

### Deprecated

### Removed
Expand Down
42 changes: 25 additions & 17 deletions mex/common/sinks/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
from typing import cast

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.logging import watch
from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.sinks.base import BaseSink
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper


@watch
def post_to_backend_api(
models: Iterable[AnyExtractedModel], chunk_size: int = 100
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Load models to the Backend API using bulk insertion.
class BackendApiSink(BaseSink, BackendApiConnector):
rababerladuseladim marked this conversation as resolved.
Show resolved Hide resolved
"""Sink to load models to the Backend API."""

Args:
models: Iterable of extracted models
chunk_size: Optional size to chunks to post in one request
CHUNK_SIZE = 50
TIMEOUT = 30

Returns:
Generator for identifiers of posted models
"""
connector = BackendApiConnector.get()
for chunk in grouper(chunk_size, models):
model_list = [model for model in chunk if model is not None]
response = connector.post_extracted_items(model_list)
yield from cast(list[AnyExtractedIdentifier], response.identifiers)
def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Load models to the Backend API using bulk insertion.

Args:
models: Iterable of extracted models

Returns:
Generator for identifiers of posted models
"""
total_count = 0
for chunk in grouper(self.CHUNK_SIZE, models):
model_list = [model for model in chunk if model is not None]
response = self.post_extracted_items(model_list)
total_count += len(model_list)
yield from cast(list[AnyExtractedIdentifier], response.identifiers)
logger.info("BackendApiSink - written %s models", total_count)
rababerladuseladim marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 17 additions & 0 deletions mex/common/sinks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from abc import abstractmethod
from collections.abc import Iterable

from mex.common.connector import BaseConnector
from mex.common.models import AnyExtractedModel
from mex.common.types import Identifier


class BaseSink(BaseConnector):
"""Base class to define the interface of sink instances."""

@abstractmethod
def load(
self, models: Iterable[AnyExtractedModel]
) -> Iterable[Identifier]: # pragma: no cover
"""Iteratively load models to a destination and yield their identifiers."""
...
88 changes: 52 additions & 36 deletions mex/common/sinks/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,61 @@
from pathlib import Path
from typing import IO, Any

from mex.common.logging import logger, watch
from mex.common.logging import logger
from mex.common.models import AnyExtractedModel
from mex.common.settings import BaseSettings
from mex.common.sinks.base import BaseSink
from mex.common.transform import MExEncoder
from mex.common.types import AnyExtractedIdentifier
from mex.common.utils import grouper


@watch
def write_ndjson(
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Write the incoming models into a new-line delimited JSON file.

Args:
models: Iterable of extracted models to write

Settings:
work_dir: Path to store the NDJSON files in

Returns:
Generator for identifiers of written models
"""
file_handles: dict[str, IO[Any]] = {}
settings = BaseSettings.get()
with ExitStack() as stack:
for model in models:
class_name = model.__class__.__name__
try:
handle = file_handles[class_name]
except KeyError:
file_name = Path(settings.work_dir, f"{class_name}.ndjson")
writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115
file_handles[class_name] = handle = stack.enter_context(writer)
logger.info(
"write_ndjson - writing %s to file %s",
class_name,
file_name.as_posix(),
)

json.dump(model, handle, sort_keys=True, cls=MExEncoder)
handle.write("\n")
yield model.identifier
class NdjsonSink(BaseSink):
"""Sink to load models into new-line delimited JSON files."""

CHUNK_SIZE = 100
_work_dir: Path

def __init__(self) -> None:
"""Instantiate the multi sink singleton."""
settings = BaseSettings.get()
self._work_dir = Path(settings.work_dir)

def close(self) -> None:
"""Nothing to close, since load already closes all file handles."""

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[AnyExtractedIdentifier, None, None]:
"""Write the incoming models into a new-line delimited JSON file.
cutoffthetop marked this conversation as resolved.
Show resolved Hide resolved

Args:
models: Iterable of extracted models to write

Returns:
Generator for identifiers of written models
"""
file_handles: dict[str, IO[Any]] = {}
total_count = 0
with ExitStack() as stack:
for chunk in grouper(self.CHUNK_SIZE, models):
for model in chunk:
if model is None:
continue
class_name = model.__class__.__name__
try:
fh = file_handles[class_name]
except KeyError:
file_name = self._work_dir / f"{class_name}.ndjson"
writer = open(file_name, "a+", encoding="utf-8") # noqa: SIM115
file_handles[class_name] = fh = stack.enter_context(writer)
logger.info(
"NdjsonSink - writing %s to file %s",
class_name,
file_name.as_posix(),
)
fh.write(f"{json.dumps(model, sort_keys=True, cls=MExEncoder)}\n")
total_count += 1
yield model.identifier
logger.info("NdjsonSink - written %s models", total_count)
rababerladuseladim marked this conversation as resolved.
Show resolved Hide resolved
79 changes: 79 additions & 0 deletions mex/common/sinks/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from collections.abc import Generator, Iterable
from itertools import tee
from typing import Final

from mex.common.models import AnyExtractedModel
from mex.common.settings import BaseSettings
from mex.common.sinks.backend_api import BackendApiSink
from mex.common.sinks.base import BaseSink
from mex.common.sinks.ndjson import NdjsonSink
from mex.common.types import Identifier, Sink

_SINK_REGISTRY: Final[dict[Sink, type["BaseSink"]]] = {}


class _MultiSink(BaseSink):
"""Sink to load models to multiple sinks simultaneously."""

# This class is private because it should only be acquired by calling `get_sink`.

_sinks: list[BaseSink] = []

def __init__(self) -> None:
"""Instantiate the multi sink singleton."""
settings = BaseSettings.get()
for sink in settings.sink:
if sink in _SINK_REGISTRY:
sink_cls = _SINK_REGISTRY[sink]
self._sinks.append(sink_cls.get())
else:
msg = f"Sink function not implemented: {sink}"
raise RuntimeError(msg)

def close(self) -> None:
"""Close all underlying sinks."""
for sink in self._sinks:
sink.close()

def load(
self,
models: Iterable[AnyExtractedModel],
) -> Generator[Identifier, None, None]:
"""Load models to multiple sinks simultaneously."""
for sink, model_gen in zip(
self._sinks, tee(models, len(self._sinks)), strict=False
rababerladuseladim marked this conversation as resolved.
Show resolved Hide resolved
):
yield from sink.load(model_gen)


def register_sink(key: Sink, sink_cls: type["BaseSink"]) -> None:
"""Register an implementation of a sink function to a settings key.

Args:
key: Possible value of `BaseSettings.sink`
sink_cls: Implementation of the abstract sink class

Raises:
RuntimeError: When the `key` is already registered
"""
if key in _SINK_REGISTRY:
msg = f"Already registered sink function: {key}"
raise RuntimeError(msg)
_SINK_REGISTRY[key] = sink_cls


def get_sink() -> "BaseSink":
"""Get a sink function that serves all configured `sink` destinations.

Raises:
RuntimeError: When the configured sink is not registered

Returns:
A function that pours the models into all configured sinks
"""
return _MultiSink.get()


# register the default providers shipped with mex-common
register_sink(Sink.BACKEND, BackendApiSink)
register_sink(Sink.NDJSON, NdjsonSink)
16 changes: 7 additions & 9 deletions tests/sinks/test_backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@

from pytest import MonkeyPatch

from mex.common.backend_api.connector import BackendApiConnector
from mex.common.backend_api.models import IdentifiersResponse
from mex.common.models import ExtractedPerson
from mex.common.sinks.backend_api import post_to_backend_api
from mex.common.sinks.backend_api import BackendApiSink


def test_post_to_backend_api_mocked(
def test_sink_load_mocked(
extracted_person: ExtractedPerson, monkeypatch: MonkeyPatch
) -> None:
def __init__(self: BackendApiConnector) -> None:
def __init__(self: BackendApiSink) -> None:
self.session = MagicMock()

monkeypatch.setattr(BackendApiConnector, "__init__", __init__)
monkeypatch.setattr(BackendApiSink, "__init__", __init__)

response = IdentifiersResponse(identifiers=[extracted_person.identifier])
post_extracted_items = Mock(return_value=response)
monkeypatch.setattr(
BackendApiConnector, "post_extracted_items", post_extracted_items
)
monkeypatch.setattr(BackendApiSink, "post_extracted_items", post_extracted_items)

model_ids = list(post_to_backend_api([extracted_person]))
sink = BackendApiSink.get()
model_ids = list(sink.load([extracted_person]))
assert model_ids == response.identifiers
post_extracted_items.assert_called_once_with([extracted_person])
7 changes: 4 additions & 3 deletions tests/sinks/test_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from mex.common.models import ExtractedData
from mex.common.settings import BaseSettings
from mex.common.sinks.ndjson import write_ndjson
from mex.common.sinks.ndjson import NdjsonSink
from mex.common.types import Identifier, TemporalEntity


Expand All @@ -21,7 +21,7 @@ class ExtractedThing(ExtractedData):
ts_attr: TemporalEntity | None = None


def test_write_ndjson() -> None:
def test_sink_load() -> None:
settings = BaseSettings.get()

test_models = [
Expand All @@ -37,7 +37,8 @@ def test_write_ndjson() -> None:
),
]

ids = list(write_ndjson(test_models))
sink = NdjsonSink.get()
ids = list(sink.load(test_models))
assert len(ids)

with open(settings.work_dir / "ExtractedThing.ndjson") as handle:
Expand Down
Loading