-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 CDK: Added support for efficient parent/child streams using cache #6057
Changes from 5 commits
4036d9d
550f757
0fa705f
8398985
66b5344
40e9a09
4376194
5708545
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# Initialize Streams Package | ||
from .exceptions import UserDefinedBackoffException | ||
from .http import HttpStream | ||
from .http import HttpStream, HttpSubStream | ||
|
||
__all__ = ["HttpStream", "UserDefinedBackoffException"] | ||
__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,10 +23,14 @@ | |
# | ||
|
||
|
||
import os | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union | ||
|
||
import requests | ||
import vcr | ||
import vcr.cassette as Cassette | ||
|
||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.streams.core import Stream | ||
from requests.auth import AuthBase | ||
|
@@ -57,6 +61,39 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None): | |
elif authenticator: | ||
self._authenticator = authenticator | ||
|
||
if self.use_cache: | ||
self.cache_file = self.request_cache() | ||
# we need this attr to get metadata about cassettes, such as record play count, all records played, etc. | ||
self.cass = None | ||
|
||
@property | ||
def cache_filename(self): | ||
""" | ||
Override if needed. Return the name of cache file | ||
""" | ||
return f'{self.name}.yml' | ||
|
||
@property | ||
def use_cache(self): | ||
""" | ||
Override if needed. If True, all records will be cached. | ||
""" | ||
return False | ||
|
||
def request_cache(self) -> Cassette: | ||
""" | ||
Builds VCR instance. | ||
It deletes file everytime we create it, normally should be called only once. | ||
We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. | ||
""" | ||
|
||
try: | ||
os.remove(self.cache_filename) | ||
except FileNotFoundError: | ||
pass | ||
|
||
return vcr.use_cassette(self.cache_filename, record_mode="new_episodes", serializer="yaml") | ||
|
||
@property | ||
@abstractmethod | ||
def url_base(self) -> str: | ||
|
@@ -322,7 +359,18 @@ def read_records( | |
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
) | ||
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
response = self._send_request(request, request_kwargs) | ||
|
||
if self.use_cache: | ||
# use context manager to handle and store cassette metadata | ||
with self.cache_file as cass: | ||
self.cass = cass | ||
# vcr tries to find records based on the request, if such records exist, return from cache file | ||
# else make a request and save record in cache file | ||
response = self._send_request(request, request_kwargs) | ||
|
||
else: | ||
response = self._send_request(request, request_kwargs) | ||
|
||
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) | ||
|
||
next_page_token = self.next_page_token(response) | ||
|
@@ -331,3 +379,35 @@ def read_records( | |
|
||
# Always return an empty generator just in case no records were ever yielded | ||
yield from [] | ||
|
||
|
||
class HttpSubStream(HttpStream, ABC): | ||
|
||
def __init__(self, parent: HttpStream, **kwargs): | ||
""" | ||
:param parent: should be the instance of HttpStream class | ||
""" | ||
super().__init__(**kwargs) | ||
self.parent = parent | ||
|
||
def stream_slices( | ||
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None | ||
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
parent_stream_slices = self.parent.stream_slices( | ||
sync_mode=SyncMode.full_refresh, | ||
cursor_field=cursor_field, | ||
stream_state=stream_state | ||
) | ||
|
||
# iterate over all parent stream_slices | ||
for stream_slice in parent_stream_slices: | ||
parent_records = self.parent.read_records( | ||
sync_mode=sync_mode, | ||
cursor_field=cursor_field, | ||
stream_slice=stream_slice, | ||
stream_state=stream_state | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we're making the read_records call on the parent stream using the substream's sync mode and state. Are there scenarios where this means we could miss relevant data if we're in incremental and have a recent state? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, just fixed it |
||
) | ||
|
||
# iterate over all parent records with current stream_slice | ||
for record in parent_records: | ||
yield {"parent": record} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't guarantee time-order of slices right? We can assume that the iteration There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's right There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok now that you've changed above to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?