Skip to content

Commit

Permalink
vdk-data-sources: registration
Browse files Browse the repository at this point in the history
Register a data source and its associated configuration class
1. First decorate the class with @data_source decorator
2. Then impelment vdk_data_sources_register to register the class as
below

```
@hookimpl
def vdk_data_sources_register(self,
            data_source_factory: IDataSourceFactory):

data_source_factory.register_data_source_class(AutoGeneratedDataSource)
```
  • Loading branch information
antoniivanov committed Nov 2, 2023
1 parent bee768d commit 9a32f82
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IDataSourceConfiguration,
)
from vdk.plugin.data_sources.data_source import IDataSourceStream
from vdk.plugin.data_sources.factory import register_data_source
from vdk.plugin.data_sources.factory import data_source
from vdk.plugin.data_sources.state import IDataSourceState


Expand Down Expand Up @@ -73,7 +73,7 @@ def read(self) -> Iterator[DataSourcePayload]:
yield self._data[i]


@register_data_source(
@data_source(
name="auto-generated-data", config_class=AutoGeneratedDataSourceConfiguration
)
class AutoGeneratedDataSource(IDataSource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
from typing import List
from typing import Type

from vdk.api.job_input import IProperties
from vdk.plugin.data_sources import config
from vdk.plugin.data_sources.data_source import IDataSource
from vdk.plugin.data_sources.data_source import (
IDataSourceConfiguration,
)
from vdk.plugin.data_sources.state import IDataSourceState


def data_source(name: str, config_class: Type[IDataSourceConfiguration]):
def inner_decorator(data_source_class: Type[IDataSource]):
data_source_class.__data_source_name__ = name
data_source_class.__data_source__config__class__ = config_class
return data_source_class

return inner_decorator


@dataclass
Expand Down Expand Up @@ -44,18 +51,20 @@ def __int__(self, data_source_name: str, existing_data_source_names: List[str]):

class IDataSourceFactory:
@abstractmethod
def register_data_source(
def register_data_source_class(
self,
name: str,
data_source_class: Type[IDataSource],
data_source_config_class: Type[IDataSourceConfiguration],
):
"""
Register a data source and its associated configuration class.
:param name: The name identifier for the data source
:param data_source_class: The data source class that implements `IDataSource`
:param data_source_config_class: The configuration class that implements `IDataSourceConfiguration`
:param data_source_class: The data source class that implements `IDataSource` and must be decoreated with @data_source
Example::
@data_source(name="csv", config_class=CsvDataSourceConfiguration)
class CsvDataSource(IDataSource):
...
"""

@abstractmethod
Expand Down Expand Up @@ -93,12 +102,40 @@ def __new__(cls):
cls._instance.__data_source_registry = {}
return cls._instance

def register_data_source(
def register_data_source_class(self, data_source_class: Type[IDataSource]):
if not hasattr(data_source_class, "__data_source_name__") or not hasattr(
data_source_class, "__data_source__config__class__"
):
raise ValueError(
"Invalid data_source_class."
"data_source_class must extend IDataSource, and be decorated with @data_source."
)

name = getattr(data_source_class, "__data_source_name__")
config_class = getattr(data_source_class, "__data_source__config__class__")
self.__register_data_source(name, data_source_class, config_class)

def __register_data_source(
self,
name: str,
data_source_class: Type[IDataSource],
data_source_config_class: Type[IDataSourceConfiguration],
):
if not isinstance(data_source_class, type):
raise ValueError(
"data_source_class must a class definition. Not a class instance for example."
)
if not issubclass(data_source_class, IDataSource):
raise ValueError("data_source_class must be a class inheriting IDataSource")
if not isinstance(data_source_config_class, type):
raise ValueError(
"data_source_config_class must a class definition. Not a class instance for example."
)
if not issubclass(data_source_config_class, IDataSourceConfiguration):
raise ValueError(
"data_source_config_class must be a class inheriting IDataSourceConfiguration"
)

self.__data_source_registry[name] = DataSourceRegistryItem(
name, data_source_class, data_source_config_class
)
Expand Down Expand Up @@ -126,13 +163,3 @@ def __get_source_item(self, name) -> DataSourceRegistryItem:
name, list(self.__data_source_registry.keys())
)
return registry_item


def register_data_source(name: str, config_class: Type[IDataSourceConfiguration]):
def inner_decorator(data_source_class: Type[IDataSource]):
SingletonDataSourceFactory().register_data_source(
name, data_source_class, config_class
)
return data_source_class

return inner_decorator
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.plugin.hook_markers import hookspec
from vdk.plugin.data_sources.factory import IDataSourceFactory


class DataSourcesHookSpec:
@hookspec(historic=True)
def vdk_data_sources_register(self, data_source_factory: IDataSourceFactory):
"""
Register a data source and its associated configuration class.
:param data_source_factory: the factory where the data sources should be registered. Provide those param:
:name: The name identifier for the data source
:data_source_class: The data source class that implements `IDataSource`. Only the type. Not an instance of it!
Example::
data_source_factory.register_data_source_class(data_source_class=CsvDataSource)
#where CsvDataSource is like
@data_source(name="csv", config_class=CsvDataSourceConfig)
CsvDataSource(IDataSource):
...
"""
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,44 @@
from vdk.api.plugin.hook_markers import hookimpl
from vdk.api.plugin.plugin_registry import IPluginRegistry
from vdk.internal.core.config import ConfigurationBuilder
from vdk.plugin.data_sources.auto_generated import AutoGeneratedDataSource
from vdk.plugin.data_sources.config import ConfigClassMetadata
from vdk.plugin.data_sources.factory import IDataSourceFactory
from vdk.plugin.data_sources.factory import SingletonDataSourceFactory
from vdk.plugin.data_sources.hook_spec import DataSourcesHookSpec

"""
Include the plugins implementation. For example:
"""


class DummyPlugin:
class DataSourcesPlugin:
@hookimpl
def vdk_data_sources_register(self, data_source_factory: IDataSourceFactory):
data_source_factory.register_data_source_class(AutoGeneratedDataSource)

@hookimpl(tryfirst=True)
def vdk_configure(self, config_builder: ConfigurationBuilder):
config_builder.add(
key="dummy_config_key",
default_value="dummy",
description="""
Dummy configuration
""",
)
def vdk_configure(self, config_builder: ConfigurationBuilder) -> None:
# define the config options so they show up in the help.
for ds in SingletonDataSourceFactory().list():
config_class_meta = ConfigClassMetadata(ds.config_class)
for field in config_class_meta.get_config_fields():
group_name = config_class_meta.get_group_name()
config_builder.add(
key=f"a data source {group_name} config's option {field.name()}",
default_value=field.default(),
description=field.description(),
is_sensitive=field.is_sensitive(),
)


@hookimpl
def vdk_start(plugin_registry: IPluginRegistry, command_line_args: List):
plugin_registry.load_plugin_with_hooks_impl(DummyPlugin(), "DummyPlugin")
plugin_registry.add_hook_specs(DataSourcesHookSpec)
plugin_registry.load_plugin_with_hooks_impl(
DataSourcesPlugin(), "DataSourcesPlugin"
)

plugin_registry.hook().vdk_data_sources_register.call_historic(
kwargs=dict(data_source_factory=SingletonDataSourceFactory())
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from click.testing import Result
from vdk.plugin.data_sources import plugin_entry
from vdk.plugin.data_sources.auto_generated import AutoGeneratedDataSource
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory
Expand All @@ -9,7 +11,7 @@

def test_run_ingest_sources():
ingest_plugin = IngestIntoMemoryPlugin()
runner = CliEntryBasedTestRunner(ingest_plugin)
runner = CliEntryBasedTestRunner(ingest_plugin, plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("ingest-sources-job")]
Expand All @@ -20,8 +22,22 @@ def test_run_ingest_sources():
assert len(ingest_plugin.payloads) > 0


def test_run_ingest_data_flow_sources():
a = AutoGeneratedDataSource
ingest_plugin = IngestIntoMemoryPlugin()
runner = CliEntryBasedTestRunner(ingest_plugin, plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("ingest-data-flow-job")]
)

cli_assert_equal(0, result)

assert len(ingest_plugin.payloads) > 0


def test_run_ingest_sources_error_no_such_method():
runner = CliEntryBasedTestRunner()
runner = CliEntryBasedTestRunner(plugin_entry)

result: Result = runner.invoke(
["run", jobs_path_from_caller_directory("ingest-sources-job")]
Expand Down
22 changes: 16 additions & 6 deletions projects/vdk-plugins/vdk-data-sources/tests/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from unittest.mock import patch

import pytest
from vdk.plugin.data_sources.factory import (
DataSourceNotFoundException,
)
from vdk.plugin.data_sources.data_source import IDataSource
from vdk.plugin.data_sources.data_source import IDataSourceConfiguration
from vdk.plugin.data_sources.factory import data_source
from vdk.plugin.data_sources.factory import DataSourceNotFoundException
from vdk.plugin.data_sources.factory import (
SingletonDataSourceFactory,
)


class MockDataSourceConfig(Mock, IDataSourceConfiguration):
pass


@data_source("mock", MockDataSourceConfig)
class MockDataSource(Mock, IDataSource):
pass


def test_register_and_list_data_source():
with patch(
SingletonDataSourceFactory.__module__
Expand All @@ -22,7 +32,7 @@ def test_register_and_list_data_source():
):
factory = SingletonDataSourceFactory()

factory.register_data_source("test_ds", Mock(), Mock())
factory.register_data_source_class(MockDataSource)

data_sources = factory.list()
assert len(data_sources) == 1
Expand All @@ -41,9 +51,9 @@ def test_create_data_source():
):
factory = SingletonDataSourceFactory()

factory.register_data_source("test_ds", Mock(), Mock())
factory.register_data_source_class(MockDataSource)

data_source_instance = factory.create_data_source("test_ds")
data_source_instance = factory.create_data_source("mock")
assert data_source_instance is not None


Expand Down

0 comments on commit 9a32f82

Please sign in to comment.