Skip to content

Commit

Permalink
Souce Amplitude Refactor: Use CheckpointMixin for state management (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jun 6, 2024
1 parent 266169a commit c1de750
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
dockerRepository: airbyte/source-amplitude
documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude
githubIssueLabel: source-amplitude
Expand Down
478 changes: 385 additions & 93 deletions airbyte-integrations/connectors/source-amplitude/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.3.11"
version = "0.3.12"
name = "source-amplitude"
description = "Source implementation for Amplitude."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_amplitude"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"

[tool.poetry.scripts]
source-amplitude = "source_amplitude.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream

LOGGER = logging.getLogger("airbyte")
Expand Down Expand Up @@ -45,7 +46,7 @@ def error_msg_from_status(status: int = None):
LOGGER.error(f"Unknown error occured: code {status}")


class Events(HttpStream):
class Events(HttpStream, CheckpointMixin):
api_version = 2
base_params = {}
cursor_field = "server_upload_time"
Expand All @@ -61,6 +62,8 @@ def __init__(self, data_region: str, start_date: str, event_time_interval: dict
self.event_time_interval = event_time_interval
self._start_date = pendulum.parse(start_date) if isinstance(start_date, str) else start_date
self.date_time_fields = self._get_date_time_items_from_schema()
if not hasattr(self, "_state"):
self._state = {}
super().__init__(**kwargs)

@property
Expand All @@ -72,7 +75,15 @@ def url_base(self) -> str:
def time_interval(self) -> dict:
return {self.event_time_interval.get("size_unit"): self.event_time_interval.get("size")}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
@property
def state(self) -> Mapping[str, Any]:
return self._state

@state.setter
def state(self, value: Mapping[str, Any]) -> Mapping[str, Any]:
self._state = value

def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# save state value in source native format
if self.compare_date_template:
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
Expand Down Expand Up @@ -172,8 +183,9 @@ def read_records(
# https://developers.amplitude.com/docs/export-api#status-codes
try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
records = super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
yield from records
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
self.state = self._get_updated_state(self.state, record)
yield record
except requests.exceptions.HTTPError as error:
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,6 @@ def test_updated_state(self):

state = {"server_upload_time": "2023-01-01"}
for record in cursor_fields_smaple:
state = stream.get_updated_state(state, record)
state = stream._get_updated_state(state, record)

assert state["server_upload_time"] == "2023-08-31 00:00:00.000000"
85 changes: 43 additions & 42 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,48 +57,49 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------- |
| 0.3.11 | 2024-06-04 | [38988](https://github.com/airbytehq/airbyte/pull/38988) | [autopull] Upgrade base image to v1.2.1 |
| 0.3.10 | 2024-04-19 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | Updating to 0.80.0 CDK |
| 0.3.9 | 2024-04-12 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | schema descriptions |
| 0.3.8 | 2024-03-12 | [35987](https://github.com/airbytehq/airbyte/pull/35987) | Unpin CDK version |
| 0.3.7 | 2024-02-12 | [35162](https://github.com/airbytehq/airbyte/pull/35162) | Manage dependencies with Poetry. |
| 0.3.6 | 2023-10-23 | [31702](https://github.com/airbytehq/airbyte/pull/31702) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.5 | 2023-09-28 | [30846](https://github.com/airbytehq/airbyte/pull/30846) | Add support of multiple cursor date formats |
| 0.3.4 | 2023-09-28 | [30831](https://github.com/airbytehq/airbyte/pull/30831) | Add user friendly error description on 403 error |
| 0.3.3 | 2023-09-21 | [30652](https://github.com/airbytehq/airbyte/pull/30652) | Update spec: declare `start_date` type as `date-time` |
| 0.3.2 | 2023-09-18 | [30525](https://github.com/airbytehq/airbyte/pull/30525) | Fix `KeyError` while getting `data_region` from config |
| 0.3.1 | 2023-09-15 | [30471](https://github.com/airbytehq/airbyte/pull/30471) | Fix `Event` stream: Use `start_time` instead of cursor in the case of more recent |
| 0.3.0 | 2023-09-13 | [30378](https://github.com/airbytehq/airbyte/pull/30378) | Switch to latest CDK version |
| 0.2.4 | 2023-05-05 | [25842](https://github.com/airbytehq/airbyte/pull/25842) | added missing attrs in events schema, enabled default availability strategy |
| 0.2.3 | 2023-04-20 | [25317](https://github.com/airbytehq/airbyte/pull/25317) | Refactor Events Stream, use pre-YAML version based on Python CDK |
| 0.2.2 | 2023-04-19 | [25315](https://github.com/airbytehq/airbyte/pull/25315) | Refactor to only fetch date_time_fields once per request |
| 0.2.1 | 2023-02-03 | [25281](https://github.com/airbytehq/airbyte/pull/25281) | Reduce request_time_range to 4 hours |
| 0.2.0 | 2023-02-03 | [22362](https://github.com/airbytehq/airbyte/pull/22362) | Migrate to YAML |
| 0.1.24 | 2023-03-28 | [21022](https://github.com/airbytehq/airbyte/pull/21022) | Enable event stream time interval selection |
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future. |
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
| 0.1.12 | 2022-08-11 | [15506](https://github.com/airbytehq/airbyte/pull/15506) | Changed slice day window to 1, instead of 3 for Events stream |
| 0.1.11 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from spec |
| 0.1.10 | 2022-06-16 | [13846](https://github.com/airbytehq/airbyte/pull/13846) | Try-catch the BadZipFile error |
| 0.1.9 | 2022-06-10 | [13638](https://github.com/airbytehq/airbyte/pull/13638) | Fixed an infinite loop when fetching Amplitude data |
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces errors on `date-time` check |
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active_users, and average_session_lengths streams |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for kubernetes support |
| 0.1.0 | 2021-06-08 | [3664](https://github.com/airbytehq/airbyte/pull/3664) | New Source: Amplitude |
| 0.3.12 | 2024-06-06 | [39103](https://github.com/airbytehq/airbyte/pull/39103) | Use `CheckpointMixin` for state management |
| 0.3.11 | 2024-06-04 | [38988](https://github.com/airbytehq/airbyte/pull/38988) | [autopull] Upgrade base image to v1.2.1 |
| 0.3.10 | 2024-04-19 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | Updating to 0.80.0 CDK |
| 0.3.9 | 2024-04-12 | [36631](https://github.com/airbytehq/airbyte/pull/36631) | schema descriptions |
| 0.3.8 | 2024-03-12 | [35987](https://github.com/airbytehq/airbyte/pull/35987) | Unpin CDK version |
| 0.3.7 | 2024-02-12 | [35162](https://github.com/airbytehq/airbyte/pull/35162) | Manage dependencies with Poetry. |
| 0.3.6 | 2023-10-23 | [31702](https://github.com/airbytehq/airbyte/pull/31702) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.5 | 2023-09-28 | [30846](https://github.com/airbytehq/airbyte/pull/30846) | Add support of multiple cursor date formats |
| 0.3.4 | 2023-09-28 | [30831](https://github.com/airbytehq/airbyte/pull/30831) | Add user friendly error description on 403 error |
| 0.3.3 | 2023-09-21 | [30652](https://github.com/airbytehq/airbyte/pull/30652) | Update spec: declare `start_date` type as `date-time` |
| 0.3.2 | 2023-09-18 | [30525](https://github.com/airbytehq/airbyte/pull/30525) | Fix `KeyError` while getting `data_region` from config |
| 0.3.1 | 2023-09-15 | [30471](https://github.com/airbytehq/airbyte/pull/30471) | Fix `Event` stream: Use `start_time` instead of cursor in the case of more recent |
| 0.3.0 | 2023-09-13 | [30378](https://github.com/airbytehq/airbyte/pull/30378) | Switch to latest CDK version |
| 0.2.4 | 2023-05-05 | [25842](https://github.com/airbytehq/airbyte/pull/25842) | added missing attrs in events schema, enabled default availability strategy |
| 0.2.3 | 2023-04-20 | [25317](https://github.com/airbytehq/airbyte/pull/25317) | Refactor Events Stream, use pre-YAML version based on Python CDK |
| 0.2.2 | 2023-04-19 | [25315](https://github.com/airbytehq/airbyte/pull/25315) | Refactor to only fetch date_time_fields once per request |
| 0.2.1 | 2023-02-03 | [25281](https://github.com/airbytehq/airbyte/pull/25281) | Reduce request_time_range to 4 hours |
| 0.2.0 | 2023-02-03 | [22362](https://github.com/airbytehq/airbyte/pull/22362) | Migrate to YAML |
| 0.1.24 | 2023-03-28 | [21022](https://github.com/airbytehq/airbyte/pull/21022) | Enable event stream time interval selection |
| 0.1.23 | 2023-03-02 | [23087](https://github.com/airbytehq/airbyte/pull/23087) | Specified date formatting in specification |
| 0.1.22 | 2023-02-17 | [23192](https://github.com/airbytehq/airbyte/pull/23192) | Skip the stream if `start_date` is specified in the future. |
| 0.1.21 | 2023-02-01 | [21888](https://github.com/airbytehq/airbyte/pull/21888) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
| 0.1.12 | 2022-08-11 | [15506](https://github.com/airbytehq/airbyte/pull/15506) | Changed slice day window to 1, instead of 3 for Events stream |
| 0.1.11 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from spec |
| 0.1.10 | 2022-06-16 | [13846](https://github.com/airbytehq/airbyte/pull/13846) | Try-catch the BadZipFile error |
| 0.1.9 | 2022-06-10 | [13638](https://github.com/airbytehq/airbyte/pull/13638) | Fixed an infinite loop when fetching Amplitude data |
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces errors on `date-time` check |
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active_users, and average_session_lengths streams |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for kubernetes support |
| 0.1.0 | 2021-06-08 | [3664](https://github.com/airbytehq/airbyte/pull/3664) | New Source: Amplitude |

</details>

Expand Down

0 comments on commit c1de750

Please sign in to comment.