Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Klaviyo: use CheckpointMixin for handling state updates #38879

Merged
merged 9 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.2.2@sha256:57703de3b4c4204bd68a7b13c9300f8e03c0189bffddaffc796f1da25d2dbea0
dockerImageTag: 2.6.3
dockerImageTag: 2.6.4
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
378 changes: 336 additions & 42 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-klaviyo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.6.3"
version = "2.6.4"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_klaviyo"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^0"
airbyte-cdk = "=0.90.0"

[tool.poetry.scripts]
source-klaviyo = "source_klaviyo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,36 @@
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.core import CheckpointMixin, StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from requests import Response

from .availability_strategy import KlaviyoAvailabilityStrategy
from .exceptions import KlaviyoBackoffError


class KlaviyoStream(HttpStream, ABC):
class KlaviyoStream(HttpStream, CheckpointMixin, ABC):
"""Base stream for api version v2023-10-15"""

url_base = "https://a.klaviyo.com/api/"
primary_key = "id"
page_size = None
api_revision = "2023-10-15"

@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state = value

def __init__(self, api_key: str, start_date: Optional[str] = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._api_key = api_key
self._start_ts = start_date
if not hasattr(self, "_state"):
self._state = {}

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
Expand Down Expand Up @@ -86,7 +96,7 @@ def map_record(self, record: MutableMapping[str, Any]) -> MutableMapping[str, An
record[self.cursor_field] = record["attributes"][self.cursor_field]
return record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record.
This typically compared the cursor_field from the latest record and the current state and picks
Expand Down Expand Up @@ -118,8 +128,13 @@ def read_records(
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:

current_state = self.state or {}
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
for record in super().read_records(sync_mode, cursor_field, stream_slice, current_state):
self.state = self._get_updated_state(current_state, record)
yield record

except KlaviyoBackoffError as e:
self.logger.warning(repr(e))

Expand Down Expand Up @@ -167,7 +182,7 @@ def request_params(
class IncrementalKlaviyoStreamWithArchivedRecords(IncrementalKlaviyoStream, ABC):
"""A base class which should be used when archived records need to be read"""

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Extend the stream state with `archived` property to store such records' state separately from the stream state
"""
Expand All @@ -180,7 +195,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_stream_state["archived"] = {self.cursor_field: latest_archived_cursor.isoformat()}
return current_stream_state
else:
return super().get_updated_state(current_stream_state, latest_record)
return super()._get_updated_state(current_stream_state, latest_record)

def stream_slices(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_request_params(self, config_start_date, stream_state_date, next_page_to
)
def test_get_updated_state(self, config_start_date, current_cursor, latest_cursor, expected_cursor):
stream = SomeIncrementalStream(api_key=API_KEY, start_date=config_start_date)
assert stream.get_updated_state(
assert stream._get_updated_state(
current_stream_state={stream.cursor_field: current_cursor} if current_cursor else {},
latest_record={stream.cursor_field: latest_cursor},
) == {stream.cursor_field: expected_cursor}
Expand Down Expand Up @@ -446,7 +446,7 @@ def test_read_records(self, requests_mock):
)
def test_get_updated_state(self, latest_record, current_stream_state, expected_state):
stream = Campaigns(api_key=API_KEY)
assert stream.get_updated_state(current_stream_state, latest_record) == expected_state
assert stream._get_updated_state(current_stream_state, latest_record) == expected_state

def test_stream_slices(self):
stream = Campaigns(api_key=API_KEY)
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ Stream `Lists Detailed` contains field `profile_count` in addition to info from

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|
| 2.6.3 | 2024-06-04 | [38935](https://github.com/airbytehq/airbyte/pull/38935) | [autopull] Upgrade base image to v1.2.1 |
| `2.6.4` | 2024-06-06 | [38879](https://github.com/airbytehq/airbyte/pull/38879) | Implement `CheckpointMixin` for handling state in Python streams |
| `2.6.3` | 2024-06-04 | [38935](https://github.com/airbytehq/airbyte/pull/38935) | [autopull] Upgrade base image to v1.2.1 |
| `2.6.2` | 2024-05-08 | [37789](https://github.com/airbytehq/airbyte/pull/37789) | Move stream schemas and spec to manifest |
| `2.6.1` | 2024-05-07 | [38010](https://github.com/airbytehq/airbyte/pull/38010) | Add error handler for `5XX` status codes |
| `2.6.0` | 2024-04-19 | [37370](https://github.com/airbytehq/airbyte/pull/37370) | Add streams `campaigns_detailed` and `lists_detailed` |
Expand Down Expand Up @@ -107,4 +108,4 @@ Stream `Lists Detailed` contains field `profile_count` in addition to info from
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |

</details>
</details>
Loading