From 79752dc85252a267a27573e5f49d7769f618c57e Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 19 Oct 2023 22:27:32 -0500 Subject: [PATCH 01/16] Updated Milvus vector DB files --- morpheus/service/{ => vdb}/milvus_client.py | 0 .../{ => vdb}/milvus_vector_db_service.py | 627 ++++++++++++------ .../vdb/utils.py} | 6 +- .../service/{ => vdb}/vector_db_service.py | 194 +++++- ...ctor_db.py => write_to_vector_db_stage.py} | 6 +- tests/test_milvus_vector_db_service.py | 124 +++- ...st_milvus_write_to_vector_db_stage_pipe.py | 4 +- 7 files changed, 733 insertions(+), 228 deletions(-) rename morpheus/service/{ => vdb}/milvus_client.py (100%) rename morpheus/service/{ => vdb}/milvus_vector_db_service.py (64%) rename morpheus/{utils/vector_db_service_utils.py => service/vdb/utils.py} (94%) rename morpheus/service/{ => vdb}/vector_db_service.py (65%) rename morpheus/stages/output/{write_to_vector_db.py => write_to_vector_db_stage.py} (96%) diff --git a/morpheus/service/milvus_client.py b/morpheus/service/vdb/milvus_client.py similarity index 100% rename from morpheus/service/milvus_client.py rename to morpheus/service/vdb/milvus_client.py diff --git a/morpheus/service/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py similarity index 64% rename from morpheus/service/milvus_vector_db_service.py rename to morpheus/service/vdb/milvus_vector_db_service.py index a51e85cd7a..de86e5a83d 100644 --- a/morpheus/service/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -23,49 +23,145 @@ import pandas as pd import pymilvus from pymilvus.orm.mutation import MutationResult -from pymilvus.orm.types import infer_dtype_bydata import cudf -from morpheus.service.milvus_client import MILVUS_DATA_TYPE_MAP -from morpheus.service.milvus_client import MilvusClient -from morpheus.service.vector_db_service import VectorDBResourceService -from morpheus.service.vector_db_service import VectorDBService +from morpheus.service.vdb.milvus_client import MilvusClient +from morpheus.service.vdb.vector_db_service import VectorDBResourceService +from morpheus.service.vdb.vector_db_service import VectorDBService logger = logging.getLogger(__name__) class FieldSchemaEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, pymilvus.DataType): - return str(obj) - return json.JSONEncoder.default(self, obj) + def default(self, o: typing.Any) -> str: + """ + Serialize objects to a JSON-compatible string format. + + Parameters + ---------- + o : typing.Any + Object to be serialized. + + Returns + ------- + str + JSON-compatible string format of the object. + """ + + if isinstance(o, pymilvus.DataType): + return str(o) + return json.JSONEncoder.default(self, o) @staticmethod - def object_hook(o): - if "DataType." in o: - return getattr(pymilvus.DataType, o.split(".")[1]) + def object_hook(o: dict) -> dict: + """ + Updated dictionary with pymilvus datatype. + + Parameters + ---------- + o : dict + Dictionary to be converted. + + Returns + ------- + dict + Dictionary with changes to its original format. + """ + + if "type" in o and "DataType." in o["type"]: + o["type"] = getattr(pymilvus.DataType, o["type"].split(".")[1]) return o @staticmethod - def dump(field: pymilvus.FieldSchema, f: typing.IO): + def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str: + """ + Serialize a FieldSchema object to a JSON file. + + Parameters + ---------- + field : pymilvus.FieldSchema + FieldSchema object to be serialized. + f : typing.IO + File-like object to which the data is serialized. + + Returns + ------- + str + JSON string. + """ return json.dump(field, f, cls=FieldSchemaEncoder) @staticmethod - def dumps(field: pymilvus.FieldSchema): + def dumps(field: pymilvus.FieldSchema) -> str: + """ + Serialize a FieldSchema object to a JSON-compatible string format. + + Parameters + ---------- + field : pymilvus.FieldSchema + FieldSchema object to be serialized. + + Returns + ------- + str + JSON-compatible string format of the FieldSchema object. + """ + return json.dumps(field, cls=FieldSchemaEncoder) @staticmethod - def load(f: typing.IO): + def load(f: typing.IO) -> pymilvus.FieldSchema: + """ + Deserialize a JSON file to a FieldSchema object. + + Parameters + ---------- + f : typing.IO + File-like object from which the data is deserialized. + + Returns + ------- + pymilvus.FieldSchema + Deserialized FieldSchema object. + """ return pymilvus.FieldSchema.construct_from_dict(json.load(f, object_hook=FieldSchemaEncoder.object_hook)) @staticmethod - def loads(field: str): + def loads(field: str) -> pymilvus.FieldSchema: + """ + Deserialize a JSON-compatible string to a FieldSchema object. + + Parameters + ---------- + field : str + JSON-compatible string to be deserialized. + + Returns + ------- + pymilvus.FieldSchema + Deserialized FieldSchema object. + """ + return pymilvus.FieldSchema.construct_from_dict(json.loads(field, object_hook=FieldSchemaEncoder.object_hook)) @staticmethod - def from_dict(field: dict): + def from_dict(field: dict) -> pymilvus.FieldSchema: + """ + Convert a dictionary to a FieldSchema object. + + Parameters + ---------- + field : dict + Dictionary to be converted to a FieldSchema object. + + Returns + ------- + pymilvus.FieldSchema + Converted FieldSchema object. + """ + # FieldSchema converts dtype -> type when serialized. We need to convert any dtype to type before deserilaizing # First convert any dtype to type @@ -75,7 +171,7 @@ def from_dict(field: dict): # Convert string type to DataType if ("type" in field and isinstance(field["type"], str)): - field["type"] = FieldSchemaEncoder.object_hook(field["type"]) + field = FieldSchemaEncoder.object_hook(field) # Now use the normal from dict function return pymilvus.FieldSchema.construct_from_dict(field) @@ -109,6 +205,16 @@ def wrapper(self, name, *args, **kwargs): class MilvusVectorDBResourceService(VectorDBResourceService): + """ + Represents a service for managing resources in a Milvus Vector Database. + + Parameters + ---------- + name : str + Name of the resource. + client : MilvusClient + An instance of the MilvusClient for interaction with the Milvus Vector Database. + """ def __init__(self, name: str, client: MilvusClient) -> None: super().__init__() @@ -129,38 +235,48 @@ def __init__(self, name: str, client: MilvusClient) -> None: self._collection.load() def _set_up_collection(self): - + """ + Set up the collection fields. + """ self._fields = self._collection.schema.fields def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: - # collection: pymilvus.Collection = None - # try: - # collection_conf = kwargs.get("collection_conf", {}) - # partition_name = collection_conf.get("partition_name", "_default") + """ + Insert data into the vector database. - # collection = self._client.get_collection(collection_name=self._name, **collection_conf) - # result = collection.insert(data, partition_name=partition_name) - # collection.flush() - # finally: - # collection.release() + Parameters + ---------- + data : list[list] | list[dict] + Data to be inserted into the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + Returns + ------- + dict + Returns response content as a dictionary. + """ result = self._collection.insert(data, **kwargs) + self._collection.flush() - result_dict = { - "primary_keys": result.primary_keys, - "insert_count": result.insert_count, - "delete_count": result.delete_count, - "upsert_count": result.upsert_count, - "timestamp": result.timestamp, - "succ_count": result.succ_count, - "err_count": result.err_count, - "succ_index": result.succ_index, - "err_index": result.err_index - } - - return result_dict + return self._insert_result_to_dict(result=result) def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert a dataframe entires into the vector database. + + Parameters + ---------- + df : typing.Union[cudf.DataFrame, pd.DataFrame] + Dataframe to be inserted into the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] @@ -173,28 +289,79 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa final_df = final_df.to_pandas() result = self._collection.insert(data=final_df, **kwargs) + self._collection.flush() - result_dict = { - "primary_keys": result.primary_keys, - "insert_count": result.insert_count, - "delete_count": result.delete_count, - "upsert_count": result.upsert_count, - "timestamp": result.timestamp, - "succ_count": result.succ_count, - "err_count": result.err_count, - "succ_index": result.succ_index, - "err_index": result.err_index - } - - return result_dict + return self._insert_result_to_dict(result=result) def describe(self, **kwargs: dict[str, typing.Any]) -> dict: - raise NotImplementedError() + """ + Provides a description of the collection. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + return self._client.describe_collection(collection_name=self._name, **kwargs) + + def query(self, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a collection in the Milvus vector database. + + This method performs a search operation in the specified collection/partition in the Milvus vector database. + + Parameters + ---------- + query : str, optional + The search query, which can be a filter expression, by default None. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result, which can vary depending on the query and options. + + Raises + ------ + RuntimeError + If an error occurs during the search operation. + If query argument is `None` and `data` keyword argument doesn't exist. + If `data` keyword arguement is `None`. + """ + + logger.debug("Searching in collection: %s, query=%s, kwargs=%s", self._name, query, kwargs) + + return self._client.query(collection_name=self._name, filter=query, **kwargs) async def similarity_search(self, embeddings: list[list[float]], k: int = 4, **kwargs: dict[str, typing.Any]) -> list[dict]: + """ + Perform a similarity search within the collection. + + Parameters + ---------- + embeddings : list[list[float]] + Embeddings for which to perform the similarity search. + k : int, optional + The number of nearest neighbors to return, by default 4. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ + + self._collection.load() assert self._vector_field is not None, "Cannot perform similarity search on a collection without a vector field" @@ -217,6 +384,158 @@ async def similarity_search(self, return outputs + def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the collection. + + Parameters + ---------- + data : list[typing.Any] + Data to be updated in the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + if not isinstance(data, list): + raise RuntimeError("Data is not of type list.") + + result = self._client.upsert(collection_name=self._name, entities=data, **kwargs) + + self._collection.flush() + + return self._update_delete_result_to_dict(result=result) + + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the collection. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns result of the given keys that are deleted from the collection. + """ + + result = self._client.delete(collection_name=self._name, pks=keys, **kwargs) + + self._collection.load() + + return result + + def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the collection using expressions. + + Parameters + ---------- + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are deleted from the collection. + """ + + result = self._client.delete_by_expr(collection_name=self._name, expression=expr, **kwargs) + + return self._update_delete_result_to_dict(result=result) + + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys. + + Parameters + ---------- + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the collection. + """ + + result = None + + try: + result = self._client.get(collection_name=self._name, ids=keys, **kwargs) + except pymilvus.exceptions.MilvusException as exec_info: + raise RuntimeError(f"Unable to perform search: {exec_info}") from exec_info + + return result + + def count(self, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the collection. + """ + return self._collection.num_entities + + def drop(self, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a collection, index, or partition in the Milvus vector database. + + This function allows you to drop a collection. + + Parameters + ---------- + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + """ + + self._collection.drop(**kwargs) + + def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]: + result_dict = { + "primary_keys": result.primary_keys, + "insert_count": result.insert_count, + "delete_count": result.delete_count, + "upsert_count": result.upsert_count, + "timestamp": result.timestamp, + "succ_count": result.succ_count, + "err_count": result.err_count, + "succ_index": result.succ_index, + "err_index": result.err_index + } + return result_dict + + def _update_delete_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]: + result_dict = { + "insert_count": result.insert_count, + "delete_count": result.delete_count, + "upsert_count": result.upsert_count, + "timestamp": result.timestamp, + "succ_count": result.succ_count, + "err_count": result.err_count + } + return result_dict + class MilvusVectorDBService(VectorDBService): """ @@ -347,41 +666,33 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. for part in partition_conf["partitions"]: self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout) - def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs) -> list[dict]: - from pandas.api.types import is_array_like - from pandas.api.types import is_list_like - from pandas.api.types import pandas_dtype - from pymilvus import Collection - from pymilvus import CollectionSchema - from pymilvus import DataType - from pymilvus import FieldSchema - from pymilvus import MilvusException - from pymilvus.orm.types import infer_dtype_bydata - from pymilvus.orm.types import is_numeric_datatype - from pymilvus.orm.types import map_numpy_dtype_to_datatype - + def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) -> list[dict]: fields = [] # Always add a primary key - fields.append({"name": "pk", "dtype": DataType.INT64, "is_primary": True, "auto_id": True}) + fields.append({"name": "pk", "dtype": pymilvus.DataType.INT64, "is_primary": True, "auto_id": True}) + + if isinstance(df, cudf.DataFrame): + df = df.to_pandas() # Loop over all of the columns of the first row and build the schema for col_name, col_val in df.iloc[0].iteritems(): field_dict = { "name": col_name, - "dtype": infer_dtype_bydata(col_val), + "dtype": pymilvus.orm.types.infer_dtype_bydata(col_val), # "is_primary": col_name == kwargs.get("primary_key", None), # "auto_id": col_name == kwargs.get("primary_key", None) } - if (field_dict["dtype"] == DataType.VARCHAR): + if (field_dict["dtype"] == pymilvus.DataType.VARCHAR): field_dict["max_length"] = 65_535 - if (field_dict["dtype"] == DataType.FLOAT_VECTOR or field_dict["dtype"] == DataType.BINARY_VECTOR): - field_dict["dim"] = len(col_val) + if (field_dict["dtype"] == pymilvus.DataType.FLOAT_VECTOR + or field_dict["dtype"] == pymilvus.DataType.BINARY_VECTOR): + field_dict["params"] = {"dim": len(col_val)} - if (field_dict["dtype"] == DataType.UNKNOWN): + if (field_dict["dtype"] == pymilvus.DataType.UNKNOWN): logger.warning("Could not infer data type for column '%s', with value: %s. Skipping column in schema.", col_name, col_val) @@ -397,21 +708,21 @@ def create_from_dataframe(self, overwrite: bool = False, **kwargs: dict[str, typing.Any]) -> None: """ - Create resources in the vector database. + Create collections in the vector database. Parameters ---------- name : str - Name of the resource. + Name of the collection. df : Union[cudf.DataFrame, pd.DataFrame] - The dataframe to create the resource from. + The dataframe to create the collection from. overwrite : bool, optional - Whether to overwrite the resource if it already exists. Default is False. + Whether to overwrite the collection if it already exists. Default is False. **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. """ - fields = self._build_schema_conf(df=df, **kwargs) + fields = self._build_schema_conf(df=df) create_kwargs = { "collection_conf": { @@ -462,38 +773,8 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, If the collection not exists exists. """ - return self._collection_insert(name, data, **kwargs) - - def _collection_insert(self, name: str, data: list[list] | list[dict], - **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - - if not self.has_store_object(name): - raise RuntimeError(f"Collection {name} doesn't exist.") - - collection = None - try: - collection_conf = kwargs.get("collection_conf", {}) - partition_name = collection_conf.get("partition_name", "_default") - - collection = self._client.get_collection(collection_name=name, **collection_conf) - result = collection.insert(data, partition_name=partition_name) - collection.flush() - finally: - collection.release() - - result_dict = { - "primary_keys": result.primary_keys, - "insert_count": result.insert_count, - "delete_count": result.delete_count, - "upsert_count": result.upsert_count, - "timestamp": result.timestamp, - "succ_count": result.succ_count, - "err_count": result.err_count, - "succ_index": result.succ_index, - "err_index": result.err_index - } - - return result_dict + resource = self.load_resource(name) + return resource.insert(data, **kwargs) @with_collection_lock def insert_dataframe(self, @@ -522,20 +803,14 @@ def insert_dataframe(self, RuntimeError If the collection not exists exists. """ - if not self.has_store_object(name): - raise RuntimeError(f"Collection {name} doesn't exist.") + resource = self.load_resource(name) - if isinstance(df, cudf.DataFrame): - df = df.to_pandas() - - dict_of_rows = df.to_dict(orient='records') - - return self._collection_insert(name, dict_of_rows, **kwargs) + return resource.insert_dataframe(df=df, **kwargs) @with_collection_lock - def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: + def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: """ - Search for data in a collection in the Milvus vector database. + Query data in a collection in the Milvus vector database. This method performs a search operation in the specified collection/partition in the Milvus vector database. @@ -543,8 +818,8 @@ def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) ---------- name : str Name of the collection to search within. - query : str, optional - The search query, which can be a filter expression, by default None. + query : str + The search query, which can be a filter expression. **kwargs : dict Additional keyword arguments for the search operation. @@ -552,38 +827,32 @@ def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) ------- typing.Any The search result, which can vary depending on the query and options. + """ - Raises - ------ - RuntimeError - If an error occurs during the search operation. - If query argument is `None` and `data` keyword argument doesn't exist. - If `data` keyword arguement is `None`. + resource = self.load_resource(name) + + return resource.query(query, **kwargs) + + async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[dict]: """ + Perform a similarity search within the collection. - logger.debug("Searching in collection: %s, query=%s, kwargs=%s", name, query, kwargs) + Parameters + ---------- + name : str + Name of the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. - try: - self._client.load_collection(collection_name=name) - if query is not None: - result = self._client.query(collection_name=name, filter=query, **kwargs) - else: - if "data" not in kwargs: - raise RuntimeError("The search operation requires that search vectors be " + - "provided as a keyword argument 'data'") - if kwargs["data"] is None: - raise RuntimeError("Argument 'data' cannot be None") - - data = kwargs.pop("data") - - result = self._client.search(collection_name=name, data=data, **kwargs) - return result + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ - except pymilvus.exceptions.MilvusException as exec_info: - raise RuntimeError(f"Unable to perform serach: {exec_info}") from exec_info + resource = self.load_resource(name) - finally: - self._client.release_collection(collection_name=name) + return resource.similarity_search(**kwargs) @with_collection_lock def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: @@ -593,7 +862,7 @@ def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.A Parameters ---------- name : str - Name of the resource. + Name of the collection. data : list[typing.Any] Data to be updated in the collection. **kwargs : dict[str, typing.Any] @@ -608,19 +877,19 @@ def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.A if not isinstance(data, list): raise RuntimeError("Data is not of type list.") - result = self._client.upsert(collection_name=name, entities=data, **kwargs) + resource = self.load_resource(name) - return self._convert_mutation_result_to_dict(result=result) + return resource.update(data=data, **kwargs) @with_collection_lock def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: """ - Delete vectors by keys from the resource. + Delete vectors by keys from the collection. Parameters ---------- name : str - Name of the resource. + Name of the collection. keys : int | str | list Primary keys to delete vectors. **kwargs : dict[str, typing.Any] @@ -632,19 +901,19 @@ def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, Returns result of the given keys that are delete from the collection. """ - result = self._client.delete(collection_name=name, pks=keys, **kwargs) + resource = self.load_resource(name) - return result + return resource.delete_by_keys(keys=keys, **kwargs) @with_collection_lock def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: """ - Delete vectors from the resource using expressions. + Delete vectors from the collection using expressions. Parameters ---------- name : str - Name of the resource. + Name of the collection. expr : str Delete expression. **kwargs : dict[str, typing.Any] @@ -656,9 +925,10 @@ def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[ Returns result of the given keys that are delete from the collection. """ - result = self._client.delete_by_expr(collection_name=name, expression=expr, **kwargs) + resource = self.load_resource(name) + result = resource.delete(expr=expr, **kwargs) - return self._convert_mutation_result_to_dict(result=result) + return result @with_collection_lock def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: @@ -681,16 +951,9 @@ def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str Returns result rows of the given keys from the collection. """ - result = None - - try: - self._client.load_collection(collection_name=name) - result = self._client.get(collection_name=name, ids=keys, **kwargs) - except pymilvus.exceptions.MilvusException as exec_info: - raise RuntimeError(f"Unable to perform serach: {exec_info}") from exec_info + resource = self.load_resource(name) - finally: - self._client.release_collection(collection_name=name) + result = resource.retrieve_by_keys(keys=keys, **kwargs) return result @@ -710,8 +973,9 @@ def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: int Returns number of entities in the collection. """ + resource = self.load_resource(name) - return self._client.num_entities(collection_name=name, **kwargs) + return resource.count(**kwargs) def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: """ @@ -729,8 +993,8 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: Notes on Expected Keyword Arguments: ------------------------------------ - - 'resource' (str, optional): - Specifies the type of resource to drop. Possible values: 'collection' (default), 'index', 'partition'. + - 'collection' (str, optional): + Specifies the type of collection to drop. Possible values: 'collection' (default), 'index', 'partition'. - 'partition_name' (str, optional): Required when dropping a specific partition within a collection. Specifies the partition name to be dropped. @@ -744,7 +1008,7 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: Raises ------ ValueError - If mandatory arguments are missing or if the provided 'resource' value is invalid. + If mandatory arguments are missing or if the provided 'collection' value is invalid. """ logger.debug("Dropping collection: %s, kwargs=%s", name, kwargs) @@ -758,6 +1022,8 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: raise ValueError("Mandatory argument 'partition_name' is required when resource='partition'") partition_name = kwargs["partition_name"] if self._client.has_partition(collection_name=name, partition_name=partition_name): + # Collection need to be released before dropping the partition. + self._client.release_collection(collection_name=name) self._client.drop_partition(collection_name=name, partition_name=partition_name) elif resource == "index": if "field_name" in kwargs and "index_name" in kwargs: @@ -785,7 +1051,9 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: Returns collection information. """ - return self._client.describe_collection(collection_name=name, **kwargs) + resource = self.load_resource(name) + + return resource.describe(**kwargs) def close(self) -> None: """ @@ -796,17 +1064,6 @@ def close(self) -> None: """ self._client.close() - def _convert_mutation_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]: - result_dict = { - "insert_count": result.insert_count, - "delete_count": result.delete_count, - "upsert_count": result.upsert_count, - "timestamp": result.timestamp, - "succ_count": result.succ_count, - "err_count": result.err_count - } - return result_dict - @classmethod def get_collection_lock(cls, name: str) -> threading.Lock: """ diff --git a/morpheus/utils/vector_db_service_utils.py b/morpheus/service/vdb/utils.py similarity index 94% rename from morpheus/utils/vector_db_service_utils.py rename to morpheus/service/vdb/utils.py index a8039b1860..aa40bd312c 100644 --- a/morpheus/utils/vector_db_service_utils.py +++ b/morpheus/service/vdb/utils.py @@ -15,8 +15,6 @@ import importlib import typing -import morpheus.service - class VectorDBServiceFactory: @@ -24,7 +22,7 @@ class VectorDBServiceFactory: @classmethod def create_instance( cls, service_name: typing.Literal["milvus"], *args: typing.Any, - **kwargs: dict[str, typing.Any]) -> "morpheus.service.milvus_vector_db_service.MilvusVectorDBService": + **kwargs: dict[str, typing.Any]) -> "morpheus.service.vdb.milvus_vector_db_service.MilvusVectorDBService": pass @classmethod @@ -53,7 +51,7 @@ def create_instance(cls, service_name: str, *args: typing.Any, **kwargs: dict[st If the specified service name is not found or does not correspond to a valid service class. """ try: - module_name = f"morpheus.service.{service_name}_vector_db_service" + module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" module = importlib.import_module(module_name) class_name = f"{service_name.capitalize()}VectorDBService" class_ = getattr(module, class_name) diff --git a/morpheus/service/vector_db_service.py b/morpheus/service/vdb/vector_db_service.py similarity index 65% rename from morpheus/service/vector_db_service.py rename to morpheus/service/vdb/vector_db_service.py index 844d7c366c..3ecc6a65aa 100644 --- a/morpheus/service/vector_db_service.py +++ b/morpheus/service/vdb/vector_db_service.py @@ -25,17 +25,161 @@ class VectorDBResourceService(ABC): + """ + Abstract base class for a Vector Database Resource Service. + """ @abstractmethod def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert data into the vector database. + + Parameters + ---------- + data : list[list] | list[dict] + Data to be inserted into the resource. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + pass @abstractmethod def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert a dataframe into the vector database. + + Parameters + ---------- + df : typing.Union[cudf.DataFrame, pd.DataFrame] + Dataframe to be inserted into the resource. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + pass @abstractmethod def describe(self, **kwargs: dict[str, typing.Any]) -> dict: + """ + Provide a description of the vector database. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + + pass + + @abstractmethod + def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the vector database. + + Parameters + ---------- + data : list[typing.Any] + Data to be updated in the resource. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + pass + + @abstractmethod + def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete data in the vector database. + + Parameters + ---------- + expr : typing.Any + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the delete operation stats. + """ + + pass + + @abstractmethod + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using keys from the resource. + + Parameters + ---------- + keys : typing.Any + Primary keys to get vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[typing.Any] + Returns rows of the given keys that exists in the resource. + """ + pass + + @abstractmethod + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the resource. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns vectors of the given keys that are delete from the resource. + """ + pass + + @abstractmethod + def count(self, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities in the given resource. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + int + Returns number of rows/entities in the given resource. + """ pass @abstractmethod @@ -43,6 +187,25 @@ async def similarity_search(self, embeddings: list[list[float]], k: int = 4, **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the vector database. + + Parameters + ---------- + embeddings : list[list[float]] + Embeddings for which to perform the similarity search. + k : int, optional + The number of nearest neighbors to return, by default 4. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[list[dict]] + Returns a list of lists, where each inner list contains dictionaries representing the results of the + similarity search. + """ + pass @@ -107,15 +270,15 @@ def insert_dataframe(self, pass @abstractmethod - def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: + def query(self, name: str, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: """ - Search for content in the vector database. + Query a resource in the vector database. Parameters ---------- name : str Name of the resource. - query : str, default None + query : str Query to execute on the given resource. **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. @@ -128,6 +291,27 @@ def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) pass + @abstractmethod + async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the vector database. + + Parameters + ---------- + name : str + Name of the resource. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[list[dict]] + Returns a list of lists, where each inner list contains dictionaries representing the results of the + similarity search. + """ + + pass + @abstractmethod def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: """ @@ -223,10 +407,6 @@ def create_from_dataframe(self, Whether to overwrite the resource if it already exists. Default is False. **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - None """ pass diff --git a/morpheus/stages/output/write_to_vector_db.py b/morpheus/stages/output/write_to_vector_db_stage.py similarity index 96% rename from morpheus/stages/output/write_to_vector_db.py rename to morpheus/stages/output/write_to_vector_db_stage.py index 8e1d12eb16..f18366be19 100644 --- a/morpheus/stages/output/write_to_vector_db.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -23,8 +23,8 @@ from morpheus.messages import MultiResponseMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -from morpheus.service.vector_db_service import VectorDBService -from morpheus.utils.vector_db_service_utils import VectorDBServiceFactory +from morpheus.service.vdb.utils import VectorDBServiceFactory +from morpheus.service.vdb.vector_db_service import VectorDBService logger = logging.getLogger(__name__) @@ -156,7 +156,7 @@ def on_data_multi_message(msg: MultiResponseMessage) -> MultiResponseMessage: # self._service.create_from_dataframe(name=self._resource_name, df=metadata, index_field="embedding") # Insert entries in the dataframe to vector database. - result = self._resource_service.insert_dataframe(df=metadata, **self._resource_kwargs) + self._resource_service.insert_dataframe(df=metadata, **self._resource_kwargs) return msg diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 1e7b835471..2a433bd454 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -15,19 +15,24 @@ # limitations under the License. import concurrent.futures +import json +import random import numpy as np +import pymilvus import pytest -from morpheus.service.milvus_client import MILVUS_DATA_TYPE_MAP -from morpheus.service.milvus_vector_db_service import MilvusVectorDBService +import cudf + +from morpheus.service.vdb.milvus_client import MILVUS_DATA_TYPE_MAP +from morpheus.service.vdb.milvus_vector_db_service import FieldSchemaEncoder +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService @pytest.fixture(scope="module", name="milvus_service") def milvus_service_fixture(milvus_server_uri: str): # This fixture is scoped to the function level since the WriteToVectorDBStage will close the connection on' # pipeline completion - from morpheus.service.milvus_vector_db_service import MilvusVectorDBService service = MilvusVectorDBService(uri=milvus_server_uri) yield service @@ -46,6 +51,11 @@ def test_has_store_object(milvus_service: MilvusVectorDBService): assert not milvus_service.has_store_object(collection_name) +@pytest.fixture +def sample_field(): + return pymilvus.FieldSchema(name="test_field", dtype=pymilvus.DataType.INT64) + + @pytest.mark.milvus def test_create_and_drop_collection(idx_part_collection_config: dict, milvus_service: MilvusVectorDBService): # Create a collection and check if it exists. @@ -80,7 +90,7 @@ def test_insert_and_retrieve_by_keys(milvus_service: MilvusVectorDBService, @pytest.mark.milvus -def test_search(milvus_service: MilvusVectorDBService, idx_part_collection_config: dict, milvus_data: list[dict]): +def test_query(milvus_service: MilvusVectorDBService, idx_part_collection_config: dict, milvus_data: list[dict]): # Create a collection. collection_name = "test_search_collection" milvus_service.create(collection_name, **idx_part_collection_config) @@ -92,7 +102,7 @@ def test_search(milvus_service: MilvusVectorDBService, idx_part_collection_confi query = "age==26 or age==27" # Perform a search in the collection. - search_result = milvus_service.search(collection_name, query) + search_result = milvus_service.query(collection_name, query) assert len(search_result) == 2 assert search_result[0]["age"] in [26, 27] assert search_result[1]["age"] in [26, 27] @@ -102,9 +112,10 @@ def test_search(milvus_service: MilvusVectorDBService, idx_part_collection_confi @pytest.mark.milvus -def test_search_with_data(milvus_service: MilvusVectorDBService, - idx_part_collection_config: dict, - milvus_data: list[dict]): +@pytest.mark.asyncio +async def test_similarity_search_with_data(milvus_service: MilvusVectorDBService, + idx_part_collection_config: dict, + milvus_data: list[dict]): # Create a collection. collection_name = "test_search_with_data_collection" milvus_service.create(collection_name, **idx_part_collection_config) @@ -116,16 +127,19 @@ def test_search_with_data(milvus_service: MilvusVectorDBService, search_vec = rng.random((1, 10)) # Define a search filter. - fltr = "age==26 or age==27" + expr = "age==26 or age==27" # Perform a search in the collection. - search_result = milvus_service.search(collection_name, data=search_vec, filter=fltr, output_fields=["id", "age"]) + similarity_search_coroutine = await milvus_service.similarity_search(collection_name, + embeddings=search_vec, + expr=expr) + search_result = await similarity_search_coroutine assert len(search_result[0]) == 2 - assert search_result[0][0]["entity"]["age"] in [26, 27] - assert search_result[0][1]["entity"]["age"] in [26, 27] - assert len(search_result[0][0]["entity"].keys()) == 2 - assert sorted(list(search_result[0][0]["entity"].keys())) == ["age", "id"] + assert search_result[0][0]["age"] in [26, 27] + assert search_result[0][1]["age"] in [26, 27] + assert len(search_result[0][0].keys()) == 2 + assert sorted(list(search_result[0][0].keys())) == ["age", "id"] # Clean up the collection. milvus_service.drop(collection_name) @@ -184,11 +198,12 @@ def test_insert_into_partition(milvus_service: MilvusVectorDBService, milvus_data: list[dict]): # Create a collection with a partition. collection_name = "test_partition_collection" + partition_name = idx_part_collection_config["collection_conf"]["partition_conf"]["partitions"][0]["name"] milvus_service.create(collection_name, **idx_part_collection_config) # Insert data into the specified partition. - response = milvus_service.insert(collection_name, milvus_data, collection_conf={"partition_name": partition_name}) + response = milvus_service.insert(collection_name, milvus_data, partition_name=partition_name) assert response["insert_count"] == len(milvus_data) # Retrieve inserted data by primary keys. @@ -228,7 +243,7 @@ def test_insert_into_partition(milvus_service: MilvusVectorDBService, @pytest.mark.milvus def test_update(milvus_service: MilvusVectorDBService, simple_collection_config: dict, milvus_data: list[dict]): collection_name = "test_update_collection" - + milvus_service.drop(collection_name) # Create a collection with the specified schema configuration. milvus_service.create(collection_name, **simple_collection_config) @@ -294,7 +309,7 @@ def test_delete(milvus_service: MilvusVectorDBService, idx_part_collection_confi delete_response = milvus_service.delete(collection_name, delete_expr) assert delete_response["delete_count"] == 2 - response = milvus_service.search(collection_name, query="id > 0") + response = milvus_service.query(collection_name, query="id > 0") assert len(response) == len(milvus_data) - 2 for item in response: @@ -314,7 +329,6 @@ def test_single_instance_with_collection_lock(milvus_service: MilvusVectorDBServ milvus_service.create(collection_name, **idx_part_collection_config) filter_query = "age == 26 or age == 27" - search_vec = np.random.random((1, 10)) execution_order = [] def insert_data(): @@ -322,8 +336,8 @@ def insert_data(): assert result['insert_count'] == len(milvus_data) execution_order.append("Insert Executed") - def search_data(): - result = milvus_service.search(collection_name, data=search_vec, filter=filter_query) + def query_data(): + result = milvus_service.query(collection_name, query=filter_query) execution_order.append("Search Executed") assert isinstance(result, list) @@ -333,7 +347,7 @@ def count_entities(): with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: executor.submit(insert_data) - executor.submit(search_data) + executor.submit(query_data) executor.submit(count_entities) # Assert the execution order @@ -350,7 +364,6 @@ def test_multi_instance_with_collection_lock(milvus_service: MilvusVectorDBServi collection_name = "test_insert_and_search_order_with_collection_lock" filter_query = "age == 26 or age == 27" - search_vec = np.random.random((1, 10)) execution_order = [] @@ -363,18 +376,18 @@ def insert_data(): assert result['insert_count'] == len(milvus_data) execution_order.append("Insert Executed") - def search_data(): - result = milvus_service.search(collection_name, data=search_vec, filter=filter_query) - execution_order.append("Search Executed") + def query_data(): + result = milvus_service.query(collection_name, query=filter_query) + execution_order.append("Query Executed") assert isinstance(result, list) with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: executor.submit(create_collection) executor.submit(insert_data) - executor.submit(search_data) + executor.submit(query_data) # Assert the execution order - assert execution_order == ["Create Executed", "Insert Executed", "Search Executed"] + assert execution_order == ["Create Executed", "Insert Executed", "Query Executed"] def test_get_collection_lock(): @@ -385,3 +398,60 @@ def test_get_collection_lock(): lock = MilvusVectorDBService.get_collection_lock(collection_name) assert "lock" == type(lock).__name__ assert collection_name in MilvusVectorDBService._collection_locks + + +@pytest.mark.milvus +def test_create_from_dataframe(milvus_service: MilvusVectorDBService): + + df = cudf.DataFrame({ + "id": list(range(10)), + "age": [random.randint(20, 40) for i in range(10)], + "embedding": [[random.random() for _ in range(10)] for _ in range(10)] + }) + + collection_name = "test_create_from_dataframe_collection" + + # Create a collection using dataframe schema. + milvus_service.create_from_dataframe(collection_name, df=df, index_field="embedding") + + assert milvus_service.has_store_object(collection_name) + + # Clean up the collection. + milvus_service.drop(collection_name) + + +def test_fse_default(): + encoder = FieldSchemaEncoder() + result = encoder.default(pymilvus.DataType.INT32) + assert result == "DataType.INT32" + + +def test_fse_object_hook(): + data = {"name": "test_field", "type": "DataType.INT64"} + result = FieldSchemaEncoder.object_hook(data) + assert result["type"] == pymilvus.DataType.INT64 + + +def test_fse_load(tmp_path): + data = {"name": "test_field", "type": "DataType.INT64"} + file_path = tmp_path / "test.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump(data, f) + with open(file_path, "r", encoding="utf-8") as f: + result = FieldSchemaEncoder.load(f) + assert result.name == "test_field" + assert result.dtype == pymilvus.DataType.INT64 + + +def test_fse_loads(): + data = '{"name": "test_field", "type": "DataType.INT64"}' + result = FieldSchemaEncoder.loads(data) + assert result.name == "test_field" + assert result.dtype == pymilvus.DataType.INT64 + + +def test_fse_from_dict(): + data = {"name": "test_field", "dtype": "DataType.INT64"} + result = FieldSchemaEncoder.from_dict(data) + assert result.name == "test_field" + assert result.dtype == pymilvus.DataType.INT64 diff --git a/tests/test_milvus_write_to_vector_db_stage_pipe.py b/tests/test_milvus_write_to_vector_db_stage_pipe.py index ef84b5d7c4..f24229fef8 100755 --- a/tests/test_milvus_write_to_vector_db_stage_pipe.py +++ b/tests/test_milvus_write_to_vector_db_stage_pipe.py @@ -24,11 +24,11 @@ from morpheus.messages import ControlMessage from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import from morpheus.pipeline import LinearPipeline -from morpheus.service.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService from morpheus.stages.general.linear_modules_stage import LinearModulesStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -from morpheus.stages.output.write_to_vector_db import WriteToVectorDBStage +from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import TO_CONTROL_MESSAGE From 6ca9c82c22838ee87b4de69386687645bdc64765 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 19 Oct 2023 23:58:24 -0500 Subject: [PATCH 02/16] Updated Milvus vector DB files --- .../stages/output/write_to_vector_db_stage.py | 32 +++++++++++++------ ...st_milvus_write_to_vector_db_stage_pipe.py | 26 +++++++++++---- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index f18366be19..6b4512ce8d 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -21,6 +21,7 @@ from morpheus.config import Config from morpheus.messages import ControlMessage from morpheus.messages import MultiResponseMessage +from morpheus.messages.multi_message import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair from morpheus.service.vdb.utils import VectorDBServiceFactory @@ -35,16 +36,20 @@ class WriteToVectorDBStage(SinglePortStage): Parameters ---------- - config : `morpheus.config.Config` + config : Config Pipeline configuration instance. - resource_name : str - The name of the resource managed by this instance. - resource_conf : dict - Additional resource configuration when performing vector database writes. service : typing.Union[str, VectorDBService] Either the name of the vector database service to use or an instance of VectorDBService for managing the resource. - **service_kwargs : dict[str, typing.Any] + resource_name : str + The identifier of the resource on which operations are to be performed in the vector database. + embedding_column_name : str, optional + Name of the embedding column, by default "embedding". + recreate : bool, optional + Specifies whether to recreate the resource if it already exists, by default False. + resource_kwargs : dict, optional + Additional keyword arguments to pass when performing vector database writes on a given resource. + **service_kwargs : dict Additional keyword arguments to pass when creating a VectorDBService instance. Raises @@ -55,12 +60,11 @@ class WriteToVectorDBStage(SinglePortStage): def __init__(self, config: Config, - *, + service: typing.Union[str, VectorDBService], resource_name: str, embedding_column_name: str = "embedding", recreate: bool = False, resource_kwargs: dict = None, - service: typing.Union[str, VectorDBService], **service_kwargs): super().__init__(config) @@ -89,7 +93,7 @@ def __init__(self, # Ensure that the resource exists if (not has_object): - self._service.create(name=self._resource_name, collection_conf=self._resource_kwargs) + self._service.create(name=self._resource_name, **self._resource_kwargs) # Get the service for just the resource we are interested in self._resource_service = self._service.load_resource(name=self._resource_name) @@ -132,7 +136,7 @@ def on_data_control_message(ctrl_msg: ControlMessage) -> ControlMessage: return ctrl_msg - def on_data_multi_message(msg: MultiResponseMessage) -> MultiResponseMessage: + def on_data_multi_response_message(msg: MultiResponseMessage) -> MultiResponseMessage: # Probs tensor contains all of the embeddings embeddings = msg.get_probs_tensor() embeddings_list = embeddings.tolist() @@ -160,9 +164,17 @@ def on_data_multi_message(msg: MultiResponseMessage) -> MultiResponseMessage: return msg + def on_data_multi_message(msg: MultiMessage): + # Insert entries in the dataframe to vector database. + self._service.insert_dataframe(name=self._resource_name, df=msg.get_meta(), **self._resource_kwargs) + + return msg + if (issubclass(input_stream[1], ControlMessage)): on_data = ops.map(on_data_control_message) elif (issubclass(input_stream[1], MultiResponseMessage)): + on_data = ops.map(on_data_multi_response_message) + elif (issubclass(input_stream[1], MultiMessage)): on_data = ops.map(on_data_multi_message) else: raise RuntimeError(f"Unexpected input type {input_stream[1]}") diff --git a/tests/test_milvus_write_to_vector_db_stage_pipe.py b/tests/test_milvus_write_to_vector_db_stage_pipe.py index f24229fef8..d2f4de3848 100755 --- a/tests/test_milvus_write_to_vector_db_stage_pipe.py +++ b/tests/test_milvus_write_to_vector_db_stage_pipe.py @@ -46,13 +46,18 @@ def get_test_df(num_input_rows): @pytest.mark.milvus @pytest.mark.use_cpp -@pytest.mark.parametrize("use_instance, num_input_rows, expected_num_output_rows", [(True, 5, 5), (False, 5, 5)]) +@pytest.mark.parametrize("use_instance, num_input_rows, expected_num_output_rows, resource_kwargs, recreate", + [(True, 5, 5, { + "partition_name": "age_partition" + }, True), (False, 5, 5, {}, False), (False, 5, 5, {}, True)]) def test_write_to_vector_db_stage_pipe(milvus_server_uri: str, idx_part_collection_config: dict, use_instance: bool, config: Config, num_input_rows: int, - expected_num_output_rows: int): + expected_num_output_rows: int, + resource_kwargs: dict, + recreate: bool): collection_name = "test_stage_insert_collection" @@ -60,8 +65,13 @@ def test_write_to_vector_db_stage_pipe(milvus_server_uri: str, df = get_test_df(num_input_rows) milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + milvus_service.create(name=collection_name, overwrite=True, **idx_part_collection_config) + if recreate: + # Update resource kwargs with collection configuration if recreate is True + resource_kwargs.update(idx_part_collection_config) + to_cm_module_config = { "module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE } @@ -75,22 +85,24 @@ def test_write_to_vector_db_stage_pipe(milvus_server_uri: str, output_port_name="output", output_type=ControlMessage)) - # Provide partition name to insert data into the partition otherwise goes to '_default' partition. - resource_kwargs = {"collection_conf": {"partition_name": "age_partition"}} - + # Provide partition name in the resource_kwargs to insert data into the partition + # otherwise goes to '_default' partition. if use_instance: # Instantiate stage with service instance and insert options. write_to_vdb_stage = WriteToVectorDBStage(config, resource_name=collection_name, service=milvus_service, + recreate=recreate, resource_kwargs=resource_kwargs) else: + service_kwargs = {"uri": milvus_server_uri} # Instantiate stage with service name, uri and insert options. write_to_vdb_stage = WriteToVectorDBStage(config, resource_name=collection_name, service="milvus", - uri=milvus_server_uri, - resource_kwargs=resource_kwargs) + recreate=recreate, + resource_kwargs=resource_kwargs, + **service_kwargs) pipe.add_stage(write_to_vdb_stage) sink_stage = pipe.add_stage(InMemorySinkStage(config)) From fbf2f4b15608f7a80a7aa4908db87f334b99dee0 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Fri, 20 Oct 2023 10:48:32 -0500 Subject: [PATCH 03/16] Updated Milvus vector DB files --- tests/test_milvus_write_to_vector_db_stage_pipe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_milvus_write_to_vector_db_stage_pipe.py b/tests/test_milvus_write_to_vector_db_stage_pipe.py index 10bae2c36a..c80f935353 100755 --- a/tests/test_milvus_write_to_vector_db_stage_pipe.py +++ b/tests/test_milvus_write_to_vector_db_stage_pipe.py @@ -31,7 +31,7 @@ from morpheus.stages.general.linear_modules_stage import LinearModulesStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -from morpheus.stages.output.write_to_vector_db_stage_stage import WriteToVectorDBStage +from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import TO_CONTROL_MESSAGE From b67882a6f62c304a6eb6260fa1d228e7e5ada2dc Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 23 Oct 2023 13:02:46 -0500 Subject: [PATCH 04/16] collection_conf key --- morpheus/service/vdb/milvus_vector_db_service.py | 8 +++----- tests/test_milvus_vector_db_service.py | 2 +- .../service/milvus_idx_part_collection_conf.json | 4 ++-- .../tests_data/service/milvus_simple_collection_conf.json | 4 ++-- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index de86e5a83d..887a276c45 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -627,10 +627,10 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. If the provided schema fields configuration is empty. """ logger.debug("Creating collection: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) + # Preserve original configuration. - kwargs = copy.deepcopy(kwargs) + collection_conf = copy.deepcopy(kwargs) - collection_conf = kwargs.get("collection_conf") auto_id = collection_conf.get("auto_id", False) index_conf = collection_conf.get("index_conf", None) partition_conf = collection_conf.get("partition_conf", None) @@ -725,17 +725,15 @@ def create_from_dataframe(self, fields = self._build_schema_conf(df=df) create_kwargs = { - "collection_conf": { "schema_conf": { "description": "Auto generated schema from DataFrame in Morpheus", "schema_fields": fields, } - } } if (kwargs.get("index_field", None) is not None): # Check to make sure the column name exists in the fields - create_kwargs["collection_conf"]["index_conf"] = { + create_kwargs["index_conf"] = { "field_name": kwargs.get("index_field"), # Default index type "metric_type": "L2", "index_type": "HNSW", diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 216a8bc1c4..3c96124536 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -226,7 +226,7 @@ def test_insert_into_partition(milvus_service: MilvusVectorDBService, # Make sure to drop any existing collection from previous runs. milvus_service.drop(collection_name) - partition_name = idx_part_collection_config["collection_conf"]["partition_conf"]["partitions"][0]["name"] + partition_name = idx_part_collection_config["partition_conf"]["partitions"][0]["name"] # Create a collection with a partition. milvus_service.create(collection_name, **idx_part_collection_config) diff --git a/tests/tests_data/service/milvus_idx_part_collection_conf.json b/tests/tests_data/service/milvus_idx_part_collection_conf.json index 9fff0ac839..a5ffb57a91 100644 --- a/tests/tests_data/service/milvus_idx_part_collection_conf.json +++ b/tests/tests_data/service/milvus_idx_part_collection_conf.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:23acfc38f7307a36956cde751b767dce503c3ff0cb91404ffc8ab97e428d6ae6 -size 1116 +oid sha256:d96d5419902b9727cc8c63960720a3cb4cad9f198b6579e06843a3dc5877e25d +size 1089 diff --git a/tests/tests_data/service/milvus_simple_collection_conf.json b/tests/tests_data/service/milvus_simple_collection_conf.json index e848d08754..6e69101be8 100644 --- a/tests/tests_data/service/milvus_simple_collection_conf.json +++ b/tests/tests_data/service/milvus_simple_collection_conf.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d1673eed40c5bdb6270c133c1a192170336b668a05210f1a6dd14269ed87661b -size 825 +oid sha256:e9d3b30e5a1e0abe9bb6f673e9bef607b7668f1af4a56e32b3678092810e0bdc +size 798 From 13720cf5d6fb215e2c54aef697b4b00697644c98 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 24 Oct 2023 12:06:48 -0500 Subject: [PATCH 05/16] Updated rss feed processing files --- morpheus/controllers/rss_controller.py | 105 ++++++++++++---------- morpheus/stages/input/rss_source_stage.py | 52 +++++------ tests/controllers/test_rss_controller.py | 71 +++++++++++---- tests/test_rss_source_stage_pipe.py | 20 ++--- 4 files changed, 137 insertions(+), 111 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index dd8274eefe..9b261c830e 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -14,7 +14,9 @@ import logging import os -import typing +import time +from dataclasses import asdict +from dataclasses import dataclass from urllib.parse import urlparse import feedparser @@ -27,6 +29,17 @@ logger = logging.getLogger(__name__) +@dataclass +class FeedStats: + """Data class to hold error feed stats""" + + failure_count: int + success_count: int + last_failure: float + last_success: float + last_try_result: str + + class RSSController: """ RSSController handles fetching and processing of RSS feed entries. @@ -46,6 +59,8 @@ class RSSController: Enable caching of RSS feed request data. cache_dir : str, optional, default = "./.cache/http" Cache directory for storing RSS feed request data. + cooldown_interval : int, optional, default = 600 + Cooldown interval in seconds if there is a failure in fetching or parsing the feed. """ def __init__(self, @@ -53,7 +68,8 @@ def __init__(self, batch_size: int = 128, run_indefinitely: bool = None, enable_cache: bool = False, - cache_dir: str = "./.cache/http"): + cache_dir: str = "./.cache/http", + cooldown_interval: int = 600): if (isinstance(feed_input, str)): feed_input = [feed_input] @@ -62,6 +78,12 @@ def __init__(self, self._feed_input = set(feed_input) self._batch_size = batch_size self._previous_entries = set() # Stores the IDs of previous entries to prevent the processing of duplicates. + self._cooldown_interval = cooldown_interval + + # Validate feed_input + for f in self._feed_input: + if not RSSController.is_url(f) and not os.path.exists(f): + raise ValueError(f"Invalid URL or file path: {f}") if (run_indefinitely is None): # If feed_input is URL. Runs indefinitely @@ -74,7 +96,11 @@ def __init__(self, self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"), backend="sqlite") - self._errored_feeds = [] # Feeds that have thrown an error and wont be retried + self._feed_stats_dict = {input: FeedStats(failure_count=0, + success_count=0, + last_failure=-1, + last_success=-1, + last_try_result="Unknown") for input in self._feed_input} @property def run_indefinitely(self): @@ -94,6 +120,7 @@ def _read_file_content(self, file_path: str) -> str: return file.read() def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> feedparser.FeedParserDict: + feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input) soup = BeautifulSoup(feed_input, 'xml') @@ -116,7 +143,7 @@ def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> if child.name == "link": link_value = child.get_text() if not link_value: - feed_item[child.name] = child.get('href') + feed_item[child.name] = child.get('href', 'Unknown value') else: feed_item[child.name] = link_value # To be consistant with feedparser entries, rename guid to id @@ -153,20 +180,17 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: if is_url_with_session: fallback = True - try: - logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url) - feed = feedparser.parse(url) - except Exception as ex: - raise RuntimeError(f"Failed to parse feed using fallback: {url}: {ex}") from ex + logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url) + feed = feedparser.parse(url) if feed["bozo"]: try: - logger.warning("Failed to parse feed: %s, %s. Trying with other source", - url, - feed['bozo_exception']) + logger.info("Failed to parse feed: %s, %s. Try parsing the feed manually", + url, + feed['bozo_exception']) feed = self._try_parse_feed_with_beautiful_soup(url, is_url) - except Exception as exec_info: - raise RuntimeError(f"Invalid feed input: {url}. Error: {exec_info}") from exec_info + except Exception: + raise logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback) @@ -182,17 +206,25 @@ def parse_feeds(self): The parsed feed content. """ for url in self._feed_input: + feed_stats: FeedStats = self._feed_stats_dict[url] + current_time = time.time() try: - if (url in self._errored_feeds): - continue + if ((current_time - feed_stats.last_failure) >= self._cooldown_interval): + feed = self._try_parse_feed(url) - feed = self._try_parse_feed(url) + feed_stats.last_success = current_time + feed_stats.success_count += 1 + feed_stats.last_try_result = "Success" - yield feed + yield feed except Exception as ex: - logger.warning("Failed to parse feed: %s: %s. The feed will be not be retried.", url, ex) - self._errored_feeds.append(url) + logger.warning("Failed to parse feed: %s: %s.", url, ex) + feed_stats.last_failure = current_time + feed_stats.failure_count += 1 + feed_stats.last_try_result = "Failure" + + logger.debug("Feed stats: %s", asdict(feed_stats)) def fetch_dataframes(self): """ @@ -222,45 +254,22 @@ def fetch_dataframes(self): entry_accumulator.append(entry) if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size: - yield self.create_dataframe(entry_accumulator) + yield cudf.DataFrame(entry_accumulator) entry_accumulator.clear() self._previous_entries = current_entries # Yield any remaining entries. if entry_accumulator: - df = self.create_dataframe(entry_accumulator) + # TODO (Bhargav): Debug : cannot mix list and non-list, non-null values error + df = cudf.DataFrame(entry_accumulator) yield df else: logger.debug("No new entries found.") except Exception as exc: - raise RuntimeError(f"Error fetching or processing feed entries: {exc}") from exc - - def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame: - """ - Create a DataFrame from accumulated entry data. - - Parameters - ---------- - entries : typing.List[typing.Tuple] - List of accumulated feed entries. - - Returns - ------- - cudf.DataFrame - A DataFrame containing feed entry data. - - Raises - ------ - RuntimeError - Error creating DataFrame. - """ - try: - return cudf.DataFrame(entries) - except Exception as exc: - logger.error("Error creating DataFrame: %s", exc) - raise RuntimeError(f"Error creating DataFrame: {exc}") from exc + logger.error(f"Error fetching or processing feed entries: {exc}") + raise @classmethod def is_url(cls, feed_input: str) -> bool: diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index eb84100775..2d7d60b3a9 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -43,14 +43,14 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): Interval in seconds between fetching new feed items. stop_after: int, default = 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` - max_retries : int, optional, default = 3 - Maximum number of retries for fetching entries on exception. batch_size : int, optional, default = None Number of feed items to accumulate before creating a DataFrame. enable_cache : bool, optional, default = False Enable caching of RSS feed request data. cache_dir : str, optional, default = "./.cache/http" Cache directory for storing RSS feed request data. + cooldown_interval : int, optional, default = 600 + Cooldown interval in seconds if there is a failure in fetching or parsing the feed. """ def __init__(self, @@ -58,16 +58,15 @@ def __init__(self, feed_input: list[str], interval_secs: float = 600, stop_after: int = 0, - max_retries: int = 5, run_indefinitely: bool = None, batch_size: int = None, enable_cache: bool = False, - cache_dir: str = "./.cache/http"): + cache_dir: str = "./.cache/http", + cooldown_interval: int = 600): super().__init__(c) self._stop_requested = False self._stop_after = stop_after self._interval_secs = interval_secs - self._max_retries = max_retries if (batch_size is None): batch_size = c.pipeline_batch_size @@ -83,7 +82,8 @@ def __init__(self, batch_size=batch_size, run_indefinitely=run_indefinitely, enable_cache=enable_cache, - cache_dir=cache_dir) + cache_dir=cache_dir, + cooldown_interval=cooldown_interval) @property def name(self) -> str: @@ -103,13 +103,11 @@ def _fetch_feeds(self) -> MessageMeta: """ Fetch RSS feed entries and yield as MessageMeta object. """ - retries = 0 - while (not self._stop_requested) and (retries < self._max_retries): + while (not self._stop_requested): try: for df in self._controller.fetch_dataframes(): df_size = len(df) - self._records_emitted += df_size if logger.isEnabledFor(logging.DEBUG): logger.debug("Received %d new entries...", df_size) @@ -117,32 +115,24 @@ def _fetch_feeds(self) -> MessageMeta: yield MessageMeta(df=df) - if (self._stop_after > 0 and self._records_emitted >= self._stop_after): - self._stop_requested = True - logger.debug("Stop limit reached...preparing to halt the source.") - break + self._records_emitted += df_size + except Exception as exc: if not self._controller.run_indefinitely: - self._stop_requested = True - continue + logger.error("Failed either in the process of fetching or processing entries: %d.", exc) + raise - logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs) - time.sleep(self._interval_secs) + if (self._stop_after > 0 and self._records_emitted >= self._stop_after): + self._stop_requested = True + logger.debug("Stop limit reached... preparing to halt the source.") + break - except Exception as exc: - if not self._controller.run_indefinitely: - logger.error("The input provided is not a URL or a valid path, therefore, the maximum " + - "retries are being overridden, and early exiting is triggered.") - raise RuntimeError(f"Failed to fetch feed entries : {exc}") from exc - - retries += 1 - logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries) - logger.debug("Waiting for 5 secs before retrying...") - time.sleep(5) # Wait before retrying - - if retries == self._max_retries: # Check if retries exceeded the limit - logger.error("Max retries reached. Unable to fetch feed entries.") - raise RuntimeError(f"Failed to fetch feed entries after max retries: {exc}") from exc + if not self._controller.run_indefinitely: + self._stop_requested = True + continue + + logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs) + time.sleep(self._interval_secs) logger.debug("Source stopped.") diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 4b84d73f2c..12b49a5d3a 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from os import path +from unittest.mock import patch import feedparser import pytest @@ -21,6 +23,7 @@ import cudf from _utils import TEST_DIRS +from morpheus.controllers.rss_controller import FeedStats from morpheus.controllers.rss_controller import RSSController test_urls = ["https://realpython.com/atom.xml", "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml"] @@ -45,7 +48,7 @@ def test_run_indefinitely_true(feed_input: str, expected_output: bool): assert controller.run_indefinitely == expected_output -@pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths + test_file_paths) +@pytest.mark.parametrize("feed_input", test_file_paths) def test_run_indefinitely_false(feed_input: str): controller = RSSController(feed_input=feed_input) assert controller.run_indefinitely is False @@ -60,9 +63,8 @@ def test_parse_feed_valid_url(feed_input: str): @pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths) def test_parse_feed_invalid_input(feed_input: str): - controller = RSSController(feed_input=feed_input) - list(controller.parse_feeds()) - assert controller._errored_feeds == [feed_input] + with pytest.raises(ValueError, match=f"Invalid URL or file path: {feed_input}"): + RSSController(feed_input=feed_input) @pytest.mark.parametrize("feed_input", [(test_urls + test_file_paths), test_urls, test_urls[0], test_file_paths[0]]) @@ -84,14 +86,6 @@ def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int): assert len(dataframe) == expected_count -@pytest.mark.parametrize("feed_input", test_file_paths) -def test_create_dataframe(feed_input: str): - controller = RSSController(feed_input=feed_input) - entries = [{"id": "1", "title": "Entry 1"}, {"id": "2", "title": "Entry 2"}] - df = controller.create_dataframe(entries) - assert len(df) == len(entries) - - @pytest.mark.parametrize("feed_input", test_urls) def test_is_url_true(feed_input: str): assert RSSController.is_url(feed_input) @@ -115,9 +109,9 @@ def test_batch_size(feed_input: str | list[str], batch_size: int): ("https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml", True, True), ("https://www.mandiant.com/resources/blog/rss.xml", True, False)]) def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool): - rss_controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) + controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) - feed_data = rss_controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) assert isinstance(feed_data, feedparser.FeedParserDict) @@ -142,9 +136,52 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl @pytest.mark.parametrize("enable_cache", [True, False]) def test_enable_disable_cache(enable_cache): feed_input = "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml" - rss_controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) + controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) if enable_cache: - assert rss_controller._session + assert controller._session else: - assert not rss_controller._session + assert not controller._session + + +def test_parse_feeds(): + feed_input = "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml" + controller = RSSController(feed_input=feed_input, enable_cache=False) + + with patch.object(controller, '_try_parse_feed') as mock_try_parse_feed: + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Success" + assert feed_stats.failure_count == 0 + assert feed_stats.success_count == 1 + + # Raise exception to test failure scenario + mock_try_parse_feed.side_effect = Exception("SampleException") + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Failure" + assert feed_stats.failure_count == 1 + assert feed_stats.success_count == 1 + + # Skip trying until cooldown period is met. + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Failure" + assert feed_stats.failure_count == 1 + assert feed_stats.success_count == 1 + + # Resume trying after cooldown period + with patch("time.time", return_value=time.time() + controller._cooldown_interval): + + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Failure" + assert feed_stats.failure_count == 2 + assert feed_stats.success_count == 1 diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index dcb4451fc8..4bc32391ca 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -47,7 +47,7 @@ def test_constructor_with_feed_file(config): feed_input=file_feed_input, interval_secs=5, stop_after=10, - max_retries=2, + cooldown_interval=100, batch_size=256, enable_cache=True, cache_dir="./.cache/http_cache") @@ -59,7 +59,7 @@ def test_constructor_with_feed_file(config): assert ctlr._batch_size == 256 assert rss_source_stage._interval_secs == 5 assert rss_source_stage._stop_after == 10 - assert rss_source_stage._max_retries == 2 + assert rss_source_stage._controller._cooldown_interval == 100 assert rss_source_stage._controller._session is not None assert rss_source_stage._controller._session.cache.cache_name == "./.cache/http_cache/RSSController.sqlite" @@ -99,17 +99,7 @@ def test_rss_source_stage_pipe(config: Config, @pytest.mark.use_python -def test_invalid_input_rss_source_stage_pipe(config: Config): +def test_invalid_input_rss_source_stage(config: Config): - pipe = Pipeline(config) - - rss_source_stage = pipe.add_stage( - RSSSourceStage(config, feed_input=[invalid_feed_input], interval_secs=1, max_retries=1)) - sink_stage = pipe.add_stage(InMemorySinkStage(config)) - - pipe.add_edge(rss_source_stage, sink_stage) - - pipe.run() - - assert len(sink_stage.get_messages()) == 0 - assert rss_source_stage._controller._errored_feeds == [invalid_feed_input] + with pytest.raises(ValueError, match=f"Passed invalid filepath: {invalid_feed_input}"): + RSSSourceStage(config, feed_input=[invalid_feed_input], interval_secs=1, cooldown_interval=500) From 3165b79f5bfd94299634b4c0aa75cc0881c301e1 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 24 Oct 2023 12:09:20 -0500 Subject: [PATCH 06/16] Updated tests --- tests/test_rss_source_stage_pipe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index 4bc32391ca..2b2367dc17 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -101,5 +101,5 @@ def test_rss_source_stage_pipe(config: Config, @pytest.mark.use_python def test_invalid_input_rss_source_stage(config: Config): - with pytest.raises(ValueError, match=f"Passed invalid filepath: {invalid_feed_input}"): + with pytest.raises(ValueError, match=f"Invalid URL or file path: {invalid_feed_input}"): RSSSourceStage(config, feed_input=[invalid_feed_input], interval_secs=1, cooldown_interval=500) From 4a54ad189cb72cb9878808dcdbca5e8dd3bbb588 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 24 Oct 2023 14:23:23 -0500 Subject: [PATCH 07/16] Removed external url and replaced with mock objects --- tests/controllers/test_rss_controller.py | 163 +++++++++++++++-------- 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 12b49a5d3a..82f74aafa9 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -15,6 +15,7 @@ import time from os import path +from unittest.mock import Mock from unittest.mock import patch import feedparser @@ -26,7 +27,7 @@ from morpheus.controllers.rss_controller import FeedStats from morpheus.controllers.rss_controller import RSSController -test_urls = ["https://realpython.com/atom.xml", "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml"] +test_urls = ["https://fake.nvidia.com/rss/HomePage.xml"] test_invalid_urls = [ "invalid_url", @@ -42,6 +43,29 @@ ] +@pytest.fixture(scope="module", name="mock_feed") +def mock_feed_fixture() -> feedparser.FeedParserDict: + feed_items = [{"link": "https://nvidia.com", "id": "12345"}, + {"link": "https://fake.nvidia.com", "id": "22345"} + ] + feed = feedparser.FeedParserDict() + feed.update({"entries": feed_items, "bozo": 0}) + + return feed + +@pytest.fixture(scope="module", name="mock_get_response") +def mock_get_response() -> Mock: + # Open and read the content of the file + with open(test_file_paths[0], 'rb') as file: + file_content = file.read() + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.content = file_content + mock_response.text = file_content + + return mock_response + @pytest.mark.parametrize("feed_input, expected_output", [(url, True) for url in test_urls]) def test_run_indefinitely_true(feed_input: str, expected_output: bool): controller = RSSController(feed_input=feed_input) @@ -49,35 +73,27 @@ def test_run_indefinitely_true(feed_input: str, expected_output: bool): @pytest.mark.parametrize("feed_input", test_file_paths) -def test_run_indefinitely_false(feed_input: str): +def test_run_indefinitely_false(feed_input: list[str]): controller = RSSController(feed_input=feed_input) assert controller.run_indefinitely is False @pytest.mark.parametrize("feed_input", test_urls) -def test_parse_feed_valid_url(feed_input: str): +def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict): controller = RSSController(feed_input=feed_input) - feed = list(controller.parse_feeds())[0] - assert feed.entries + with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: + mock_feedparser_parse.return_value = mock_feed + feed = list(controller.parse_feeds())[0] + assert feed.entries @pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths) -def test_parse_feed_invalid_input(feed_input: str): +def test_parse_feed_invalid_input(feed_input: list[str]): with pytest.raises(ValueError, match=f"Invalid URL or file path: {feed_input}"): RSSController(feed_input=feed_input) -@pytest.mark.parametrize("feed_input", [(test_urls + test_file_paths), test_urls, test_urls[0], test_file_paths[0]]) -def test_fetch_dataframes(feed_input: str | list[str]): - controller = RSSController(feed_input=feed_input) - dataframes_generator = controller.fetch_dataframes() - dataframe = next(dataframes_generator, None) - assert isinstance(dataframe, cudf.DataFrame) - assert "link" in dataframe.columns - assert len(dataframe) > 0 - - -@pytest.mark.parametrize("feed_input, expected_count", [(path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml"), 30)]) +@pytest.mark.parametrize("feed_input, expected_count", [(test_file_paths[0], 30)]) def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int): controller = RSSController(feed_input=[feed_input, feed_input]) # Pass duplicate feed inputs dataframes_generator = controller.fetch_dataframes() @@ -87,17 +103,38 @@ def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int): @pytest.mark.parametrize("feed_input", test_urls) -def test_is_url_true(feed_input: str): +def test_is_url_true(feed_input: list[str]): assert RSSController.is_url(feed_input) @pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths + test_file_paths) -def test_is_url_false(feed_input: str): +def test_is_url_false(feed_input: list[str]): assert not RSSController.is_url(feed_input) -@pytest.mark.parametrize("feed_input, batch_size", [(test_urls + test_file_paths, 5)]) -def test_batch_size(feed_input: str | list[str], batch_size: int): +@pytest.mark.parametrize("feed_input", [test_urls, test_urls[0]]) +def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser.FeedParserDict): + controller = RSSController(feed_input=feed_input) + + with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: + mock_feedparser_parse.return_value = mock_feed + dataframes_generator = controller.fetch_dataframes() + dataframe = next(dataframes_generator, None) + assert isinstance(dataframe, cudf.DataFrame) + assert "link" in dataframe.columns + assert len(dataframe) > 0 + +@pytest.mark.parametrize("feed_input", [test_file_paths, test_file_paths[0]]) +def test_fetch_dataframes_filepath(feed_input: str | list[str]): + controller = RSSController(feed_input=feed_input) + dataframes_generator = controller.fetch_dataframes() + dataframe = next(dataframes_generator, None) + assert isinstance(dataframe, cudf.DataFrame) + assert "link" in dataframe.columns + assert len(dataframe) > 0 + +@pytest.mark.parametrize("feed_input, batch_size", [(test_file_paths, 5)]) +def test_batch_size(feed_input: list[str], batch_size: int): controller = RSSController(feed_input=feed_input, batch_size=batch_size) for df in controller.fetch_dataframes(): assert isinstance(df, cudf.DataFrame) @@ -105,13 +142,24 @@ def test_batch_size(feed_input: str | list[str], batch_size: int): @pytest.mark.parametrize("feed_input, is_url, enable_cache", - [(path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml"), False, False), - ("https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml", True, True), - ("https://www.mandiant.com/resources/blog/rss.xml", True, False)]) -def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool): + [(test_file_paths[0], False, False), + (test_urls[0], True, True), + (test_urls[0], True, False)]) +def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock): controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + if is_url: + if enable_cache: + with patch("morpheus.controllers.rss_controller.requests_cache.CachedSession.get") as mock_get: + mock_get.return_value = mock_get_response + feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + else: + with patch("morpheus.controllers.rss_controller.requests.get") as mock_get: + mock_get.return_value = mock_get_response + feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + + else: + feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) assert isinstance(feed_data, feedparser.FeedParserDict) @@ -135,8 +183,7 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl @pytest.mark.parametrize("enable_cache", [True, False]) def test_enable_disable_cache(enable_cache): - feed_input = "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml" - controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) + controller = RSSController(feed_input=test_urls, enable_cache=enable_cache) if enable_cache: assert controller._session @@ -144,44 +191,48 @@ def test_enable_disable_cache(enable_cache): assert not controller._session -def test_parse_feeds(): - feed_input = "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml" +def test_parse_feeds(mock_feed: feedparser.FeedParserDict): + feed_input = test_urls[0] controller = RSSController(feed_input=feed_input, enable_cache=False) - with patch.object(controller, '_try_parse_feed') as mock_try_parse_feed: - dataframes_generator = controller.parse_feeds() - next(dataframes_generator, None) - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] - assert feed_stats.last_try_result == "Success" - assert feed_stats.failure_count == 0 - assert feed_stats.success_count == 1 - - # Raise exception to test failure scenario - mock_try_parse_feed.side_effect = Exception("SampleException") - dataframes_generator = controller.parse_feeds() - next(dataframes_generator, None) + with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] - assert feed_stats.last_try_result == "Failure" - assert feed_stats.failure_count == 1 - assert feed_stats.success_count == 1 + mock_feedparser_parse.return_value = mock_feed - # Skip trying until cooldown period is met. - dataframes_generator = controller.parse_feeds() - next(dataframes_generator, None) + with patch.object(controller, '_try_parse_feed') as mock_try_parse_feed: + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Success" + assert feed_stats.failure_count == 0 + assert feed_stats.success_count == 1 - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] - assert feed_stats.last_try_result == "Failure" - assert feed_stats.failure_count == 1 - assert feed_stats.success_count == 1 + # Raise exception to test failure scenario + mock_try_parse_feed.side_effect = Exception("SampleException") + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) - # Resume trying after cooldown period - with patch("time.time", return_value=time.time() + controller._cooldown_interval): + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Failure" + assert feed_stats.failure_count == 1 + assert feed_stats.success_count == 1 + # Skip trying until cooldown period is met. dataframes_generator = controller.parse_feeds() next(dataframes_generator, None) feed_stats: FeedStats = controller._feed_stats_dict[feed_input] assert feed_stats.last_try_result == "Failure" - assert feed_stats.failure_count == 2 + assert feed_stats.failure_count == 1 assert feed_stats.success_count == 1 + + # Resume trying after cooldown period + with patch("time.time", return_value=time.time() + controller._cooldown_interval): + + dataframes_generator = controller.parse_feeds() + next(dataframes_generator, None) + + feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + assert feed_stats.last_try_result == "Failure" + assert feed_stats.failure_count == 2 + assert feed_stats.success_count == 1 From cefcb124e995ac8ecb13d1a8c2c90d2fe372cc16 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 25 Oct 2023 09:32:18 -0500 Subject: [PATCH 08/16] Fixed cannot mix list and non-list bug --- examples/llm/agents/README.md | 17 ++++++ examples/llm/common/web_scraper_stage.py | 24 +++++--- examples/llm/vdb_upload/langchain.py | 2 +- examples/llm/vdb_upload/pipeline.py | 37 +++++++----- examples/llm/vdb_upload/run.py | 42 ++++++++++++++ morpheus/controllers/rss_controller.py | 14 ++--- .../service/vdb/milvus_vector_db_service.py | 8 +-- .../stages/output/write_to_vector_db_stage.py | 58 +++++++++++-------- tests/test_rss_source_stage_pipe.py | 6 +- 9 files changed, 145 insertions(+), 63 deletions(-) diff --git a/examples/llm/agents/README.md b/examples/llm/agents/README.md index 872061f0a6..2ecaf36dcb 100644 --- a/examples/llm/agents/README.md +++ b/examples/llm/agents/README.md @@ -1,3 +1,20 @@ + + # Morpheus LLM Agents Example diff --git a/examples/llm/common/web_scraper_stage.py b/examples/llm/common/web_scraper_stage.py index c79c9922bc..b65a1994a6 100644 --- a/examples/llm/common/web_scraper_stage.py +++ b/examples/llm/common/web_scraper_stage.py @@ -21,7 +21,6 @@ import pandas as pd import requests_cache from bs4 import BeautifulSoup -from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter import cudf @@ -82,7 +81,9 @@ def supports_cpp_node(self): def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: - node = builder.make_node(self.unique_name, ops.map(self._download_and_split)) + node = builder.make_node(self.unique_name, + ops.map(self._download_and_split), + ops.filter(lambda x: x is not None)) node.launch_options.pe_count = self._config.num_threads builder.make_edge(input_stream[0], node) @@ -91,15 +92,21 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea def _download_and_split(self, msg: MessageMeta) -> MessageMeta: - # Convert the dataframe into a list of dictionaries - df_pd: pd.DataFrame = msg.df.to_pandas() - df_dicts = df_pd.to_dict(orient="records") + if self._link_column not in msg.get_column_names(): + return None + + df = msg.df - splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100, length_function=len) + if isinstance(df, cudf.DataFrame): + df: pd.DataFrame = df.to_pandas() + + # Convert the dataframe into a list of dictionaries + df_dicts = df.to_dict(orient="records") final_rows: list[dict] = [] for row in df_dicts: + url = row[self._link_column] try: @@ -124,7 +131,7 @@ def _download_and_split(self, msg: MessageMeta) -> MessageMeta: # print(article.text) # text = article.text - split_text = splitter.split_text(text) + split_text = self._text_splitter.split_text(text) for text in split_text: r = row.copy() @@ -137,4 +144,5 @@ def _download_and_split(self, msg: MessageMeta) -> MessageMeta: logger.error(f"Error parsing document: {e}") continue - return MessageMeta(cudf.from_pandas(pd.DataFrame(final_rows))) + # Not using cudf to avoid error: pyarrow.lib.ArrowInvalid: cannot mix list and non-list, non-null values + return MessageMeta(pd.DataFrame(final_rows)) diff --git a/examples/llm/vdb_upload/langchain.py b/examples/llm/vdb_upload/langchain.py index 78e7e8c2e0..f66d75d9d3 100644 --- a/examples/llm/vdb_upload/langchain.py +++ b/examples/llm/vdb_upload/langchain.py @@ -14,12 +14,12 @@ import logging import pickle -from examples.llm.vdb_upload.common import build_rss_urls from langchain.document_loaders.rss import RSSFeedLoader from langchain.embeddings.huggingface import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores.milvus import Milvus +from llm.vdb_upload.common import build_rss_urls from morpheus.utils.logging_timer import log_time logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index e51ed9cf5c..fe2f563917 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -32,14 +32,21 @@ logger = logging.getLogger(__name__) -def pipeline(num_threads, - pipeline_batch_size, - model_max_batch_size, - model_fea_length, - embedding_size, - model_name, - isolate_embeddings, - stop_after: int): +def pipeline(num_threads: int, + pipeline_batch_size: int, + model_max_batch_size: int, + model_fea_length: int, + embedding_size: int, + model_name: str, + isolate_embeddings: bool, + stop_after: int, + enable_cache: bool, + interval_secs: int, + run_indefinitely: bool, + vector_db_uri: str, + vector_db_service: str, + vector_db_resource_name: str, + triton_server_url: str): config = Config() config.mode = PipelineModes.NLP @@ -56,13 +63,15 @@ def pipeline(num_threads, pipe = LinearPipeline(config) - # add doca source stage + # add rss source stage pipe.set_source( RSSSourceStage(config, feed_input=build_rss_urls(), batch_size=128, stop_after=stop_after, - run_indefinitely=False)) + run_indefinitely=run_indefinitely, + enable_cache=enable_cache, + interval_secs=interval_secs)) pipe.add_stage(MonitorStage(config, description="Source rate", unit='pages')) @@ -90,18 +99,18 @@ def pipeline(num_threads, pipe.add_stage( TritonInferenceStage(config, model_name=model_name, - server_url="localhost:8001", + server_url=triton_server_url, force_convert_inputs=True, use_shared_memory=True)) pipe.add_stage(MonitorStage(config, description="Inference rate", unit="events", delayed_start=True)) pipe.add_stage( WriteToVectorDBStage(config, - resource_name="RSS", + resource_name=vector_db_resource_name, resource_kwargs=build_milvus_config(embedding_size=embedding_size), recreate=True, - service="milvus", - uri="http://localhost:19530")) + service=vector_db_service, + uri=vector_db_uri)) pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True)) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 752f470b5a..7bacc5fc0d 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -72,6 +72,48 @@ def run(): type=click.IntRange(min=0), help="Stop after emitting this many records from the RSS source stage. Useful for testing. Disabled if `0`", ) +@click.option( + "--enable_cache", + is_flag=True, + default=False, + help="Enable caching of RSS feed request data.", +) +@click.option( + "--interval_secs", + default=600, + type=click.IntRange(min=1), + help="Interval in seconds between fetching new feed items.", +) +@click.option( + "--run_indefinitely", + is_flag=True, + default=False, + help=" Indicates whether the process should run continuously.", +) +@click.option( + "--vector_db_uri", + type=str, + default="http://localhost:19530", + help="URI for connecting to Vector Database server.", +) +@click.option( + "--vector_db_service", + type=str, + default="milvus", + help="Name of the vector database service to use.", +) +@click.option( + "--vector_db_resource_name", + type=str, + default="RSS", + help="The identifier of the resource on which operations are to be performed in the vector database.", +) +@click.option( + "--triton_server_url", + type=str, + default="localhost:8001", + help="Triton server URL.", +) def pipeline(**kwargs): from .pipeline import pipeline as _pipeline diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 9b261c830e..24abd20d79 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -14,18 +14,18 @@ import logging import os +import pickle import time from dataclasses import asdict from dataclasses import dataclass from urllib.parse import urlparse import feedparser +import pandas as pd import requests import requests_cache from bs4 import BeautifulSoup -import cudf - logger = logging.getLogger(__name__) @@ -111,7 +111,7 @@ def _get_response_text(self, url: str) -> str: if self._session: response = self._session.get(url) else: - response = requests.get(url, timeout=1.0) + response = requests.get(url, timeout=2.0) return response.text @@ -185,7 +185,7 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: if feed["bozo"]: try: - logger.info("Failed to parse feed: %s, %s. Try parsing the feed manually", + logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception']) feed = self._try_parse_feed_with_beautiful_soup(url, is_url) @@ -254,16 +254,14 @@ def fetch_dataframes(self): entry_accumulator.append(entry) if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size: - yield cudf.DataFrame(entry_accumulator) + yield pd.DataFrame(entry_accumulator) entry_accumulator.clear() self._previous_entries = current_entries # Yield any remaining entries. if entry_accumulator: - # TODO (Bhargav): Debug : cannot mix list and non-list, non-null values error - df = cudf.DataFrame(entry_accumulator) - yield df + yield pd.DataFrame(entry_accumulator) else: logger.debug("No new entries found.") diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 887a276c45..767f3ecabe 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -725,10 +725,10 @@ def create_from_dataframe(self, fields = self._build_schema_conf(df=df) create_kwargs = { - "schema_conf": { - "description": "Auto generated schema from DataFrame in Morpheus", - "schema_fields": fields, - } + "schema_conf": { + "description": "Auto generated schema from DataFrame in Morpheus", + "schema_fields": fields, + } } if (kwargs.get("index_field", None) is not None): diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index ea656bc9b9..81f9042f81 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -127,46 +127,54 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea stream = input_stream[0] def on_data_control_message(ctrl_msg: ControlMessage) -> ControlMessage: - # Insert entries in the dataframe to vector database. - result = self._service.insert_dataframe(name=self._resource_name, - df=ctrl_msg.payload().df, - **self._resource_kwargs) - ctrl_msg.set_metadata("insert_response", result) + df = ctrl_msg.payload().df + + if not df.empty: + # Insert entries in the dataframe to vector database. + result = self._service.insert_dataframe(name=self._resource_name, + df=df, + **self._resource_kwargs) + + ctrl_msg.set_metadata("insert_response", result) return ctrl_msg def on_data_multi_response_message(msg: MultiResponseMessage) -> MultiResponseMessage: - # Probs tensor contains all of the embeddings - embeddings = msg.get_probs_tensor() - embeddings_list = embeddings.tolist() - # # Figure out which columns we need - # available_columns = set(msg.get_meta_column_names()) + metadata = msg.get_meta() - # if (self._include_columns is not None): - # available_columns = available_columns.intersection(self._include_columns) - # if (self._exclude_columns is not None): - # available_columns = available_columns.difference(self._exclude_columns) + if not metadata.empty: + # Probs tensor contains all of the embeddings + embeddings = msg.get_probs_tensor() + embeddings_list = embeddings.tolist() - # Build up the metadata from the message - metadata = msg.get_meta().to_pandas() + # # Figure out which columns we need + # available_columns = set(msg.get_meta_column_names()) - # Add in the embedding results to the dataframe - metadata[self._embedding_column_name] = embeddings_list + # if (self._include_columns is not None): + # available_columns = available_columns.intersection(self._include_columns) + # if (self._exclude_columns is not None): + # available_columns = available_columns.difference(self._exclude_columns) - # if (not self._service.has_store_object(name=self._resource_name)): - # # Create the vector database resource - # self._service.create_from_dataframe(name=self._resource_name, df=metadata, index_field="embedding") + # Add in the embedding results to the dataframe + metadata[self._embedding_column_name] = embeddings_list - # Insert entries in the dataframe to vector database. - self._resource_service.insert_dataframe(df=metadata, **self._resource_kwargs) + # if (not self._service.has_store_object(name=self._resource_name)): + # # Create the vector database resource + # self._service.create_from_dataframe(name=self._resource_name, df=metadata, index_field="embedding") + + # Insert entries in the dataframe to vector database. + self._resource_service.insert_dataframe(df=metadata, **self._resource_kwargs) return msg def on_data_multi_message(msg: MultiMessage): - # Insert entries in the dataframe to vector database. - self._service.insert_dataframe(name=self._resource_name, df=msg.get_meta(), **self._resource_kwargs) + metadata = msg.get_meta() + + if not metadata.empty: + # Insert entries in the dataframe to vector database. + self._service.insert_dataframe(name=self._resource_name, df=metadata, **self._resource_kwargs) return msg diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index 2b2367dc17..ecb2402de0 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -31,12 +31,12 @@ @pytest.mark.use_python def test_constructor_with_feed_url(config): - url_feed_input = "https://realpython.com/atom.xml" + url_feed_input = "https://fake.nvidia.com/rss/HomePage.xml" rss_source_stage = RSSSourceStage(config, feed_input=url_feed_input) ctlr = rss_source_stage._controller - assert ctlr._feed_input == {"https://realpython.com/atom.xml"} + assert ctlr._feed_input == {"https://fake.nvidia.com/rss/HomePage.xml"} assert ctlr._run_indefinitely is True @@ -66,7 +66,7 @@ def test_constructor_with_feed_file(config): @pytest.mark.use_python def test_support_cpp_node(config): - url_feed_input = "https://realpython.com/atom.xml" + url_feed_input = "https://fake.nvidia.com/rss/HomePage.xml" rss_source_stage = RSSSourceStage(config, feed_input=url_feed_input) assert rss_source_stage.supports_cpp_node() is False From 27c3f6fe9fa05d0ced5c08c168b705bbceeecefb Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 25 Oct 2023 10:31:28 -0500 Subject: [PATCH 09/16] Merge remote-tracking branch 'upstream/fea-sherlock' into 1272-fea-improve-the-vectordbservice --- morpheus/controllers/rss_controller.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 403aa20f8a..8d004ae4f3 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -189,6 +189,7 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: feed['bozo_exception']) feed = self._try_parse_feed_with_beautiful_soup(url, is_url) except Exception: + logger.error("Failed to parse the feed manually: %s", url) raise logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback) @@ -218,7 +219,7 @@ def parse_feeds(self): yield feed except Exception as ex: - logger.warning("Failed to parse feed: %s: %s.", url, ex) + logger.info("Failed to parse feed: %s: %s.", url, ex) feed_stats.last_failure = current_time feed_stats.failure_count += 1 feed_stats.last_try_result = "Failure" From ce4f6bcd606fe7cb562a504fda6ffda398eec05f Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 25 Oct 2023 12:38:18 -0500 Subject: [PATCH 10/16] Fixed datatype mismatch error --- .../service/vdb/milvus_vector_db_service.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 619ed24a85..10afffde1a 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -226,11 +226,14 @@ def __init__(self, name: str, client: MilvusClient) -> None: self._fields: list[pymilvus.FieldSchema] = self._collection.schema.fields self._vector_field = None + self._fillna_fields_dict = {} for field in self._fields: if field.dtype == pymilvus.DataType.FLOAT_VECTOR: self._vector_field = field.name - break + else: + if not field.auto_id: + self._fillna_fields_dict[field.name] = field.dtype self._collection.load() @@ -288,6 +291,22 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa if isinstance(final_df, cudf.DataFrame): final_df = final_df.to_pandas() + # Ensure that there are no None values in the DataFrame entries. + for field_name, dtype in self._fillna_fields_dict.items(): + if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): + final_df[field_name] = final_df[field_name].fillna("") + elif dtype in (pymilvus.DataType.INT8, + pymilvus.DataType.INT16, + pymilvus.DataType.INT32, + pymilvus.DataType.INT64): + final_df[field_name] = final_df[field_name].fillna(0) + elif dtype in (pymilvus.DataType.FLOAT, pymilvus.DataType.DOUBLE): + final_df[field_name] = final_df[field_name].fillna(0.0) + elif dtype == pymilvus.DataType.BOOL: + final_df[field_name] = final_df[field_name].fillna(False) + else: + logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) + result = self._collection.insert(data=final_df, **kwargs) self._collection.flush() From 74730a7049d70cf2b491bb95b49aba4ec8a3d0d2 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 25 Oct 2023 14:32:46 -0500 Subject: [PATCH 11/16] replaced cudf with pandas --- tests/controllers/test_rss_controller.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 82f74aafa9..e8b3f8df2d 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -19,10 +19,9 @@ from unittest.mock import patch import feedparser +import pandas as pd import pytest -import cudf - from _utils import TEST_DIRS from morpheus.controllers.rss_controller import FeedStats from morpheus.controllers.rss_controller import RSSController @@ -98,7 +97,7 @@ def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int): controller = RSSController(feed_input=[feed_input, feed_input]) # Pass duplicate feed inputs dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) - assert isinstance(dataframe, cudf.DataFrame) + assert isinstance(dataframe, pd.DataFrame) assert len(dataframe) == expected_count @@ -120,7 +119,7 @@ def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser mock_feedparser_parse.return_value = mock_feed dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) - assert isinstance(dataframe, cudf.DataFrame) + assert isinstance(dataframe, pd.DataFrame) assert "link" in dataframe.columns assert len(dataframe) > 0 @@ -129,7 +128,7 @@ def test_fetch_dataframes_filepath(feed_input: str | list[str]): controller = RSSController(feed_input=feed_input) dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) - assert isinstance(dataframe, cudf.DataFrame) + assert isinstance(dataframe, pd.DataFrame) assert "link" in dataframe.columns assert len(dataframe) > 0 @@ -137,7 +136,7 @@ def test_fetch_dataframes_filepath(feed_input: str | list[str]): def test_batch_size(feed_input: list[str], batch_size: int): controller = RSSController(feed_input=feed_input, batch_size=batch_size) for df in controller.fetch_dataframes(): - assert isinstance(df, cudf.DataFrame) + assert isinstance(df, pd.DataFrame) assert len(df) <= batch_size From 509aa025691abdc72fba580a77fac2f4d42509cc Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 26 Oct 2023 10:21:54 -0500 Subject: [PATCH 12/16] Added callback to service argument and updated write to vdb stage --- examples/llm/common/web_scraper_stage.py | 2 +- examples/llm/vdb_upload/run.py | 5 ++ .../stages/output/write_to_vector_db_stage.py | 81 +++++++------------ 3 files changed, 34 insertions(+), 54 deletions(-) diff --git a/examples/llm/common/web_scraper_stage.py b/examples/llm/common/web_scraper_stage.py index b65a1994a6..00e6e18bb5 100644 --- a/examples/llm/common/web_scraper_stage.py +++ b/examples/llm/common/web_scraper_stage.py @@ -30,7 +30,7 @@ from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -logger = logging.getLogger(__name__) +logger = logging.getLogger(f"morpheus.{__name__}") class WebScraperStage(SinglePortStage): diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 7bacc5fc0d..a2958bc2dc 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -18,6 +18,10 @@ logger = logging.getLogger(__name__) +def is_valid_service(ctx, param, value): # pylint: disable=unused-argument + from morpheus.service.vdb.utils import validate_service + value = value.lower() + return validate_service(service_name=value) @click.group(name=__name__) def run(): @@ -100,6 +104,7 @@ def run(): "--vector_db_service", type=str, default="milvus", + callback=is_valid_service, help="Name of the vector database service to use.", ) @click.option( diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index 81f9042f81..5d9521ad97 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -126,68 +126,43 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea stream = input_stream[0] - def on_data_control_message(ctrl_msg: ControlMessage) -> ControlMessage: + def extract_df(msg): + df = None - df = ctrl_msg.payload().df + if isinstance(msg, ControlMessage): + df = msg.payload().df + elif isinstance(msg, MultiResponseMessage): + df = msg.get_meta() + if df is not None and not df.empty: + embeddings = msg.get_probs_tensor() + df[self._embedding_column_name] = embeddings.tolist() + elif isinstance(msg, MultiMessage): + df = msg.get_meta() + else: + logger.error(f"Unexpected message type '{type(msg)}' was encountered. Skipping insertion.") - if not df.empty: - # Insert entries in the dataframe to vector database. - result = self._service.insert_dataframe(name=self._resource_name, - df=df, - **self._resource_kwargs) + return df - ctrl_msg.set_metadata("insert_response", result) + def on_data(msg): - return ctrl_msg + df = extract_df(msg) - def on_data_multi_response_message(msg: MultiResponseMessage) -> MultiResponseMessage: + if df is not None and not df.empty: + try: + result = self._service.insert_dataframe(name=self._resource_name, df=df, **self._resource_kwargs) - metadata = msg.get_meta() + if isinstance(msg, ControlMessage): + msg.set_metadata("insert_response", result) - if not metadata.empty: - # Probs tensor contains all of the embeddings - embeddings = msg.get_probs_tensor() - embeddings_list = embeddings.tolist() + except Exception as exc: + logger.error(f"Unable to insert into collection: {self._resource_name} due to {exc}") + return None - # # Figure out which columns we need - # available_columns = set(msg.get_meta_column_names()) + return msg if df is not None and not df.empty else None - # if (self._include_columns is not None): - # available_columns = available_columns.intersection(self._include_columns) - # if (self._exclude_columns is not None): - # available_columns = available_columns.difference(self._exclude_columns) - - # Add in the embedding results to the dataframe - metadata[self._embedding_column_name] = embeddings_list - - # if (not self._service.has_store_object(name=self._resource_name)): - # # Create the vector database resource - # self._service.create_from_dataframe(name=self._resource_name, df=metadata, index_field="embedding") - - # Insert entries in the dataframe to vector database. - self._resource_service.insert_dataframe(df=metadata, **self._resource_kwargs) - - return msg - - def on_data_multi_message(msg: MultiMessage): - metadata = msg.get_meta() - - if not metadata.empty: - # Insert entries in the dataframe to vector database. - self._service.insert_dataframe(name=self._resource_name, df=metadata, **self._resource_kwargs) - - return msg - - if (issubclass(input_stream[1], ControlMessage)): - on_data = ops.map(on_data_control_message) - elif (issubclass(input_stream[1], MultiResponseMessage)): - on_data = ops.map(on_data_multi_response_message) - elif (issubclass(input_stream[1], MultiMessage)): - on_data = ops.map(on_data_multi_message) - else: - raise RuntimeError(f"Unexpected input type {input_stream[1]}") - - to_vector_db = builder.make_node(self.unique_name, on_data, ops.on_completed(self.on_completed)) + to_vector_db = builder.make_node(self.unique_name, ops.map(on_data), + ops.filter(lambda x: x is not None), + ops.on_completed(self.on_completed)) builder.make_edge(stream, to_vector_db) stream = to_vector_db From 0c253c35c8970ae8e6b80da31f422b9685092736 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 26 Oct 2023 11:03:31 -0500 Subject: [PATCH 13/16] Added callback to service argument and updated write to vdb stage --- morpheus/service/vdb/utils.py | 71 ++++++++++++++++--- .../stages/output/write_to_vector_db_stage.py | 18 ++--- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/morpheus/service/vdb/utils.py b/morpheus/service/vdb/utils.py index aa40bd312c..795b9ca351 100644 --- a/morpheus/service/vdb/utils.py +++ b/morpheus/service/vdb/utils.py @@ -16,6 +16,60 @@ import typing +def handle_service_exceptions(func): + """ + A decorator that handles common exceptions for the given service. + + Parameters + ---------- + func : function + The function to be decorated. + + Returns + ------- + function + A wrapped function with exception handling. + + Raises + ------ + ValueError + If the specified service name is not found or does not correspond to a valid service class. + """ + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except (ModuleNotFoundError, AttributeError) as exc: + service_name = args[0] if args else kwargs.get('service_name', 'Unknown') + module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" + raise ValueError(f"Service {service_name} not found. Ensure that the corresponding service class, " + + f"such as {module_name}, has been implemented.") from exc + return wrapper + + +@handle_service_exceptions +def validate_service(service_name: str): + """ + Parameters + ---------- + service_name : str + The name of the vector database service to create. + + Returns + ------- + Returns `service_name` if implementation exist. + + Raises + ------ + ValueError + If the specified service name is not found or does not correspond to a valid service class. + """ + + module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" + importlib.import_module(module_name) + + return service_name + + class VectorDBServiceFactory: @typing.overload @@ -26,6 +80,7 @@ def create_instance( pass @classmethod + @handle_service_exceptions def create_instance(cls, service_name: str, *args: typing.Any, **kwargs: dict[str, typing.Any]): """ Factory for creating instances of vector database service classes. This factory allows dynamically @@ -50,13 +105,9 @@ def create_instance(cls, service_name: str, *args: typing.Any, **kwargs: dict[st ValueError If the specified service name is not found or does not correspond to a valid service class. """ - try: - module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" - module = importlib.import_module(module_name) - class_name = f"{service_name.capitalize()}VectorDBService" - class_ = getattr(module, class_name) - instance = class_(*args, **kwargs) - return instance - except (ModuleNotFoundError, AttributeError) as exc: - raise ValueError(f"Service {service_name} not found. Ensure that the corresponding service class," + - f"such as {module_name}, has been implemented.") from exc + module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" + module = importlib.import_module(module_name) + class_name = f"{service_name.capitalize()}VectorDBService" + class_ = getattr(module, class_name) + instance = class_(*args, **kwargs) + return instance diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index 5d9521ad97..76d84c960e 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -139,26 +139,26 @@ def extract_df(msg): elif isinstance(msg, MultiMessage): df = msg.get_meta() else: - logger.error(f"Unexpected message type '{type(msg)}' was encountered. Skipping insertion.") + raise RuntimeError(f"Unexpected message type '{type(msg)}' was encountered.") return df def on_data(msg): + try: + df = extract_df(msg) - df = extract_df(msg) - - if df is not None and not df.empty: - try: + if df is not None and not df.empty: result = self._service.insert_dataframe(name=self._resource_name, df=df, **self._resource_kwargs) if isinstance(msg, ControlMessage): msg.set_metadata("insert_response", result) - except Exception as exc: - logger.error(f"Unable to insert into collection: {self._resource_name} due to {exc}") - return None + return msg + + except Exception as exc: + logger.error("Unable to insert into collection: %s due to %s", self._resource_name, exc) - return msg if df is not None and not df.empty else None + return None to_vector_db = builder.make_node(self.unique_name, ops.map(on_data), ops.filter(lambda x: x is not None), From a09c7204848a83d39d76b628723dfa070745dbeb Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 26 Oct 2023 13:49:17 -0500 Subject: [PATCH 14/16] Removed private variables from tests --- morpheus/controllers/rss_controller.py | 43 ++++++++++++++++++++--- morpheus/stages/input/rss_source_stage.py | 8 +++-- tests/controllers/test_rss_controller.py | 20 ++++++----- tests/test_rss_source_stage_pipe.py | 24 ------------- 4 files changed, 56 insertions(+), 39 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 8d004ae4f3..5266321aa9 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -60,6 +60,8 @@ class RSSController: Cache directory for storing RSS feed request data. cooldown_interval : int, optional, default = 600 Cooldown interval in seconds if there is a failure in fetching or parsing the feed. + request_timeout : float, optional, default = 2.0 + Request timeout in secs to fetch the feed. """ def __init__(self, @@ -68,7 +70,8 @@ def __init__(self, run_indefinitely: bool = None, enable_cache: bool = False, cache_dir: str = "./.cache/http", - cooldown_interval: int = 600): + cooldown_interval: int = 600, + request_timeout: float = 2.0): if (isinstance(feed_input, str)): feed_input = [feed_input] @@ -78,6 +81,7 @@ def __init__(self, self._batch_size = batch_size self._previous_entries = set() # Stores the IDs of previous entries to prevent the processing of duplicates. self._cooldown_interval = cooldown_interval + self._request_timeout = request_timeout # Validate feed_input for f in self._feed_input: @@ -106,11 +110,40 @@ def run_indefinitely(self): """Property that determines to run the source indefinitely""" return self._run_indefinitely + @property + def session_exist(self) -> bool: + """Property that indicates the existence of a session.""" + return True if self._session is not None else False + + def get_feed_stats(self, feed_url: str) -> FeedStats: + """ + Get feed input stats. + + Parameters + ---------- + feed_url : str + Feed URL that is part of feed_input passed to the constructor. + + Returns + ------- + FeedStats + FeedStats instance for the given feed URL if it exists. + + Raises + ------ + ValueError + If the feed URL is not found in the feed input provided to the constructor. + """ + if feed_url not in self._feed_stats_dict: + raise ValueError("The feed URL is not part of the feed input provided to the constructor.") + + return self._feed_stats_dict[feed_url] + def _get_response_text(self, url: str) -> str: - if self._session: + if self.session_exist: response = self._session.get(url) else: - response = requests.get(url, timeout=2.0) + response = requests.get(url, timeout=self._request_timeout) return response.text @@ -163,7 +196,7 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: fallback = False cache_hit = False - is_url_with_session = is_url and self._session + is_url_with_session = is_url and self.session_exist if is_url_with_session: response = self._session.get(url) @@ -219,7 +252,7 @@ def parse_feeds(self): yield feed except Exception as ex: - logger.info("Failed to parse feed: %s: %s.", url, ex) + logger.warning("Failed to parse feed: %s Feed stats: %s\n%s.", url, asdict(feed_stats), ex) feed_stats.last_failure = current_time feed_stats.failure_count += 1 feed_stats.last_try_result = "Failure" diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index 2d7d60b3a9..9b3fad8146 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -51,6 +51,8 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): Cache directory for storing RSS feed request data. cooldown_interval : int, optional, default = 600 Cooldown interval in seconds if there is a failure in fetching or parsing the feed. + request_timeout : float, optional, default = 2.0 + Request timeout in secs to fetch the feed. """ def __init__(self, @@ -62,7 +64,8 @@ def __init__(self, batch_size: int = None, enable_cache: bool = False, cache_dir: str = "./.cache/http", - cooldown_interval: int = 600): + cooldown_interval: int = 600, + request_timeout: float = 2.0): super().__init__(c) self._stop_requested = False self._stop_after = stop_after @@ -83,7 +86,8 @@ def __init__(self, run_indefinitely=run_indefinitely, enable_cache=enable_cache, cache_dir=cache_dir, - cooldown_interval=cooldown_interval) + cooldown_interval=cooldown_interval, + request_timeout=request_timeout) @property def name(self) -> str: diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index e8b3f8df2d..46ff944005 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -185,14 +185,15 @@ def test_enable_disable_cache(enable_cache): controller = RSSController(feed_input=test_urls, enable_cache=enable_cache) if enable_cache: - assert controller._session + assert controller.session_exist else: - assert not controller._session + assert not controller.session_exist def test_parse_feeds(mock_feed: feedparser.FeedParserDict): feed_input = test_urls[0] - controller = RSSController(feed_input=feed_input, enable_cache=False) + cooldown_interval = 620 + controller = RSSController(feed_input=feed_input, enable_cache=False, cooldown_interval=cooldown_interval) with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: @@ -201,7 +202,7 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): with patch.object(controller, '_try_parse_feed') as mock_try_parse_feed: dataframes_generator = controller.parse_feeds() next(dataframes_generator, None) - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + feed_stats: FeedStats = controller.get_feed_stats(feed_input) assert feed_stats.last_try_result == "Success" assert feed_stats.failure_count == 0 assert feed_stats.success_count == 1 @@ -211,7 +212,7 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): dataframes_generator = controller.parse_feeds() next(dataframes_generator, None) - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + feed_stats: FeedStats = controller.get_feed_stats(feed_input) assert feed_stats.last_try_result == "Failure" assert feed_stats.failure_count == 1 assert feed_stats.success_count == 1 @@ -220,18 +221,21 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): dataframes_generator = controller.parse_feeds() next(dataframes_generator, None) - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + feed_stats: FeedStats = controller.get_feed_stats(feed_input) assert feed_stats.last_try_result == "Failure" assert feed_stats.failure_count == 1 assert feed_stats.success_count == 1 # Resume trying after cooldown period - with patch("time.time", return_value=time.time() + controller._cooldown_interval): + with patch("time.time", return_value=time.time() + cooldown_interval): dataframes_generator = controller.parse_feeds() next(dataframes_generator, None) - feed_stats: FeedStats = controller._feed_stats_dict[feed_input] + feed_stats: FeedStats = controller.get_feed_stats(feed_input) assert feed_stats.last_try_result == "Failure" assert feed_stats.failure_count == 2 assert feed_stats.success_count == 1 + + with pytest.raises(ValueError): + controller.get_feed_stats("http://testfeed.com") diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index ecb2402de0..1b408af8d4 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -40,30 +40,6 @@ def test_constructor_with_feed_url(config): assert ctlr._run_indefinitely is True -@pytest.mark.use_python -def test_constructor_with_feed_file(config): - file_feed_input = os.path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml") - rss_source_stage = RSSSourceStage(config, - feed_input=file_feed_input, - interval_secs=5, - stop_after=10, - cooldown_interval=100, - batch_size=256, - enable_cache=True, - cache_dir="./.cache/http_cache") - - ctlr = rss_source_stage._controller - - assert ctlr._feed_input == {file_feed_input} - assert ctlr._run_indefinitely is False - assert ctlr._batch_size == 256 - assert rss_source_stage._interval_secs == 5 - assert rss_source_stage._stop_after == 10 - assert rss_source_stage._controller._cooldown_interval == 100 - assert rss_source_stage._controller._session is not None - assert rss_source_stage._controller._session.cache.cache_name == "./.cache/http_cache/RSSController.sqlite" - - @pytest.mark.use_python def test_support_cpp_node(config): url_feed_input = "https://fake.nvidia.com/rss/HomePage.xml" From 1dc7114a8a9a1ede061ab8574811efb5eb61060b Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Fri, 27 Oct 2023 09:35:30 -0500 Subject: [PATCH 15/16] Fixed pylint errors --- examples/llm/common/web_scraper_stage.py | 18 +++++++++-------- examples/llm/rag/standalone_pipeline.py | 3 ++- examples/llm/vdb_upload/run.py | 2 ++ morpheus/controllers/rss_controller.py | 20 +++++++++---------- morpheus/service/vdb/utils.py | 2 ++ .../stages/output/write_to_vector_db_stage.py | 3 ++- tests/controllers/test_rss_controller.py | 16 +++++++-------- .../milvus_idx_part_collection_conf.json | 2 +- 8 files changed, 36 insertions(+), 30 deletions(-) diff --git a/examples/llm/common/web_scraper_stage.py b/examples/llm/common/web_scraper_stage.py index 00e6e18bb5..53c8719faa 100644 --- a/examples/llm/common/web_scraper_stage.py +++ b/examples/llm/common/web_scraper_stage.py @@ -115,8 +115,10 @@ def _download_and_split(self, msg: MessageMeta) -> MessageMeta: if (not response.ok): logger.warning( - f"Error downloading document from URL '{url}'. Returned code: {response.status_code}. With reason: '{response.reason}'" - ) + "Error downloading document from URL '%s'. " + "Returned code: %s. With reason: '%s'", + url, + response.status_code, + response.reason) continue raw_html = response.text @@ -134,14 +136,14 @@ def _download_and_split(self, msg: MessageMeta) -> MessageMeta: split_text = self._text_splitter.split_text(text) for text in split_text: - r = row.copy() - r.update({"page_content": text}) - final_rows.append(r) + row_cp = row.copy() + row_cp.update({"page_content": text}) + final_rows.append(row_cp) - logger.debug(f"Processed page: '{url}'. Cache hit: {response.from_cache}") + logger.debug("Processed page: '%s'. Cache hit: %s", url, response.from_cache) - except ValueError as e: - logger.error(f"Error parsing document: {e}") + except ValueError as exc: + logger.error("Error parsing document: %s", exc) continue # Not using cudf to avoid error: pyarrow.lib.ArrowInvalid: cannot mix list and non-list, non-null values diff --git a/examples/llm/rag/standalone_pipeline.py b/examples/llm/rag/standalone_pipeline.py index 47d9956562..b87e34722e 100644 --- a/examples/llm/rag/standalone_pipeline.py +++ b/examples/llm/rag/standalone_pipeline.py @@ -181,7 +181,8 @@ def standalone( pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions')) - pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name, vdb_resource_name=vdb_resource_name))) + pipe.add_stage( + LLMEngineStage(config, engine=_build_engine(model_name=model_name, vdb_resource_name=vdb_resource_name))) sink = pipe.add_stage(InMemorySinkStage(config)) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index a2958bc2dc..fb127f4fac 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -18,11 +18,13 @@ logger = logging.getLogger(__name__) + def is_valid_service(ctx, param, value): # pylint: disable=unused-argument from morpheus.service.vdb.utils import validate_service value = value.lower() return validate_service(service_name=value) + @click.group(name=__name__) def run(): pass diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 5266321aa9..a1b64a926f 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -99,11 +99,11 @@ def __init__(self, self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"), backend="sqlite") - self._feed_stats_dict = {input: FeedStats(failure_count=0, - success_count=0, - last_failure=-1, - last_success=-1, - last_try_result="Unknown") for input in self._feed_input} + self._feed_stats_dict = { + input: + FeedStats(failure_count=0, success_count=0, last_failure=-1, last_success=-1, last_try_result="Unknown") + for input in self._feed_input + } @property def run_indefinitely(self): @@ -113,7 +113,7 @@ def run_indefinitely(self): @property def session_exist(self) -> bool: """Property that indicates the existence of a session.""" - return True if self._session is not None else False + return bool(self._session) def get_feed_stats(self, feed_url: str) -> FeedStats: """ @@ -217,9 +217,7 @@ def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: if feed["bozo"]: try: - logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", - url, - feed['bozo_exception']) + logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception']) feed = self._try_parse_feed_with_beautiful_soup(url, is_url) except Exception: logger.error("Failed to parse the feed manually: %s", url) @@ -270,7 +268,7 @@ def fetch_dataframes(self): Raises ------ - RuntimeError + Exception If there is error fetching or processing feed entries. """ entry_accumulator = [] @@ -299,7 +297,7 @@ def fetch_dataframes(self): logger.debug("No new entries found.") except Exception as exc: - logger.error(f"Error fetching or processing feed entries: {exc}") + logger.error("Error fetching or processing feed entries: %s", exc) raise @classmethod diff --git a/morpheus/service/vdb/utils.py b/morpheus/service/vdb/utils.py index 795b9ca351..662c57c7c8 100644 --- a/morpheus/service/vdb/utils.py +++ b/morpheus/service/vdb/utils.py @@ -35,6 +35,7 @@ def handle_service_exceptions(func): ValueError If the specified service name is not found or does not correspond to a valid service class. """ + def wrapper(*args, **kwargs): try: return func(*args, **kwargs) @@ -43,6 +44,7 @@ def wrapper(*args, **kwargs): module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" raise ValueError(f"Service {service_name} not found. Ensure that the corresponding service class, " + f"such as {module_name}, has been implemented.") from exc + return wrapper diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index 76d84c960e..8275e4724b 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -160,7 +160,8 @@ def on_data(msg): return None - to_vector_db = builder.make_node(self.unique_name, ops.map(on_data), + to_vector_db = builder.make_node(self.unique_name, + ops.map(on_data), ops.filter(lambda x: x is not None), ops.on_completed(self.on_completed)) diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 46ff944005..e388783fbe 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -44,16 +44,15 @@ @pytest.fixture(scope="module", name="mock_feed") def mock_feed_fixture() -> feedparser.FeedParserDict: - feed_items = [{"link": "https://nvidia.com", "id": "12345"}, - {"link": "https://fake.nvidia.com", "id": "22345"} - ] + feed_items = [{"link": "https://nvidia.com", "id": "12345"}, {"link": "https://fake.nvidia.com", "id": "22345"}] feed = feedparser.FeedParserDict() feed.update({"entries": feed_items, "bozo": 0}) return feed + @pytest.fixture(scope="module", name="mock_get_response") -def mock_get_response() -> Mock: +def mock_get_response_fixture() -> Mock: # Open and read the content of the file with open(test_file_paths[0], 'rb') as file: file_content = file.read() @@ -65,6 +64,7 @@ def mock_get_response() -> Mock: return mock_response + @pytest.mark.parametrize("feed_input, expected_output", [(url, True) for url in test_urls]) def test_run_indefinitely_true(feed_input: str, expected_output: bool): controller = RSSController(feed_input=feed_input) @@ -123,6 +123,7 @@ def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser assert "link" in dataframe.columns assert len(dataframe) > 0 + @pytest.mark.parametrize("feed_input", [test_file_paths, test_file_paths[0]]) def test_fetch_dataframes_filepath(feed_input: str | list[str]): controller = RSSController(feed_input=feed_input) @@ -132,6 +133,7 @@ def test_fetch_dataframes_filepath(feed_input: str | list[str]): assert "link" in dataframe.columns assert len(dataframe) > 0 + @pytest.mark.parametrize("feed_input, batch_size", [(test_file_paths, 5)]) def test_batch_size(feed_input: list[str], batch_size: int): controller = RSSController(feed_input=feed_input, batch_size=batch_size) @@ -140,10 +142,8 @@ def test_batch_size(feed_input: list[str], batch_size: int): assert len(df) <= batch_size -@pytest.mark.parametrize("feed_input, is_url, enable_cache", - [(test_file_paths[0], False, False), - (test_urls[0], True, True), - (test_urls[0], True, False)]) +@pytest.mark.parametrize("feed_input, is_url, enable_cache", [(test_file_paths[0], False, False), + (test_urls[0], True, True), (test_urls[0], True, False)]) def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock): controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) diff --git a/tests/tests_data/service/milvus_idx_part_collection_conf.json b/tests/tests_data/service/milvus_idx_part_collection_conf.json index a5ffb57a91..9e442f21f6 100644 --- a/tests/tests_data/service/milvus_idx_part_collection_conf.json +++ b/tests/tests_data/service/milvus_idx_part_collection_conf.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d96d5419902b9727cc8c63960720a3cb4cad9f198b6579e06843a3dc5877e25d +oid sha256:30142c5f6ca06929ebfdbddd2e214ad43f26465df9c92924ab86305fe1c43816 size 1089 From 19361150198ec888aceaca69654e848766b046d3 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Sun, 29 Oct 2023 23:12:48 -0500 Subject: [PATCH 16/16] Updated pymilvus to 2.3.2 --- docker/conda/environments/cuda11.8_dev.yml | 2 +- examples/llm/agents/kafka_pipeline.py | 26 ++-------------- examples/llm/agents/simple_pipeline.py | 10 +------ examples/llm/vdb_upload/README.md | 10 +++---- examples/llm/vdb_upload/export_model.py | 5 ++-- examples/llm/vdb_upload/requirements.yaml | 2 +- morpheus/llm/nodes/langchain_agent_node.py | 4 ++- .../service/vdb/milvus_vector_db_service.py | 30 ++++++++----------- morpheus/utils/logging_timer.py | 2 +- morpheus/utils/type_utils.py | 22 +++++++------- tests/conftest.py | 6 ++-- 11 files changed, 45 insertions(+), 74 deletions(-) diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 9d25275e13..f1e6eed6d6 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -121,5 +121,5 @@ dependencies: # Add additional dev dependencies here - databricks-connect - pytest-kafka==0.6.0 - - pymilvus==2.3.1 + - pymilvus==2.3.2 - milvus diff --git a/examples/llm/agents/kafka_pipeline.py b/examples/llm/agents/kafka_pipeline.py index 3c37558717..fd78eabe8b 100644 --- a/examples/llm/agents/kafka_pipeline.py +++ b/examples/llm/agents/kafka_pipeline.py @@ -11,20 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging -import os import time -import pymilvus from langchain.agents import AgentType from langchain.agents import initialize_agent from langchain.agents import load_tools -from langchain.embeddings import HuggingFaceEmbeddings -from langchain.llms.openai import OpenAI from langchain.llms.openai import OpenAIChat -from requests_cache import SQLiteCache - -import cudf from morpheus.config import Config from morpheus.config import PipelineModes @@ -32,20 +26,12 @@ from morpheus.llm.llm_engine_stage import LLMEngineStage from morpheus.llm.nodes.extracter_node import ExtracterNode from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode -from morpheus.llm.nodes.prompt_template_node import PromptTemplateNode -from morpheus.llm.nodes.rag_node import RAGNode -from morpheus.llm.services.nemo_llm_service import NeMoLLMService from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline -from morpheus.service.milvus_vector_db_service import MilvusVectorDBService -from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.vector_db_service_utils import VectorDBServiceFactory logger = logging.getLogger(__name__) @@ -76,13 +62,7 @@ def _build_engine(model_name: str): return engine -def pipeline( - num_threads, - pipeline_batch_size, - model_max_batch_size, - model_name, - repeat_count, -): +def pipeline(num_threads, pipeline_batch_size, model_max_batch_size, model_name): config = Config() config.mode = PipelineModes.OTHER @@ -106,7 +86,7 @@ def pipeline( pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name))) - sink = pipe.add_stage(InMemorySinkStage(config)) + pipe.add_stage(InMemorySinkStage(config)) # pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True)) diff --git a/examples/llm/agents/simple_pipeline.py b/examples/llm/agents/simple_pipeline.py index 56b3a53148..8e474323ec 100644 --- a/examples/llm/agents/simple_pipeline.py +++ b/examples/llm/agents/simple_pipeline.py @@ -11,17 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging -import os import time -import pymilvus from langchain import OpenAI from langchain.agents import AgentType from langchain.agents import initialize_agent from langchain.agents import load_tools -from langchain.embeddings import HuggingFaceEmbeddings -from requests_cache import SQLiteCache import cudf @@ -31,18 +28,13 @@ from morpheus.llm.llm_engine_stage import LLMEngineStage from morpheus.llm.nodes.extracter_node import ExtracterNode from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode -from morpheus.llm.nodes.prompt_template_node import PromptTemplateNode -from morpheus.llm.nodes.rag_node import RAGNode -from morpheus.llm.services.nemo_llm_service import NeMoLLMService from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline -from morpheus.service.milvus_vector_db_service import MilvusVectorDBService from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.vector_db_service_utils import VectorDBServiceFactory logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/README.md b/examples/llm/vdb_upload/README.md index 80e16c3873..8cb5cfd408 100644 --- a/examples/llm/vdb_upload/README.md +++ b/examples/llm/vdb_upload/README.md @@ -195,7 +195,7 @@ using `sentence-transformers/paraphrase-multilingual-mpnet-base-v2` as an exampl 2. **Run the Pipeline Call with the Chosen Model**: - Execute the following command with the model name you've identified: ```bash - python examples/llm/main.py vdb_upload export-triton-model --model_name + python examples/llm/main.py vdb_upload export-triton-model --model_name \ sentence-transformers/paraphrase-multilingual-mpnet-base-v2 --triton_repo ./models/triton-model-repo ``` @@ -220,9 +220,9 @@ using `sentence-transformers/paraphrase-multilingual-mpnet-base-v2` as an exampl 5. **Deploy the Model**: - Reload the docker container, specifying that we also need to load paraphrase-multilingual-mpnet-base-v2 ```bash - docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 - -v $PWD/models:/models nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver - --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model + docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 \ + -v $PWD/models:/models nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver \ + --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model \ all-MiniLM-L6-v2 --load-model sentence-transformers/paraphrase-multilingual-mpnet-base-v2 ``` @@ -252,7 +252,7 @@ using `sentence-transformers/paraphrase-multilingual-mpnet-base-v2` as an exampl 6. **Update the Pipeline Call**: - Now that the model has been exported and deployed, we can update the pipeline call to use the new model: ```bash - python examples/llm/main.py vdb_upload pipeline --model_name + python examples/llm/main.py vdb_upload pipeline --model_name \ sentence-transformers/paraphrase-multilingual-mpnet-base-v2 ``` diff --git a/examples/llm/vdb_upload/export_model.py b/examples/llm/vdb_upload/export_model.py index bfb69fd76b..87da3d6a01 100644 --- a/examples/llm/vdb_upload/export_model.py +++ b/examples/llm/vdb_upload/export_model.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import functools import inspect import logging @@ -39,12 +40,11 @@ class CustomTokenizer(torch.nn.Module): - + # pylint: disable=abstract-method def __init__(self, model_name: str): super().__init__() self.inner_model = AutoModel.from_pretrained(model_name) - # self.inner_model = SentenceTransformer(model_name) if (isinstance(self.inner_model, SentenceTransformer)): self._output_dim = self.inner_model.get_sentence_embedding_dimension() @@ -199,6 +199,7 @@ def build_triton_model(model_name, model_seq_length, max_batch_size, triton_repo config.platform = "onnxruntime_onnx" config.max_batch_size = max_batch_size + # pylint: disable=no-member for input_name, input_data in sample_input.data.items(): config.input.append( diff --git a/examples/llm/vdb_upload/requirements.yaml b/examples/llm/vdb_upload/requirements.yaml index be2cd41b2f..e9786a69ce 100644 --- a/examples/llm/vdb_upload/requirements.yaml +++ b/examples/llm/vdb_upload/requirements.yaml @@ -36,4 +36,4 @@ dependencies: - pip: - grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus - langchain==0.0.310 - - pymilvus # The conda package is woefully out of date and incorrect + - pymilvus==2.3.2 # The conda package is woefully out of date and incorrect diff --git a/morpheus/llm/nodes/langchain_agent_node.py b/morpheus/llm/nodes/langchain_agent_node.py index 96aef0d5c5..12b2b239c8 100644 --- a/morpheus/llm/nodes/langchain_agent_node.py +++ b/morpheus/llm/nodes/langchain_agent_node.py @@ -37,8 +37,10 @@ def get_input_names(self): async def _run_single(self, **kwargs): + all_lists = all(isinstance(v, list) for v in kwargs.values()) + # Check if all values are a list - if (all([isinstance(v, list) for v in kwargs.values()])): + if all_lists: # Transform from dict[str, list[Any]] to list[dict[str, Any]] input_list = [dict(zip(kwargs, t)) for t in zip(*kwargs.values())] diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 6d3732936d..ea1bd3b4ff 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -281,33 +281,30 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa Returns response content as a dictionary. """ - # From the schema, this is the list of columns we need, excluding any auto_id columns - column_names = [field.name for field in self._fields if not field.auto_id] - - final_column_names = df.columns.intersection(column_names) - - final_df = df[final_column_names] - - if isinstance(final_df, cudf.DataFrame): - final_df = final_df.to_pandas() + if isinstance(df, cudf.DataFrame): + df = df.to_pandas() # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): - final_df[field_name] = final_df[field_name].fillna("") + df[field_name] = df[field_name].fillna("") elif dtype in (pymilvus.DataType.INT8, pymilvus.DataType.INT16, pymilvus.DataType.INT32, pymilvus.DataType.INT64): - final_df[field_name] = final_df[field_name].fillna(0) + df[field_name] = df[field_name].fillna(0) elif dtype in (pymilvus.DataType.FLOAT, pymilvus.DataType.DOUBLE): - final_df[field_name] = final_df[field_name].fillna(0.0) + df[field_name] = df[field_name].fillna(0.0) elif dtype == pymilvus.DataType.BOOL: - final_df[field_name] = final_df[field_name].fillna(False) + df[field_name] = df[field_name].fillna(False) else: logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) - result = self._collection.insert(data=final_df, **kwargs) + # From the schema, this is the list of columns we need, excluding any auto_id columns + column_names = [field.name for field in self._fields if not field.auto_id] + + # Note: dataframe columns has to be in the order of collection schema fields.s + result = self._collection.insert(data=df[column_names], **kwargs) self._collection.flush() return self._insert_result_to_dict(result=result) @@ -666,12 +663,9 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. schema = pymilvus.CollectionSchema(fields=schema_fields, **schema_conf) - # if index_conf: - # index_param = self._client.prepare_index_params(**index_conf) - self._client.create_collection_with_schema(collection_name=name, schema=schema, - index_param=index_conf, + index_params=index_conf, auto_id=auto_id, shards_num=collection_conf.get("shards", 2), consistency_level=collection_conf.get( diff --git a/morpheus/utils/logging_timer.py b/morpheus/utils/logging_timer.py index 250a44985b..4be1f3a31d 100644 --- a/morpheus/utils/logging_timer.py +++ b/morpheus/utils/logging_timer.py @@ -56,7 +56,7 @@ def _do_log_message(self, duration_ms: float): @contextmanager -def log_time(log_fn, msg: str = None, count: int = None, *args, **kwargs): +def log_time(log_fn, *args, msg: str = None, count: int = None, **kwargs): # Create an info object to allow users to set the message in the context block info = LogTimeInfo(log_fn=log_fn, msg=msg, count=count, args=args, kwargs=kwargs) diff --git a/morpheus/utils/type_utils.py b/morpheus/utils/type_utils.py index e88b807dba..bab1711967 100644 --- a/morpheus/utils/type_utils.py +++ b/morpheus/utils/type_utils.py @@ -18,6 +18,8 @@ from collections import defaultdict T_co = typing.TypeVar("T_co", covariant=True) + +# pylint: disable=invalid-name T = typing.TypeVar('T') T1 = typing.TypeVar('T1') T2 = typing.TypeVar('T2') @@ -75,18 +77,18 @@ def unpack_union(*cls_list: typing.Type) -> typing.Union: assert len(cls_list) > 0, "Union class list must have at least 1 element." + out_union = None + if (len(cls_list) == 1): - return typing.Union[cls_list[0]] - # elif (len(cls_list) == 2): - # return typing.Union[cls_list[0], cls_list[1]] + out_union = typing.Union[cls_list[0]] else: out_union = unpack_union(cls_list[0:2]) # Since typing.Union[typing.Union[A, B], C] == typing.Union[A, B, C], we build the union up manually - for t in cls_list[2:]: - out_union = typing.Union[out_union, t] + for typ in cls_list[2:]: + out_union = typing.Union[out_union, typ] - return out_union + return out_union @typing.overload @@ -109,10 +111,10 @@ def unpack_tuple(*cls_list: typing.Type) -> typing.Tuple: assert len(cls_list) > 0, "Union class list must have at least 1 element." + out_tuple = None + if (len(cls_list) == 1): - return typing.Tuple[cls_list[0]] - # elif (len(cls_list) == 2): - # return typing.Union[cls_list[0], cls_list[1]] + out_tuple = typing.Tuple[cls_list[0]] else: out_tuple = unpack_tuple(cls_list[0:2]) @@ -120,7 +122,7 @@ def unpack_tuple(*cls_list: typing.Type) -> typing.Tuple: for t in cls_list[2:]: out_tuple = typing.Tuple[out_tuple, t] - return out_tuple + return out_tuple def pretty_print_type_name(t: typing.Type) -> str: diff --git a/tests/conftest.py b/tests/conftest.py index dc1acddced..90371fc690 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -952,9 +952,9 @@ def filter_probs_df(dataset, use_cpp: bool): def _get_random_port(): import socket - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sckt: + sckt.bind(('', 0)) + return sckt.getsockname()[1] @pytest.fixture(scope="session")