Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-chargebee] Replace IncrementalSingleSliceCursor with semi-incremental DatetimeBasedCursor #53220

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 2 additions & 235 deletions airbyte-integrations/connectors/source-chargebee/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional, Union
from dataclasses import dataclass
from typing import Optional

from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.transformations.transformation import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState

Expand Down Expand Up @@ -49,233 +46,3 @@ def transform(
"""
record["custom_fields"] = [{"name": k, "value": record.pop(k)} for k in record.copy() if k.startswith("cf_")]
return record


class PriorStateHandler:
"""
PriorStateHandler is a class responsible for managing the state of a stream by tracking and updating the prior state values.

Args:
cursor_field (str): The field used to track the cursor position in the stream.
stream_state (Optional[StreamState]): The current state of the stream.
value_type (Optional[Any]): The default value type for the state.
key (str): The key used to store the prior state in the stream state.

Methods:
Private:
property: _exists: Checks if the prior state key exists in the stream state.
propetry: _prior_state_value: Retrieves the prior state value for a specific stream.
property: _stream_state_value: Retrieves the state value of the stream.
func: _update(): Updates the stream state if the current stream state value is greater than the prior state value.
func: _init(): Sets the initial state for the stream by copying the current state.

Public:
func: set(): Sets the state of the component. If the component does not exist, it initializes it. Otherwise, it updates the existing component.
"""

def __init__(
self,
cursor_field: str,
stream_state: Optional[StreamState] = None,
value_type: Optional[Any] = None,
key: Optional[str] = None,
) -> None:
self._cursor_field = cursor_field
self._stream_state = stream_state if stream_state is not None else {}
self._default_value: Any = value_type() if value_type is not None else str()
self._state_key: str = key if key else "prior_state"

@property
def _exists(self) -> bool:
"""
Check if the prior state key exists in the stream state.

Returns:
bool: True if the state key exists in the stream state, False otherwise.
"""

return self._state_key in self._stream_state

@property
def _prior_state_value(self) -> Any:
"""
Property that retrieves the prior state value for a specific stream.

Returns:
int: The prior state value for the stream, or the default value type if not found.
"""

return self._stream_state.get(self._state_key, {}).get(self._cursor_field, self._default_value)

@property
def _stream_state_value(self) -> Any:
"""
Property that retrieves the state value of the stream.

This method accesses the `stream_state` dictionary and returns the value
associated with the `cursor_field` key. If the key is not found, it returns
the default value specified by `self._default_value`.

Returns:
int: The state value of the stream.
"""

return self._stream_state.get(self._cursor_field, self._default_value)

def _update(self) -> None:
"""
Updates the stream state if the current stream state value is greater than the prior state value.

This method compares the current stream state value with the prior state value.
If the current stream state value is greater, it updates the stream state with the new value
using the state key and cursor field.
"""

if self._stream_state_value > self._prior_state_value:
self._stream_state[self._state_key] = {self._cursor_field: self._stream_state_value}

def _init(self) -> None:
"""
Sets the initial state for the stream by copying the current state.

This method initializes the stream state by creating a copy of the current state
and assigning it to the state key specific to this stream.
"""

self._stream_state[self._state_key] = self._stream_state.copy()

def set(self) -> None:
"""
Sets the state of the component. If the component does not exist, it initializes it.
Otherwise, it updates the existing component.
"""

self._init() if not self._exists else self._update()


@dataclass
class IncrementalSingleSliceCursor(DeclarativeCursor):
cursor_field: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]):
self._state = {}
self._cursor = None
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self._prior_state = PriorStateHandler(
cursor_field=self.cursor_field.eval(self.config),
stream_state=self._state,
value_type=int,
)

def get_request_params(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update request params.
# Returns empty dict
return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update request headers.
# Returns empty dict
return self._get_request_option(RequestOptionType.header, stream_slice)

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update body data.
# Returns empty dict
return self._get_request_option(RequestOptionType.body_data, stream_slice)

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
# Current implementation does not provide any options to update body json.
# Returns empty dict
return self._get_request_option(RequestOptionType.body_json, stream_slice)

def _get_request_option(self, option_type: RequestOptionType, stream_slice: StreamSlice):
return {}

def get_stream_state(self) -> StreamState:
return {**self._state}

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
return self.get_stream_state()

def set_initial_state(self, stream_state: StreamState):
"""
Sets the initial state of the stream based on the provided stream state.

This method evaluates the cursor field using the configuration, retrieves the cursor value from the
provided stream state, and updates the internal state and cursor if the cursor value is present.
Additionally, it sets or updates the existing prior state with the cursor value.

Args:
stream_state (StreamState): The state of the stream to initialize from.
"""

cursor_field = self.cursor_field.eval(self.config)
cursor_value = stream_state.get(cursor_field)
if cursor_value:
self._state[cursor_field] = cursor_value
self._cursor = cursor_value
self._prior_state.set()

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

:param stream_slice: The current slice, which may or may not contain the most recently observed record
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
if not record_cursor_value:
return

if self.is_greater_than_or_equal(record, self._state):
self._cursor = record_cursor_value

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = self._cursor

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
yield StreamSlice(partition={}, cursor_slice={})

def should_be_synced(self, record: Record) -> bool:
"""
Evaluating if a record should be synced allows for filtering and stop condition on pagination
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
return bool(record_cursor_value)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
"""
cursor_field = self.cursor_field.eval(self.config)
first_cursor_value = first.get(cursor_field) if first else None
second_cursor_value = second.get(cursor_field) if second else None
if first_cursor_value and second_cursor_value:
return first_cursor_value > second_cursor_value
elif first_cursor_value:
return True
else:
return False
33 changes: 27 additions & 6 deletions airbyte-integrations/connectors/source-chargebee/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,16 @@ definitions:
cursor_value: "{{ response.get('next_offset')|tojson() }}"
stop_condition: "{{ not response.next_offset }}"
incremental_sync:
type: CustomIncrementalSync
class_name: source_declarative_manifest.components.IncrementalSingleSliceCursor
type: DatetimeBasedCursor
cursor_field: updated_at
start_datetime:
type: MinMaxDatetime
datetime: '{{ format_datetime(config[''start_date''], ''%s'') }}'
datetime_format: '%s'
datetime_format: '%s'
cursor_datetime_formats:
- '%s'
is_client_side_incremental: true
transformations:
- type: CustomTransformation
class_name: source_declarative_manifest.components.CustomFieldTransformation
Expand Down Expand Up @@ -1461,9 +1468,16 @@ definitions:
cursor_value: "{{ response.get('next_offset')|tojson() }}"
stop_condition: "{{ not response.next_offset }}"
incremental_sync:
type: CustomIncrementalSync
class_name: source_declarative_manifest.components.IncrementalSingleSliceCursor
type: DatetimeBasedCursor
cursor_field: updated_at
start_datetime:
type: MinMaxDatetime
datetime: '{{ format_datetime(config[''start_date''], ''%s'') }}'
datetime_format: '%s'
datetime_format: '%s'
cursor_datetime_formats:
- '%s'
is_client_side_incremental: true
transformations:
- type: CustomTransformation
class_name: source_declarative_manifest.components.CustomFieldTransformation
Expand Down Expand Up @@ -1742,9 +1756,16 @@ definitions:
cursor_value: "{{ response.get('next_offset')|tojson() }}"
stop_condition: "{{ not response.next_offset }}"
incremental_sync:
type: CustomIncrementalSync
class_name: source_declarative_manifest.components.IncrementalSingleSliceCursor
type: DatetimeBasedCursor
cursor_field: migrated_at
start_datetime:
type: MinMaxDatetime
datetime: '{{ format_datetime(config[''start_date''], ''%s'') }}'
datetime_format: '%s'
datetime_format: '%s'
cursor_datetime_formats:
- '%s'
is_client_side_incremental: true
transformations:
- type: CustomTransformation
class_name: source_declarative_manifest.components.CustomFieldTransformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ data:
hosts:
- "*.chargebee.com"
connectorBuildOptions:
baseImage: docker.io/airbyte/source-declarative-manifest:6.10.0@sha256:58722e84dbd06bb2af9250e37d24d1c448e247fc3a84d75ee4407d52771b6f03
baseImage: docker.io/airbyte/source-declarative-manifest:6.33.1@sha256:06468f2b0acdb0126a29757f67025f8f837014f70e3f079e10e304b0e1a6be4b
connectorSubtype: api
topefolorunso marked this conversation as resolved.
Show resolved Hide resolved
connectorType: source
definitionId: 686473f1-76d9-4994-9cc7-9b13da46147c
dockerImageTag: 0.9.0
dockerImageTag: 0.10.0
dockerRepository: airbyte/source-chargebee
documentationUrl: https://docs.airbyte.com/integrations/sources/chargebee
githubIssueLabel: source-chargebee
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,3 @@ def test_field_transformation(components_module, record, expected_record):
transformer = components_module.CustomFieldTransformation()
transformed_record = transformer.transform(record)
assert transformed_record == expected_record


@pytest.mark.parametrize(
"record_data, expected",
[
({"pk": 1, "name": "example", "updated_at": 1662459011}, True),
],
)
def test_slicer(components_module, record_data, expected):
date_time_dict = {"updated_at": 1662459010}
new_state = {"updated_at": 1662459011}
slicer = components_module.IncrementalSingleSliceCursor(cursor_field="updated_at", config={}, parameters={})
stream_slice = StreamSlice(partition={}, cursor_slice=date_time_dict)
record = Record(stream_name="", data=record_data, associated_slice=stream_slice)
slicer.observe(StreamSlice(partition={}, cursor_slice=date_time_dict), record)
slicer.close_slice(stream_slice)
assert slicer.get_stream_state() == new_state
assert slicer.get_request_headers() == {}
assert slicer.get_request_body_data() == {}
assert slicer.get_request_params() == {}
assert slicer.get_request_body_json() == {}


@pytest.mark.parametrize(
"first_record, second_record, expected",
[
({"pk": 1, "name": "example", "updated_at": 1662459010}, {"pk": 2, "name": "example2", "updated_at": 1662460000}, True),
({"pk": 1, "name": "example", "updated_at": 1662459010}, {"pk": 2, "name": "example2", "updated_at": 1662440000}, False),
({"pk": 1, "name": "example", "updated_at": 1662459010}, {"pk": 2, "name": "example2"}, False),
({"pk": 1, "name": "example"}, {"pk": 2, "name": "example2", "updated_at": 1662459010}, True),
],
)
def test_is_greater_than_or_equal(components_module, first_record, second_record, expected):
slicer = components_module.IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field="updated_at")
assert slicer.is_greater_than_or_equal(second_record, first_record) == expected


def test_set_initial_state(components_module):
cursor_field = "updated_at"
cursor_value = 999999999
slicer = components_module.IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field=cursor_field)
slicer.set_initial_state(stream_state={cursor_field: cursor_value})
assert slicer._state[cursor_field] == cursor_value


@pytest.mark.parametrize(
"record, expected",
[
({"pk": 1, "name": "example", "updated_at": 1662459010}, True),
],
)
def test_should_be_synced(components_module, record, expected):
cursor_field = "updated_at"
slicer = components_module.IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field=cursor_field)
assert slicer.should_be_synced(record) == expected


def test_stream_slices(components_module):
slicer = components_module.IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field="updated_at")
stream_slices_instance = slicer.stream_slices()
actual = next(stream_slices_instance)
assert actual == {}
3 changes: 2 additions & 1 deletion docs/integrations/sources/chargebee.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ The Chargebee connector should not run into [Chargebee API](https://apidocs.char

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.9.0 | 2024-10-22 | [47243](https://github.com/airbytehq/airbyte/pull/47243) | Migrate to Manifest-only |
| 0.10.0 | 2025-07-02 | [53220](https://github.com/airbytehq/airbyte/pull/53220) | Replace custom `IncrementalSingleSliceCursor` component with low-code semi-incremental `DatetimeBasedCursor` |
| 0.9.0 | 2024-10-22 | [47243](https://github.com/airbytehq/airbyte/pull/47243) | Migrate to Manifest-only |
| 0.8.0 | 2025-01-31 | [52685](https://github.com/airbytehq/airbyte/pull/52685) | Update to latest CDK version |
| 0.7.3 | 2025-01-11 | [49038](https://github.com/airbytehq/airbyte/pull/49038) | Starting with this version, the Docker image is now rootless. Please note that this and future versions will not be compatible with Airbyte versions earlier than 0.64 |
| 0.7.2 | 2024-11-20 | [48510](https://github.com/airbytehq/airbyte/pull/48510) | Ensure no pagination issues on concurrent syncs |
Expand Down
Loading