From 9aa5a5a52d1b9d34ff14cc597ebc1ea31f382e10 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Wed, 22 Sep 2021 20:23:27 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20CDK:=20Added=20support=20for=20e?= =?UTF-8?q?fficient=20parent/child=20streams=20using=20cache=20(#6057)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add caching * Upd cache file handling * Upd slices, sync mode, docs * Bump version * Use SyncMode.full_refresh for parent stream_slices * Refactor --- airbyte-cdk/python/CHANGELOG.md | 3 + .../sources/streams/http/__init__.py | 4 +- .../airbyte_cdk/sources/streams/http/http.py | 82 ++++++++++++++++++- .../python/docs/concepts/http-streams.md | 5 ++ airbyte-cdk/python/setup.py | 3 +- .../sources/streams/http/test_http.py | 80 +++++++++++++++++- 6 files changed, 172 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 29074f4a791d..e83176b4172a 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.23 +Added the ability to use caching for efficient synchronization of nested streams. + ## 0.1.22 Allow passing custom headers to request in `OAuth2Authenticator.refresh_access_token()`: https://github.com/airbytehq/airbyte/pull/6219 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/__init__.py index 583e0cc66c92..044328d4008b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/__init__.py @@ -1,5 +1,5 @@ # Initialize Streams Package from .exceptions import UserDefinedBackoffException -from .http import HttpStream +from .http import HttpStream, HttpSubStream -__all__ = ["HttpStream", "UserDefinedBackoffException"] +__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 9d7575bd8154..5e32ae49497a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -23,10 +23,14 @@ # +import os from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import requests +import vcr +import vcr.cassette as Cassette + from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.core import Stream from requests.auth import AuthBase @@ -57,6 +61,39 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None): elif authenticator: self._authenticator = authenticator + if self.use_cache: + self.cache_file = self.request_cache() + # we need this attr to get metadata about cassettes, such as record play count, all records played, etc. + self.cassete = None + + @property + def cache_filename(self): + """ + Override if needed. Return the name of cache file + """ + return f'{self.name}.yml' + + @property + def use_cache(self): + """ + Override if needed. If True, all records will be cached. + """ + return False + + def request_cache(self) -> Cassette: + """ + Builds VCR instance. + It deletes file everytime we create it, normally should be called only once. + We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. + """ + + try: + os.remove(self.cache_filename) + except FileNotFoundError: + pass + + return vcr.use_cassette(self.cache_filename, record_mode="new_episodes", serializer="yaml") + @property @abstractmethod def url_base(self) -> str: @@ -322,7 +359,18 @@ def read_records( data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), ) request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - response = self._send_request(request, request_kwargs) + + if self.use_cache: + # use context manager to handle and store cassette metadata + with self.cache_file as cass: + self.cassete = cass + # vcr tries to find records based on the request, if such records exist, return from cache file + # else make a request and save record in cache file + response = self._send_request(request, request_kwargs) + + else: + response = self._send_request(request, request_kwargs) + yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) next_page_token = self.next_page_token(response) @@ -331,3 +379,35 @@ def read_records( # Always return an empty generator just in case no records were ever yielded yield from [] + + +class HttpSubStream(HttpStream, ABC): + + def __init__(self, parent: HttpStream, **kwargs): + """ + :param parent: should be the instance of HttpStream class + """ + super().__init__(**kwargs) + self.parent = parent + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + parent_stream_slices = self.parent.stream_slices( + sync_mode=SyncMode.full_refresh, + cursor_field=cursor_field, + stream_state=stream_state + ) + + # iterate over all parent stream_slices + for stream_slice in parent_stream_slices: + parent_records = self.parent.read_records( + sync_mode=SyncMode.full_refresh, + cursor_field=cursor_field, + stream_slice=stream_slice, + stream_state=stream_state + ) + + # iterate over all parent records with current stream_slice + for record in parent_records: + yield {"parent": record} diff --git a/airbyte-cdk/python/docs/concepts/http-streams.md b/airbyte-cdk/python/docs/concepts/http-streams.md index 12fda0eca2cb..5bedb787c747 100644 --- a/airbyte-cdk/python/docs/concepts/http-streams.md +++ b/airbyte-cdk/python/docs/concepts/http-streams.md @@ -72,7 +72,12 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value. +### Caching + +When we are dealing with streams that depend on the results of another stream, we can use caching to write the data of the parent stream to a file in order to use this data when the child stream synchronizes, rather than performing a full HTTP request again. We can turn on caching by overriding use_cache property, and use HttpSubStream class as base class of child stream. + ### Network Adapter Keyword arguments + If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc.. override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can be returned as a keyword argument. diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 104c67d4b94b..47b58281d845 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.22", + version="0.1.23", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -72,6 +72,7 @@ "pydantic~=1.6", "PyYAML~=5.4", "requests", + "vcrpy", "Deprecated~=1.2", ], python_requires=">=3.7.0", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 591e2cc8003e..d6758b6ce457 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -31,7 +31,7 @@ import pytest import requests from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.auth import NoAuth from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator as HttpTokenAuthenticator from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException @@ -371,3 +371,81 @@ def test_body_for_all_methods(self, mocker, requests_mock): assert response["body"] == self.data_body else: assert response["body"] is None + + +class CacheHttpStream(StubBasicReadHttpStream): + use_cache = True + + +class CacheHttpSubStream(HttpSubStream): + url_base = "https://example.com" + primary_key = "" + + def __init__(self, parent): + super().__init__(parent=parent) + + def parse_response(self, **kwargs) -> Iterable[Mapping]: + return [] + + def next_page_token(self, **kwargs) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + +def test_caching_filename(): + stream = CacheHttpStream() + assert stream.cache_filename == f"{stream.name}.yml" + + +def test_caching_cassettes_are_different(): + stream_1 = CacheHttpStream() + stream_2 = CacheHttpStream() + + assert stream_1.cache_file != stream_2.cache_file + + +def test_parent_attribute_exist(): + parent_stream = CacheHttpStream() + child_stream = CacheHttpSubStream(parent=parent_stream) + + assert child_stream.parent == parent_stream + + +def test_cache_response(mocker): + stream = CacheHttpStream() + mocker.patch.object(stream, "url_base", "https://google.com/") + list(stream.read_records(sync_mode=SyncMode.full_refresh)) + + with open(stream.cache_filename, 'r') as f: + assert f.read() + + +class CacheHttpStreamWithSlices(CacheHttpStream): + paths = ['', 'search'] + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f'{stream_slice.get("path")}' + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + for path in self.paths: + yield {"path": path} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response + + +def test_using_cache(mocker): + parent_stream = CacheHttpStreamWithSlices() + mocker.patch.object(parent_stream, "url_base", "https://google.com/") + + for _slice in parent_stream.stream_slices(): + list(parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice)) + + child_stream = CacheHttpSubStream(parent=parent_stream) + + for _slice in child_stream.stream_slices(sync_mode=SyncMode.full_refresh): + pass + + assert parent_stream.cassete.play_count != 0