From e3a5361c1cbcf69742ab7da9fa50da6ea050dca5 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Thu, 11 Jul 2024 20:55:52 +0700 Subject: [PATCH 01/15] Feat: init ping method, add rabbit test ping --- faststream/broker/core/usecase.py | 7 ++++++- faststream/rabbit/broker/broker.py | 10 ++++++++++ tests/brokers/base/connection.py | 18 ++++++++++++++++++ tests/brokers/rabbit/test_connect.py | 16 ++++++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index c226850ace..28e63852f3 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -27,7 +27,7 @@ BrokerMiddleware, ConnectionType, CustomCallable, - MsgType, + MsgType ) from faststream.exceptions import NOT_CONNECTED_YET from faststream.log.logging import set_logger_fmt @@ -342,3 +342,8 @@ async def publish( publish = partial(m(None).publish_scope, publish) return await publish(msg, **kwargs) + + @abstractmethod + async def ping(self, timeout: int | None) -> bool: + """Check connection alive.""" + raise NotImplementedError() diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 6cb357fef7..aff4a477f8 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -655,3 +655,13 @@ async def declare_exchange( """Declares exchange object in **RabbitMQ**.""" assert self.declarer, NOT_CONNECTED_YET # nosec B101 return await self.declarer.declare_exchange(exchange) + + async def ping(self, timeout: int | None) -> bool: + await super().ping(timeout) + try: + channel = await self._connection.channel() + await channel.declare_queue(name='test_queue', durable=True) + await channel.close() + return True + except Exception: + return False diff --git a/tests/brokers/base/connection.py b/tests/brokers/base/connection.py index 9455048669..20bb42170c 100644 --- a/tests/brokers/base/connection.py +++ b/tests/brokers/base/connection.py @@ -46,3 +46,21 @@ async def test_connect_by_url_priority(self, settings): await broker.connect(**kwargs) assert await self.ping(broker) await broker.close() + + @pytest.mark.asyncio() + async def test_connection_alive(self, settings): + kwargs = self.get_broker_args(settings) + broker = self.broker() + await broker.connect(**kwargs) + alive = await broker.ping(timeout=5) + assert alive + await broker.close() + + @pytest.mark.asyncio() + async def test_connection_not_alive(self, settings): + kwargs = self.get_broker_args(settings) + broker = self.broker() + await broker.connect(**kwargs) + await broker.close() + alive = await broker.ping(timeout=5) + assert not alive diff --git a/tests/brokers/rabbit/test_connect.py b/tests/brokers/rabbit/test_connect.py index 968d2e2301..3a590a320d 100644 --- a/tests/brokers/rabbit/test_connect.py +++ b/tests/brokers/rabbit/test_connect.py @@ -58,3 +58,19 @@ async def test_connect_merge_args_and_kwargs_native(self, settings): broker = self.broker("fake-url") # will be ignored assert await broker.connect(url=settings.url) await broker.close() + + @pytest.mark.asyncio() + async def test_connection_alive(self, settings): + broker = self.broker("fake-url") # will be ignored + await broker.connect(url=settings.url) + alive = await broker.ping(timeout=5) + assert alive + await broker.close() + + @pytest.mark.asyncio() + async def test_connection_not_alive(self, settings): + broker = self.broker("fake-url") # will be ignored + await broker.connect(url=settings.url) + await broker.close() + alive = await broker.ping(timeout=5) + assert not alive From 8767bbc71313e51475205c4915681412d8706a16 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Fri, 12 Jul 2024 00:26:27 +0700 Subject: [PATCH 02/15] Fix: add timeout test, change logic ping --- faststream/broker/core/usecase.py | 4 ++-- faststream/rabbit/broker/broker.py | 20 ++++++++++---------- tests/brokers/base/connection.py | 19 +++++-------------- tests/brokers/rabbit/test_connect.py | 16 ---------------- 4 files changed, 17 insertions(+), 42 deletions(-) diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index 28e63852f3..5106268902 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -27,7 +27,7 @@ BrokerMiddleware, ConnectionType, CustomCallable, - MsgType + MsgType, ) from faststream.exceptions import NOT_CONNECTED_YET from faststream.log.logging import set_logger_fmt @@ -344,6 +344,6 @@ async def publish( return await publish(msg, **kwargs) @abstractmethod - async def ping(self, timeout: int | None) -> bool: + async def ping(self, timeout: Optional[float]) -> bool: """Check connection alive.""" raise NotImplementedError() diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index aff4a477f8..8dafaa1ed2 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -12,6 +12,7 @@ ) from urllib.parse import urlparse +import anyio from aio_pika import connect_robust from typing_extensions import Annotated, Doc, override @@ -655,13 +656,12 @@ async def declare_exchange( """Declares exchange object in **RabbitMQ**.""" assert self.declarer, NOT_CONNECTED_YET # nosec B101 return await self.declarer.declare_exchange(exchange) - - async def ping(self, timeout: int | None) -> bool: - await super().ping(timeout) - try: - channel = await self._connection.channel() - await channel.declare_queue(name='test_queue', durable=True) - await channel.close() - return True - except Exception: - return False + + async def ping(self, timeout: Optional[float]) -> bool: + with anyio.move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + if not self._connection or self._connection.is_closed or not self._connection.transport: + return False + return True + diff --git a/tests/brokers/base/connection.py b/tests/brokers/base/connection.py index 20bb42170c..3307698f61 100644 --- a/tests/brokers/base/connection.py +++ b/tests/brokers/base/connection.py @@ -11,8 +11,9 @@ class BrokerConnectionTestcase: def get_broker_args(self, settings): return {} + @pytest.mark.asyncio() async def ping(self, broker) -> bool: - return True + return await broker.ping(timeout=5.0) @pytest.mark.asyncio() async def test_close_before_start(self, async_mock): @@ -48,19 +49,9 @@ async def test_connect_by_url_priority(self, settings): await broker.close() @pytest.mark.asyncio() - async def test_connection_alive(self, settings): - kwargs = self.get_broker_args(settings) - broker = self.broker() - await broker.connect(**kwargs) - alive = await broker.ping(timeout=5) - assert alive - await broker.close() - - @pytest.mark.asyncio() - async def test_connection_not_alive(self, settings): + async def test_ping_timeout(self, settings): kwargs = self.get_broker_args(settings) - broker = self.broker() + broker = self.broker("wrong_url") await broker.connect(**kwargs) + assert not await broker.ping(timeout=0.00001) await broker.close() - alive = await broker.ping(timeout=5) - assert not alive diff --git a/tests/brokers/rabbit/test_connect.py b/tests/brokers/rabbit/test_connect.py index 3a590a320d..968d2e2301 100644 --- a/tests/brokers/rabbit/test_connect.py +++ b/tests/brokers/rabbit/test_connect.py @@ -58,19 +58,3 @@ async def test_connect_merge_args_and_kwargs_native(self, settings): broker = self.broker("fake-url") # will be ignored assert await broker.connect(url=settings.url) await broker.close() - - @pytest.mark.asyncio() - async def test_connection_alive(self, settings): - broker = self.broker("fake-url") # will be ignored - await broker.connect(url=settings.url) - alive = await broker.ping(timeout=5) - assert alive - await broker.close() - - @pytest.mark.asyncio() - async def test_connection_not_alive(self, settings): - broker = self.broker("fake-url") # will be ignored - await broker.connect(url=settings.url) - await broker.close() - alive = await broker.ping(timeout=5) - assert not alive From d065d60047582f7a92c9c7def5be5c9f00546d88 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sat, 13 Jul 2024 00:42:56 +0700 Subject: [PATCH 03/15] Feat: add check redis connection --- faststream/broker/core/usecase.py | 2 +- faststream/rabbit/broker/broker.py | 1 + faststream/redis/broker/broker.py | 9 +++++++++ tests/brokers/redis/test_connect.py | 4 ---- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index 5106268902..d8aae6064c 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -342,7 +342,7 @@ async def publish( publish = partial(m(None).publish_scope, publish) return await publish(msg, **kwargs) - + @abstractmethod async def ping(self, timeout: Optional[float]) -> bool: """Check connection alive.""" diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 8dafaa1ed2..b58af27cf6 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -657,6 +657,7 @@ async def declare_exchange( assert self.declarer, NOT_CONNECTED_YET # nosec B101 return await self.declarer.declare_exchange(exchange) + @override async def ping(self, timeout: Optional[float]) -> bool: with anyio.move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 4f30e8adfb..2aa0f5d3d1 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -13,6 +13,7 @@ ) from urllib.parse import urlparse +import anyio from redis.asyncio.client import Redis from redis.asyncio.connection import ( Connection, @@ -477,3 +478,11 @@ async def publish_batch( list=list, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with anyio.move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + else: + return await self._connection.ping() diff --git a/tests/brokers/redis/test_connect.py b/tests/brokers/redis/test_connect.py index 305fee821f..cd10fe8ca0 100644 --- a/tests/brokers/redis/test_connect.py +++ b/tests/brokers/redis/test_connect.py @@ -15,10 +15,6 @@ def get_broker_args(self, settings): "port": settings.port, } - async def ping(self, broker: RedisBroker) -> bool: - await broker._connection.ping() - return True - @pytest.mark.asyncio() async def test_init_connect_by_raw_data(self, settings): async with RedisBroker( From ddec8d4bcec1724e7f4f907dda362ee14caca6fa Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 14 Jul 2024 18:21:33 +0700 Subject: [PATCH 04/15] lint code, add ping for nats and kafka --- faststream/confluent/broker/broker.py | 4 ++++ faststream/kafka/broker/broker.py | 9 +++++++++ faststream/nats/broker/broker.py | 9 +++++++++ faststream/rabbit/broker/broker.py | 5 ++--- faststream/redis/broker/broker.py | 4 ++-- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 45c3f8fc1a..ae0a1ac578 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -523,3 +523,7 @@ async def publish_batch( reply_to=reply_to, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + pass diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 0827e09060..ad47a3bdc4 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -19,6 +19,7 @@ import aiokafka from aiokafka.partitioner import DefaultPartitioner from aiokafka.producer.producer import _missing +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -806,3 +807,11 @@ async def publish_batch( reply_to=reply_to, correlation_id=correlation_id, ) + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + else: + return not self._producer._producer._closed diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index fefcc069a8..e0c1546a15 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -14,6 +14,7 @@ ) import nats +from anyio import move_on_after from nats.aio.client import ( DEFAULT_CONNECT_TIMEOUT, DEFAULT_DRAIN_TIMEOUT, @@ -914,3 +915,11 @@ async def new_inbox(self) -> str: assert self._connection # nosec B101 return self._connection.new_inbox() + + @override + async def ping(self, timeout: Optional[float]) -> bool: + with move_on_after(timeout) as cancel_scope: + if cancel_scope.cancel_called: + return False + else: + return self._connection.is_connected diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index b58af27cf6..74ff59c352 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -12,8 +12,8 @@ ) from urllib.parse import urlparse -import anyio from aio_pika import connect_robust +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -659,10 +659,9 @@ async def declare_exchange( @override async def ping(self, timeout: Optional[float]) -> bool: - with anyio.move_on_after(timeout) as cancel_scope: + with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False if not self._connection or self._connection.is_closed or not self._connection.transport: return False return True - diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 2aa0f5d3d1..62e851b396 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -13,7 +13,7 @@ ) from urllib.parse import urlparse -import anyio +from anyio import move_on_after from redis.asyncio.client import Redis from redis.asyncio.connection import ( Connection, @@ -481,7 +481,7 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: - with anyio.move_on_after(timeout) as cancel_scope: + with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False else: From b395bd6196cb96b6bf4c63b5a8ba693d55919c4f Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 14 Jul 2024 20:20:25 +0700 Subject: [PATCH 05/15] Feat: add confluent ping --- faststream/confluent/broker/broker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index ae0a1ac578..1eb78fe8b0 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -17,6 +17,7 @@ Union, ) +from anyio import move_on_after from typing_extensions import Annotated, Doc, override from faststream.__about__ import SERVICE_NAME @@ -526,4 +527,8 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: - pass + with move_on_after(timeout) as cancel_scope: + await self.close() + if cancel_scope.cancel_called or not self._producer: + return False + return True From a70941835cfdb8764434b6a80de8a3aa7f8d93ee Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 14 Jul 2024 21:04:21 +0700 Subject: [PATCH 06/15] Fix: clear code --- faststream/kafka/broker/broker.py | 9 +++++---- faststream/rabbit/broker/broker.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index ad47a3bdc4..147d23c327 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -811,7 +811,8 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: - if cancel_scope.cancel_called: - return False - else: - return not self._producer._producer._closed + return not ( + cancel_scope.cancel_called or + self._producer is None or + self._producer._producer._closed + ) diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 74ff59c352..0374df0852 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -662,6 +662,6 @@ async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False - if not self._connection or self._connection.is_closed or not self._connection.transport: + if not self._connection or self._connection.is_closed: return False return True From 979f112a6c97eb424108518b56150c6f3724e44d Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Mon, 15 Jul 2024 16:43:39 +0700 Subject: [PATCH 07/15] Fix: lint --- faststream/confluent/broker/broker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 1eb78fe8b0..4cb8d1c6de 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -528,7 +528,7 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: - await self.close() - if cancel_scope.cancel_called or not self._producer: - return False - return True + return not ( + cancel_scope.cancel_called or self._producer is None + ) + From e7ee4f59c923601c9938b665bfc186be95a979ba Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Mon, 15 Jul 2024 16:47:40 +0700 Subject: [PATCH 08/15] Fix: run lint.sh --- faststream/confluent/broker/broker.py | 5 +---- faststream/kafka/broker/broker.py | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 4cb8d1c6de..c41e3bafcf 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -528,7 +528,4 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: - return not ( - cancel_scope.cancel_called or self._producer is None - ) - + return not (cancel_scope.cancel_called or self._producer is None) diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 147d23c327..d0f5b56945 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -812,7 +812,7 @@ async def publish_batch( async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: return not ( - cancel_scope.cancel_called or - self._producer is None or - self._producer._producer._closed + cancel_scope.cancel_called + or self._producer is None + or self._producer._producer._closed ) From d55cad257c3f57a6c07503c7bc7d44da3d711ef3 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Mon, 15 Jul 2024 22:14:31 +0300 Subject: [PATCH 09/15] refactor: unify ping method code --- faststream/kafka/broker/broker.py | 12 +++++++----- faststream/nats/broker/broker.py | 7 +++++-- faststream/rabbit/broker/broker.py | 6 ++++-- faststream/redis/broker/broker.py | 7 +++++-- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index d0f5b56945..9eb9d8bd1b 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -811,8 +811,10 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: - return not ( - cancel_scope.cancel_called - or self._producer is None - or self._producer._producer._closed - ) + if cancel_scope.cancel_called: + return False + + if self._producer is None: + return False + + return not self._producer._producer._closed diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index e0c1546a15..74fa1f0bdf 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -921,5 +921,8 @@ async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False - else: - return self._connection.is_connected + + if self._connection is None: + return False + + return self._connection.is_connected diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 0374df0852..c12ec28bae 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -662,6 +662,8 @@ async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False - if not self._connection or self._connection.is_closed: + + if self._connection is None: return False - return True + + return not self._connection.is_closed diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 62e851b396..aebfc1add9 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -484,5 +484,8 @@ async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False - else: - return await self._connection.ping() + + if self._connection is None: + return False + + return await self._connection.ping() From 4506219b146e8a09f5e4bb555adde716fc18ca07 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Mon, 15 Jul 2024 22:19:47 +0300 Subject: [PATCH 10/15] refactor: correct FastKafkaProducer signature --- faststream/kafka/publisher/producer.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 53ab290f0a..398dba7e4a 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -4,7 +4,6 @@ from faststream.broker.message import encode_message from faststream.broker.publisher.proto import ProducerProto -from faststream.exceptions import NOT_CONNECTED_YET if TYPE_CHECKING: from aiokafka import AIOKafkaProducer @@ -15,8 +14,6 @@ class AioKafkaFastProducer(ProducerProto): """A class to represent Kafka producer.""" - _producer: Optional["AIOKafkaProducer"] - def __init__( self, producer: "AIOKafkaProducer", @@ -37,8 +34,6 @@ async def publish( # type: ignore[override] reply_to: str = "", ) -> None: """Publish a message to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - message, content_type = encode_message(message) headers_to_send = { @@ -63,8 +58,7 @@ async def publish( # type: ignore[override] ) async def stop(self) -> None: - if self._producer is not None: # pragma: no branch - await self._producer.stop() + await self._producer.stop() async def publish_batch( self, @@ -77,8 +71,6 @@ async def publish_batch( reply_to: str = "", ) -> None: """Publish a batch of messages to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - batch = self._producer.create_batch() headers_to_send = {"correlation_id": correlation_id, **(headers or {})} From 6ab41fa01248f3efb03d50e37506ba51cfdb6194 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Mon, 15 Jul 2024 22:23:43 +0300 Subject: [PATCH 11/15] refactor: unify Confluent ping method code --- faststream/confluent/broker/broker.py | 6 +++++- faststream/confluent/publisher/producer.py | 10 +--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index c41e3bafcf..e59a326292 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -528,4 +528,8 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: with move_on_after(timeout) as cancel_scope: - return not (cancel_scope.cancel_called or self._producer is None) + if cancel_scope.cancel_called: + return False + + if self._producer is None: + return False diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index 0989ba5644..99c75d32b7 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -4,7 +4,6 @@ from faststream.broker.message import encode_message from faststream.broker.publisher.proto import ProducerProto -from faststream.exceptions import NOT_CONNECTED_YET if TYPE_CHECKING: from faststream.confluent.client import AsyncConfluentProducer @@ -14,8 +13,6 @@ class AsyncConfluentFastProducer(ProducerProto): """A class to represent Kafka producer.""" - _producer: Optional["AsyncConfluentProducer"] - def __init__( self, producer: "AsyncConfluentProducer", @@ -36,8 +33,6 @@ async def publish( # type: ignore[override] reply_to: str = "", ) -> None: """Publish a message to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - message, content_type = encode_message(message) headers_to_send = { @@ -62,8 +57,7 @@ async def publish( # type: ignore[override] ) async def stop(self) -> None: - if self._producer is not None: # pragma: no branch - await self._producer.stop() + await self._producer.stop() async def publish_batch( self, @@ -76,8 +70,6 @@ async def publish_batch( correlation_id: str = "", ) -> None: """Publish a batch of messages to a topic.""" - assert self._producer, NOT_CONNECTED_YET # nosec B101 - batch = self._producer.create_batch() headers_to_send = {"correlation_id": correlation_id, **(headers or {})} From 415a3acb16533fea6fd7a64b71de343ca2da16ab Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Wed, 17 Jul 2024 19:11:36 +0530 Subject: [PATCH 12/15] Add ping method to async confluent producer and fix confluent ping method --- faststream/confluent/broker/broker.py | 2 ++ faststream/confluent/client.py | 47 ++++++++++++++++++--------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index e59a326292..8d83719740 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -533,3 +533,5 @@ async def ping(self, timeout: Optional[float]) -> bool: if self._producer is None: return False + + return await self._producer._producer.ping(timeout=timeout) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 74b8dddbe1..9bff0e6d52 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -26,6 +26,19 @@ _missing = object() +ADMINCLIENT_CONFIG_PARAMS = ( + "allow.auto.create.topics", + "bootstrap.servers", + "client.id", + "request.timeout.ms", + "metadata.max.age.ms", + "security.protocol", + "connections.max.idle.ms", + "sasl.mechanism", + "sasl.username", + "sasl.password", +) + class MsgToSend(BaseModel): """A Pydantic model representing a message to be sent to Kafka. @@ -214,6 +227,24 @@ async def send_batch( ] await asyncio.gather(*tasks) + async def ping(self, timeout: Optional[float] = 5.0) -> bool: + """Implement ping using AdminClient.""" + try: + admin_client = AdminClient( + { + x: self.config[x] + for x in ADMINCLIENT_CONFIG_PARAMS + if x in self.config + } + ) + cluster_metadata = await call_or_await( + admin_client.list_topics, timeout=timeout + ) + + return bool(cluster_metadata) + except Exception: + return False + class TopicPartition(NamedTuple): """A named tuple representing a Kafka topic and partition.""" @@ -228,22 +259,8 @@ def create_topics( logger: Union["LoggerProto", None, object] = logger, ) -> None: """Creates Kafka topics using the provided configuration.""" - required_config_params = ( - "allow.auto.create.topics", - "bootstrap.servers", - "client.id", - "request.timeout.ms", - "metadata.max.age.ms", - "security.protocol", - "connections.max.idle.ms", - "sasl.mechanism", - "sasl.username", - "sasl.password", - "sasl.kerberos.service.name", - ) - admin_client = AdminClient( - {x: config[x] for x in required_config_params if x in config} + {x: config[x] for x in ADMINCLIENT_CONFIG_PARAMS if x in config} ) fs = admin_client.create_topics( From 5a56111712ab8012bec36b949c22a1c24b784d3f Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 17 Jul 2024 19:20:49 +0300 Subject: [PATCH 13/15] refactor: use Confluent producer to ping --- faststream/confluent/broker/broker.py | 12 +++++++++++- faststream/confluent/client.py | 26 ++++++++++++++++---------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 8d83719740..f0465205bd 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -527,6 +527,13 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: + random_topic: Optional[str] + + if sub := next(iter(self._subscribers.values()), None): + random_topic = next(iter(sub.topics), None) + else: + random_topic = None + with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False @@ -534,4 +541,7 @@ async def ping(self, timeout: Optional[float]) -> bool: if self._producer is None: return False - return await self._producer._producer.ping(timeout=timeout) + return await self._producer._producer.ping( + timeout=timeout, + topic=random_topic, + ) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 9bff0e6d52..b9d9740e88 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -227,21 +227,27 @@ async def send_batch( ] await asyncio.gather(*tasks) - async def ping(self, timeout: Optional[float] = 5.0) -> bool: - """Implement ping using AdminClient.""" + async def ping( + self, + timeout: Optional[float] = 5.0, + topic: Optional[str] = None, + ) -> bool: + """Implement ping using `list_topics` information request.""" + if timeout is None: + timeout = -1 + + kwargs = {"timeout": timeout} + if topic: + kwargs["topic"] = topic + try: - admin_client = AdminClient( - { - x: self.config[x] - for x in ADMINCLIENT_CONFIG_PARAMS - if x in self.config - } - ) cluster_metadata = await call_or_await( - admin_client.list_topics, timeout=timeout + self.producer.list_topics, + **kwargs, ) return bool(cluster_metadata) + except Exception: return False From 4a253aab7e8331c81ea713f2f5c50fafaaaec312 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 17 Jul 2024 19:24:03 +0300 Subject: [PATCH 14/15] lint: explicit kwargs type --- faststream/confluent/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index b9d9740e88..c36f4635d6 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -236,7 +236,7 @@ async def ping( if timeout is None: timeout = -1 - kwargs = {"timeout": timeout} + kwargs: Dict[str, Any] = {"timeout": timeout} if topic: kwargs["topic"] = topic From bdeed92fb9aca0c4a63955b1b738b19a1663c6a0 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 17 Jul 2024 19:33:52 +0300 Subject: [PATCH 15/15] refactor: remove concrete topic asking --- faststream/confluent/broker/broker.py | 12 +----------- faststream/confluent/client.py | 7 +------ 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index f0465205bd..8d83719740 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -527,13 +527,6 @@ async def publish_batch( @override async def ping(self, timeout: Optional[float]) -> bool: - random_topic: Optional[str] - - if sub := next(iter(self._subscribers.values()), None): - random_topic = next(iter(sub.topics), None) - else: - random_topic = None - with move_on_after(timeout) as cancel_scope: if cancel_scope.cancel_called: return False @@ -541,7 +534,4 @@ async def ping(self, timeout: Optional[float]) -> bool: if self._producer is None: return False - return await self._producer._producer.ping( - timeout=timeout, - topic=random_topic, - ) + return await self._producer._producer.ping(timeout=timeout) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index c36f4635d6..49d75b76de 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -230,20 +230,15 @@ async def send_batch( async def ping( self, timeout: Optional[float] = 5.0, - topic: Optional[str] = None, ) -> bool: """Implement ping using `list_topics` information request.""" if timeout is None: timeout = -1 - kwargs: Dict[str, Any] = {"timeout": timeout} - if topic: - kwargs["topic"] = topic - try: cluster_metadata = await call_or_await( self.producer.list_topics, - **kwargs, + timeout=timeout, ) return bool(cluster_metadata)