diff --git a/examples/hello_milvus_delete.py b/examples/hello_milvus_delete.py new file mode 100644 index 000000000..5dd296590 --- /dev/null +++ b/examples/hello_milvus_delete.py @@ -0,0 +1,74 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, + exceptions +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") +milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +print("collections:", milvus_client.list_collections()) +print(f"{collection_name} :", milvus_client.describe_collection(collection_name)) +rng = np.random.default_rng(seed=19530) + +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 1}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 2}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 3}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 4}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 5}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 6}, +] + +print(fmt.format("Start inserting entities")) +pks = milvus_client.insert(collection_name, rows, progress_bar=True) +pks2 = milvus_client.insert(collection_name, {"id": 7, "vector": rng.random((1, dim))[0], "g": 1}) +pks.extend(pks2) + + +def fetch_data_by_pk(pk): + print(f"get primary key {pk} from {collection_name}") + pk_data = milvus_client.get(collection_name, pk) + + if pk_data: + print(f"data of primary key {pk} is", pk_data[0]) + else: + print(f"data of primary key {pk} is empty") + +fetch_data_by_pk(pks[2]) + +print(f"start to delete primary key {pks[2]} in collection {collection_name}") +milvus_client.delete(collection_name, pks = pks[2]) + +fetch_data_by_pk(pks[2]) + + +fetch_data_by_pk(pks[4]) +filter = "e == 5 or f == 6" +print(f"start to delete by expr {filter} in collection {collection_name}") +milvus_client.delete(collection_name, filter=filter) + +fetch_data_by_pk(pks[4]) + +print(f"start to delete by expr '{filter}' or by primary 4 in collection {collection_name}, expect get exception") +try: + milvus_client.delete(collection_name, pks = 4, filter=filter) +except Exception as e: + assert isinstance(e, exceptions.ParamError) + print("catch exception", e) + +print(f"start to delete without specify any expr '{filter}' or any primary key in collection {collection_name}, expect get exception") +try: + milvus_client.delete(collection_name) +except Exception as e: + print("catch exception", e) + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +milvus_client.drop_collection(collection_name) diff --git a/pymilvus/exceptions.py b/pymilvus/exceptions.py index e71df737c..5b1db4269 100644 --- a/pymilvus/exceptions.py +++ b/pymilvus/exceptions.py @@ -215,3 +215,6 @@ class ExceptionsMessage: "Attempt to insert an unexpected field to collection without enabling dynamic field" ) UpsertAutoIDTrue = "Upsert don't support autoid == true" + AmbiguousDeleteFilterParam = ( + "Ambiguous filter parameter, only one deletion condition can be specified." + ) diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 6e0d02680..e6225672d 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -8,6 +8,7 @@ from pymilvus.exceptions import ( DataTypeNotMatchException, MilvusException, + ParamError, PrimaryKeyException, ) from pymilvus.orm import utility @@ -58,9 +59,7 @@ def __init__( self._using = self._create_connection( uri, user, password, db_name, token, timeout=timeout, **kwargs ) - self.is_self_hosted = bool( - utility.get_server_type(using=self._using) == "milvus", - ) + self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus") def create_collection( self, @@ -104,10 +103,7 @@ def create_collection( except Exception as ex: logger.error("Failed to create collection: %s", collection_name) raise ex from ex - index_params = { - "metric_type": metric_type, - "params": {}, - } + index_params = {"metric_type": metric_type, "params": {}} self._create_index(collection_name, vector_field_name, index_params, timeout=timeout) self._load(collection_name, timeout=timeout) @@ -121,21 +117,10 @@ def _create_index( """Create a index on the collection""" conn = self._get_connection() try: - conn.create_index( - collection_name, - vec_field_name, - index_params, - timeout=timeout, - ) - logger.debug( - "Successfully created an index on collection: %s", - collection_name, - ) + conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout) + logger.debug("Successfully created an index on collection: %s", collection_name) except Exception as ex: - logger.error( - "Failed to create an index on collection: %s", - collection_name, - ) + logger.error("Failed to create an index on collection: %s", collection_name) raise ex from ex def insert( @@ -195,9 +180,7 @@ def insert( pks.extend(res.primary_keys) except Exception as ex: logger.error( - "Failed to insert batch starting at entity: %s/%s", - str(i), - str(len(data)), + "Failed to insert batch starting at entity: %s/%s", str(i), str(len(data)) ) raise ex from ex @@ -370,8 +353,9 @@ def get( def delete( self, collection_name: str, - pks: Union[list, str, int], + pks: Optional[Union[list, str, int]] = None, timeout: Optional[float] = None, + filter: Optional[str] = "", **kwargs, ): """Delete entries in the collection by their pk. @@ -390,7 +374,8 @@ def delete( Args: pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int - or str or alist of either. + or str or alist of either. Default to None. + filter(str, optional): A filter to use for the deletion. Defaults to empty. timeout (int, optional): Timeout to use, overides the client level assigned at init. Defaults to None. """ @@ -398,17 +383,26 @@ def delete( if isinstance(pks, (int, str)): pks = [pks] - if len(pks) == 0: - return [] - + expr = "" conn = self._get_connection() - try: - schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + if pks: + try: + schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) + except Exception as ex: + logger.error("Failed to describe collection: %s", collection_name) + raise ex from ex + + expr = self._pack_pks_expr(schema_dict, pks) + + if filter: + if expr: + raise ParamError(message=ExceptionsMessage.AmbiguousDeleteFilterParam) + + if not isinstance(filter, str): + raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter)) + + expr = filter - expr = self._pack_pks_expr(schema_dict, pks) ret_pks = [] try: res = conn.delete(collection_name, expr, timeout=timeout, **kwargs) @@ -600,8 +594,5 @@ def _load(self, collection_name: str, timeout: Optional[float] = None): try: conn.load_collection(collection_name, timeout=timeout) except MilvusException as ex: - logger.error( - "Failed to load collection: %s", - collection_name, - ) + logger.error("Failed to load collection: %s", collection_name) raise ex from ex