Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed async_grpc #866

Merged
merged 4 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions qdrant_client/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,40 +192,6 @@ def grpc_points(self) -> grpc.PointsStub:

raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}")

@property
def async_grpc_points(self) -> grpc.PointsStub:
"""gRPC client for points methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
warnings.warn(
"async_grpc_points is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_points` instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(self._client, QdrantRemote):
return self._client.async_grpc_points

raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}")

@property
def async_grpc_collections(self) -> grpc.CollectionsStub:
"""gRPC client for collections methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
warnings.warn(
"async_grpc_collections is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_collections` instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(self._client, QdrantRemote):
return self._client.async_grpc_collections

raise NotImplementedError(f"gRPC client is not supported for {type(self._client)}")

@property
def rest(self) -> SyncApis[ApiClient]:
"""REST Client
Expand Down
84 changes: 0 additions & 84 deletions qdrant_client/qdrant_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,6 @@ def _init_grpc_channel(self) -> None:
auth_token_provider=self._auth_token_provider, # type: ignore
)

def _init_async_grpc_channel(self) -> None:
if self._closed:
raise RuntimeError("Client was closed. Please create a new QdrantClient instance.")

if self._aio_grpc_channel is None:
self._aio_grpc_channel = get_async_channel(
host=self._host,
port=self._grpc_port,
ssl=self._https,
metadata=self._grpc_headers,
options=self._grpc_options,
compression=self._grpc_compression,
auth_token_provider=self._auth_token_provider,
)

def _init_grpc_points_client(self) -> None:
self._init_grpc_channel()
self._grpc_points_client = grpc.PointsStub(self._grpc_channel)
Expand All @@ -302,75 +287,6 @@ def _init_grpc_root_client(self) -> None:
self._init_grpc_channel()
self._grpc_root_client = grpc.QdrantStub(self._grpc_channel)

def _init_async_grpc_points_client(self) -> None:
self._init_async_grpc_channel()
self._aio_grpc_points_client = grpc.PointsStub(self._aio_grpc_channel)

def _init_async_grpc_collections_client(self) -> None:
self._init_async_grpc_channel()
self._aio_grpc_collections_client = grpc.CollectionsStub(self._aio_grpc_channel)

def _init_async_grpc_snapshots_client(self) -> None:
self._init_async_grpc_channel()
self._aio_grpc_snapshots_client = grpc.SnapshotsStub(self._aio_grpc_channel)

def _init_async_grpc_root_client(self) -> None:
self._init_async_grpc_channel()
self._aio_grpc_root_client = grpc.QdrantStub(self._aio_grpc_channel)

@property
def async_grpc_collections(self) -> grpc.CollectionsStub:
"""gRPC client for collections methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
if self._aio_grpc_collections_client is None:
self._init_async_grpc_collections_client()
return self._aio_grpc_collections_client

@property
def async_grpc_points(self) -> grpc.PointsStub:
"""gRPC client for points methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
if self._aio_grpc_points_client is None:
self._init_async_grpc_points_client()
return self._aio_grpc_points_client

@property
def async_grpc_snapshots(self) -> grpc.SnapshotsStub:
"""gRPC client for snapshots methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
warnings.warn(
"async_grpc_snapshots is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_snapshots` instead.",
DeprecationWarning,
stacklevel=2,
)
if self._aio_grpc_snapshots_client is None:
self._init_async_grpc_snapshots_client()
return self._aio_grpc_snapshots_client

@property
def async_grpc_root(self) -> grpc.QdrantStub:
"""gRPC client for info methods

Returns:
An instance of raw gRPC client, generated from Protobuf
"""
warnings.warn(
"async_grpc_root is deprecated and will be removed in a future release. Use `AsyncQdrantRemote.grpc_root` instead.",
DeprecationWarning,
stacklevel=2,
)
if self._aio_grpc_root_client is None:
self._init_async_grpc_root_client()
return self._aio_grpc_root_client

@property
def grpc_collections(self) -> grpc.CollectionsStub:
Expand Down
76 changes: 0 additions & 76 deletions tests/test_async_qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
import pytest

import qdrant_client.http.exceptions
from qdrant_client import QdrantClient
from qdrant_client import grpc as qdrant_grpc
from qdrant_client import models
from qdrant_client.async_qdrant_client import AsyncQdrantClient
from qdrant_client.conversions.conversion import payload_to_grpc
from tests.fixtures.payload import one_random_payload_please
from tests.utils import read_version

NUM_VECTORS = 100
Expand All @@ -21,78 +17,6 @@
COLLECTION_NAME = "async_test_collection"


@pytest.mark.asyncio
async def test_async_grpc():
points = (
qdrant_grpc.PointStruct(
id=qdrant_grpc.PointId(num=idx),
vectors=qdrant_grpc.Vectors(
vector=qdrant_grpc.Vector(data=np.random.rand(DIM).tolist())
),
payload=payload_to_grpc(one_random_payload_please(idx)),
)
for idx in range(NUM_VECTORS)
)

client = QdrantClient(prefer_grpc=True, timeout=3)

grpc_collections = client.async_grpc_collections

res = await grpc_collections.List(qdrant_grpc.ListCollectionsRequest(), timeout=1.0)

for collection in res.collections:
print(collection.name)
await grpc_collections.Delete(
qdrant_grpc.DeleteCollection(collection_name=collection.name)
)

await grpc_collections.Create(
qdrant_grpc.CreateCollection(
collection_name=COLLECTION_NAME,
vectors_config=qdrant_grpc.VectorsConfig(
params=qdrant_grpc.VectorParams(size=DIM, distance=qdrant_grpc.Distance.Cosine)
),
)
)

grpc_points = client.async_grpc_points

upload_features = []

# Upload vectors in parallel
for point in points:
upload_features.append(
grpc_points.Upsert(
qdrant_grpc.UpsertPoints(
collection_name=COLLECTION_NAME, wait=True, points=[point]
)
)
)
await asyncio.gather(*upload_features)

queries = [np.random.rand(DIM).tolist() for _ in range(NUM_QUERIES)]

# Make async queries
search_queries = []
for query in queries:
search_query = grpc_points.Search(
qdrant_grpc.SearchPoints(
collection_name=COLLECTION_NAME,
vector=query,
limit=10,
)
)
search_queries.append(search_query)
results = await asyncio.gather(*search_queries) # All queries are running in parallel now

assert len(results) == NUM_QUERIES

for result in results:
assert len(result.result) == 10

client.close()


@pytest.mark.asyncio
@pytest.mark.parametrize("prefer_grpc", [True, False])
async def test_async_qdrant_client(prefer_grpc):
Expand Down
18 changes: 0 additions & 18 deletions tests/test_qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1888,24 +1888,6 @@ def test_client_close():
RuntimeError
): # prevent initializing grpc connection, since http connection is closed
_ = client_grpc_do_nothing.get_collection("test")

client_aio_grpc = QdrantClient(prefer_grpc=True, timeout=TIMEOUT)
_ = client_aio_grpc.async_grpc_collections
client_aio_grpc.close()

client_aio_grpc = QdrantClient(prefer_grpc=True, timeout=TIMEOUT)
_ = client_aio_grpc.async_grpc_collections
client_aio_grpc.close(grace=2.0)
with pytest.raises(RuntimeError):
client_aio_grpc._client._init_async_grpc_channel() # prevent reinitializing grpc connection, since
# http connection is closed

client_aio_grpc_do_nothing = QdrantClient(prefer_grpc=True, timeout=TIMEOUT)
client_aio_grpc_do_nothing.close()
with pytest.raises(
RuntimeError
): # prevent initializing grpc connection, since http connection is closed
_ = client_aio_grpc_do_nothing.async_grpc_collections
# endregion grpc

# region local
Expand Down
3 changes: 0 additions & 3 deletions tools/async_client_generator/client_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ def get_async_methods(class_obj: type) -> list[str]:
exclude_methods=[
"__del__",
"migrate",
"async_grpc_collections",
"async_grpc_points",
"async_grpc_root",
],
)

Expand Down
9 changes: 0 additions & 9 deletions tools/async_client_generator/remote_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,6 @@ def get_async_methods(class_obj: type) -> list[str]:
exclude_methods=[
"__del__",
"migrate",
"async_grpc_collections",
"async_grpc_points",
"async_grpc_snapshots",
"async_grpc_root",
"_init_async_grpc_points_client",
"_init_async_grpc_collections_client",
"_init_async_grpc_snapshots_client",
"_init_async_grpc_channel",
"_init_async_grpc_root_client",
],
)

Expand Down
Loading