Skip to content

Commit

Permalink
vdk-data-sources: address review comments
Browse files Browse the repository at this point in the history
Addressing revew comments from
#2805
  • Loading branch information
antoniivanov committed Nov 2, 2023
1 parent c245154 commit 8042cff
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 40 deletions.
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-data-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ The data-sources project is a plugin for the Versatile Data Kit (VDK). It aims t

## Usage

### Installation

```
pip install vdk-data-sources
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def __init__(
):
self._config = config
self._stream_number = stream_number
self._data = self._generate_test_data(start_id)
self._data = self._generate_data(start_id)

def _generate_test_data(self, start_id: int) -> List[DataSourcePayload]:
def _generate_data(self, start_id: int) -> List[DataSourcePayload]:
generated_data = []
for i in range(self._config.num_records):
data = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ class IDataSourceConfiguration:
Example::
@config_class(
name="example", description="Data source example for example purposese"
)
class ExampleDataSourceConfiguration(IDataSourceConfiguration):
num_records: int = config_field(
description="Number of records to return", default=2
# Let's say we are developing dats source to extract data from Stack Overflow API
@config_class(name="stackoverflow-api", description="Extract data from StackOverflow API")
class StackOverflowDataSourceConfiguration(IDataSourceConfiguration):
api_url: str = config_field(description="StackOverflow API URL")
api_key: Optional[str] = config_field(description="API key", default="")
api_endpoints: List[str] = config_field(
description="List of API endpoints", default=['answers', 'questions', 'posts']
)
...
Expand Down Expand Up @@ -75,12 +77,15 @@ class IDataSourceStream:
:Example::
class MyStream(IDataSourceStream):
def name(self):
return "My Stream"
# Implement the actual data extraction from StackOverflow API
class StackOverflowDataSourceStream(IDataSourceStream):
def read(self):
yield DataSourcePayload(...)
def __init__(self, endpoint_url: str):
self._endpoint_url = endpoint_url
def read(self) -> Iterator[DataSourcePayload]:
for item in stackoverflow_client.get(self._endpoint_url):
yield DataSourcePayload(item)
"""

Expand Down Expand Up @@ -108,19 +113,24 @@ class IDataSource:
:Example::
@data_source(name="my", config_class=MyDataSourceConfig)
class MyDataSource(IDataSource):
def configure(self, config: MyDataSourceConfig):
pass
# Let's build data source for Stack Overflow API. We will split each API endpoint data into separate stream
def connect(self, state):
pass
@data_source(name="stackoverflow-api", config_class=StackOverflowDataSourceConfiguration)
class StackOverflowDataSource(IDataSource):
def disconnect(self):
pass
def configure(self, config: IDataSourceConfiguration):
self._config = config
def connect(self, state: IDataSourceState):
if not self._streams:
self._streams = [StackOverflowDataSourceStream(endpoint) for endpoint in self._config.api_endpoints]
def streams(self):
return [MyStream()]
def streams(self) -> List[IDataSourceStream]:
return self._streams
def disconnect(self):
close_all_streams(self._streams)
self._streams = []
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from threading import Thread
from typing import Callable
from typing import cast
from typing import Dict
from typing import List
from typing import Optional

Expand Down Expand Up @@ -61,7 +60,7 @@ def __init__(self, job_input: IJobInput):
self.__ingestion_queue = Queue()
self.__actual_ingester = cast(IIngester, job_input)
self.__worker_threads = self._start_workers(8)
self.__ingested_streams_set = set()
self.__being_ingested_streams = set()
self.__stored_exceptions = queue.SimpleQueue()
self.__state_factory = DataSourceStateFactory(
PropertiesBasedDataSourceStorage(job_input)
Expand Down Expand Up @@ -197,8 +196,8 @@ def start_ingestion(
destinations: List[IngestDestination] = None,
error_callback: Optional[IDataSourceErrorCallback] = None,
):
if data_source_id not in self.__ingested_streams_set:
self.__ingested_streams_set.add(data_source_id)
if data_source_id not in self.__being_ingested_streams:
self.__being_ingested_streams.add(data_source_id)
else:
raise ValueError(
f"Data source is already in ingestion queue. "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional

from vdk.api.job_input import IJobInput
from vdk.plugin.data_sources.factory import SingletonDataSourceFactory
from vdk.plugin.data_sources.ingester import DataSourceIngester
from vdk.plugin.data_sources.ingester import IngestDestination
from vdk.plugin.data_sources.mapping.definitions import DataFlowMappingDefinition
from vdk.plugin.data_sources.mapping.definitions import Definitions
from vdk.plugin.data_sources.mapping.definitions import DestinationDefinition
from vdk.plugin.data_sources.mapping.definitions import SourceDefinition


class DataFlowInput:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from typing import Optional

from vdk.plugin.data_sources.data_source import DataSourcePayload
from vdk.plugin.data_sources.ingester import DataSourceIngester


@dataclass
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import fields
from dataclasses import is_dataclass
from typing import Any
from typing import Dict
from typing import Type
from typing import TypeVar

import toml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
SingletonDataSourceFactory,
)
from vdk.plugin.data_sources.ingester import DataSourceIngester
from vdk.plugin.data_sources.state import DataSourceStateFactory


def run(job_input: IJobInput):
Expand Down

0 comments on commit 8042cff

Please sign in to comment.