Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return 422 for Avro/JSONSchema when payload does not match schema #853

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,13 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
except InvalidPayload as e:
cause = str(e.__cause__)
KafkaRest.r(
body={"error_code": RESTErrorCodes.INVALID_DATA.value, "message": cause},
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
except SchemaRetrievalError as e:
KafkaRest.r(
body={"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, "message": str(e)},
Expand Down
111 changes: 111 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"""
from __future__ import annotations

from dataclasses import dataclass
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX
from karapace.schema_type import SchemaType
from karapace.version import __version__
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand All @@ -21,10 +23,12 @@
test_objects_avro_evolution,
wait_for_topics,
)
from typing import Any, Mapping

import asyncio
import base64
import json
import pytest
import time

NEW_TOPIC_TIMEOUT = 10
Expand Down Expand Up @@ -675,3 +679,110 @@ async def test_partitions(
res = await rest_async_client.get(f"/topics/{topic_name}/partitions/foo/offsets", headers=header)
assert res.status_code == 404
assert res.json()["error_code"] == 404


@dataclass
class IncompatibleDataForSchemaTestCase:
name: str
schema_type: SchemaType
schema: Mapping[str, Any]
expected_error_message: str

def __str__(self) -> str:
return self.name


@pytest.mark.parametrize(
"testcase",
[
IncompatibleDataForSchemaTestCase(
name="Avro schema, incompatible data",
schema_type=SchemaType.AVRO,
schema={
"type": "record",
"namespace": "karapace.test",
"name": "IncompatibleDataTest",
"fields": [{"name": "validField", "type": "string"}, {"name": "invalidField", "type": "int"}],
},
expected_error_message="Object does not fit to stored schema",
),
IncompatibleDataForSchemaTestCase(
name="JSONSchema, incompatible data",
schema_type=SchemaType.JSONSCHEMA,
schema={
"$schema": "https://json.schema.org/draft/2020-12/schema",
"$id": "https://example.com/json.schema.test",
"title": "JSON Schema Test",
"description": "a description",
"type": "object",
"properties": {
"validField": {
"type": "string",
},
"invalidField": {
"type": "integer",
},
},
},
expected_error_message=(
"'not an integer' is not of type 'integer'\n\n"
"Failed validating 'type' in schema['properties']['invalidField']:\n"
" {'type': 'integer'}\n\nOn instance['invalidField']:\n 'not an integer'"
),
),
],
ids=str,
)
async def test_publish_invalid_data_for_schema(
testcase: IncompatibleDataForSchemaTestCase,
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaAdminClient,
) -> None:
incompatible_data = {
"records": [
{
"value": {
"validField": "valid value",
"invalidField": "not an integer",
},
},
],
}

topic_name = new_topic(admin_client)
subject = f"{topic_name}-value"

await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"

# Register schemas to get the ids
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schema": json.dumps(testcase.schema), "schemaType": testcase.schema_type.value},
)
assert res.status_code == 200
schema_id = res.json()["id"]

payload = {
"value_schema_id": schema_id,
}
payload.update(incompatible_data)

# Kludge: the schema type defines the value to "JSON", correct is "jsonschema"
# JSON serialization does not use schema to validate the data.
if SchemaType.JSONSCHEMA == testcase.schema_type:
serialization_format_name = "jsonschema"
else:
serialization_format_name = testcase.schema_type.value.lower()

res = await rest_async_client.post(
url,
json=payload,
headers=REST_HEADERS[serialization_format_name],
)
assert res.status_code == 422
res_json = res.json()
assert res_json["error_code"] == 42205
assert "message" in res_json
assert testcase.expected_error_message == res_json["message"]
Loading