Skip to content

Commit

Permalink
updated and fixed bug
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <[email protected]>
  • Loading branch information
franciscojavierarceo committed Jan 29, 2025
1 parent 9d2d0f3 commit 8360bbb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 36 deletions.
36 changes: 18 additions & 18 deletions docs/reference/online-stores/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ Details for each specific online store, such as how to configure it in a `featur

Below is a matrix indicating which online stores support what functionality.

| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes |
| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) | Milvus |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:-------|
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | no |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no |
3 changes: 1 addition & 2 deletions examples/rag/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ def run_demo():
query = query_embedding.detach().cpu().numpy().tolist()[0]

# Retrieve top k documents
features = store.retrieve_online_documents(
feature=None,
features = store.retrieve_online_documents_v2(
features=[
"city_embeddings:vector",
"city_embeddings:item_id",
Expand Down
35 changes: 19 additions & 16 deletions sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
if not self.client:
if config.provider == "local":
db_path = self._get_db_path(config)
print(f"Connecting to Milvus in local mode using {db_path}")
self.client = MilvusClient(db_path)
else:
self.client = MilvusClient(
Expand All @@ -138,8 +137,11 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
)
return self.client

def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, Any]:
def _get_or_create_collection(
self, config: RepoConfig, table: FeatureView
) -> Dict[str, Any]:
self.client = self._connect(config)
vector_field_dict = {k.name: k for k in table.schema if k.vector_index}
collection_name = _table_id(config.project, table)
if collection_name not in self._collections:
# Create a composite key by combining entity fields
Expand Down Expand Up @@ -200,10 +202,13 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, A
DataType.FLOAT_VECTOR,
DataType.BINARY_VECTOR,
]:
metric = vector_field_dict[
vector_field.name
].vector_search_metric
index_params.add_index(
collection_name=collection_name,
field_name=vector_field.name,
metric_type=config.online_store.metric_type,
metric_type=metric or config.online_store.metric_type,
index_type=config.online_store.index_type,
index_name=f"vector_index_{vector_field.name}",
params={"nlist": config.online_store.nlist},
Expand Down Expand Up @@ -234,7 +239,7 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
self.client = self._connect(config)
collection = self._get_collection(config, table)
collection = self._get_or_create_collection(config, table)
vector_cols = [f.name for f in table.features if f.vector_index]
entity_batch_to_insert = []
for entity_key, values_dict, timestamp, created_ts in data:
Expand Down Expand Up @@ -301,7 +306,7 @@ def update(
):
self.client = self._connect(config)
for table in tables_to_keep:
self._collections = self._get_collection(config, table)
self._collections = self._get_or_create_collection(config, table)

for table in tables_to_delete:
collection_name = _table_id(config.project, table)
Expand Down Expand Up @@ -347,7 +352,7 @@ def retrieve_online_documents_v2(
}
self.client = self._connect(config)
collection_name = _table_id(config.project, table)
collection = self._get_collection(config, table)
collection = self._get_or_create_collection(config, table)
if not config.online_store.vector_enabled:
raise ValueError("Vector search is not enabled in the online store config")

Expand Down Expand Up @@ -408,11 +413,10 @@ def retrieve_online_documents_v2(
)
for field in output_fields:
val = ValueProto()
field_value = hit.get("entity", {}).get(field, None)
# entity_key_proto = None
if field in ["created_ts", "event_ts"]:
res_ts = datetime.fromtimestamp(
hit.get("entity", {}).get(field) / 1e6
)
res_ts = datetime.fromtimestamp(field_value / 1e6)
elif field == ann_search_field:
serialized_embedding = _serialize_vector_to_float_list(
embedding
Expand All @@ -426,15 +430,14 @@ def retrieve_online_documents_v2(
PrimitiveFeastType.INT32,
PrimitiveFeastType.BYTES,
]:
res[field] = ValueProto(
string_val=hit.get("entity", {}).get(field, "")
)
res[field] = ValueProto(string_val=field_value)
elif field == composite_key_name:
pass
elif isinstance(field_value, bytes):
val.ParseFromString(field_value)
res[field] = val
else:
val.ParseFromString(
bytes(hit.get("entity", {}).get(field, b"").encode())
)
val.string_val = field_value
res[field] = val
distance = hit.get("distance", None)
res["distance"] = (
Expand Down Expand Up @@ -471,7 +474,7 @@ def _extract_proto_values_to_dict(
else:
vector_values = getattr(feature_values, proto_val_type).val
else:
if serialize_to_string:
if serialize_to_string and proto_val_type != "string_val":
vector_values = feature_values.SerializeToString().decode()
else:
vector_values = getattr(feature_values, proto_val_type)
Expand Down

0 comments on commit 8360bbb

Please sign in to comment.