Skip to content

Commit

Permalink
vdk-datasources: POC plugin
Browse files Browse the repository at this point in the history
This change implements POC for Data Sources API.

    It is based on some of teh requirements and reserach in
https://github.com/vmware/versatile-data-kit/wiki/Ingest-Source-Research

    See concepts page for explanation of data source related concepts

    So what's implemented is
    - Data Source APIs handling sources, streams and state
    - Data Source connection management partialy
    - Data Source Ingester that reads from data sources and writes to
    existing IIngeser
    - An example data source AutoGeneratedDataSource
    - An example job in the function test suite

Most likely this would be moved to plugin vdk-data-sources. For now
it
    doesn't appear ther's need for this to be in vdk-core.
  • Loading branch information
antoniivanov committed Oct 27, 2023
1 parent 85bffc0 commit 8d39b7d
Show file tree
Hide file tree
Showing 19 changed files with 1,428 additions and 0 deletions.
22 changes: 22 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-data-sources:
variables:
PLUGIN_NAME: vdk-data-sources
extends: .build-plugin

build-py37-vdk-data-sources:
extends: .build-vdk-data-sources
image: "python:3.7"

build-py311-vdk-data-sources:
extends: .build-vdk-data-sources
image: "python:3.11"

release-vdk-data-sources:
variables:
PLUGIN_NAME: vdk-data-sources
extends: .release-plugin
60 changes: 60 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

The data-sources project is a plugin for the Versatile Data Kit (VDK). It aims to simplify data ingestion from multiple sources by offering a single, unified API. Whether you're dealing with databases, REST APIs, or other forms of data, this project allows you to manage them all in a consistent manner. This is crucial for building scalable and maintainable data pipelines.


## Usage

```
pip install vdk-data-sources
```

### Concepts


#### Data Source
A Data Source is a central component responsible for establishing and managing a connection to a specific set of data. It interacts with a given configuration and maintains a stateful relationship with the data it accesses. This stateful relationship can include information such as authentication tokens, data markers, or any other form of metadata that helps manage the data connection. The Data Source exposes various data streams through which data can be read.

#### Data Source Stream
A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows. Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. For example for Database based data source , each table could be a separate stream. Streams can be ingested in parallel potentially.

Reading from the stream yields a sequence of Data Source Payloads

#### Data Source Payload
The Data Source Payload is a data structure that encapsulates the actual data along with its metadata. The payload consists of four main parts:

Data: containing the core data that needs to be ingested (e.g in database the table content)
Metadata: A dictionary containing additional contextual information about the data (for example timestamps, environment specific metadata, etc.)
State: Contains the state of the data soruce stream as of this payload. For example in case of incremental ingestion from a database table it would contain the value of a incremental key columns (le.g updated_time column in teh table) which can be used to restart/continue the ingestion later.


### Configuration

(`vdk config-help` is useful command to browse all config options of your installation of vdk)

### Example

To build your own data source you can use [this data source](./src/vdk/plugin/data_sources/auto_generated.py) as an example or reference

### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section


pytest
vdk-core
vdk-test-utils
42 changes: 42 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"

setuptools.setup(
name="vdk-data-sources",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={
"vdk.plugin.run": ["vdk-data-sources = vdk.plugin.data_sources.plugin_entry"]
},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
project_urls={
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources",
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources",
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import fields
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List

from vdk.plugin.data_sources.config import config_class
from vdk.plugin.data_sources.config import config_field
from vdk.plugin.data_sources.data_source import DataSourcePayload
from vdk.plugin.data_sources.data_source import IDataSource
from vdk.plugin.data_sources.data_source import (
IDataSourceConfiguration,
)
from vdk.plugin.data_sources.data_source import IDataSourceStream
from vdk.plugin.data_sources.factory import register_data_source
from vdk.plugin.data_sources.state import IDataSourceState


@config_class(
name="auto_generated", description="Configuration for Auto generated data source"
)
class AutoGeneratedDataSourceConfiguration(IDataSourceConfiguration):
num_records: int = config_field(
description="Number of records to return", default=2
)
include_metadata: bool = config_field(
description="If true autogenerated metadata is included in the response",
default=False,
)
num_streams: int = config_field(
description="The number of streams the data source would have", default=1
)


class AutoGeneratedDataSourceStream(IDataSourceStream):
def name(self) -> str:
return str(self._stream_number)

def __init__(
self, config: AutoGeneratedDataSourceConfiguration, stream_number: int
):
self._config = config
self._stream_number = stream_number
self._data = self._generate_test_data()

def _generate_test_data(self) -> List[DataSourcePayload]:
generated_data = []
for i in range(self._config.num_records):
data = {
"id": i,
"name": f"Stream_{self._stream_number}_Name_{i}",
"stream": self._stream_number,
}
metadata = (
{"timestamp": datetime.now()} if self._config.include_metadata else {}
)
generated_data.append(DataSourcePayload(data=data, metadata=metadata))
return generated_data

def read(self) -> Iterator[List[DataSourcePayload]]:
for i in range(0, len(self._data), 1):
yield self._data[i]


@register_data_source(
name="auto-generated-data", config_class=AutoGeneratedDataSourceConfiguration
)
class AutoGeneratedDataSource(IDataSource):
def __init__(self):
self._streams = []

def connect(self, config: IDataSourceConfiguration, state: IDataSourceState):
if not isinstance(config, AutoGeneratedDataSourceConfiguration):
raise RuntimeError(
f"config type must be {AutoGeneratedDataSourceConfiguration}"
)
self._streams = [
AutoGeneratedDataSourceStream(config, i) for i in range(config.num_streams)
]

def disconnect(self):
self._streams = []

def streams(self) -> List[IDataSourceStream]:
return self._streams
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from dataclasses import field
from dataclasses import is_dataclass
from dataclasses import MISSING
from typing import Any
from typing import Optional
from typing import Type


def config_class(name: str, description: str, **kwargs):
"""
A decorator to mark a class as a configuration class. It extends dataclass decorator with config related options.
Each field in a decorator class must be defined with config_field
Example::
@config_class(name="group", description="Just example")
ExampleConfig:
option1: int = config_field(description="Option 1")
option2: bool = config_filed(description="Option 2")
:param name: The name of the configuration group representing by the class.
:param description: The description of the configuration group.
:param kwargs: Additional arguments passed to :func:`dataclass`.
"""

def decorator(cls):
cls = dataclass(cls, **kwargs)

setattr(cls, "config_group_name", name)
setattr(cls, "config_group_description", description)

for field_name, field_info in cls.__dataclass_fields__.items():
if "config_description" not in field_info.metadata:
raise TypeError(
f"The field '{field_name}' must be declared using config_field."
)

return cls

return decorator


def __check_config_class(cls):
if not hasattr(cls, "config_group_description"):
raise ValueError(f"cls {cls} is not decorated with @config_class.")


def get_config_class_group(cls):
__check_config_class(cls)
return getattr(cls, "config_group_name")


def get_config_class_description(cls):
__check_config_class(cls)
return getattr(cls, "config_group_description")


def config_field(
*,
description: str,
is_sensitive: bool = False,
default=MISSING,
init=True,
repr=True,
hash=None,
compare=True,
metadata=None,
**kwargs,
):
if metadata is None:
metadata = {}
metadata.update(
{"config_description": description, "config_is_sensitive": is_sensitive}
)

return field(
**kwargs,
default=default,
init=init,
repr=repr,
hash=hash,
compare=compare,
metadata=metadata,
)


class ConfigFieldMetadata:
def __init__(self, field_class, field_name: str):
self._field_name = field_name
self._field_class = field_class

def __get_field_metadata_value(self, metadata_key: str) -> Any:
return self._field_class.__dataclass_fields__[self._field_name].metadata.get(
metadata_key, None
)

def name(self) -> str:
return self._field_name

def description(self) -> str:
return self.__get_field_metadata_value("config_description")

def is_sensitive(self) -> bool:
return self.__get_field_metadata_value("config_is_sensitive")

def default(self) -> Optional[Any]:
default_value = self._field_class.__dataclass_fields__[self._field_name].default
return default_value if default_value != MISSING else None


def get_config_field_metadata(cls, field_name: str):
__check_config_class(cls)
return ConfigFieldMetadata(cls, field_name)
Loading

0 comments on commit 8d39b7d

Please sign in to comment.