Skip to content

Commit

Permalink
🎉 CDK: Added support for efficient parent/child streams using cache (#…
Browse files Browse the repository at this point in the history
…6057)

* Add caching

* Upd cache file handling

* Upd slices, sync mode, docs

* Bump version

* Use SyncMode.full_refresh for parent stream_slices

* Refactor
  • Loading branch information
gaart authored Sep 22, 2021
1 parent 548a3a3 commit 9aa5a5a
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 5 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.23
Added the ability to use caching for efficient synchronization of nested streams.

## 0.1.22
Allow passing custom headers to request in `OAuth2Authenticator.refresh_access_token()`: https://github.com/airbytehq/airbyte/pull/6219

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Initialize Streams Package
from .exceptions import UserDefinedBackoffException
from .http import HttpStream
from .http import HttpStream, HttpSubStream

__all__ = ["HttpStream", "UserDefinedBackoffException"]
__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
82 changes: 81 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
#


import os
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
import vcr
import vcr.cassette as Cassette

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from requests.auth import AuthBase
Expand Down Expand Up @@ -57,6 +61,39 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None):
elif authenticator:
self._authenticator = authenticator

if self.use_cache:
self.cache_file = self.request_cache()
# we need this attr to get metadata about cassettes, such as record play count, all records played, etc.
self.cassete = None

@property
def cache_filename(self):
"""
Override if needed. Return the name of cache file
"""
return f'{self.name}.yml'

@property
def use_cache(self):
"""
Override if needed. If True, all records will be cached.
"""
return False

def request_cache(self) -> Cassette:
"""
Builds VCR instance.
It deletes file everytime we create it, normally should be called only once.
We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files.
"""

try:
os.remove(self.cache_filename)
except FileNotFoundError:
pass

return vcr.use_cassette(self.cache_filename, record_mode="new_episodes", serializer="yaml")

@property
@abstractmethod
def url_base(self) -> str:
Expand Down Expand Up @@ -322,7 +359,18 @@ def read_records(
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)

if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)

else:
response = self._send_request(request, request_kwargs)

yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
Expand All @@ -331,3 +379,35 @@ def read_records(

# Always return an empty generator just in case no records were ever yielded
yield from []


class HttpSubStream(HttpStream, ABC):

def __init__(self, parent: HttpStream, **kwargs):
"""
:param parent: should be the instance of HttpStream class
"""
super().__init__(**kwargs)
self.parent = parent

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
parent_stream_slices = self.parent.stream_slices(
sync_mode=SyncMode.full_refresh,
cursor_field=cursor_field,
stream_state=stream_state
)

# iterate over all parent stream_slices
for stream_slice in parent_stream_slices:
parent_records = self.parent.read_records(
sync_mode=SyncMode.full_refresh,
cursor_field=cursor_field,
stream_slice=stream_slice,
stream_state=stream_state
)

# iterate over all parent records with current stream_slice
for record in parent_records:
yield {"parent": record}
5 changes: 5 additions & 0 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe

When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.

### Caching

When we are dealing with streams that depend on the results of another stream, we can use caching to write the data of the parent stream to a file in order to use this data when the child stream synchronizes, rather than performing a full HTTP request again. We can turn on caching by overriding use_cache property, and use HttpSubStream class as base class of child stream.

### Network Adapter Keyword arguments

If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.22",
version="0.1.23",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -72,6 +72,7 @@
"pydantic~=1.6",
"PyYAML~=5.4",
"requests",
"vcrpy",
"Deprecated~=1.2",
],
python_requires=">=3.7.0",
Expand Down
80 changes: 79 additions & 1 deletion airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator as HttpTokenAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
Expand Down Expand Up @@ -371,3 +371,81 @@ def test_body_for_all_methods(self, mocker, requests_mock):
assert response["body"] == self.data_body
else:
assert response["body"] is None


class CacheHttpStream(StubBasicReadHttpStream):
use_cache = True


class CacheHttpSubStream(HttpSubStream):
url_base = "https://example.com"
primary_key = ""

def __init__(self, parent):
super().__init__(parent=parent)

def parse_response(self, **kwargs) -> Iterable[Mapping]:
return []

def next_page_token(self, **kwargs) -> Optional[Mapping[str, Any]]:
return None

def path(self, **kwargs) -> str:
return ""


def test_caching_filename():
stream = CacheHttpStream()
assert stream.cache_filename == f"{stream.name}.yml"


def test_caching_cassettes_are_different():
stream_1 = CacheHttpStream()
stream_2 = CacheHttpStream()

assert stream_1.cache_file != stream_2.cache_file


def test_parent_attribute_exist():
parent_stream = CacheHttpStream()
child_stream = CacheHttpSubStream(parent=parent_stream)

assert child_stream.parent == parent_stream


def test_cache_response(mocker):
stream = CacheHttpStream()
mocker.patch.object(stream, "url_base", "https://google.com/")
list(stream.read_records(sync_mode=SyncMode.full_refresh))

with open(stream.cache_filename, 'r') as f:
assert f.read()


class CacheHttpStreamWithSlices(CacheHttpStream):
paths = ['', 'search']

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f'{stream_slice.get("path")}'

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for path in self.paths:
yield {"path": path}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield response


def test_using_cache(mocker):
parent_stream = CacheHttpStreamWithSlices()
mocker.patch.object(parent_stream, "url_base", "https://google.com/")

for _slice in parent_stream.stream_slices():
list(parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice))

child_stream = CacheHttpSubStream(parent=parent_stream)

for _slice in child_stream.stream_slices(sync_mode=SyncMode.full_refresh):
pass

assert parent_stream.cassete.play_count != 0

0 comments on commit 9aa5a5a

Please sign in to comment.