Skip to content

Commit

Permalink
rest-proxy: allow reading messages with avro-value and string-key
Browse files Browse the repository at this point in the history
It is not uncommon that Kafka users produce messages with String-key
and Avro-value. In this case Karapace REST typically fails to read
the message since it only supports one format for deserializing both
the key and the value.
For more convenience, I suggest to fallback to `binary` deserializer
for the key (only) when reading it using `avro` fails.
  • Loading branch information
jclarysse committed Feb 4, 2025
1 parent 6e61dfe commit b417e87
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,15 @@ async def fetch(self, internal_name: tuple[str, str], content_type: str, formats
try:
key = await self.deserialize(msg.key(), request_format) if msg.key() else None
except DeserializationError as e:
KarapaceBase.unprocessable_entity(
message=f"key deserialization error for format {request_format}: {e}",
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
content_type=content_type,
)
if request_format == "avro":
LOG.warning("Cannot process non-empty key using avro deserializer, falling back to binary.")
key = await self.deserialize(msg.key(), "binary")
else:
KarapaceBase.unprocessable_entity(
message=f"key deserialization error for format {request_format}: {e}",
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
content_type=content_type,
)
try:
value = await self.deserialize(msg.value(), request_format) if msg.value() else None
except DeserializationError as e:
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,44 @@ async def test_publish_consume_avro(rest_async_client, admin_client, trail, sche
assert expected == actual, f"Expecting {actual} to be {expected}"


@pytest.mark.parametrize("fmt", ["avro"])
async def test_consume_avro_key_deserialization_error_fallback(rest_async_client, admin_client, fmt):
topic_name = new_topic(admin_client, prefix=f"{fmt}_")

# Produce record with binary key and empty value
header = REST_HEADERS["binary"]
string_key = b"testkey"
publish_key = base64.b64encode(string_key)
publish_key_binary = publish_key.decode("utf-8")
await repeat_until_successful_request(
rest_async_client.post,
f"topics/{topic_name}",
json_data={"records": [{"key": f"{publish_key_binary}"}]},
headers=header,
error_msg="Unexpected response status for offset commit",
timeout=10,
sleep=1,
)

# Test consuming record using avro format
headers = REST_HEADERS[fmt]
group_name = f"e2e_group_{fmt}"
instance_id = await new_consumer(rest_async_client, group_name, fmt=fmt)
assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments"
assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]}
res = await rest_async_client.post(assign_path, json=assign_payload, headers=headers)
assert res.ok
consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=1000"
resp = await rest_async_client.get(consume_path, headers=headers)

# Key-deserialization error should automatically fallback to binary
assert resp.status_code == 200, f"Expected 200 response: {resp}"
data = resp.json()
data_keys = [x["key"] for x in data]
for expected, actual in zip(publish_key, data_keys):
assert expected == actual, f"Expecting {actual} to be {expected}"


@pytest.mark.parametrize("fmt", sorted(KNOWN_FORMATS))
async def test_consume_grafecul_deserialization_error_handling(rest_async_client, admin_client, fmt):
topic_name = new_topic(admin_client, prefix=f"{fmt}_")
Expand Down

0 comments on commit b417e87

Please sign in to comment.