Skip to content

Commit

Permalink
Fix the retriever issue of Milvus (opea-project#1286)
Browse files Browse the repository at this point in the history
* Fix the retriever issue of Milvus DB that data can not be retrieved
after ingested using dataprep.

Signed-off-by: letonghan <[email protected]>

---------

Signed-off-by: letonghan <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
letonghan and pre-commit-ci[bot] authored Feb 14, 2025
1 parent 0e3f8ab commit 47f68a4
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 59 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ The initially supported `Microservices` are described in the below table. More `
A `Microservices` can be created by using the decorator `register_microservice`. Taking the `embedding microservice` as an example:

```python
from langchain_community.embeddings import HuggingFaceHubEmbeddings

from comps import register_microservice, EmbedDoc, ServiceType, TextDoc


Expand Down
7 changes: 7 additions & 0 deletions comps/retrievers/deployment/docker_compose/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

include:
- ../../../third_parties/elasticsearch/deployment/docker_compose/compose.yaml
- ../../../third_parties/milvus/deployment/docker_compose/compose.yaml
- ../../../third_parties/opensearch/deployment/docker_compose/compose.yaml
- ../../../third_parties/neo4j/deployment/docker_compose/compose.yaml
- ../../../third_parties/pgvector/deployment/docker_compose/compose.yaml
Expand Down Expand Up @@ -49,6 +50,12 @@ services:
depends_on:
tei-embedding-serving:
condition: service_healthy
standalone:
condition: service_healthy
etcd:
condition: service_healthy
minio:
condition: service_healthy

retriever-neo4j:
extends: retriever
Expand Down
5 changes: 4 additions & 1 deletion comps/retrievers/src/README_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Please refer to this [readme](../../third_parties/elasticsearch/src/README.md).

```bash
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_ELASTICSEARCH"
python opea_retrievers_microservice.py
```
Expand All @@ -64,6 +65,7 @@ export ES_CONNECTION_STRING="http://localhost:9200"
export INDEX_NAME=${your_index_name}
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_ELASTICSEARCH"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
```

### 2.2 Build Docker Image
Expand All @@ -83,7 +85,8 @@ You can choose one as needed.
### 2.3 Run Docker with CLI (Option A)

```bash
docker run -d --name="retriever-elasticsearch" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/retriever:latest
docker run -d --name="retriever-elasticsearch" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_EMBEDDING_ENDPOINT=${TEI_EMBEDDING_ENDPOINT}
-e HUGGINGFACEHUB_API_TOKEN=${HUGGINGFACEHUB_API_TOKEN} opea/retriever:latest
```

### 2.4 Run Docker with Docker Compose (Option B)
Expand Down
5 changes: 3 additions & 2 deletions comps/retrievers/src/README_milvus.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ export MILVUS_HOST=${your_milvus_host_ip}
export MILVUS_PORT=19530
export COLLECTION_NAME=${your_collection_name}
export TEI_EMBEDDING_ENDPOINT=${your_emdding_endpoint}
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
```

### Start Retriever Service

```bash
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:${your_embedding_port}"
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_MILVUS"
python opea_retrievers_microservice.py
```
Expand All @@ -44,7 +45,7 @@ docker build -t opea/retriever:latest --build-arg https_proxy=$https_proxy --bui
### Run Docker with CLI (Option A)

```bash
docker run -d --name="retriever-milvus-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e TEI_EMBEDDING_ENDPOINT=${your_emdding_endpoint} -e MILVUS_HOST=${your_milvus_host_ip} -e RETRIEVER_COMPONENT_NAME=$RETRIEVER_COMPONENT_NAME opea/retriever:latest
docker run -d --name="retriever-milvus-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e TEI_EMBEDDING_ENDPOINT=${your_emdding_endpoint} -e MILVUS_HOST=${your_milvus_host_ip} -e HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token} -e RETRIEVER_COMPONENT_NAME=$RETRIEVER_COMPONENT_NAME opea/retriever:latest
```

### Run Docker with Docker Compose (Option B)
Expand Down
1 change: 1 addition & 0 deletions comps/retrievers/src/README_opensearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Please refer to this [readme](../../third_parties/opensearch/src/README.md).
```bash
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_OPENSEARCH"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_token}
python opea_retrievers_microservice.py
```

Expand Down
4 changes: 3 additions & 1 deletion comps/retrievers/src/README_vdms.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Please refer to this [readme](../../third_parties/vdms/src/README.md).

```bash
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_VDMS"
python opea_retrievers_microservice.py
```
Expand All @@ -77,6 +78,7 @@ python opea_retrievers_microservice.py
export RETRIEVE_MODEL_ID="BAAI/bge-base-en-v1.5"
export INDEX_NAME=${your_index_name or collection_name}
export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
export RETRIEVER_COMPONENT_NAME="OPEA_RETRIEVER_VDMS"
```

Expand All @@ -97,7 +99,7 @@ You can choose one as needed.
### 2.3 Run Docker with CLI (Option A)

```bash
docker run -d --name="retriever-vdms-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e INDEX_NAME=$INDEX_NAME -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e RETRIEVER_COMPONENT_NAME=$RETRIEVER_COMPONENT_NAME opea/retriever:latest
docker run -d --name="retriever-vdms-server" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e INDEX_NAME=$INDEX_NAME -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=${HUGGINGFACEHUB_API_TOKEN} -e RETRIEVER_COMPONENT_NAME=$RETRIEVER_COMPONENT_NAME opea/retriever:latest
```

### 2.4 Run Docker with Docker Compose (Option B)
Expand Down
4 changes: 2 additions & 2 deletions comps/retrievers/src/integrations/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def get_boolean_env_var(var_name, default_value=False):
# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")
LOCAL_EMBEDDING_MODEL = os.getenv("LOCAL_EMBEDDING_MODEL", "maidalun1020/bce-embedding-base_v1")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", "")
BRIDGE_TOWER_EMBEDDING = os.getenv("BRIDGE_TOWER_EMBEDDING", False)
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")

# Directory pathss
current_file_path = os.path.abspath(__file__)
Expand Down Expand Up @@ -122,7 +123,6 @@ def format_redis_conn_from_env():
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "rag_milvus")
# TEI configuration
TEI_EMBEDDING_MODEL = os.environ.get("TEI_EMBEDDING_MODEL", "/home/user/bce-embedding-base_v1")
TEI_EMBEDDING_ENDPOINT = os.environ.get("TEI_EMBEDDING_ENDPOINT", "")
os.environ["OPENAI_API_BASE"] = TEI_EMBEDDING_ENDPOINT
os.environ["OPENAI_API_KEY"] = "Dummy key"

Expand Down
22 changes: 19 additions & 3 deletions comps/retrievers/src/integrations/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import os

from elasticsearch import Elasticsearch
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from fastapi import HTTPException
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings
from langchain_elasticsearch import ElasticsearchStore

from comps import CustomLogger, EmbedDoc, OpeaComponent, OpeaComponentRegistry, ServiceType

from .config import EMBED_MODEL, ES_CONNECTION_STRING, ES_INDEX_NAME, TEI_EMBEDDING_ENDPOINT
from .config import EMBED_MODEL, ES_CONNECTION_STRING, ES_INDEX_NAME, HUGGINGFACEHUB_API_TOKEN, TEI_EMBEDDING_ENDPOINT

logger = CustomLogger("es_retrievers")
logflag = os.getenv("LOGFLAG", False)
Expand Down Expand Up @@ -40,7 +41,22 @@ def _initialize_embedder(self):
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ init embedder ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embeddings = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
if not HUGGINGFACEHUB_API_TOKEN:
raise HTTPException(
status_code=400,
detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.",
)
import requests

response = requests.get(TEI_EMBEDDING_ENDPOINT + "/info")
if response.status_code != 200:
raise HTTPException(
status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available."
)
model_id = response.json()["model_id"]
embeddings = HuggingFaceInferenceAPIEmbeddings(
api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT
)
else:
# create embeddings using local embedding model
if logflag:
Expand Down
56 changes: 46 additions & 10 deletions comps/retrievers/src/integrations/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@


import os
from typing import List, Optional

from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from fastapi import HTTPException
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings
from langchain_milvus.vectorstores import Milvus

from comps import CustomLogger, EmbedDoc, OpeaComponent, OpeaComponentRegistry, ServiceType

from .config import COLLECTION_NAME, INDEX_PARAMS, LOCAL_EMBEDDING_MODEL, MILVUS_URI, TEI_EMBEDDING_ENDPOINT
from .config import (
COLLECTION_NAME,
HUGGINGFACEHUB_API_TOKEN,
INDEX_PARAMS,
LOCAL_EMBEDDING_MODEL,
MILVUS_URI,
TEI_EMBEDDING_ENDPOINT,
)

logger = CustomLogger("milvus_retrievers")
logflag = os.getenv("LOGFLAG", False)
Expand All @@ -28,7 +35,6 @@ def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.RETRIEVER.name.lower(), description, config)

self.embedder = self._initialize_embedder()
self.client = self._initialize_client()
health_status = self.check_health()
if not health_status:
logger.error("OpeaMilvusRetriever health check failed.")
Expand All @@ -38,7 +44,22 @@ def _initialize_embedder(self):
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ init embedder ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embeddings = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
if not HUGGINGFACEHUB_API_TOKEN:
raise HTTPException(
status_code=400,
detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.",
)
import requests

response = requests.get(TEI_EMBEDDING_ENDPOINT + "/info")
if response.status_code != 200:
raise HTTPException(
status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available."
)
model_id = response.json()["model_id"]
embeddings = HuggingFaceInferenceAPIEmbeddings(
api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT
)
else:
# create embeddings using local embedding model
if logflag:
Expand Down Expand Up @@ -70,7 +91,14 @@ def check_health(self) -> bool:
if logflag:
logger.info("[ check health ] start to check health of milvus")
try:
_ = self.client.client.list_collections()
client = Milvus(
embedding_function=self.embedder,
collection_name=COLLECTION_NAME,
connection_args={"uri": MILVUS_URI},
index_params=INDEX_PARAMS,
auto_id=True,
)
_ = client.client.list_collections()
if logflag:
logger.info("[ check health ] Successfully connected to Milvus!")
return True
Expand All @@ -89,21 +117,29 @@ async def invoke(self, input: EmbedDoc) -> list:
if logflag:
logger.info(input)

my_milvus = Milvus(
embedding_function=self.embedder,
collection_name=COLLECTION_NAME,
connection_args={"uri": MILVUS_URI},
index_params=INDEX_PARAMS,
auto_id=True,
)

if input.search_type == "similarity":
search_res = await self.client.asimilarity_search_by_vector(embedding=input.embedding, k=input.k)
search_res = await my_milvus.asimilarity_search_by_vector(embedding=input.embedding, k=input.k)
elif input.search_type == "similarity_distance_threshold":
if input.distance_threshold is None:
raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever")
search_res = await self.client.asimilarity_search_by_vector(
search_res = await my_milvus.asimilarity_search_by_vector(
embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold
)
elif input.search_type == "similarity_score_threshold":
docs_and_similarities = await self.client.asimilarity_search_with_relevance_scores(
docs_and_similarities = await my_milvus.asimilarity_search_with_relevance_scores(
query=input.text, k=input.k, score_threshold=input.score_threshold
)
search_res = [doc for doc, _ in docs_and_similarities]
elif input.search_type == "mmr":
search_res = await self.client.amax_marginal_relevance_search(
search_res = await my_milvus.amax_marginal_relevance_search(
query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult
)

Expand Down
21 changes: 19 additions & 2 deletions comps/retrievers/src/integrations/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from typing import Callable, List, Union

import numpy as np
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from fastapi import HTTPException
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings
from langchain_community.vectorstores import OpenSearchVectorSearch
from pydantic import conlist

Expand All @@ -15,6 +16,7 @@

from .config import (
EMBED_MODEL,
HUGGINGFACEHUB_API_TOKEN,
OPENSEARCH_INDEX_NAME,
OPENSEARCH_INITIAL_ADMIN_PASSWORD,
OPENSEARCH_URL,
Expand Down Expand Up @@ -49,7 +51,22 @@ def _initialize_embedder(self):
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ init embedder ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embeddings = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
if not HUGGINGFACEHUB_API_TOKEN:
raise HTTPException(
status_code=400,
detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.",
)
import requests

response = requests.get(TEI_EMBEDDING_ENDPOINT + "/info")
if response.status_code != 200:
raise HTTPException(
status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available."
)
model_id = response.json()["model_id"]
embeddings = HuggingFaceInferenceAPIEmbeddings(
api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT
)
else:
# create embeddings using local embedding model
if logflag:
Expand Down
22 changes: 19 additions & 3 deletions comps/retrievers/src/integrations/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

import os

from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from fastapi import HTTPException
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings
from langchain_community.vectorstores import PGVector

from comps import CustomLogger, EmbedDoc, OpeaComponent, OpeaComponentRegistry, ServiceType

from .config import EMBED_MODEL, PG_CONNECTION_STRING, PG_INDEX_NAME, TEI_EMBEDDING_ENDPOINT
from .config import EMBED_MODEL, HUGGINGFACEHUB_API_TOKEN, PG_CONNECTION_STRING, PG_INDEX_NAME, TEI_EMBEDDING_ENDPOINT

logger = CustomLogger("pgvector_retrievers")
logflag = os.getenv("LOGFLAG", False)
Expand Down Expand Up @@ -39,7 +40,22 @@ def _initialize_embedder(self):
# create embeddings using TEI endpoint service
if logflag:
logger.info(f"[ init embedder ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}")
embeddings = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT)
if not HUGGINGFACEHUB_API_TOKEN:
raise HTTPException(
status_code=400,
detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.",
)
import requests

response = requests.get(TEI_EMBEDDING_ENDPOINT + "/info")
if response.status_code != 200:
raise HTTPException(
status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available."
)
model_id = response.json()["model_id"]
embeddings = HuggingFaceInferenceAPIEmbeddings(
api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT
)
else:
# create embeddings using local embedding model
if logflag:
Expand Down
Loading

0 comments on commit 47f68a4

Please sign in to comment.