From 84089d3277f3850169c226d735607f92259f34a8 Mon Sep 17 00:00:00 2001 From: "d.rudenko" Date: Tue, 17 Dec 2024 13:30:09 +0100 Subject: [PATCH 1/4] Removed async_grpc --- qdrant_client/qdrant_client.py | 34 -------- qdrant_client/qdrant_remote.py | 84 ------------------- tests/test_async_qdrant_client.py | 76 ----------------- tests/test_qdrant_client.py | 18 ---- .../client_generator.py | 3 - .../remote_generator.py | 9 -- 6 files changed, 224 deletions(-) diff --git a/qdrant_client/qdrant_client.py b/qdrant_client/qdrant_client.py index 0f0a2c45..7ea64764 100644 --- a/qdrant_client/qdrant_client.py +++ b/qdrant_client/qdrant_client.py @@ -192,40 +192,6 @@ def grpc_points(self) -> grpc.PointsStub: raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}") - @property - def async_grpc_points(self) -> grpc.PointsStub: - """gRPC client for points methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - warnings.warn( - "async_grpc_points is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_points` instead.", - DeprecationWarning, - stacklevel=2, - ) - if isinstance(self._client, QdrantRemote): - return self._client.async_grpc_points - - raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}") - - @property - def async_grpc_collections(self) -> grpc.CollectionsStub: - """gRPC client for collections methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - warnings.warn( - "async_grpc_collections is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_collections` instead.", - DeprecationWarning, - stacklevel=2, - ) - if isinstance(self._client, QdrantRemote): - return self._client.async_grpc_collections - - raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}") - @property def rest(self) -> SyncApis[ApiClient]: """REST Client diff --git a/qdrant_client/qdrant_remote.py b/qdrant_client/qdrant_remote.py index c3b1cc46..7e7b39de 100644 --- a/qdrant_client/qdrant_remote.py +++ b/qdrant_client/qdrant_remote.py @@ -271,21 +271,6 @@ def _init_grpc_channel(self) -> None: auth_token_provider=self._auth_token_provider, # type: ignore ) - def _init_async_grpc_channel(self) -> None: - if self._closed: - raise RuntimeError("Client was closed. Please create a new QdrantClient instance.") - - if self._aio_grpc_channel is None: - self._aio_grpc_channel = get_async_channel( - host=self._host, - port=self._grpc_port, - ssl=self._https, - metadata=self._grpc_headers, - options=self._grpc_options, - compression=self._grpc_compression, - auth_token_provider=self._auth_token_provider, - ) - def _init_grpc_points_client(self) -> None: self._init_grpc_channel() self._grpc_points_client = grpc.PointsStub(self._grpc_channel) @@ -302,75 +287,6 @@ def _init_grpc_root_client(self) -> None: self._init_grpc_channel() self._grpc_root_client = grpc.QdrantStub(self._grpc_channel) - def _init_async_grpc_points_client(self) -> None: - self._init_async_grpc_channel() - self._aio_grpc_points_client = grpc.PointsStub(self._aio_grpc_channel) - - def _init_async_grpc_collections_client(self) -> None: - self._init_async_grpc_channel() - self._aio_grpc_collections_client = grpc.CollectionsStub(self._aio_grpc_channel) - - def _init_async_grpc_snapshots_client(self) -> None: - self._init_async_grpc_channel() - self._aio_grpc_snapshots_client = grpc.SnapshotsStub(self._aio_grpc_channel) - - def _init_async_grpc_root_client(self) -> None: - self._init_async_grpc_channel() - self._aio_grpc_root_client = grpc.QdrantStub(self._aio_grpc_channel) - - @property - def async_grpc_collections(self) -> grpc.CollectionsStub: - """gRPC client for collections methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - if self._aio_grpc_collections_client is None: - self._init_async_grpc_collections_client() - return self._aio_grpc_collections_client - - @property - def async_grpc_points(self) -> grpc.PointsStub: - """gRPC client for points methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - if self._aio_grpc_points_client is None: - self._init_async_grpc_points_client() - return self._aio_grpc_points_client - - @property - def async_grpc_snapshots(self) -> grpc.SnapshotsStub: - """gRPC client for snapshots methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - warnings.warn( - "async_grpc_snapshots is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_snapshots` instead.", - DeprecationWarning, - stacklevel=2, - ) - if self._aio_grpc_snapshots_client is None: - self._init_async_grpc_snapshots_client() - return self._aio_grpc_snapshots_client - - @property - def async_grpc_root(self) -> grpc.QdrantStub: - """gRPC client for info methods - - Returns: - An instance of raw gRPC client, generated from Protobuf - """ - warnings.warn( - "async_grpc_root is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_root` instead.", - DeprecationWarning, - stacklevel=2, - ) - if self._aio_grpc_root_client is None: - self._init_async_grpc_root_client() - return self._aio_grpc_root_client @property def grpc_collections(self) -> grpc.CollectionsStub: diff --git a/tests/test_async_qdrant_client.py b/tests/test_async_qdrant_client.py index 8847c8b6..29a39e9c 100644 --- a/tests/test_async_qdrant_client.py +++ b/tests/test_async_qdrant_client.py @@ -7,12 +7,8 @@ import pytest import qdrant_client.http.exceptions -from qdrant_client import QdrantClient -from qdrant_client import grpc as qdrant_grpc from qdrant_client import models from qdrant_client.async_qdrant_client import AsyncQdrantClient -from qdrant_client.conversions.conversion import payload_to_grpc -from tests.fixtures.payload import one_random_payload_please from tests.utils import read_version NUM_VECTORS = 100 @@ -21,78 +17,6 @@ COLLECTION_NAME = "async_test_collection" -@pytest.mark.asyncio -async def test_async_grpc(): - points = ( - qdrant_grpc.PointStruct( - id=qdrant_grpc.PointId(num=idx), - vectors=qdrant_grpc.Vectors( - vector=qdrant_grpc.Vector(data=np.random.rand(DIM).tolist()) - ), - payload=payload_to_grpc(one_random_payload_please(idx)), - ) - for idx in range(NUM_VECTORS) - ) - - client = QdrantClient(prefer_grpc=True, timeout=3) - - grpc_collections = client.async_grpc_collections - - res = await grpc_collections.List(qdrant_grpc.ListCollectionsRequest(), timeout=1.0) - - for collection in res.collections: - print(collection.name) - await grpc_collections.Delete( - qdrant_grpc.DeleteCollection(collection_name=collection.name) - ) - - await grpc_collections.Create( - qdrant_grpc.CreateCollection( - collection_name=COLLECTION_NAME, - vectors_config=qdrant_grpc.VectorsConfig( - params=qdrant_grpc.VectorParams(size=DIM, distance=qdrant_grpc.Distance.Cosine) - ), - ) - ) - - grpc_points = client.async_grpc_points - - upload_features = [] - - # Upload vectors in parallel - for point in points: - upload_features.append( - grpc_points.Upsert( - qdrant_grpc.UpsertPoints( - collection_name=COLLECTION_NAME, wait=True, points=[point] - ) - ) - ) - await asyncio.gather(*upload_features) - - queries = [np.random.rand(DIM).tolist() for _ in range(NUM_QUERIES)] - - # Make async queries - search_queries = [] - for query in queries: - search_query = grpc_points.Search( - qdrant_grpc.SearchPoints( - collection_name=COLLECTION_NAME, - vector=query, - limit=10, - ) - ) - search_queries.append(search_query) - results = await asyncio.gather(*search_queries) # All queries are running in parallel now - - assert len(results) == NUM_QUERIES - - for result in results: - assert len(result.result) == 10 - - client.close() - - @pytest.mark.asyncio @pytest.mark.parametrize("prefer_grpc", [True, False]) async def test_async_qdrant_client(prefer_grpc): diff --git a/tests/test_qdrant_client.py b/tests/test_qdrant_client.py index 883a5012..25eebdd1 100644 --- a/tests/test_qdrant_client.py +++ b/tests/test_qdrant_client.py @@ -1888,24 +1888,6 @@ def test_client_close(): RuntimeError ): # prevent initializing grpc connection, since http connection is closed _ = client_grpc_do_nothing.get_collection("test") - - client_aio_grpc = QdrantClient(prefer_grpc=True, timeout=TIMEOUT) - _ = client_aio_grpc.async_grpc_collections - client_aio_grpc.close() - - client_aio_grpc = QdrantClient(prefer_grpc=True, timeout=TIMEOUT) - _ = client_aio_grpc.async_grpc_collections - client_aio_grpc.close(grace=2.0) - with pytest.raises(RuntimeError): - client_aio_grpc._client._init_async_grpc_channel() # prevent reinitializing grpc connection, since - # http connection is closed - - client_aio_grpc_do_nothing = QdrantClient(prefer_grpc=True, timeout=TIMEOUT) - client_aio_grpc_do_nothing.close() - with pytest.raises( - RuntimeError - ): # prevent initializing grpc connection, since http connection is closed - _ = client_aio_grpc_do_nothing.async_grpc_collections # endregion grpc # region local diff --git a/tools/async_client_generator/client_generator.py b/tools/async_client_generator/client_generator.py index 833dea4f..6c1b0d1e 100644 --- a/tools/async_client_generator/client_generator.py +++ b/tools/async_client_generator/client_generator.py @@ -94,9 +94,6 @@ def get_async_methods(class_obj: type) -> list[str]: exclude_methods=[ "__del__", "migrate", - "async_grpc_collections", - "async_grpc_points", - "async_grpc_root", ], ) diff --git a/tools/async_client_generator/remote_generator.py b/tools/async_client_generator/remote_generator.py index 552b3e18..00782c13 100644 --- a/tools/async_client_generator/remote_generator.py +++ b/tools/async_client_generator/remote_generator.py @@ -137,15 +137,6 @@ def get_async_methods(class_obj: type) -> list[str]: exclude_methods=[ "__del__", "migrate", - "async_grpc_collections", - "async_grpc_points", - "async_grpc_snapshots", - "async_grpc_root", - "_init_async_grpc_points_client", - "_init_async_grpc_collections_client", - "_init_async_grpc_snapshots_client", - "_init_async_grpc_channel", - "_init_async_grpc_root_client", ], ) From 552a4fcfc46abe604b6c39b9cef606be2c28f8ca Mon Sep 17 00:00:00 2001 From: "d.rudenko" Date: Wed, 18 Dec 2024 16:28:17 +0100 Subject: [PATCH 2/4] Removal of asyncio --- qdrant_client/qdrant_remote.py | 15 +--------- tests/test_qdrant_client.py | 30 ------------------- .../remote/import_from_transformer.py | 7 +++-- 3 files changed, 5 insertions(+), 47 deletions(-) diff --git a/qdrant_client/qdrant_remote.py b/qdrant_client/qdrant_remote.py index 7e7b39de..3c3c151a 100644 --- a/qdrant_client/qdrant_remote.py +++ b/qdrant_client/qdrant_remote.py @@ -1,4 +1,3 @@ -import asyncio import importlib.metadata import logging import math @@ -28,7 +27,7 @@ from qdrant_client.auth import BearerAuth from qdrant_client.client_base import QdrantBase from qdrant_client.common.version_check import is_compatible, get_server_version -from qdrant_client.connection import get_async_channel, get_channel +from qdrant_client.connection import get_channel from qdrant_client.conversions import common_types as types from qdrant_client.conversions.common_types import get_args_subscribed from qdrant_client.conversions.conversion import ( @@ -188,7 +187,6 @@ def __init__( self._grpc_snapshots_client: Optional[grpc.SnapshotsStub] = None self._grpc_root_client: Optional[grpc.QdrantStub] = None - self._aio_grpc_channel = None self._aio_grpc_points_client: Optional[grpc.PointsStub] = None self._aio_grpc_collections_client: Optional[grpc.CollectionsStub] = None self._aio_grpc_snapshots_client: Optional[grpc.SnapshotsStub] = None @@ -223,17 +221,6 @@ def close(self, grpc_grace: Optional[float] = None, **kwargs: Any) -> None: "Unable to close grpc_channel. Connection was interrupted on the server side" ) - if hasattr(self, "_aio_grpc_channel") and self._aio_grpc_channel is not None: - try: - loop = asyncio.get_running_loop() - loop.create_task(self._aio_grpc_channel.close(grace=grpc_grace)) - except AttributeError: - logging.warning( - "Unable to close aio_grpc_channel. Connection was interrupted on the server side" - ) - except RuntimeError: - pass - try: self.openapi_client.close() except Exception: diff --git a/tests/test_qdrant_client.py b/tests/test_qdrant_client.py index 25eebdd1..95877b7e 100644 --- a/tests/test_qdrant_client.py +++ b/tests/test_qdrant_client.py @@ -2051,36 +2051,6 @@ def auth_token_provider(): client.unlock_storage() assert token == "token_2" - -def test_async_auth_token_provider(): - """Check that initialization fails if async auth_token_provider is provided to sync client.""" - token = "" - - async def auth_token_provider(): - nonlocal token - await asyncio.sleep(0.1) - token = "test_token" - return token - - client = QdrantClient(auth_token_provider=auth_token_provider) - - with pytest.raises( - qdrant_client.http.exceptions.ResponseHandlingException, - match="Synchronous token provider is not set.", - ): - client.get_collections() - - assert token == "" - - client = QdrantClient(auth_token_provider=auth_token_provider, prefer_grpc=True) - with pytest.raises( - ValueError, match="Synchronous channel requires synchronous auth token provider." - ): - client.get_collections() - - assert token == "" - - @pytest.mark.parametrize("prefer_grpc", [True, False]) def test_read_consistency(prefer_grpc): fixture_points = generate_fixtures(vectors_sizes=DIM, num=NUM_VECTORS) diff --git a/tools/async_client_generator/transformers/remote/import_from_transformer.py b/tools/async_client_generator/transformers/remote/import_from_transformer.py index b00dc052..1c8d6488 100644 --- a/tools/async_client_generator/transformers/remote/import_from_transformer.py +++ b/tools/async_client_generator/transformers/remote/import_from_transformer.py @@ -18,8 +18,9 @@ def visit_ImportFrom(self, node: ast.ImportFrom) -> ast.AST: if hasattr(alias, "name"): for old_value, new_value in self.import_replace_map.items(): alias.name = alias.name.replace(old_value, new_value) - if alias.name == "get_async_channel": - alias.asname = "get_channel" - node.names = [alias for alias in node.names if alias.name != "get_channel"] + if alias.name == "get_channel": + alias.name = "get_async_channel as get_channel" + # alias.asname = "get_channel" + # node.names = [alias for alias in node.names if alias.name != "get_channel"] return self.generic_visit(node) From 3b46c802c5ff1611caa13c93011512c3cdfff83e Mon Sep 17 00:00:00 2001 From: "d.rudenko" Date: Wed, 18 Dec 2024 21:32:51 +0100 Subject: [PATCH 3/4] Return needed test --- tests/test_qdrant_client.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test_qdrant_client.py b/tests/test_qdrant_client.py index 95877b7e..098c8af2 100644 --- a/tests/test_qdrant_client.py +++ b/tests/test_qdrant_client.py @@ -2051,6 +2051,35 @@ def auth_token_provider(): client.unlock_storage() assert token == "token_2" + +def test_async_auth_token_provider(): + """Check that initialization fails if async auth_token_provider is provided to sync client.""" + token = "" + + async def auth_token_provider(): + nonlocal token + await asyncio.sleep(0.1) + token = "test_token" + return token + + client = QdrantClient(auth_token_provider=auth_token_provider) + + with pytest.raises( + qdrant_client.http.exceptions.ResponseHandlingException, + match="Synchronous token provider is not set.", + ): + client.get_collections() + + assert token == "" + + client = QdrantClient(auth_token_provider=auth_token_provider, prefer_grpc=True) + with pytest.raises( + ValueError, match="Synchronous channel requires synchronous auth token provider." + ): + client.get_collections() + + assert token == "" + @pytest.mark.parametrize("prefer_grpc", [True, False]) def test_read_consistency(prefer_grpc): fixture_points = generate_fixtures(vectors_sizes=DIM, num=NUM_VECTORS) From cedce3fed75f241605632014383a292b497fe999 Mon Sep 17 00:00:00 2001 From: "d.rudenko" Date: Wed, 18 Dec 2024 21:43:10 +0100 Subject: [PATCH 4/4] Proper fix of renaming --- .../transformers/remote/import_from_transformer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tools/async_client_generator/transformers/remote/import_from_transformer.py b/tools/async_client_generator/transformers/remote/import_from_transformer.py index 1c8d6488..14faee63 100644 --- a/tools/async_client_generator/transformers/remote/import_from_transformer.py +++ b/tools/async_client_generator/transformers/remote/import_from_transformer.py @@ -19,8 +19,7 @@ def visit_ImportFrom(self, node: ast.ImportFrom) -> ast.AST: for old_value, new_value in self.import_replace_map.items(): alias.name = alias.name.replace(old_value, new_value) if alias.name == "get_channel": - alias.name = "get_async_channel as get_channel" - # alias.asname = "get_channel" - # node.names = [alias for alias in node.names if alias.name != "get_channel"] + alias.name = "get_async_channel" + alias.asname = "get_channel" return self.generic_visit(node)