Skip to content

Commit

Permalink
vdk-core: data sources POC
Browse files Browse the repository at this point in the history
This change implements POC for Data Sources API.

It is based on some of teh requirements and reserach in
https://github.com/vmware/versatile-data-kit/wiki/Ingest-Source-Research

See concepts page for explanation of data source related concepts

So what's implemented is
- Data Source APIs handling sources, streams and state
- Data Source connection management partialy
- Data Source Ingester that reads from data sources and writes to
existing IIngeser
- An example data source AutoGeneratedDataSource
- An example job in the function test suite

Most likely this would be moved to plugin vdk-data-sources. For now it
doesn't appear ther's need for this to be in vdk-core.
  • Loading branch information
antoniivanov committed Oct 23, 2023
1 parent b01891d commit ef872f7
Show file tree
Hide file tree
Showing 13 changed files with 1,130 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import fields
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List

from vdk.internal.builtin_plugins.ingestion.source.config import config_class
from vdk.internal.builtin_plugins.ingestion.source.config import config_field
from vdk.internal.builtin_plugins.ingestion.source.data_source import DataSourcePayload
from vdk.internal.builtin_plugins.ingestion.source.data_source import IDataSource
from vdk.internal.builtin_plugins.ingestion.source.data_source import (
IDataSourceConfiguration,
)
from vdk.internal.builtin_plugins.ingestion.source.data_source import IDataSourceStream
from vdk.internal.builtin_plugins.ingestion.source.factory import register_data_source
from vdk.internal.builtin_plugins.ingestion.source.state import IDataSourceState


@config_class(
name="auto_generated", description="Configuration for Auto generated data source"
)
class AutoGeneratedDataSourceConfiguration(IDataSourceConfiguration):
num_records: int = config_field(
description="Number of records to return", default=2
)
include_metadata: bool = config_field(
description="If true autogenerated metadata is included in the response",
default=False,
)
num_streams: int = config_field(
description="The number of streams the data source would have", default=1
)


class AutoGeneratedDataSourceStream(IDataSourceStream):
def name(self) -> str:
return str(self._stream_number)

def __init__(
self, config: AutoGeneratedDataSourceConfiguration, stream_number: int
):
self._config = config
self._stream_number = stream_number
self._data = self._generate_test_data()

def _generate_test_data(self) -> List[DataSourcePayload]:
generated_data = []
for i in range(self._config.num_records):
data = {
"id": i,
"name": f"Stream_{self._stream_number}_Name_{i}",
"stream": self._stream_number,
}
metadata = (
{"timestamp": datetime.now()} if self._config.include_metadata else {}
)
generated_data.append(DataSourcePayload(data=data, metadata=metadata))
return generated_data

def read(self) -> Iterator[List[DataSourcePayload]]:
for i in range(0, len(self._data), 1):
yield self._data[i]


@register_data_source("auto-generated-data", AutoGeneratedDataSourceConfiguration)
class AutoGeneratedDataSource(IDataSource):
def __init__(self):
self._streams = []

def connect(self, config: IDataSourceConfiguration, state: IDataSourceState):
if not isinstance(config, AutoGeneratedDataSourceConfiguration):
raise RuntimeError(
f"config type must be {AutoGeneratedDataSourceConfiguration}"
)
self._streams = [
AutoGeneratedDataSourceStream(config, i) for i in range(config.num_streams)
]

def disconnect(self):
self._streams = []

def streams(self) -> List[IDataSourceStream]:
return self._streams
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from dataclasses import field
from dataclasses import is_dataclass
from dataclasses import MISSING
from typing import Any
from typing import Optional
from typing import Type


def config_class(name: str, description: str, **kwargs):
"""
A decorator to mark a class as a configuration class. It extends dataclass decorator with config related options.
Each field in a decorator class must be defined with config_field
Example::
@config_class(name="group", description="Just example")
ExampleConfig:
option1: int = config_field(description="Option 1")
option2: bool = config_filed(description="Option 2")
:param name: The name of the configuration group representing by the class.
:param description: The description of the configuration group.
:param kwargs: Additional arguments passed to :func:`dataclass`.
"""

def decorator(cls):
cls = dataclass(cls, **kwargs)

setattr(cls, "config_group_name", name)
setattr(cls, "config_group_description", description)

for field_name, field_info in cls.__dataclass_fields__.items():
if "config_description" not in field_info.metadata:
raise TypeError(
f"The field '{field_name}' must be declared using config_field."
)

return cls

return decorator


def __check_config_class(cls):
if not hasattr(cls, "config_group_description"):
raise ValueError(f"cls {cls} is not decorated with @config_class.")


def get_config_class_group(cls):
__check_config_class(cls)
return getattr(cls, "config_group_name")


def get_config_class_description(cls):
__check_config_class(cls)
return getattr(cls, "config_group_description")


def config_field(
*,
description: str,
is_sensitive: bool = False,
default=MISSING,
init=True,
repr=True,
hash=None,
compare=True,
metadata=None,
**kwargs,
):
if metadata is None:
metadata = {}
metadata.update(
{"config_description": description, "config_is_sensitive": is_sensitive}
)

return field(
**kwargs,
default=default,
init=init,
repr=repr,
hash=hash,
compare=compare,
metadata=metadata,
)


class ConfigFieldMetadata:
def __init__(self, field_class, field_name: str):
self._field_name = field_name
self._field_class = field_class

def __get_field_metadata_value(self, metadata_key: str) -> Any:
return self._field_class.__dataclass_fields__[self._field_name].metadata.get(
metadata_key, None
)

def name(self) -> str:
return self._field_name

def description(self) -> str:
return self.__get_field_metadata_value("config_description")

def is_sensitive(self) -> bool:
return self.__get_field_metadata_value("config_is_sensitive")

def default(self) -> Optional[Any]:
default_value = self._field_class.__dataclass_fields__[self._field_name].default
return default_value if default_value != MISSING else None


def get_config_field_metadata(cls, field_name: str):
__check_config_class(cls)
return ConfigFieldMetadata(cls, field_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from abc import abstractmethod
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Union

from vdk.internal.builtin_plugins.ingestion.source.state import IDataSourceState


class IDataSourceConfiguration:
"""Interface representing the configuration for a data source."""

pass


@dataclass
class DataSourcePayload:
"""
Encapsulates a single payload to be ingested coming from a data source.
:param data: The data to be ingested.
:param metadata: Additional metadata about the data.
:param state: Optional state related to the data (for the timestamp or id of the payload) which will be peristed once the payload is sucessfully ingested
"""

data: Optional[Dict[str, Any]]
metadata: Optional[Dict[str, Union[int, str, bool, float, datetime]]]
state: Optional[Dict[str, Any]] = field(default_factory=dict)


class StopDataSourceStream(Exception):
"""Signal the end of a stream and there's no more data"""


class RetryDataSourceStream(Exception):
"""Signal the stream ingestion should be retried"""


class IDataSourceStream:
"""
Abstract class for a Data Source Stream, representing a channel or resource to read data.
For example each table in a database could be a stream.
Or each separate API endpoint for an API could be a stream and so on.
:Example::
class MyStream(IDataSourceStream):
def name(self):
return "My Stream"
def read(self):
yield DataSourcePayload(...)
"""

@abstractmethod
def name(self) -> str:
"""
:return: unique (within the data souce) name of the data source stream which can be used as a key and find by it.
"""
pass

@abstractmethod
def read(self) -> Iterable[DataSourcePayload]:
"""
Generator method or Iterator for reading data from the stream.
:return: An iterable of DataSourcePayload objects.
"""
pass


class IDataSource:
"""
Abstract class for a Data Source, responsible for managing the connection and providing data streams.
:Example::
class MyDataSource(IDataSource):
def connect(self, config, state):
pass
def disconnect(self):
pass
def streams(self):
return [MyStream()]
"""

@abstractmethod
def connect(self, config: IDataSourceConfiguration, state: IDataSourceState):
"""
Establish a connection using provided configuration and last saved state.
:param config: Data source configuration object.
:param state: Data source state object.
"""

@abstractmethod
def disconnect(self):
"""
Disconnect and clean up resources if needed.
"""

@abstractmethod
def streams(self) -> List[IDataSourceStream]:
"""
Get the available streams for this data source.
:return: List of IDataSourceStream objects.
"""


class DataSourcesAggregatedException(Exception):
"""
Exception to aggregate multiple Data Sources failures.
:Example::
DataSourcesAggregatedException({"Stream1": Exception1, "Stream2": Exception2})
"""

def __init__(self, data_streams_exceptions: Dict[str, Exception]):
super().__init__(
f"Data Sources failed to ingest data: {data_streams_exceptions}"
)
self.data_streams_exceptions = data_streams_exceptions


@dataclass
class DataSourceError:
"""
Data class to encapsulate information about a Data Source ingestion error.
:data_stream: The data stream where the error occurred.
:exception: The exception that was raised.
:Example::
DataSourceError(data_stream=MyStream(), exception=SomeException())
"""

data_stream: IDataSourceStream
exception: BaseException


class IDataSourceErrorCallback:
"""
Callback interface to be implemented for handling data source ingestion errors.
Example::
# can be a function
def my_error_callback(error: DataSourceError):
print(f"Stream: {error.data_stream.name()}, Exception: {error.exception}")
# or class
class MyErrorCallback(IDataSourceErrorCallback)
def __call__(self, error: DataSourceError):
print(f"Stream: {error.data_stream.name()}, Exception: {error.exception}")
"""

@abstractmethod
def __call__(self, error: DataSourceError):
"""
Invoked when an error occurs during data ingestion.
:param error:DataSourceError: Object containing details of the occurred error.
:raises:
StopDataSourceStream: Stops the current data stream without any errors
RetryDataSourceStream: Retries ingesting the current data stream later
"""

pass
Loading

0 comments on commit ef872f7

Please sign in to comment.