From 1b656d7a343c0bf702da798a463b60e37a91297c Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Wed, 22 Jan 2025 00:00:34 +0100 Subject: [PATCH 1/2] fix: delete references if schema is deleted --- src/karapace/in_memory_database.py | 28 +++++---- src/karapace/schema_reader.py | 13 +--- src/karapace/schema_references.py | 2 +- src/karapace/schema_registry.py | 4 -- tests/unit/test_in_memory_database.py | 86 ++++++++++++++++++++++++--- 5 files changed, 96 insertions(+), 37 deletions(-) diff --git a/src/karapace/in_memory_database.py b/src/karapace/in_memory_database.py index 6692cae33..6a1f18186 100644 --- a/src/karapace/in_memory_database.py +++ b/src/karapace/in_memory_database.py @@ -125,18 +125,10 @@ def num_subjects(self) -> int: def num_schema_versions(self) -> tuple[int, int]: pass - @abstractmethod - def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: - pass - @abstractmethod def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: pass - @abstractmethod - def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: - pass - class InMemoryDatabase(KarapaceDatabase): def __init__(self) -> None: @@ -257,6 +249,9 @@ def insert_schema_version( schema=schema, schema_id=schema_id, ) + if references: + for ref in references: + self._insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id) else: self._delete_from_schema_id_on_subject( subject=subject, @@ -352,12 +347,19 @@ def delete_subject(self, *, subject: Subject, version: Version) -> None: def delete_subject_hard(self, *, subject: Subject) -> None: with self.schema_lock_thread: + for schema in self.subjects[subject].schemas.values(): + if schema.references: + self._remove_referenced_by(schema.schema_id, schema.references) del self.subjects[subject] self._delete_subject_from_schema_id_on_subject(subject=subject) def delete_subject_schema(self, *, subject: Subject, version: Version) -> None: with self.schema_lock_thread: - self.subjects[subject].schemas.pop(version, None) + schema = self.subjects[subject].schemas.pop(version, None) + if schema: + if schema.references: + self._remove_referenced_by(schema.schema_id, schema.references) + self._delete_from_schema_id_on_subject(subject=subject, schema=schema.schema) def num_schemas(self) -> int: return len(self.schemas) @@ -377,19 +379,19 @@ def num_schema_versions(self) -> tuple[int, int]: soft_deleted_versions += 1 return (live_versions, soft_deleted_versions) - def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: + def _insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: with self.schema_lock_thread: referents = self.referenced_by.get((subject, version), None) if referents: - referents.append(schema_id) + referents.add(schema_id) else: - self.referenced_by[(subject, version)] = Referents([schema_id]) + self.referenced_by[(subject, version)] = Referents({schema_id}) def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: with self.schema_lock_thread: return self.referenced_by.get((subject, version), None) - def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: + def _remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: with self.schema_lock_thread: for ref in references: key = (ref.subject, ref.version) diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index d7c02ba03..0f2c3a1a8 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -43,7 +43,7 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version +from karapace.typing import JsonObject, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown from threading import Event, Lock, Thread from typing import Final @@ -660,10 +660,6 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: references=resolved_references, ) - if resolved_references: - for ref in resolved_references: - self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id) - def handle_msg(self, key: dict, value: dict | None) -> None: if "keytype" in key: try: @@ -687,13 +683,6 @@ def handle_msg(self, key: dict, value: dict | None) -> None: ) raise InvalidSchema("Message key doesn't contain the `keytype` attribute") - def remove_referenced_by( - self, - schema_id: SchemaId, - references: Sequence[Reference], - ) -> None: - self.database.remove_referenced_by(schema_id, references) - def get_referenced_by( self, subject: Subject, diff --git a/src/karapace/schema_references.py b/src/karapace/schema_references.py index 900568349..204998088 100644 --- a/src/karapace/schema_references.py +++ b/src/karapace/schema_references.py @@ -12,7 +12,7 @@ from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from typing import cast, NewType, TypeVar -Referents = NewType("Referents", list[SchemaId]) +Referents = NewType("Referents", set[SchemaId]) T = TypeVar("T") diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 5c553b0db..6c5b27f5e 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -170,8 +170,6 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ deleted=True, references=schema_version.references, ) - if schema_version.references and len(schema_version.references) > 0: - self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references) else: try: schema_versions_live = self.subject_get(subject, include_deleted=False) @@ -225,8 +223,6 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, deleted=True, references=schema_version.references, ) - if schema_version.references and len(schema_version.references) > 0: - self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references) return resolved_version def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]: diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index a3720940d..57504ca7b 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -5,7 +5,7 @@ from __future__ import annotations from collections import defaultdict -from collections.abc import Iterable, Sequence +from collections.abc import Sequence from confluent_kafka.cimpl import KafkaError from karapace.config import DEFAULTS from karapace.constants import DEFAULT_SCHEMA_TOPIC @@ -13,13 +13,17 @@ from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher +from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import SchemaVersion, TypedSchema from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import Reference, Referents +from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from pathlib import Path from typing import Final +import pytest + TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/") @@ -176,15 +180,9 @@ def num_subjects(self) -> int: def num_schema_versions(self) -> tuple[int, int]: return self.db.num_schema_versions() - def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: - return self.db.insert_referenced_by(subject=subject, version=version, schema_id=schema_id) - def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: return self.db.get_referenced_by(subject=subject, version=version) - def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None: - return self.db.remove_referenced_by(schema_id=schema_id, references=references) - def duplicates(self) -> dict[SchemaId, list[tuple[Subject, TypedSchema]]]: duplicate_data = defaultdict(list) for schema_id, schemas in self._duplicates.items(): @@ -259,3 +257,77 @@ def test_can_ingest_schemas_from_log() -> None: schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data()) assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas" assert duplicates == {}, "the schema database is broken. The id should be unique" + + +@pytest.fixture(name="db_with_schemas") +def fixture_in_memory_database_with_schemas() -> InMemoryDatabase: + db = InMemoryDatabase() + schema_str = "syntax = 'proto3'; message Test { string test = 1; }" + + subject_a = Subject("subject_a") + schema_a = TypedSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=schema_str, + schema=ProtobufSchema(schema=schema_str), + ) + db.insert_subject(subject=subject_a) + schema_id_a = db.get_schema_id(schema_a) + db.insert_schema_version( + subject=subject_a, schema_id=schema_id_a, version=Version(1), schema=schema_a, deleted=False, references=None + ) + db.insert_schema_version( + subject=subject_a, schema_id=schema_id_a, version=Version(2), schema=schema_a, deleted=False, references=None + ) + + subject_b = Subject("subject_b") + references_b = [Reference(name="test", subject=subject_a, version=Version(1))] + schema_b = TypedSchema( + schema_type=SchemaType.PROTOBUF, + schema_str=schema_str, + schema=ProtobufSchema(schema=schema_str), + references=references_b, + ) + db.insert_subject(subject=subject_b) + schema_id_b = db.get_schema_id(schema_b) + db.insert_schema_version( + subject=subject_b, + schema_id=schema_id_b, + version=Version(1), + schema=schema_b, + deleted=False, + references=references_b, + ) + + return db + + +def test_delete_schema_references(db_with_schemas: InMemoryDatabase) -> None: + # Check that the schema is referenced by subject_b + referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1)) + assert referents is not None + version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0] + assert version.subject == Subject("subject_b") + assert version.version == Version(1) + + # Delete the schema from subject_b + db_with_schemas.delete_subject_schema(subject=Subject("subject_b"), version=Version(1)) + + # Check that the schema is no longer referenced by subject_b + referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1)) + assert len(referents) == 0, "referents should be gone after deleting the schema" + + +def test_delete_subject(db_with_schemas: InMemoryDatabase) -> None: + # Check that the schema is referenced by subject_b + referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1)) + assert referents is not None + version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0] + assert version.subject == Subject("subject_b") + assert version.version == Version(1) + + # Hard delete subject_b + db_with_schemas.delete_subject_hard(subject=Subject("subject_b")) + + # Check that the schema is no longer referenced by subject_b + referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1)) + assert len(referents) == 0, "referents should be gone after hard deleting the subject" From 485a95cfedd79e585cd816ada58827cdd04d4e37 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Wed, 22 Jan 2025 00:11:10 +0100 Subject: [PATCH 2/2] chore: fix enums in stubs to address recent mypy changes --- stubs/confluent_kafka/admin/_config.pyi | 5 +++-- stubs/confluent_kafka/admin/_resource.pyi | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/stubs/confluent_kafka/admin/_config.pyi b/stubs/confluent_kafka/admin/_config.pyi index 76c18ed5c..d811a4bfa 100644 --- a/stubs/confluent_kafka/admin/_config.pyi +++ b/stubs/confluent_kafka/admin/_config.pyi @@ -1,5 +1,6 @@ from ._resource import ResourceType from enum import Enum +from typing import cast class ConfigResource: Type = ResourceType @@ -12,7 +13,7 @@ class ConfigResource: ) -> None: ... class ConfigSource(Enum): - UNKNOWN_CONFIG: int - DYNAMIC_TOPIC_CONFIG: int + UNKNOWN_CONFIG = cast(int, ...) + DYNAMIC_TOPIC_CONFIG = cast(int, ...) class ConfigEntry: ... diff --git a/stubs/confluent_kafka/admin/_resource.pyi b/stubs/confluent_kafka/admin/_resource.pyi index db1cda471..67b02be22 100644 --- a/stubs/confluent_kafka/admin/_resource.pyi +++ b/stubs/confluent_kafka/admin/_resource.pyi @@ -1,4 +1,5 @@ from enum import Enum +from typing import cast class ResourceType(Enum): - TOPIC: int + TOPIC = cast(int, ...)