Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-datasources: data sources POC #2805

Merged
merged 5 commits into from
Nov 2, 2023
Merged

Conversation

antoniivanov
Copy link
Collaborator

@antoniivanov antoniivanov commented Oct 17, 2023

This change implements POC for Data Sources API.

It is based on some of teh requirements and reserach in https://github.com/vmware/versatile-data-kit/wiki/Ingest-Source-Research

See concepts page for explanation of data source related concepts

So what's implemented is

  • Data Source APIs handling sources, streams and state
  • New Data Source is implemented by implementing IDataSource, IDataSourceConfiguraiton and IDataSourceStream
  • Data Source connection management partialy
  • Data Source Ingester that reads from data sources and writes to existing IIngeser
  • An example data source AutoGeneratedDataSource
  • An example job in the function test suite

@antoniivanov antoniivanov force-pushed the person/aivanov/data-source branch 3 times, most recently from 569a0a4 to ef872f7 Compare October 23, 2023 10:42
@antoniivanov antoniivanov force-pushed the person/aivanov/data-source branch from ef872f7 to 8d39b7d Compare October 27, 2023 10:02
@antoniivanov antoniivanov changed the title vdk-core: data sources POC vdk-datasources: data sources POC Oct 27, 2023
@antoniivanov antoniivanov force-pushed the person/aivanov/data-source branch 3 times, most recently from 29e80c8 to 3169ca2 Compare October 30, 2023 10:56
A Data Source is a central component responsible for establishing and managing a connection to a specific set of data. It interacts with a given configuration and maintains a stateful relationship with the data it accesses. This stateful relationship can include information such as authentication tokens, data markers, or any other form of metadata that helps manage the data connection. The Data Source exposes various data streams through which data can be read.

#### Data Source Stream
A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows. Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. For example for Database based data source , each table could be a separate stream. Streams can be ingested in parallel potentially.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a diagram here would be good

@murphp15
Copy link
Collaborator

I understand mostly the difference between data source and data source stream.
However I would really like a few concrete examples.

Can you give an example if your source was a relational database and also an example if it was blob storage?

data_source_ingester.ingest_data_source("auto", auto_generated, method="memory")

data_source_ingester.terminate_and_wait_to_finish()
data_source_ingester.raise_on_error()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not make this the default?



@dataclass
class DataSourceError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you not call it an exception?
I would think that error is more serious.

like out of memory error.

def __init__(self, storage: IDataSourceStateStorage):
self._storage = storage

def get_data_source_state(self, source: str) -> IDataSourceState:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There really needs to be a comment here as users are expected to use this function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users in the sence of other vdk developers.

But end users are not supposed to use it directly. I will add comment though

@antoniivanov
Copy link
Collaborator Author

Can you give an example if your source was a relational database and also an example if it was blob storage?

I will add examples . But would those work ?

In relational database like MySQL, the Data Source would be the database server itself, with which you establish a connection
Here, each table could serve as a Data Source Stream.

In the context of Amazon S3, your Data Source would be an S3 bucket.
Each object (or a group of objects under a common prefix) within that S3 bucket could be considered a Data Source Stream.

In an REST API , the data source is the HTTP base URL (http://xxx.com)
The data stream could be each different endpoint (http://xxx.com/users, http://xxx/admins)

The concepts are not something new. Singer.io and Airbyte are using similar concepts.

@antoniivanov antoniivanov force-pushed the person/aivanov/data-source branch from 3169ca2 to 7f05c82 Compare November 1, 2023 15:06
This change implements POC for Data Sources API.

    It is based on some of teh requirements and reserach in
https://github.com/vmware/versatile-data-kit/wiki/Ingest-Source-Research

    See concepts page for explanation of data source related concepts

    So what's implemented is
    - Data Source APIs handling sources, streams and state
    - Data Source connection management partialy
    - Data Source Ingester that reads from data sources and writes to
    existing IIngeser
    - An example data source AutoGeneratedDataSource
    - An example job in the function test suite

Most likely this would be moved to plugin vdk-data-sources. For now
it
    doesn't appear ther's need for this to be in vdk-core.
Add support for passing and keeping state in data sources.

The way it works a data source in the begining would be called with
method `data_source.connect(previous_state)` to initialize.
Then it if Payload.state field is not None , that state will be
persisted for that stream.

Data Source can keep per-stream state and "others" state for non-stream
specfic stateful information if needed.

auto
Register a data source and its associated configuration class
1. First decorate the class with @data_source decorator
2. Then impelment vdk_data_sources_register to register the class as
below

```
@hookimpl
def vdk_data_sources_register(self,
            data_source_factory: IDataSourceFactory):

data_source_factory.register_data_source_class(AutoGeneratedDataSource)
```
Data source can be used in this way:

```python
def run(job_input: IJobInput):
source = SourceDefinition(id="auto", name="auto-generated-data",
config={})
    destination = DestinationDefinition(id="auto-dest", method="memory")

    with DataFlowInput(job_input) as flow_input:
        flow_input.start(source, destination)
```

or in config.toml file
```toml
[sources.auto]
name="auto-generated-data"
config={}
[destinations.auto-dest]
method="memory"
[[flows]]
from="auto"
to="auto-dest"
```

```python
def run(job_input: IJobInput):
    with DataFlowInput(job_input) as flow_input:
        flow_input.start_flow(toml_parser.load_config("config.toml"))
```

flow

flow comments
When defining a data flow mapping we need to be able to map source to
target appropraitely (e.g rename columns map steram to tables.
@antoniivanov antoniivanov force-pushed the person/aivanov/data-source branch from 04c3cba to 9a32f82 Compare November 2, 2023 10:34
@antoniivanov antoniivanov merged commit 0bd3c49 into main Nov 2, 2023
@antoniivanov antoniivanov deleted the person/aivanov/data-source branch November 2, 2023 10:50
#### Data Source Stream
A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows. Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. For example for Database based data source , each table could be a separate stream. Streams can be ingested in parallel potentially.

Reading from the stream yields a sequence of Data Source Payloads
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the stream never ends, e.g. if it's a Kafka topic with constant influx of data?

Copy link
Collaborator Author

@antoniivanov antoniivanov Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could happen depends on how the Kafka Data source is implemented
You can implement it where it would fetch all data until start_timestamp . In this case it should end.

But one can reuse https://pypi.org/project/pipelinewise-tap-kafka/ with vdk-singer .

And the way they have handled it is to use max_runtime_ms (The maximum time for the tap to collect new messages from Kafka topic) to end the ingestion batch.

flow_input.start(DataFlowMappingDefinition(source, destination))
```

or in config.toml file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a toml file? I could obviously google it - we configure data jobs with INI files, and now TOML files for something else?

Copy link
Collaborator Author

@antoniivanov antoniivanov Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been considering replacing ini with TOML because

  • it supported nested structures of data
  • it supports arrays
  • it supports data types !!
  • it's somewhat similar in ini in syntax - existing ini files can be parsed by TOML parser so a change can be pretty backward compatible.

It would not have been feasible/easy to use ini format the above data flow structure. So I decided to use TOML now as an experiment to see if it's going to work for users.

We need to move away from ini due to above reasons and we've had users who have requested to support more "modern" format.
The other alternative is Yaml. But yaml is pretty ugly for highly nested configurations and would make migration from ini to yaml more involved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally agree with everything you said, however I would be great to have a more structured approach, e.g. use TOML everywhere, rather than use it here and use something else in a different place.

def _generate_test_data(self, start_id: int) -> List[DataSourcePayload]:
generated_data = []
for i in range(self._config.num_records):
data = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A data stream, which generates data about data streams... Seems a bit tautological... what about a more common/well known/relatable use case like Employees or shapes ... animals?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah I probably could have come with some more interesting example. I will leave it for now though since there are already lots of tests that expect this data. But I will change it later.

It's part of a definition of a data source - what configuraiton it requires.
You need to implement a class and decoreated with @config_class decorator like this:

Example::
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example full of "examples" is not very useful. Try to make it more relatable please.



@dataclass
class DataSourceRegistryItem:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Registry Item" - isn't this just a "data source"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It keeps the classes hat are needed to create the data source. It's not the data source. I am not sure of a better name

class DataSourceIngester:
def __init__(self, job_input: IJobInput):
self.__ingestion_queue = Queue()
self.__actual_ingester = cast(IIngester, job_input)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"actual ingester"? is there another one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the class is called "DataSourceIngester" so the actual ingester that is going ot send the data to the target is the __actual_ingester..

Feel free to suggest a better name and I will rename it.

return queue_item is None

def _ingest_stream(self, ingest_entry: IngestQueueEntry):
for payload in ingest_entry.stream.read():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do you stop reading?

Copy link
Collaborator Author

@antoniivanov antoniivanov Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the stream decides to stop or the data jobs times out.

destinations: List[IngestDestination] = None,
error_callback: Optional[IDataSourceErrorCallback] = None,
):
if data_source_id not in self.__ingested_streams_set:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ingested" streams? What if the ingestion fails? I would rename it to something something like "being_ingested" or "ingesting" streams?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

antoniivanov added a commit that referenced this pull request Nov 2, 2023
This is adding a data source plugin for singer.io.

So now users can specify singer taps as data sources

They can also list all singer taps that can be found with `vdk singer
--list-taps`

The change depends on
#2805
antoniivanov added a commit that referenced this pull request Nov 2, 2023
Addressing revew comments from
#2805
@antoniivanov
Copy link
Collaborator Author

Thanks for the review @dakodakov I have asnwered the comments above and address the changes in this PR #2865

antoniivanov added a commit that referenced this pull request Nov 2, 2023
Addressing revew comments from
#2805
antoniivanov added a commit that referenced this pull request Nov 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants