From 26b66a9b96b477f8a3277a5e9b0c8528ec00e166 Mon Sep 17 00:00:00 2001 From: spataphore1337 Date: Mon, 16 Dec 2024 12:37:28 +0300 Subject: [PATCH 1/6] feat: add BatchBufferOverflowException --- faststream/kafka/exceptions.py | 14 ++++++++++++++ faststream/kafka/publisher/producer.py | 7 +++++-- tests/brokers/kafka/test_publish.py | 17 +++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 faststream/kafka/exceptions.py diff --git a/faststream/kafka/exceptions.py b/faststream/kafka/exceptions.py new file mode 100644 index 0000000000..bf51f6a401 --- /dev/null +++ b/faststream/kafka/exceptions.py @@ -0,0 +1,14 @@ +from faststream.exceptions import FastStreamException + + +class BatchBufferOverflowException(FastStreamException): + """Exception raised when a buffer overflow occurs when adding a new message to the batches.""" + + def __init__(self, message_position: int) -> None: + self.message_position = message_position + + def __str__(self) -> str: + return ( + f"The batch buffer is full. The position of the message" + f" in the transferred collection at which the overflow occurred: {self.message_position}" + ) diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 2d5d3c92ad..b6f0022912 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -6,6 +6,7 @@ from faststream.broker.publisher.proto import ProducerProto from faststream.broker.utils import resolve_custom_func from faststream.exceptions import OperationForbiddenError +from faststream.kafka.exceptions import BatchBufferOverflowException from faststream.kafka.message import KafkaMessage from faststream.kafka.parser import AioKafkaParser @@ -100,7 +101,7 @@ async def publish_batch( reply_to, ) - for msg in msgs: + for message_position, msg in msgs: message, content_type = encode_message(msg) if content_type: @@ -111,12 +112,14 @@ async def publish_batch( else: final_headers = headers_to_send.copy() - batch.append( + metadata = batch.append( key=None, value=message, timestamp=timestamp_ms, headers=[(i, j.encode()) for i, j in final_headers.items()], ) + if metadata is None: + raise BatchBufferOverflowException(message_position=message_position) send_future = await self._producer.send_batch(batch, topic, partition=partition) if not no_confirm: diff --git a/tests/brokers/kafka/test_publish.py b/tests/brokers/kafka/test_publish.py index 5040a8662f..a75e6f1ce8 100644 --- a/tests/brokers/kafka/test_publish.py +++ b/tests/brokers/kafka/test_publish.py @@ -5,6 +5,7 @@ from faststream import Context from faststream.kafka import KafkaBroker, KafkaResponse +from faststream.kafka.exceptions import BatchBufferOverflowException from tests.brokers.base.publish import BrokerPublishTestcase @@ -133,3 +134,19 @@ async def handle_next(msg=Context("message")): body=b"1", key=b"1", ) + + @pytest.mark.asyncio + async def test_raise_buffer_overflow_exception( + self, queue: str, mock: Mock + ) -> None: + pub_broker = self.get_broker(max_batch_size=16) + + @pub_broker.subscriber(queue) + async def handler(m) -> None: + pass + + async with self.patch_broker(pub_broker) as br: + await br.start() + with pytest.raises(BatchBufferOverflowException) as e: + await br.publish_batch(1, "Hello, world!", topic=queue, no_confirm=True) + assert e.value.message_position == 1 From 2374bd201cd64f49d6d3257c9a87473a66f22bfd Mon Sep 17 00:00:00 2001 From: spataphore1337 Date: Mon, 16 Dec 2024 09:42:01 +0000 Subject: [PATCH 2/6] docs: generate API References --- docs/docs/SUMMARY.md | 2 ++ .../kafka/exceptions/BatchBufferOverflowException.md | 11 +++++++++++ 2 files changed, 13 insertions(+) create mode 100644 docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 95f4245e0f..a8a2947235 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -618,6 +618,8 @@ search: - [KafkaLoggingBroker](api/faststream/kafka/broker/logging/KafkaLoggingBroker.md) - registrator - [KafkaRegistrator](api/faststream/kafka/broker/registrator/KafkaRegistrator.md) + - exceptions + - [BatchBufferOverflowException](api/faststream/kafka/exceptions/BatchBufferOverflowException.md) - fastapi - [Context](api/faststream/kafka/fastapi/Context.md) - [KafkaRouter](api/faststream/kafka/fastapi/KafkaRouter.md) diff --git a/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md b/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md new file mode 100644 index 0000000000..824f6dc2d1 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.exceptions.BatchBufferOverflowException From 855a1595a3290ea01d680d59e885a039359e6397 Mon Sep 17 00:00:00 2001 From: spataphore1337 Date: Mon, 16 Dec 2024 12:46:49 +0300 Subject: [PATCH 3/6] fix: use pre-commit --- docs/docs/en/public_api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/en/public_api b/docs/docs/en/public_api index b14a93fe9e..2c417d1ba5 120000 --- a/docs/docs/en/public_api +++ b/docs/docs/en/public_api @@ -1 +1 @@ -./api/ \ No newline at end of file +./api/ From e4713dfa9c8ddbb689a7750e1142e52bcca4b9db Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Mon, 16 Dec 2024 12:51:32 +0300 Subject: [PATCH 4/6] Update producer.py --- faststream/kafka/publisher/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index b6f0022912..78493dbf2d 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -101,7 +101,7 @@ async def publish_batch( reply_to, ) - for message_position, msg in msgs: + for message_position, msg in enumerate(msgs): message, content_type = encode_message(msg) if content_type: From 6e607611bd475fd4292e34bde7e6be50c3f247b4 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Mon, 16 Dec 2024 13:56:14 +0300 Subject: [PATCH 5/6] chore: fix CI --- docs/docs/en/public_api | 2 +- tests/brokers/kafka/test_publish.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/docs/en/public_api b/docs/docs/en/public_api index 2c417d1ba5..9e5bfb42d2 120000 --- a/docs/docs/en/public_api +++ b/docs/docs/en/public_api @@ -1 +1 @@ -./api/ +api \ No newline at end of file diff --git a/tests/brokers/kafka/test_publish.py b/tests/brokers/kafka/test_publish.py index a75e6f1ce8..ac2c866362 100644 --- a/tests/brokers/kafka/test_publish.py +++ b/tests/brokers/kafka/test_publish.py @@ -1,4 +1,5 @@ import asyncio +from typing import Any from unittest.mock import Mock import pytest @@ -11,8 +12,8 @@ @pytest.mark.kafka class TestPublish(BrokerPublishTestcase): - def get_broker(self, apply_types: bool = False): - return KafkaBroker(apply_types=apply_types) + def get_broker(self, apply_types: bool = False, **kwargs: Any) -> KafkaBroker: + return KafkaBroker(apply_types=apply_types, **kwargs) @pytest.mark.asyncio async def test_publish_batch(self, queue: str): From ab47f76742f64bd611f96b6b72c5e793ec5b0586 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Mon, 16 Dec 2024 14:11:34 +0300 Subject: [PATCH 6/6] chore: fix CI --- tests/opentelemetry/kafka/test_kafka.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/opentelemetry/kafka/test_kafka.py b/tests/opentelemetry/kafka/test_kafka.py index 0967069ade..cc38a66281 100644 --- a/tests/opentelemetry/kafka/test_kafka.py +++ b/tests/opentelemetry/kafka/test_kafka.py @@ -261,17 +261,19 @@ async def handler(m, baggage: CurrentBaggage): @pytest.mark.kafka class TestPublishWithTelemetry(TestPublish): - def get_broker(self, apply_types: bool = False): + def get_broker(self, apply_types: bool = False, **kwargs): return KafkaBroker( middlewares=(KafkaTelemetryMiddleware(),), apply_types=apply_types, + **kwargs, ) @pytest.mark.kafka class TestConsumeWithTelemetry(TestConsume): - def get_broker(self, apply_types: bool = False): + def get_broker(self, apply_types: bool = False, **kwargs): return KafkaBroker( middlewares=(KafkaTelemetryMiddleware(),), apply_types=apply_types, + **kwargs, )