Skip to content

Commit

Permalink
🎉 Source Klaviyo: improve performance and make Global Exclusions stre…
Browse files Browse the repository at this point in the history
…am incremental (airbytehq#8592)

* improve performance of global_exclusion stream
+ make it semi-incremental by introducing a reverse incremental stream
+ increase page_size from 100 to 5k
+ set cursor_field
+ remove validation of the records by pydantic models
* enable Metrics stream

Co-authored-by: Eugene Kulak <[email protected]>
  • Loading branch information
2 people authored and schlattk committed Jan 4, 2022
1 parent f26425b commit 3e28452
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@
- name: Klaviyo
sourceDefinitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
dockerRepository: airbyte/source-klaviyo
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/klaviyo
icon: klaviyo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3269,7 +3269,7 @@
supported_destination_sync_modes: []
supported_source_sync_modes:
- "append"
- dockerImage: "airbyte/source-klaviyo:0.1.2"
- dockerImage: "airbyte/source-klaviyo:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"
changelogUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-klaviyo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-klaviyo
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
events: [ "timestamp" ]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"events": {
"timestamp": 9621295127
},
"global_exclusions": {
"timestamp": "2120-10-10T00:00:00Z"
}
}
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-klaviyo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,6 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .source import SourceKlaviyo

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@


from datetime import datetime
from typing import Any, List, MutableMapping, Optional
from typing import Any, Dict, List, MutableMapping, Optional

from jsonschema import RefResolver
from pydantic import BaseModel, Extra


Expand All @@ -29,6 +30,36 @@ def schema_extra(schema: MutableMapping[str, Any], model) -> None:
ref = prop.pop("$ref")
prop["oneOf"] = [{"type": "null"}, {"$ref": ref}]

@classmethod
def _expand_refs(cls, schema: Any, ref_resolver: Optional[RefResolver] = None) -> None:
"""Iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
:param schema: schema that will be patched
:param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated
"""
ref_resolver = ref_resolver or RefResolver.from_schema(schema)

if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")
_, definition = ref_resolver.resolve(ref_url)
cls._expand_refs(definition, ref_resolver=ref_resolver) # expand refs in definitions as well
schema.update(definition)
else:
for key, value in schema.items():
cls._expand_refs(value, ref_resolver=ref_resolver)
elif isinstance(schema, List):
for value in schema:
cls._expand_refs(value, ref_resolver=ref_resolver)

@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(**kwargs)
cls._expand_refs(schema) # UI and destination doesn't support $ref's
schema.pop("definitions", None) # remove definitions created by $ref
return schema

object: str


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class Config:


class SourceKlaviyo(AbstractSource):
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
"""Connection check to validate that the user-provided config can be used to connect to the underlying API
:param config: the user-input config object conforming to the connector's spec.json
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
:return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
ok = False
error_msg = None
Expand All @@ -58,8 +58,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
return [
Campaigns(api_key=config.api_key),
Events(api_key=config.api_key, start_date=config.start_date),
GlobalExclusions(api_key=config.api_key),
GlobalExclusions(api_key=config.api_key, start_date=config.start_date),
Lists(api_key=config.api_key),
Metrics(api_key=config.api_key),
]

def spec(self, *args, **kwargs) -> ConnectorSpecification:
Expand Down
157 changes: 122 additions & 35 deletions airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
import requests
Expand Down Expand Up @@ -46,7 +45,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""Usually contains common params e.g. pagination size etc."""
next_page_token = next_page_token or {}
Expand All @@ -58,53 +57,23 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
"""
response_json = response.json()
for record in response_json.get("data", []): # API returns records in a container array "data"
self.schema.parse_obj(record) # validate with schema first
yield record

def get_json_schema(self) -> Mapping[str, Any]:
"""Use Pydantic schema"""
return self.schema.schema()


class Campaigns(KlaviyoStream):
schema = Campaign

def path(self, **kwargs) -> str:
return "campaigns"


class Lists(KlaviyoStream):
schema = PersonList

def path(self, **kwargs) -> str:
return "lists"


class GlobalExclusions(KlaviyoStream):
schema = GlobalExclusion
primary_key = "email"

def path(self, **kwargs) -> str:
return "people/exclusions"


class Metrics(KlaviyoStream):
schema = Metric

def path(self, **kwargs) -> str:
return "metrics"


class IncrementalKlaviyoStream(KlaviyoStream, ABC):
state_checkpoint_interval = 100
"""Base class for all incremental streams, requires cursor_field to be declared"""

def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
self._start_ts = int(pendulum.parse(start_date).timestamp())

@property
@abstractmethod
def cursor_field(self) -> str:
def cursor_field(self) -> Union[str, List[str]]:
"""
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental.
Expand All @@ -113,6 +82,7 @@ def cursor_field(self) -> str:
"""

def request_params(self, stream_state=None, **kwargs):
"""Add incremental filters"""
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)

Expand Down Expand Up @@ -147,7 +117,124 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None


class ReverseIncrementalKlaviyoStream(KlaviyoStream, ABC):
"""Base class for all streams that natively incremental but supports desc & asc order"""

def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
self._start_datetime = pendulum.parse(start_date)
self._reversed = False
self._reached_old_records = False
self._low_boundary = None

@property
def state_checkpoint_interval(self) -> Optional[int]:
"""How often to checkpoint state (i.e: emit a STATE message). By default return the same value as page_size"""
return None if self._reversed else self.page_size

@property
@abstractmethod
def cursor_field(self) -> Union[str, List[str]]:
"""
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental.
:return str: The name of the cursor field.
"""

def request_params(self, stream_state=None, **kwargs):
"""Add incremental filters"""
stream_state = stream_state or {}
if stream_state:
self._reversed = True
self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime)
params = super().request_params(stream_state=stream_state, **kwargs)
params["sort"] = "desc" if self._reversed else "asc"

return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
latest_cursor = pendulum.parse(latest_record[self.cursor_field])
if current_stream_state:
latest_cursor = max(pendulum.parse(latest_record[self.cursor_field]), pendulum.parse(current_stream_state[self.cursor_field]))
return {self.cursor_field: latest_cursor.isoformat()}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
next_page_token = super().next_page_token(response)
if self._reversed and self._reached_old_records:
return None

return next_page_token

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
:return an iterable containing each record in the response
"""
for record in super().parse_response(response=response, **kwargs):
if self._reversed:
if pendulum.parse(record[self.cursor_field]) < self._low_boundary:
self._reached_old_records = True
continue
else:
if pendulum.parse(record[self.cursor_field]) < self._start_datetime:
continue
yield record


class Campaigns(KlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get-campaigns"""

schema = Campaign

def path(self, **kwargs) -> str:
return "campaigns"


class Lists(KlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get-lists"""

schema = PersonList

def path(self, **kwargs) -> str:
return "lists"


class GlobalExclusions(ReverseIncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get-global-exclusions"""

schema = GlobalExclusion
page_size = 5000 # the maximum value allowed by API
cursor_field = "timestamp"
primary_key = "email"

def path(self, **kwargs) -> str:
return "people/exclusions"


class Metrics(KlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get-metrics"""

schema = Metric

def path(self, **kwargs) -> str:
return "metrics"


class Events(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/metrics-timeline"""

schema = Event
cursor_field = "timestamp"

Expand Down
Loading

0 comments on commit 3e28452

Please sign in to comment.