-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
vdk-datasources: data sources POC (#2805)
This change implements POC for Data Sources API. It is based on some of teh requirements and reserach in https://github.com/vmware/versatile-data-kit/wiki/Ingest-Source-Research See concepts page for explanation of data source related concepts So what's implemented is - Data Source APIs handling sources, streams and state - New Data Source is implemented by implementing IDataSource, IDataSourceConfiguraiton and IDataSourceStream - Data Source connection management partialy - Data Source Ingester that reads from data sources and writes to existing IIngeser - An example data source AutoGeneratedDataSource - An example job in the function test suite
- Loading branch information
1 parent
a7271b8
commit 0bd3c49
Showing
27 changed files
with
2,187 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
image: "python:3.7" | ||
|
||
.build-vdk-data-sources: | ||
variables: | ||
PLUGIN_NAME: vdk-data-sources | ||
extends: .build-plugin | ||
|
||
build-py37-vdk-data-sources: | ||
extends: .build-vdk-data-sources | ||
image: "python:3.7" | ||
|
||
build-py311-vdk-data-sources: | ||
extends: .build-vdk-data-sources | ||
image: "python:3.11" | ||
|
||
release-vdk-data-sources: | ||
variables: | ||
PLUGIN_NAME: vdk-data-sources | ||
extends: .release-plugin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
# data-sources | ||
|
||
Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management. | ||
|
||
The data-sources project is a plugin for the Versatile Data Kit (VDK). It aims to simplify data ingestion from multiple sources by offering a single, unified API. Whether you're dealing with databases, REST APIs, or other forms of data, this project allows you to manage them all in a consistent manner. This is crucial for building scalable and maintainable data pipelines. | ||
|
||
|
||
## Usage | ||
|
||
``` | ||
pip install vdk-data-sources | ||
``` | ||
|
||
### Concepts | ||
|
||
|
||
#### Data Source | ||
A Data Source is a central component responsible for establishing and managing a connection to a specific set of data. It interacts with a given configuration and maintains a stateful relationship with the data it accesses. This stateful relationship can include information such as authentication tokens, data markers, or any other form of metadata that helps manage the data connection. The Data Source exposes various data streams through which data can be read. | ||
|
||
#### Data Source Stream | ||
A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows. | ||
Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. Streams cna be ingested in parallel. | ||
|
||
Examples: | ||
- In a database (like postgres), each table could be a separate stream. | ||
- In a message broker like Apache Kafka, each topic within Kafka acts as a distinct Data Source Stream. | ||
- In an REST API , the data source is the HTTP base URL (http://xxx.com). The data stream could be each different endpoint (http://xxx.com/users, http://xxx/admins) | ||
|
||
Reading from the stream yields a sequence of Data Source Payloads | ||
|
||
#### Data Source Payload | ||
The Data Source Payload is a data structure that encapsulates the actual data along with its metadata. The payload consists of four main parts: | ||
|
||
Data: containing the core data that needs to be ingested (e.g in database the table content) | ||
Metadata: A dictionary containing additional contextual information about the data (for example timestamps, environment specific metadata, etc.) | ||
State: Contains the state of the data soruce stream as of this payload. | ||
For example in case of incremental ingestion from a database table it would contain the value of a incremental key columns | ||
(le.g updated_time column in teh table) which can be used to restart/continue the ingestion later. | ||
|
||
|
||
### Configuration | ||
|
||
(`vdk config-help` is useful command to browse all config options of your installation of vdk) | ||
|
||
### Example | ||
|
||
To build your own data source you can use [this data source](./src/vdk/plugin/data_sources/auto_generated.py) as an example or reference | ||
|
||
To register the source use [vdk_data_sources_register hook](./src/vdk/plugin/data_sources/hook_spec.py) | ||
|
||
Then you can use it in a data job like this: | ||
|
||
```python | ||
def run(job_input: IJobInput): | ||
source = SourceDefinition(id="auto", name="auto-generated-data", config={}) | ||
destination = DestinationDefinition(id="auto-dest", method="memory") | ||
|
||
with DataFlowInput(job_input) as flow_input: | ||
flow_input.start(DataFlowMappingDefinition(source, destination)) | ||
``` | ||
|
||
or in config.toml file | ||
```toml | ||
[sources.auto] | ||
name="auto-generated-data" | ||
config={} | ||
[destinations.auto-dest] | ||
method="memory" | ||
[[flows]] | ||
from="auto" | ||
to="auto-dest" | ||
``` | ||
|
||
```python | ||
def run(job_input: IJobInput): | ||
with DataFlowInput(job_input) as flow_input: | ||
flow_input.start_flows(toml_parser.load_config("config.toml")) | ||
``` | ||
|
||
### Build and testing | ||
|
||
``` | ||
pip install -r requirements.txt | ||
pip install -e . | ||
pytest | ||
``` | ||
|
||
In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also. | ||
|
||
|
||
#### Note about the CICD: | ||
|
||
.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins). | ||
|
||
The CI/CD is separated in two stages, a build stage and a release stage. | ||
The build stage is made up of a few jobs, all which inherit from the same | ||
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10). | ||
They run according to rules, which are ordered in a way such that changes to a | ||
plugin's directory trigger the plugin CI, but changes to a different plugin does not. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# this file is used to provide testing requirements | ||
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section | ||
|
||
|
||
pytest | ||
vdk-core | ||
vdk-test-utils |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import pathlib | ||
|
||
import setuptools | ||
|
||
""" | ||
Builds a package with the help of setuptools in order for this package to be imported in other projects | ||
""" | ||
|
||
__version__ = "0.1.0" | ||
|
||
setuptools.setup( | ||
name="vdk-data-sources", | ||
version=__version__, | ||
url="https://github.com/vmware/versatile-data-kit", | ||
description="Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.", | ||
long_description=pathlib.Path("README.md").read_text(), | ||
long_description_content_type="text/markdown", | ||
install_requires=["vdk-core", "toml"], | ||
package_dir={"": "src"}, | ||
packages=setuptools.find_namespace_packages(where="src"), | ||
# This is the only vdk plugin specifc part | ||
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point. | ||
entry_points={ | ||
"vdk.plugin.run": ["vdk-data-sources = vdk.plugin.data_sources.plugin_entry"] | ||
}, | ||
classifiers=[ | ||
"Development Status :: 2 - Pre-Alpha", | ||
"License :: OSI Approved :: Apache Software License", | ||
"Programming Language :: Python :: 3.7", | ||
"Programming Language :: Python :: 3.8", | ||
"Programming Language :: Python :: 3.9", | ||
"Programming Language :: Python :: 3.10", | ||
"Programming Language :: Python :: 3.11", | ||
], | ||
project_urls={ | ||
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources", | ||
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources", | ||
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose", | ||
}, | ||
) |
107 changes: 107 additions & 0 deletions
107
projects/vdk-plugins/vdk-data-sources/src/vdk/plugin/data_sources/auto_generated.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# Copyright 2021-2023 VMware, Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from datetime import datetime | ||
from typing import Iterator | ||
from typing import List | ||
|
||
from vdk.plugin.data_sources.config import config_class | ||
from vdk.plugin.data_sources.config import config_field | ||
from vdk.plugin.data_sources.data_source import DataSourcePayload | ||
from vdk.plugin.data_sources.data_source import IDataSource | ||
from vdk.plugin.data_sources.data_source import ( | ||
IDataSourceConfiguration, | ||
) | ||
from vdk.plugin.data_sources.data_source import IDataSourceStream | ||
from vdk.plugin.data_sources.factory import data_source | ||
from vdk.plugin.data_sources.state import IDataSourceState | ||
|
||
|
||
@config_class( | ||
name="auto_generated", description="Configuration for Auto generated data source" | ||
) | ||
class AutoGeneratedDataSourceConfiguration(IDataSourceConfiguration): | ||
num_records: int = config_field( | ||
description="Number of records to return", default=2 | ||
) | ||
include_metadata: bool = config_field( | ||
description="If true autogenerated metadata is included in the response", | ||
default=False, | ||
) | ||
num_streams: int = config_field( | ||
description="The number of streams the data source would have", default=1 | ||
) | ||
|
||
|
||
class AutoGeneratedDataSourceStream(IDataSourceStream): | ||
""" | ||
A single data stream in AutoGeneratedDataSource | ||
""" | ||
|
||
def name(self) -> str: | ||
return f"stream_{self._stream_number}" | ||
|
||
def __init__( | ||
self, | ||
config: AutoGeneratedDataSourceConfiguration, | ||
stream_number: int, | ||
start_id: int = 0, | ||
): | ||
self._config = config | ||
self._stream_number = stream_number | ||
self._data = self._generate_test_data(start_id) | ||
|
||
def _generate_test_data(self, start_id: int) -> List[DataSourcePayload]: | ||
generated_data = [] | ||
for i in range(self._config.num_records): | ||
data = { | ||
"id": start_id + 1 + i, | ||
"name": f"Stream_{self._stream_number}_Name_{i}", | ||
"stream": self._stream_number, | ||
} | ||
metadata = ( | ||
{"timestamp": datetime.now()} if self._config.include_metadata else {} | ||
) | ||
generated_data.append( | ||
DataSourcePayload( | ||
data=data, metadata=metadata, state={"last_id": data["id"]} | ||
) | ||
) | ||
return generated_data | ||
|
||
def read(self) -> Iterator[DataSourcePayload]: | ||
for i in range(0, len(self._data), 1): | ||
yield self._data[i] | ||
|
||
|
||
@data_source( | ||
name="auto-generated-data", config_class=AutoGeneratedDataSourceConfiguration | ||
) | ||
class AutoGeneratedDataSource(IDataSource): | ||
""" | ||
Data source who is only generating some dummy data for testing purposes. | ||
""" | ||
|
||
def __init__(self): | ||
self._config = None | ||
self._streams = [] | ||
|
||
def configure(self, config: AutoGeneratedDataSourceConfiguration): | ||
self._config = config | ||
|
||
def connect(self, state: IDataSourceState): | ||
if not isinstance(self._config, AutoGeneratedDataSourceConfiguration): | ||
raise RuntimeError( | ||
f"config type must be {AutoGeneratedDataSourceConfiguration}" | ||
) | ||
self._streams = [ | ||
AutoGeneratedDataSourceStream( | ||
self._config, i, state.read_stream(f"stream_{i}").get("last_id", 0) | ||
) | ||
for i in range(self._config.num_streams) | ||
] | ||
|
||
def disconnect(self): | ||
self._streams = [] | ||
|
||
def streams(self) -> List[IDataSourceStream]: | ||
return self._streams |
Oops, something went wrong.