Skip to content

Commit

Permalink
Refine Component Interface (#1106)
Browse files Browse the repository at this point in the history
* Refine component interface

Signed-off-by: lvliang-intel <[email protected]>

* update env

Signed-off-by: lvliang-intel <[email protected]>

* add health check

Signed-off-by: lvliang-intel <[email protected]>

* update mulimodal embedding

Signed-off-by: lvliang-intel <[email protected]>

* update import

Signed-off-by: lvliang-intel <[email protected]>

* refine other components

Signed-off-by: lvliang-intel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix dataprepissue

Signed-off-by: lvliang-intel <[email protected]>

* fix tts issue

Signed-off-by: lvliang-intel <[email protected]>

* fix ci issues

Signed-off-by: lvliang-intel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix tts response issue

Signed-off-by: lvliang-intel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix comments

Signed-off-by: lvliang-intel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: lvliang-intel <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent cf90932 commit bf09739
Show file tree
Hide file tree
Showing 45 changed files with 380 additions and 474 deletions.
2 changes: 1 addition & 1 deletion comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from comps.cores.telemetry.opea_telemetry import opea_telemetry

# Common
from comps.cores.common.component import OpeaComponent, OpeaComponentController
from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader

# Statistics
from comps.cores.mega.base_statistics import statistics_dict, register_statistics
Expand Down
6 changes: 5 additions & 1 deletion comps/animation/src/integration/opea.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@

import requests

from comps import CustomLogger, OpeaComponent, ServiceType
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType

logger = CustomLogger("opea_animation")
logflag = os.getenv("LOGFLAG", False)


@OpeaComponentRegistry.register("OPEA_ANIMATION")
class OpeaAnimation(OpeaComponent):
"""A specialized animation component derived from OpeaComponent."""

def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.ANIMATION.name.lower(), description, config)
self.base_url = os.getenv("WAV2LIP_ENDPOINT", "http://localhost:7860")
health_status = self.check_health()
if not health_status:
logger.error("OpeaAnimation health check failed.")

def invoke(self, input: str):
"""Invokes the animation service to generate embeddings for the animation input.
Expand Down
26 changes: 8 additions & 18 deletions comps/animation/src/opea_animation_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time

# GenAIComps
from comps import CustomLogger, OpeaComponentController
from comps import CustomLogger, OpeaComponentLoader
from comps.animation.src.integration.opea import OpeaAnimation

logger = CustomLogger("opea_animation")
Expand All @@ -24,22 +24,12 @@
statistics_dict,
)

# Initialize OpeaComponentController
controller = OpeaComponentController()

# Register components
try:
# Instantiate Animation component and register it to controller
opea_animation = OpeaAnimation(
name="OpeaAnimation",
description="OPEA Animation Service",
)
controller.register(opea_animation)

# Discover and activate a healthy component
controller.discover_and_activate()
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
animation_component_name = os.getenv("ANIMATION_COMPONENT_NAME", "OPEA_ANIMATION")
# Initialize OpeaComponentLoader
loader = OpeaComponentLoader(
animation_component_name,
description=f"OPEA ANIMATION Component: {animation_component_name}",
)


# Register the microservice
Expand All @@ -56,7 +46,7 @@
def animate(audio: Base64ByteStrDoc):
start = time.time()

outfile = opea_animation.invoke(audio.byte_str)
outfile = loader.invoke(audio.byte_str)
if logflag:
logger.info(f"Video generated successfully, check {outfile} for the result.")

Expand Down
6 changes: 5 additions & 1 deletion comps/asr/src/integrations/opea_whisper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import requests
from fastapi import File, Form, UploadFile

from comps import CustomLogger, OpeaComponent, ServiceType
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import AudioTranscriptionResponse

logger = CustomLogger("opea_whisper")
logflag = os.getenv("LOGFLAG", False)


@OpeaComponentRegistry.register("OPEA_WHISPER_ASR")
class OpeaWhisperAsr(OpeaComponent):
"""A specialized ASR (Automatic Speech Recognition) component derived from OpeaComponent for Whisper ASR services.
Expand All @@ -25,6 +26,9 @@ class OpeaWhisperAsr(OpeaComponent):
def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.ASR.name.lower(), description, config)
self.base_url = os.getenv("ASR_ENDPOINT", "http://localhost:7066")
health_status = self.check_health()
if not health_status:
logger.error("OpeaWhisperAsr health check failed.")

async def invoke(
self,
Expand Down
27 changes: 6 additions & 21 deletions comps/asr/src/opea_asr_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Base64ByteStrDoc,
CustomLogger,
LLMParamsDoc,
OpeaComponentController,
OpeaComponentLoader,
ServiceType,
opea_microservices,
register_microservice,
Expand All @@ -24,24 +24,9 @@
logger = CustomLogger("opea_asr_microservice")
logflag = os.getenv("LOGFLAG", False)

# Initialize OpeaComponentController
controller = OpeaComponentController()

# Register components
try:
# Instantiate ASR components
opea_whisper = OpeaWhisperAsr(
name="OpeaWhisperAsr",
description="OPEA Whisper ASR Service",
)

# Register components with the controller
controller.register(opea_whisper)

# Discover and activate a healthy component
controller.discover_and_activate()
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
asr_component_name = os.getenv("ASR_COMPONENT_NAME", "OPEA_WHISPER_ASR")
# Initialize OpeaComponentLoader
loader = OpeaComponentLoader(asr_component_name, description=f"OPEA ASR Component: {asr_component_name}")


@register_microservice(
Expand Down Expand Up @@ -69,8 +54,8 @@ async def audio_to_text(
logger.info("ASR file uploaded.")

try:
# Use the controller to invoke the active component
asr_response = await controller.invoke(
# Use the loader to invoke the component
asr_response = await loader.invoke(
file=file,
model=model,
language=language,
Expand Down
107 changes: 55 additions & 52 deletions comps/cores/common/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,75 +86,78 @@ def __repr__(self):
return f"OpeaComponent(name={self.name}, type={self.type}, description={self.description})"


class OpeaComponentController(ABC):
"""The OpeaComponentController class serves as the base class for managing and orchestrating multiple
instances of components of the same type. It provides a unified interface for routing tasks,
registering components, and dynamically discovering available components.
class OpeaComponentRegistry:
"""Registry class to manage component instances.
Attributes:
components (dict): A dictionary to store registered components by their unique identifiers.
This registry allows storing, retrieving, and managing component instances by their names.
"""

def __init__(self):
"""Initializes the OpeaComponentController instance with an empty component registry."""
self.components = {}
self.active_component = None
_registry = {}

def register(self, component):
"""Registers an OpeaComponent instance to the controller.
@classmethod
def register(cls, name):
"""Decorator to register a component class with a specified name.
Args:
component (OpeaComponent): An instance of a subclass of OpeaComponent to be managed.
:param name: The name to associate with the component class
:return: Decorator function
"""

def decorator(component_class):
if name in cls._registry:
raise ValueError(f"A component with the name '{name}' is already registered.")
cls._registry[name] = component_class
return component_class

return decorator

Raises:
ValueError: If the component is already registered.
@classmethod
def get(cls, name):
"""Retrieve a component class by its name.
:param name: The name of the component class to retrieve
:return: The component class
"""
if component.name in self.components:
raise ValueError(f"Component '{component.name}' is already registered.")
logger.info(f"Registered component: {component.name}")
self.components[component.name] = component
if name not in cls._registry:
raise KeyError(f"No component found with the name '{name}'.")
return cls._registry[name]

def discover_and_activate(self):
"""Discovers healthy components and activates one.
@classmethod
def unregister(cls, name):
"""Remove a component class from the registry by its name.
If multiple components are healthy, it prioritizes the first registered component.
:param name: The name of the component class to remove
"""
for component in self.components.values():
if component.check_health():
self.active_component = component
logger.info(f"Activated component: {component.name}")
return
raise RuntimeError("No healthy components available.")
if name in cls._registry:
del cls._registry[name]

async def invoke(self, *args, **kwargs):
"""Invokes service accessing using the active component.

Args:
*args: Positional arguments.
**kwargs: Keyword arguments.
class OpeaComponentLoader:
"""Loader class to dynamically load and invoke components.
Returns:
Any: The result of the service accessing.
This loader retrieves components from the registry and invokes their functionality.
"""

Raises:
RuntimeError: If no active component is set.
def __init__(self, component_name, **kwargs):
"""Initialize the loader with a component retrieved from the registry and instantiate it.
:param component_name: The name of the component to load
:param kwargs: Additional parameters for the component's initialization
"""
if not self.active_component:
raise RuntimeError("No active component. Call 'discover_and_activate' first.")
return await self.active_component.invoke(*args, **kwargs)
kwargs["name"] = component_name

def list_components(self):
"""Lists all registered components.
# Retrieve the component class from the registry
component_class = OpeaComponentRegistry.get(component_name)

Returns:
list: A list of component names that are currently registered.
"""
return self.components.keys()
# Instantiate the component with the given arguments
self.component = component_class(**kwargs)

def __repr__(self):
"""Provides a string representation of the controller and its registered components.
async def invoke(self, *args, **kwargs):
"""Invoke the loaded component's execute method.
Returns:
str: A string representation of the OpeaComponentController instance.
:param args: Positional arguments for the invoke method
:param kwargs: Keyword arguments for the invoke method
:return: The result of the component's invoke method
"""
return f"OpeaComponentController(registered_components={self.list_components()})"
if not hasattr(self.component, "invoke"):
raise AttributeError(f"The component '{self.component}' does not have an 'invoke' method.")
return await self.component.invoke(*args, **kwargs)
6 changes: 5 additions & 1 deletion comps/dataprep/src/integrations/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from langchain_milvus.vectorstores import Milvus
from langchain_text_splitters import HTMLHeaderTextSplitter

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -157,6 +157,7 @@ def delete_by_partition_field(my_milvus, partition_field):
logger.info(f"[ delete partition ] delete success: {res}")


@OpeaComponentRegistry.register("OPEA_DATAPREP_MILVUS")
class OpeaMilvusDataprep(OpeaComponent):
"""A specialized dataprep component derived from OpeaComponent for milvus dataprep services.
Expand All @@ -167,6 +168,9 @@ class OpeaMilvusDataprep(OpeaComponent):
def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config)
self.embedder = self._initialize_embedder()
health_status = self.check_health()
if not health_status:
logger.error("OpeaMilvusDataprep health check failed.")

def _initialize_embedder(self):
if logflag:
Expand Down
6 changes: 5 additions & 1 deletion comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from redis.commands.search.field import TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -215,6 +215,7 @@ def ingest_data_to_redis(doc_path: DocPath):
return ingest_chunks_to_redis(file_name, chunks)


@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
class OpeaRedisDataprep(OpeaComponent):
"""A specialized dataprep component derived from OpeaComponent for redis dataprep services.
Expand All @@ -227,6 +228,9 @@ def __init__(self, name: str, description: str, config: dict = None):
self.client = self._initialize_client()
self.data_index_client = self.client.ft(INDEX_NAME)
self.key_index_client = self.client.ft(KEY_INDEX_NAME)
health_status = self.check_health()
if not health_status:
logger.error("OpeaRedisDataprep health check failed.")

def _initialize_client(self) -> redis.Redis:
if logflag:
Expand Down
33 changes: 0 additions & 33 deletions comps/dataprep/src/opea_dataprep_controller.py

This file was deleted.

Loading

0 comments on commit bf09739

Please sign in to comment.