Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-datasources: data sources POC #2805

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-data-sources:
variables:
PLUGIN_NAME: vdk-data-sources
extends: .build-plugin

build-py37-vdk-data-sources:
extends: .build-vdk-data-sources
image: "python:3.7"

build-py311-vdk-data-sources:
extends: .build-vdk-data-sources
image: "python:3.11"

release-vdk-data-sources:
variables:
PLUGIN_NAME: vdk-data-sources
extends: .release-plugin
99 changes: 99 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# data-sources

Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.

The data-sources project is a plugin for the Versatile Data Kit (VDK). It aims to simplify data ingestion from multiple sources by offering a single, unified API. Whether you're dealing with databases, REST APIs, or other forms of data, this project allows you to manage them all in a consistent manner. This is crucial for building scalable and maintainable data pipelines.


## Usage

```
pip install vdk-data-sources
```

### Concepts


#### Data Source
A Data Source is a central component responsible for establishing and managing a connection to a specific set of data. It interacts with a given configuration and maintains a stateful relationship with the data it accesses. This stateful relationship can include information such as authentication tokens, data markers, or any other form of metadata that helps manage the data connection. The Data Source exposes various data streams through which data can be read.

#### Data Source Stream
A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows.
Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. Streams cna be ingested in parallel.

Examples:
- In a database (like postgres), each table could be a separate stream.
- In a message broker like Apache Kafka, each topic within Kafka acts as a distinct Data Source Stream.
- In an REST API , the data source is the HTTP base URL (http://xxx.com). The data stream could be each different endpoint (http://xxx.com/users, http://xxx/admins)

Reading from the stream yields a sequence of Data Source Payloads
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the stream never ends, e.g. if it's a Kafka topic with constant influx of data?

Copy link
Collaborator Author

@antoniivanov antoniivanov Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could happen depends on how the Kafka Data source is implemented
You can implement it where it would fetch all data until start_timestamp . In this case it should end.

But one can reuse https://pypi.org/project/pipelinewise-tap-kafka/ with vdk-singer .

And the way they have handled it is to use max_runtime_ms (The maximum time for the tap to collect new messages from Kafka topic) to end the ingestion batch.


#### Data Source Payload
The Data Source Payload is a data structure that encapsulates the actual data along with its metadata. The payload consists of four main parts:

Data: containing the core data that needs to be ingested (e.g in database the table content)
Metadata: A dictionary containing additional contextual information about the data (for example timestamps, environment specific metadata, etc.)
State: Contains the state of the data soruce stream as of this payload.
For example in case of incremental ingestion from a database table it would contain the value of a incremental key columns
(le.g updated_time column in teh table) which can be used to restart/continue the ingestion later.


### Configuration

(`vdk config-help` is useful command to browse all config options of your installation of vdk)

### Example

To build your own data source you can use [this data source](./src/vdk/plugin/data_sources/auto_generated.py) as an example or reference

To register the source use [vdk_data_sources_register hook](./src/vdk/plugin/data_sources/hook_spec.py)

Then you can use it in a data job like this:

```python
def run(job_input: IJobInput):
source = SourceDefinition(id="auto", name="auto-generated-data", config={})
destination = DestinationDefinition(id="auto-dest", method="memory")

with DataFlowInput(job_input) as flow_input:
flow_input.start(DataFlowMappingDefinition(source, destination))
```

or in config.toml file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a toml file? I could obviously google it - we configure data jobs with INI files, and now TOML files for something else?

Copy link
Collaborator Author

@antoniivanov antoniivanov Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been considering replacing ini with TOML because

  • it supported nested structures of data
  • it supports arrays
  • it supports data types !!
  • it's somewhat similar in ini in syntax - existing ini files can be parsed by TOML parser so a change can be pretty backward compatible.

It would not have been feasible/easy to use ini format the above data flow structure. So I decided to use TOML now as an experiment to see if it's going to work for users.

We need to move away from ini due to above reasons and we've had users who have requested to support more "modern" format.
The other alternative is Yaml. But yaml is pretty ugly for highly nested configurations and would make migration from ini to yaml more involved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally agree with everything you said, however I would be great to have a more structured approach, e.g. use TOML everywhere, rather than use it here and use something else in a different place.

```toml
[sources.auto]
name="auto-generated-data"
config={}
[destinations.auto-dest]
method="memory"
[[flows]]
from="auto"
to="auto-dest"
```

```python
def run(job_input: IJobInput):
with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(toml_parser.load_config("config.toml"))
```

### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
7 changes: 7 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section


pytest
vdk-core
vdk-test-utils
42 changes: 42 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"

setuptools.setup(
name="vdk-data-sources",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "toml"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={
"vdk.plugin.run": ["vdk-data-sources = vdk.plugin.data_sources.plugin_entry"]
},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
project_urls={
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources",
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-data-sources",
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime
from typing import Iterator
from typing import List

from vdk.plugin.data_sources.config import config_class
from vdk.plugin.data_sources.config import config_field
from vdk.plugin.data_sources.data_source import DataSourcePayload
from vdk.plugin.data_sources.data_source import IDataSource
from vdk.plugin.data_sources.data_source import (
IDataSourceConfiguration,
)
from vdk.plugin.data_sources.data_source import IDataSourceStream
from vdk.plugin.data_sources.factory import data_source
from vdk.plugin.data_sources.state import IDataSourceState


@config_class(
name="auto_generated", description="Configuration for Auto generated data source"
)
class AutoGeneratedDataSourceConfiguration(IDataSourceConfiguration):
num_records: int = config_field(
description="Number of records to return", default=2
)
include_metadata: bool = config_field(
description="If true autogenerated metadata is included in the response",
default=False,
)
num_streams: int = config_field(
description="The number of streams the data source would have", default=1
)


class AutoGeneratedDataSourceStream(IDataSourceStream):
"""
A single data stream in AutoGeneratedDataSource
"""

def name(self) -> str:
return f"stream_{self._stream_number}"

def __init__(
self,
config: AutoGeneratedDataSourceConfiguration,
stream_number: int,
start_id: int = 0,
):
self._config = config
self._stream_number = stream_number
self._data = self._generate_test_data(start_id)

def _generate_test_data(self, start_id: int) -> List[DataSourcePayload]:
generated_data = []
for i in range(self._config.num_records):
data = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A data stream, which generates data about data streams... Seems a bit tautological... what about a more common/well known/relatable use case like Employees or shapes ... animals?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah I probably could have come with some more interesting example. I will leave it for now though since there are already lots of tests that expect this data. But I will change it later.

"id": start_id + 1 + i,
"name": f"Stream_{self._stream_number}_Name_{i}",
"stream": self._stream_number,
}
metadata = (
{"timestamp": datetime.now()} if self._config.include_metadata else {}
)
generated_data.append(
DataSourcePayload(
data=data, metadata=metadata, state={"last_id": data["id"]}
)
)
return generated_data

def read(self) -> Iterator[DataSourcePayload]:
for i in range(0, len(self._data), 1):
yield self._data[i]


@data_source(
name="auto-generated-data", config_class=AutoGeneratedDataSourceConfiguration
)
class AutoGeneratedDataSource(IDataSource):
"""
Data source who is only generating some dummy data for testing purposes.
"""

def __init__(self):
self._config = None
self._streams = []

def configure(self, config: AutoGeneratedDataSourceConfiguration):
self._config = config

def connect(self, state: IDataSourceState):
if not isinstance(self._config, AutoGeneratedDataSourceConfiguration):
raise RuntimeError(
f"config type must be {AutoGeneratedDataSourceConfiguration}"
)
self._streams = [
AutoGeneratedDataSourceStream(
self._config, i, state.read_stream(f"stream_{i}").get("last_id", 0)
)
for i in range(self._config.num_streams)
]

def disconnect(self):
self._streams = []

def streams(self) -> List[IDataSourceStream]:
return self._streams
Loading