Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-klaviyo): Add API Budget #53223

Merged
merged 10 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:3.0.0@sha256:1a0845ff2b30eafa793c6eee4e8f4283c2e52e1bbd44eed6cb9e9abd5d34d844
dockerImageTag: 2.11.11
dockerImageTag: 2.12.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
445 changes: 344 additions & 101 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.11.11"
version = "2.12.0"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -18,6 +18,7 @@ include = "source_klaviyo"
[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte_cdk = "^6"
pendulum = "<3.0.0"

[tool.poetry.scripts]
source-klaviyo = "source_klaviyo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@ definitions:
requester: "#/definitions/requester"
paginator: "#/definitions/paginator"

semi_incremental_retriever:
$ref: "#/definitions/base_retriever"
record_selector:
$ref: "#/definitions/selector"
record_filter:
# Removing the record filter will migrate `lists_detailed` to concurrency,
# which will cause it to fail due to rate limits.
# Rate limiting for this stream should be implemented first.
type: RecordFilter
condition: |
{% set starting_point = stream_state.get('updated', config.get('start_date')) %}
{{ starting_point and record.get('attributes', {}).get('updated') > starting_point or not starting_point }}

profiles_retriever:
$ref: "#/definitions/base_retriever"
paginator:
Expand Down Expand Up @@ -106,12 +93,13 @@ definitions:

base_semi_incremental_stream:
$ref: "#/definitions/base_stream"
retriever: "#/definitions/semi_incremental_retriever"
retriever: "#/definitions/base_retriever"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: "updated"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
start_datetime: "{{ config.get('start_date', '2012-01-01T00:00:00Z') }}"
is_client_side_incremental: true

# Incremental streams
profiles_stream:
Expand Down Expand Up @@ -253,7 +241,7 @@ definitions:
type: InlineSchemaLoader
schema: "#/definitions/lists_detailed_schema"
retriever:
$ref: "#/definitions/semi_incremental_retriever"
$ref: "#/definitions/base_retriever"
requester:
$ref: "#/definitions/requester"
request_parameters:
Expand Down Expand Up @@ -1061,6 +1049,75 @@ metadata:
primaryKeysArePresent: true
primaryKeysAreUnique: true

api_budget:
type: HTTPAPIBudget
# Each policy here uses a MovingWindowCallRatePolicy with two rates:
# one for burst (per-second) and one for steady (per-minute).
policies:
# Profiles (and global_exclusions share the same endpoint)
- type: MovingWindowCallRatePolicy
rates:
- limit: 10 # burst: 10 calls per second
interval: PT1S
- limit: 150 # steady: 150 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/profiles($|/)" # matches '/profiles' (exact or with trailing slash/extra)
# Events (and events_detailed share the same endpoint)
- type: MovingWindowCallRatePolicy
rates:
- limit: 350 # burst: 350 calls per second
interval: PT1S
- limit: 3500 # steady: 3500 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/events($|/)" # matches '/events' and '/events_detailed' if using same endpoint
# Email Templates
- type: MovingWindowCallRatePolicy
rates:
- limit: 10 # burst: 10 calls per second
interval: PT1S
- limit: 150 # steady: 150 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/templates($|/)" # matches '/templates'
# Metrics
- type: MovingWindowCallRatePolicy
rates:
- limit: 10 # burst: 10 calls per second
interval: PT1S
- limit: 150 # steady: 150 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/metrics($|/)"
# Lists (the parent endpoint for lists streams)
- type: MovingWindowCallRatePolicy
rates:
- limit: 75 # burst: 75 calls per second
interval: PT1S
- limit: 700 # steady: 700 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/lists$" # exactly '/lists'
# Lists Detailed (uses a different URL path – note the extra segment)
- type: MovingWindowCallRatePolicy
rates:
- limit: 1 # burst: 1 call per second
interval: PT1S
- limit: 15 # steady: 15 calls per minute
interval: PT1M
matchers:
- method: GET
url_path_pattern: "^/api/lists/" # matches any path beginning with '/lists/' (e.g. '/lists/123')
params:
"additional-fields[list]": "profile_count" # Other API budget settings:
status_codes_for_ratelimit_hit: [429]

# Klaviyo's rate limiting is different by endpoints:
# - XS: 1/s burst; 15/m steady
# - S: 3/s burst; 60/m steady
Expand All @@ -1085,5 +1142,5 @@ metadata:
# Based on the above, the only threads that allow for slicing and hence might perform more concurrent HTTP requests are `events` and `events_detailed`. There are no slicing for the others and hence the concurrency is limited by the number of streams querying the same endpoint. Given that the event endpoint is XL, we will set a default concurrency to 10.
concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config.get('num_workers', 10) }}"
default_concurrency: "{{ config.get('num_workers', 25) }}"
max_concurrency: 50
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.state_builder import StateBuilder


_ANY_ATTEMPT_COUNT = 123
API_KEY = "some_key"
START_DATE = pendulum.datetime(2020, 10, 10)
CONFIG = {"api_key": API_KEY, "start_date": START_DATE}
CONFIG_NO_DATE = {"api_key": API_KEY}

EVENTS_STREAM_DEFAULT_START_DATE = "2012-01-01T00:00:00+00:00"
EVENTS_STREAM_CONFIG_START_DATE = "2021-11-08T00:00:00+00:00"
Expand All @@ -48,8 +50,24 @@ def get_step_diff(provided_date: str) -> int:
return (freeze_date - provided_date).days // 7


def get_stream_by_name(stream_name: str, config: Mapping[str, Any]) -> Stream:
source = SourceKlaviyo(CatalogBuilder().build(), KlaviyoConfigBuilder().build(), StateBuilder().build())
def read_records(stream_name: str, config: Mapping[str, Any], states: Mapping[str, Any] = dict()) -> List[Mapping[str, Any]]:
state = StateBuilder()
for stream_name_key in states:
state.with_stream_state(stream_name_key, states[stream_name_key])
source = SourceKlaviyo(CatalogBuilder().build(), config, state.build())
output = read(
source,
config,
CatalogBuilder().with_stream(ConfiguredAirbyteStreamBuilder().with_name(stream_name)).build(),
)
return [r.record.data for r in output.records]


def get_stream_by_name(stream_name: str, config: Mapping[str, Any], states: Mapping[str, Any] = dict()) -> Stream:
state = StateBuilder()
for stream_name_key in states:
state.with_stream_state(stream_name_key, states[stream_name_key])
source = SourceKlaviyo(CatalogBuilder().build(), KlaviyoConfigBuilder().build(), state.build())
matches_by_name = [stream_config for stream_config in source.streams(config) if stream_config.name == stream_name]
if not matches_by_name:
raise ValueError("Please provide a valid stream name.")
Expand Down Expand Up @@ -336,7 +354,7 @@ class TestSemiIncrementalKlaviyoStream:
("start_date", "stream_state", "input_records", "expected_records"),
(
(
"2021-11-08T00:00:00+00:00",
"2021-11-08T00:00:00Z",
"2022-11-07T00:00:00+00:00",
[
{"attributes": {"updated": "2022-11-08T00:00:00+00:00"}},
Expand All @@ -349,7 +367,7 @@ class TestSemiIncrementalKlaviyoStream:
],
),
(
"2021-11-08T00:00:00+00:00",
"2021-11-09T00:00:00Z",
None,
[
{"attributes": {"updated": "2022-11-08T00:00:00+00:00"}},
Expand All @@ -361,14 +379,13 @@ class TestSemiIncrementalKlaviyoStream:
{"attributes": {"updated": "2023-11-08T00:00:00+00:00"}, "updated": "2023-11-08T00:00:00+00:00"},
],
),
("2021-11-08T00:00:00+00:00", "2022-11-07T00:00:00+00:00", [], []),
("2021-11-08T00:00:00Z", "2022-11-07T00:00:00+00:00", [], []),
),
)
def test_read_records(self, start_date, stream_state, input_records, expected_records, requests_mock):
stream = get_stream_by_name("metrics", CONFIG | {"start_date": start_date})
state = {"metrics": {"updated": stream_state}} if stream_state else {}
requests_mock.register_uri("GET", f"https://a.klaviyo.com/api/metrics", status_code=200, json={"data": input_records})
stream.stream_state = {stream.cursor_field: stream_state if stream_state else start_date}
records = get_records(stream=stream, sync_mode=SyncMode.incremental)
records = read_records("metrics", CONFIG_NO_DATE | {"start_date": start_date}, state)
assert records == expected_records


Expand Down
Loading
Loading