Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine Component Interface #1106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from comps.cores.telemetry.opea_telemetry import opea_telemetry

# Common
from comps.cores.common.component import OpeaComponent, OpeaComponentController
from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader

# Statistics
from comps.cores.mega.base_statistics import statistics_dict, register_statistics
Expand Down
6 changes: 5 additions & 1 deletion comps/animation/src/integration/opea.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@

import requests

from comps import CustomLogger, OpeaComponent, ServiceType
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType

logger = CustomLogger("opea_animation")
logflag = os.getenv("LOGFLAG", False)


@OpeaComponentRegistry.register("OPEA_ANIMATION")
class OpeaAnimation(OpeaComponent):
"""A specialized animation component derived from OpeaComponent."""

def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.ANIMATION.name.lower(), description, config)
self.base_url = os.getenv("WAV2LIP_ENDPOINT", "http://localhost:7860")
health_status = self.check_health()
if not health_status:
logger.error("OpeaAnimation health check failed.")

def invoke(self, input: str):
"""Invokes the animation service to generate embeddings for the animation input.
Expand Down
26 changes: 8 additions & 18 deletions comps/animation/src/opea_animation_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time

# GenAIComps
from comps import CustomLogger, OpeaComponentController
from comps import CustomLogger, OpeaComponentLoader
from comps.animation.src.integration.opea import OpeaAnimation

logger = CustomLogger("opea_animation")
Expand All @@ -24,22 +24,12 @@
statistics_dict,
)

# Initialize OpeaComponentController
controller = OpeaComponentController()

# Register components
try:
# Instantiate Animation component and register it to controller
opea_animation = OpeaAnimation(
name="OpeaAnimation",
description="OPEA Animation Service",
)
controller.register(opea_animation)

# Discover and activate a healthy component
controller.discover_and_activate()
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
animation_component_name = os.getenv("ANIMATION_COMPONENT_NAME", "OPEA_ANIMATION")
# Initialize OpeaComponentLoader
loader = OpeaComponentLoader(
animation_component_name,
description=f"OPEA ANIMATION Component: {animation_component_name}",
)


# Register the microservice
Expand All @@ -56,7 +46,7 @@
def animate(audio: Base64ByteStrDoc):
start = time.time()

outfile = opea_animation.invoke(audio.byte_str)
outfile = loader.invoke(audio.byte_str)
if logflag:
logger.info(f"Video generated successfully, check {outfile} for the result.")

Expand Down
6 changes: 5 additions & 1 deletion comps/asr/src/integrations/opea_whisper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import requests
from fastapi import File, Form, UploadFile

from comps import CustomLogger, OpeaComponent, ServiceType
from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import AudioTranscriptionResponse

logger = CustomLogger("opea_whisper")
logflag = os.getenv("LOGFLAG", False)


@OpeaComponentRegistry.register("OPEA_WHISPER_ASR")
class OpeaWhisperAsr(OpeaComponent):
"""A specialized ASR (Automatic Speech Recognition) component derived from OpeaComponent for Whisper ASR services.

Expand All @@ -25,6 +26,9 @@ class OpeaWhisperAsr(OpeaComponent):
def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.ASR.name.lower(), description, config)
self.base_url = os.getenv("ASR_ENDPOINT", "http://localhost:7066")
health_status = self.check_health()
if not health_status:
logger.error("OpeaWhisperAsr health check failed.")

async def invoke(
self,
Expand Down
27 changes: 6 additions & 21 deletions comps/asr/src/opea_asr_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Base64ByteStrDoc,
CustomLogger,
LLMParamsDoc,
OpeaComponentController,
OpeaComponentLoader,
ServiceType,
opea_microservices,
register_microservice,
Expand All @@ -24,24 +24,9 @@
logger = CustomLogger("opea_asr_microservice")
logflag = os.getenv("LOGFLAG", False)

# Initialize OpeaComponentController
controller = OpeaComponentController()

# Register components
try:
# Instantiate ASR components
opea_whisper = OpeaWhisperAsr(
name="OpeaWhisperAsr",
description="OPEA Whisper ASR Service",
)

# Register components with the controller
controller.register(opea_whisper)

# Discover and activate a healthy component
controller.discover_and_activate()
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
asr_component_name = os.getenv("ASR_COMPONENT_NAME", "OPEA_WHISPER_ASR")
# Initialize OpeaComponentLoader
loader = OpeaComponentLoader(asr_component_name, description=f"OPEA ASR Component: {asr_component_name}")


@register_microservice(
Expand Down Expand Up @@ -69,8 +54,8 @@ async def audio_to_text(
logger.info("ASR file uploaded.")

try:
# Use the controller to invoke the active component
asr_response = await controller.invoke(
# Use the loader to invoke the component
asr_response = await loader.invoke(
file=file,
model=model,
language=language,
Expand Down
107 changes: 55 additions & 52 deletions comps/cores/common/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,75 +86,78 @@
return f"OpeaComponent(name={self.name}, type={self.type}, description={self.description})"


class OpeaComponentController(ABC):
"""The OpeaComponentController class serves as the base class for managing and orchestrating multiple
instances of components of the same type. It provides a unified interface for routing tasks,
registering components, and dynamically discovering available components.
class OpeaComponentRegistry:
"""Registry class to manage component instances.

Attributes:
components (dict): A dictionary to store registered components by their unique identifiers.
This registry allows storing, retrieving, and managing component instances by their names.
"""

def __init__(self):
"""Initializes the OpeaComponentController instance with an empty component registry."""
self.components = {}
self.active_component = None
_registry = {}

def register(self, component):
"""Registers an OpeaComponent instance to the controller.
@classmethod
def register(cls, name):
"""Decorator to register a component class with a specified name.

Args:
component (OpeaComponent): An instance of a subclass of OpeaComponent to be managed.
:param name: The name to associate with the component class
:return: Decorator function
"""

def decorator(component_class):
if name in cls._registry:
raise ValueError(f"A component with the name '{name}' is already registered.")

Check warning on line 107 in comps/cores/common/component.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/common/component.py#L107

Added line #L107 was not covered by tests
cls._registry[name] = component_class
return component_class

return decorator

Raises:
ValueError: If the component is already registered.
@classmethod
def get(cls, name):
"""Retrieve a component class by its name.

:param name: The name of the component class to retrieve
:return: The component class
"""
if component.name in self.components:
raise ValueError(f"Component '{component.name}' is already registered.")
logger.info(f"Registered component: {component.name}")
self.components[component.name] = component
if name not in cls._registry:
raise KeyError(f"No component found with the name '{name}'.")
return cls._registry[name]

def discover_and_activate(self):
"""Discovers healthy components and activates one.
@classmethod
def unregister(cls, name):
"""Remove a component class from the registry by its name.

If multiple components are healthy, it prioritizes the first registered component.
:param name: The name of the component class to remove
"""
for component in self.components.values():
if component.check_health():
self.active_component = component
logger.info(f"Activated component: {component.name}")
return
raise RuntimeError("No healthy components available.")
if name in cls._registry:
del cls._registry[name]

async def invoke(self, *args, **kwargs):
"""Invokes service accessing using the active component.

Args:
*args: Positional arguments.
**kwargs: Keyword arguments.
class OpeaComponentLoader:
"""Loader class to dynamically load and invoke components.

Returns:
Any: The result of the service accessing.
This loader retrieves components from the registry and invokes their functionality.
"""

Raises:
RuntimeError: If no active component is set.
def __init__(self, component_name, **kwargs):
"""Initialize the loader with a component retrieved from the registry and instantiate it.

:param component_name: The name of the component to load
:param kwargs: Additional parameters for the component's initialization
"""
if not self.active_component:
raise RuntimeError("No active component. Call 'discover_and_activate' first.")
return await self.active_component.invoke(*args, **kwargs)
kwargs["name"] = component_name

def list_components(self):
"""Lists all registered components.
# Retrieve the component class from the registry
component_class = OpeaComponentRegistry.get(component_name)

Returns:
list: A list of component names that are currently registered.
"""
return self.components.keys()
# Instantiate the component with the given arguments
self.component = component_class(**kwargs)

def __repr__(self):
"""Provides a string representation of the controller and its registered components.
async def invoke(self, *args, **kwargs):
"""Invoke the loaded component's execute method.

Returns:
str: A string representation of the OpeaComponentController instance.
:param args: Positional arguments for the invoke method
:param kwargs: Keyword arguments for the invoke method
:return: The result of the component's invoke method
"""
return f"OpeaComponentController(registered_components={self.list_components()})"
if not hasattr(self.component, "invoke"):
raise AttributeError(f"The component '{self.component}' does not have an 'invoke' method.")

Check warning on line 162 in comps/cores/common/component.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/common/component.py#L162

Added line #L162 was not covered by tests
return await self.component.invoke(*args, **kwargs)
6 changes: 5 additions & 1 deletion comps/dataprep/src/integrations/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from langchain_milvus.vectorstores import Milvus
from langchain_text_splitters import HTMLHeaderTextSplitter

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -157,6 +157,7 @@ def delete_by_partition_field(my_milvus, partition_field):
logger.info(f"[ delete partition ] delete success: {res}")


@OpeaComponentRegistry.register("OPEA_DATAPREP_MILVUS")
class OpeaMilvusDataprep(OpeaComponent):
"""A specialized dataprep component derived from OpeaComponent for milvus dataprep services.

Expand All @@ -167,6 +168,9 @@ class OpeaMilvusDataprep(OpeaComponent):
def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config)
self.embedder = self._initialize_embedder()
health_status = self.check_health()
if not health_status:
logger.error("OpeaMilvusDataprep health check failed.")

def _initialize_embedder(self):
if logflag:
Expand Down
6 changes: 5 additions & 1 deletion comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from redis.commands.search.field import TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

from comps import CustomLogger, DocPath, OpeaComponent, ServiceType
from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -215,6 +215,7 @@ def ingest_data_to_redis(doc_path: DocPath):
return ingest_chunks_to_redis(file_name, chunks)


@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
class OpeaRedisDataprep(OpeaComponent):
"""A specialized dataprep component derived from OpeaComponent for redis dataprep services.

Expand All @@ -227,6 +228,9 @@ def __init__(self, name: str, description: str, config: dict = None):
self.client = self._initialize_client()
self.data_index_client = self.client.ft(INDEX_NAME)
self.key_index_client = self.client.ft(KEY_INDEX_NAME)
health_status = self.check_health()
if not health_status:
logger.error("OpeaRedisDataprep health check failed.")

def _initialize_client(self) -> redis.Redis:
if logflag:
Expand Down
33 changes: 0 additions & 33 deletions comps/dataprep/src/opea_dataprep_controller.py

This file was deleted.

Loading
Loading