diff --git a/docs/reference/online-stores/overview.md b/docs/reference/online-stores/overview.md index 04d24447058..b54329ad613 100644 --- a/docs/reference/online-stores/overview.md +++ b/docs/reference/online-stores/overview.md @@ -34,21 +34,21 @@ Details for each specific online store, such as how to configure it in a `featur Below is a matrix indicating which online stores support what functionality. -| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) | -| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | -| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | -| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Java | no | yes | no | no | no | no | no | no | no | -| readable by Go | yes | yes | no | no | no | no | no | no | no | -| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | -| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | -| support for deleting expired data | no | yes | no | no | no | no | no | no | no | -| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | -| collocated by feature service | no | no | no | no | no | no | no | no | no | -| collocated by entity key | no | yes | no | no | no | no | no | no | yes | +| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) | Milvus | +| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:-------| +| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no | +| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Java | no | yes | no | no | no | no | no | no | no | no | +| readable by Go | yes | yes | no | no | no | no | no | no | no | no | +| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | no | +| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no | +| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | no | +| support for deleting expired data | no | yes | no | no | no | no | no | no | no | no | +| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | no | +| collocated by feature service | no | no | no | no | no | no | no | no | no | no | +| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no | diff --git a/examples/rag/feature_repo/test_workflow.py b/examples/rag/feature_repo/test_workflow.py index df7e20f2123..05cd554d823 100644 --- a/examples/rag/feature_repo/test_workflow.py +++ b/examples/rag/feature_repo/test_workflow.py @@ -55,8 +55,7 @@ def run_demo(): query = query_embedding.detach().cpu().numpy().tolist()[0] # Retrieve top k documents - features = store.retrieve_online_documents( - feature=None, + features = store.retrieve_online_documents_v2( features=[ "city_embeddings:vector", "city_embeddings:item_id", diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e39db6d3a34..cbe5f6c7dcc 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -127,7 +127,6 @@ def _connect(self, config: RepoConfig) -> MilvusClient: if not self.client: if config.provider == "local": db_path = self._get_db_path(config) - print(f"Connecting to Milvus in local mode using {db_path}") self.client = MilvusClient(db_path) else: self.client = MilvusClient( @@ -138,8 +137,11 @@ def _connect(self, config: RepoConfig) -> MilvusClient: ) return self.client - def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, Any]: + def _get_or_create_collection( + self, config: RepoConfig, table: FeatureView + ) -> Dict[str, Any]: self.client = self._connect(config) + vector_field_dict = {k.name: k for k in table.schema if k.vector_index} collection_name = _table_id(config.project, table) if collection_name not in self._collections: # Create a composite key by combining entity fields @@ -200,10 +202,13 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, A DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR, ]: + metric = vector_field_dict[ + vector_field.name + ].vector_search_metric index_params.add_index( collection_name=collection_name, field_name=vector_field.name, - metric_type=config.online_store.metric_type, + metric_type=metric or config.online_store.metric_type, index_type=config.online_store.index_type, index_name=f"vector_index_{vector_field.name}", params={"nlist": config.online_store.nlist}, @@ -234,7 +239,7 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: self.client = self._connect(config) - collection = self._get_collection(config, table) + collection = self._get_or_create_collection(config, table) vector_cols = [f.name for f in table.features if f.vector_index] entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: @@ -301,7 +306,7 @@ def update( ): self.client = self._connect(config) for table in tables_to_keep: - self._collections = self._get_collection(config, table) + self._collections = self._get_or_create_collection(config, table) for table in tables_to_delete: collection_name = _table_id(config.project, table) @@ -347,7 +352,7 @@ def retrieve_online_documents_v2( } self.client = self._connect(config) collection_name = _table_id(config.project, table) - collection = self._get_collection(config, table) + collection = self._get_or_create_collection(config, table) if not config.online_store.vector_enabled: raise ValueError("Vector search is not enabled in the online store config") @@ -408,11 +413,10 @@ def retrieve_online_documents_v2( ) for field in output_fields: val = ValueProto() + field_value = hit.get("entity", {}).get(field, None) # entity_key_proto = None if field in ["created_ts", "event_ts"]: - res_ts = datetime.fromtimestamp( - hit.get("entity", {}).get(field) / 1e6 - ) + res_ts = datetime.fromtimestamp(field_value / 1e6) elif field == ann_search_field: serialized_embedding = _serialize_vector_to_float_list( embedding @@ -426,15 +430,14 @@ def retrieve_online_documents_v2( PrimitiveFeastType.INT32, PrimitiveFeastType.BYTES, ]: - res[field] = ValueProto( - string_val=hit.get("entity", {}).get(field, "") - ) + res[field] = ValueProto(string_val=field_value) elif field == composite_key_name: pass + elif isinstance(field_value, bytes): + val.ParseFromString(field_value) + res[field] = val else: - val.ParseFromString( - bytes(hit.get("entity", {}).get(field, b"").encode()) - ) + val.string_val = field_value res[field] = val distance = hit.get("distance", None) res["distance"] = ( @@ -471,7 +474,7 @@ def _extract_proto_values_to_dict( else: vector_values = getattr(feature_values, proto_val_type).val else: - if serialize_to_string: + if serialize_to_string and proto_val_type != "string_val": vector_values = feature_values.SerializeToString().decode() else: vector_values = getattr(feature_values, proto_val_type)