Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Souce Amplitude Refactor: Use CheckpointMixin for state management #39103

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
dockerRepository: airbyte/source-amplitude
documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude
githubIssueLabel: source-amplitude
Expand Down
478 changes: 385 additions & 93 deletions airbyte-integrations/connectors/source-amplitude/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.3.11"
version = "0.3.12"
name = "source-amplitude"
description = "Source implementation for Amplitude."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_amplitude"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"

[tool.poetry.scripts]
source-amplitude = "source_amplitude.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream

LOGGER = logging.getLogger("airbyte")
Expand Down Expand Up @@ -45,7 +46,7 @@ def error_msg_from_status(status: int = None):
LOGGER.error(f"Unknown error occured: code {status}")


class Events(HttpStream):
class Events(HttpStream, CheckpointMixin):
api_version = 2
base_params = {}
cursor_field = "server_upload_time"
Expand All @@ -61,6 +62,8 @@ def __init__(self, data_region: str, start_date: str, event_time_interval: dict
self.event_time_interval = event_time_interval
self._start_date = pendulum.parse(start_date) if isinstance(start_date, str) else start_date
self.date_time_fields = self._get_date_time_items_from_schema()
if not hasattr(self, "_state"):
self._state = {}
super().__init__(**kwargs)

@property
Expand All @@ -72,7 +75,15 @@ def url_base(self) -> str:
def time_interval(self) -> dict:
return {self.event_time_interval.get("size_unit"): self.event_time_interval.get("size")}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
@property
def state(self) -> Mapping[str, Any]:
return self._state

@state.setter
def state(self, value: Mapping[str, Any]) -> Mapping[str, Any]:
self._state = value

def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# save state value in source native format
if self.compare_date_template:
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
Expand Down Expand Up @@ -172,8 +183,9 @@ def read_records(
# https://developers.amplitude.com/docs/export-api#status-codes
try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
records = super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
yield from records
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
self.state = self._get_updated_state(self.state, record)
yield record
except requests.exceptions.HTTPError as error:
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,6 @@ def test_updated_state(self):

state = {"server_upload_time": "2023-01-01"}
for record in cursor_fields_smaple:
state = stream.get_updated_state(state, record)
state = stream._get_updated_state(state, record)

assert state["server_upload_time"] == "2023-08-31 00:00:00.000000"
85 changes: 43 additions & 42 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,48 +57,49 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------- |
| 0.3.11 | 2024-06-04 | [38988](https://github.com/airbytehq/airbyte/pull/38988) | [autopull] Upgrade base image to v1.2.1 |
| 0.3.10 | 2024-04-19 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | Updating to 0.80.0 CDK |
| 0.3.9 | 2024-04-12 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | schema descriptions |
| 0.3.8 | 2024-03-12 | [35987](https://github.com/airbytehq/airbyte/pull/35987) | Unpin CDK version |
| 0.3.7 | 2024-02-12 | [35162](https://github.com/airbytehq/airbyte/pull/35162) | Manage dependencies with Poetry. |
| 0.3.6 | 2023-10-23 | [31702](https://github.com/airbytehq/airbyte/pull/31702) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.5 | 2023-09-28 | [30846](https://github.com/airbytehq/airbyte/pull/30846) | Add support of multiple cursor date formats |
| 0.3.4 | 2023-09-28 | [30831](https://github.com/airbytehq/airbyte/pull/30831) | Add user friendly error description on 403 error |
| 0.3.3 | 2023-09-21 | [30652](https://github.com/airbytehq/airbyte/pull/30652) | Update spec: declare `start_date` type as `date-time` |
| 0.3.2 | 2023-09-18 | [30525](https://github.com/airbytehq/airbyte/pull/30525) | Fix `KeyError` while getting `data_region` from config |
| 0.3.1 | 2023-09-15 | [30471](https://github.com/airbytehq/airbyte/pull/30471) | Fix `Event` stream: Use `start_time` instead of cursor in the case of more recent |
| 0.3.0 | 2023-09-13 | [30378](https://github.com/airbytehq/airbyte/pull/30378) | Switch to latest CDK version |
| 0.2.4 | 2023-05-05 | [25842](https://github.com/airbytehq/airbyte/pull/25842) | added missing attrs in events schema, enabled default availability strategy |
| 0.2.3 | 2023-04-20 | [25317](https://github.com/airbytehq/airbyte/pull/25317) | Refactor Events Stream, use pre-YAML version based on Python CDK |
| 0.2.2 | 2023-04-19 | [25315](https://github.com/airbytehq/airbyte/pull/25315) | Refactor to only fetch date_time_fields once per request |
| 0.2.1 | 2023-02-03 | [25281](https://github.com/airbytehq/airbyte/pull/25281) | Reduce request_time_range to 4 hours |
| 0.2.0 | 2023-02-03 | [22362](https://github.com/airbytehq/airbyte/pull/22362) | Migrate to YAML |
| 0.1.24 | 2023-03-28 | [21022](https://github.com/airbytehq/airbyte/pull/21022) | Enable event stream time interval selection |
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future. |
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
| 0.1.12 | 2022-08-11 | [15506](https://github.com/airbytehq/airbyte/pull/15506) | Changed slice day window to 1, instead of 3 for Events stream |
| 0.1.11 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from spec |
| 0.1.10 | 2022-06-16 | [13846](https://github.com/airbytehq/airbyte/pull/13846) | Try-catch the BadZipFile error |
| 0.1.9 | 2022-06-10 | [13638](https://github.com/airbytehq/airbyte/pull/13638) | Fixed an infinite loop when fetching Amplitude data |
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces errors on `date-time` check |
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active_users, and average_session_lengths streams |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for kubernetes support |
| 0.1.0 | 2021-06-08 | [3664](https://github.com/airbytehq/airbyte/pull/3664) | New Source: Amplitude |
| 0.3.12 | 2024-06-06 | [39103](https://github.com/airbytehq/airbyte/pull/39103) | Use `CheckpointMixin` for state management |
| 0.3.11 | 2024-06-04 | [38988](https://github.com/airbytehq/airbyte/pull/38988) | [autopull] Upgrade base image to v1.2.1 |
| 0.3.10 | 2024-04-19 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | Updating to 0.80.0 CDK |
| 0.3.9 | 2024-04-12 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | schema descriptions |
| 0.3.8 | 2024-03-12 | [35987](https://github.com/airbytehq/airbyte/pull/35987) | Unpin CDK version |
| 0.3.7 | 2024-02-12 | [35162](https://github.com/airbytehq/airbyte/pull/35162) | Manage dependencies with Poetry. |
| 0.3.6 | 2023-10-23 | [31702](https://github.com/airbytehq/airbyte/pull/31702) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.5 | 2023-09-28 | [30846](https://github.com/airbytehq/airbyte/pull/30846) | Add support of multiple cursor date formats |
| 0.3.4 | 2023-09-28 | [30831](https://github.com/airbytehq/airbyte/pull/30831) | Add user friendly error description on 403 error |
| 0.3.3 | 2023-09-21 | [30652](https://github.com/airbytehq/airbyte/pull/30652) | Update spec: declare `start_date` type as `date-time` |
| 0.3.2 | 2023-09-18 | [30525](https://github.com/airbytehq/airbyte/pull/30525) | Fix `KeyError` while getting `data_region` from config |
| 0.3.1 | 2023-09-15 | [30471](https://github.com/airbytehq/airbyte/pull/30471) | Fix `Event` stream: Use `start_time` instead of cursor in the case of more recent |
| 0.3.0 | 2023-09-13 | [30378](https://github.com/airbytehq/airbyte/pull/30378) | Switch to latest CDK version |
| 0.2.4 | 2023-05-05 | [25842](https://github.com/airbytehq/airbyte/pull/25842) | added missing attrs in events schema, enabled default availability strategy |
| 0.2.3 | 2023-04-20 | [25317](https://github.com/airbytehq/airbyte/pull/25317) | Refactor Events Stream, use pre-YAML version based on Python CDK |
| 0.2.2 | 2023-04-19 | [25315](https://github.com/airbytehq/airbyte/pull/25315) | Refactor to only fetch date_time_fields once per request |
| 0.2.1 | 2023-02-03 | [25281](https://github.com/airbytehq/airbyte/pull/25281) | Reduce request_time_range to 4 hours |
| 0.2.0 | 2023-02-03 | [22362](https://github.com/airbytehq/airbyte/pull/22362) | Migrate to YAML |
| 0.1.24 | 2023-03-28 | [21022](https://github.com/airbytehq/airbyte/pull/21022) | Enable event stream time interval selection |
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future. |
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
| 0.1.12 | 2022-08-11 | [15506](https://github.com/airbytehq/airbyte/pull/15506) | Changed slice day window to 1, instead of 3 for Events stream |
| 0.1.11 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from spec |
| 0.1.10 | 2022-06-16 | [13846](https://github.com/airbytehq/airbyte/pull/13846) | Try-catch the BadZipFile error |
| 0.1.9 | 2022-06-10 | [13638](https://github.com/airbytehq/airbyte/pull/13638) | Fixed an infinite loop when fetching Amplitude data |
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces errors on `date-time` check |
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active_users, and average_session_lengths streams |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for kubernetes support |
| 0.1.0 | 2021-06-08 | [3664](https://github.com/airbytehq/airbyte/pull/3664) | New Source: Amplitude |

</details>

Expand Down
Loading