From b417e878f49f53257358c7e6c6533734089be877 Mon Sep 17 00:00:00 2001 From: Julien Clarysse Date: Tue, 4 Feb 2025 11:59:34 +0100 Subject: [PATCH] rest-proxy: allow reading messages with avro-value and string-key It is not uncommon that Kafka users produce messages with String-key and Avro-value. In this case Karapace REST typically fails to read the message since it only supports one format for deserializing both the key and the value. For more convenience, I suggest to fallback to `binary` deserializer for the key (only) when reading it using `avro` fails. --- .../kafka_rest_apis/consumer_manager.py | 14 ++++--- tests/integration/test_rest_consumer.py | 38 +++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/karapace/kafka_rest_apis/consumer_manager.py b/src/karapace/kafka_rest_apis/consumer_manager.py index 809478f4c..6451d417e 100644 --- a/src/karapace/kafka_rest_apis/consumer_manager.py +++ b/src/karapace/kafka_rest_apis/consumer_manager.py @@ -551,11 +551,15 @@ async def fetch(self, internal_name: tuple[str, str], content_type: str, formats try: key = await self.deserialize(msg.key(), request_format) if msg.key() else None except DeserializationError as e: - KarapaceBase.unprocessable_entity( - message=f"key deserialization error for format {request_format}: {e}", - sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - content_type=content_type, - ) + if request_format == "avro": + LOG.warning("Cannot process non-empty key using avro deserializer, falling back to binary.") + key = await self.deserialize(msg.key(), "binary") + else: + KarapaceBase.unprocessable_entity( + message=f"key deserialization error for format {request_format}: {e}", + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + ) try: value = await self.deserialize(msg.value(), request_format) if msg.value() else None except DeserializationError as e: diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index f0003dbdd..8e6deafb5 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -463,6 +463,44 @@ async def test_publish_consume_avro(rest_async_client, admin_client, trail, sche assert expected == actual, f"Expecting {actual} to be {expected}" +@pytest.mark.parametrize("fmt", ["avro"]) +async def test_consume_avro_key_deserialization_error_fallback(rest_async_client, admin_client, fmt): + topic_name = new_topic(admin_client, prefix=f"{fmt}_") + + # Produce record with binary key and empty value + header = REST_HEADERS["binary"] + string_key = b"testkey" + publish_key = base64.b64encode(string_key) + publish_key_binary = publish_key.decode("utf-8") + await repeat_until_successful_request( + rest_async_client.post, + f"topics/{topic_name}", + json_data={"records": [{"key": f"{publish_key_binary}"}]}, + headers=header, + error_msg="Unexpected response status for offset commit", + timeout=10, + sleep=1, + ) + + # Test consuming record using avro format + headers = REST_HEADERS[fmt] + group_name = f"e2e_group_{fmt}" + instance_id = await new_consumer(rest_async_client, group_name, fmt=fmt) + assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments" + assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} + res = await rest_async_client.post(assign_path, json=assign_payload, headers=headers) + assert res.ok + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=1000" + resp = await rest_async_client.get(consume_path, headers=headers) + + # Key-deserialization error should automatically fallback to binary + assert resp.status_code == 200, f"Expected 200 response: {resp}" + data = resp.json() + data_keys = [x["key"] for x in data] + for expected, actual in zip(publish_key, data_keys): + assert expected == actual, f"Expecting {actual} to be {expected}" + + @pytest.mark.parametrize("fmt", sorted(KNOWN_FORMATS)) async def test_consume_grafecul_deserialization_error_handling(rest_async_client, admin_client, fmt): topic_name = new_topic(admin_client, prefix=f"{fmt}_")