Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Stripe: Fix the Refunds stream missing data in Incremental sync mode #39138

Merged
merged 12 commits into from
Jun 6, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ acceptance_tests:
timeout_seconds: 3600
future_state:
future_state_path: "integration_tests/abnormal_state.json"
# The stream `setup_attempts` fails on the `test_read_sequential_slices` step,
# `Read 1 of 1 should produce at least one record.`, expecting some records to be set on the stream.
skip_comprehensive_incremental_tests: true
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.3.9
dockerImageTag: 5.4.0
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand All @@ -36,6 +36,12 @@ data:
5.0.0:
message: Version 5.0.0 introduces fixes for the `CheckoutSessions`, `CheckoutSessionsLineItems` and `Refunds` streams. The cursor field is changed for the `CheckoutSessionsLineItems` and `Refunds` streams. This will prevent data loss during incremental syncs. Also, the `Invoices`, `Subscriptions` and `SubscriptionSchedule` stream schemas have been updated.
upgradeDeadline: "2023-12-11"
5.4.0:
message: Version 5.4.0 introduces fixes for the `Refunds` streams. The `Refunds`, which previously was `incremental` on the `creation date`, now tracks updates as well. In order to do that, the cursor field needs to be updated and a `resetting` is required to get the updates.
upgradeDeadline: "2024-07-14"
scopedImpact:
- scopeType: stream
impactedScopes: ["refunds"]
suggestedStreams:
streams:
- customers
Expand Down
240 changes: 118 additions & 122 deletions airbyte-integrations/connectors/source-stripe/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.3.9"
version = "5.4.0"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@
_MAX_CONCURRENCY = 20
_DEFAULT_CONCURRENCY = 10
_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
_REFUND_STREAM_NAME = "refunds"
_INCREMENTAL_CONCURRENCY_EXCLUSION = {
_REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332
}
USE_CACHE = not _CACHE_DISABLED
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"

Expand Down Expand Up @@ -293,10 +289,17 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args),
CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args),
CreatedCursorIncrementalStripeStream(name="file_links", path="file_links", **incremental_args),
# The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs.
# Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe.
# See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428
CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args),
IncrementalStripeStream(
name="refunds",
path="refunds",
event_types=[
"refund.created",
"refund.updated",
# this is the only event that could track the refund updates
"charge.refund.updated",
],
**args,
),
UpdatedCursorIncrementalStripeStream(
name="payment_methods",
path="payment_methods",
Expand Down Expand Up @@ -343,6 +346,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
"charge.failed",
"charge.pending",
"charge.refunded",
"charge.refund.updated",
"charge.succeeded",
"charge.updated",
],
Expand Down Expand Up @@ -543,7 +547,7 @@ def _to_concurrent(

state = state_manager.get_stream_state(stream.name, stream.namespace)
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION:
if slice_boundary_fields:
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
converter = EpochValueConcurrentStreamStateConverter()
cursor = ConcurrentCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_403_error_handling(stream_by_name, requests_mock):
(
"refunds",
{
"/v1/refunds": {"data": []}
"/v1/refunds": {"data": []}, "/v1/events": {"data": []}
},
2
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,55 @@ def test_lazy_substream_data_is_filtered(
"created": 1679568588,
"currency": "eur",
},
# Incremental `Events` endpoint response
{
"id": "evt_3NRL2GEcXtiJtvvh0kjreLyk",
"object": "event",
"api_version": "2020-08-27",
"created": 1666518588,
"data": {
"object": {
"id": "re_3NRL2GEcXtiJtvvh0ahgD9V8",
"object": "refund",
"amount": 15,
"balance_transaction": "txn_3NRL2GEcXtiJtvvh0uhS7L1l",
"charge": "ch_3NRL2GEcXtiJtvvh0XOSc8NL",
"created": 1666518588,
"currency": "usd",
"destination_details": {
"card": {
"reference": "7901352802291512",
"reference_status": "available",
"reference_type": "acquirer_reference_number",
"type": "refund"
},
"type": "card"
},
"metadata": {},
"payment_intent": "pi_3NRL2GEcXtiJtvvh0OiNTz0f",
"reason": None,
"receipt_number": None,
"source_transfer_reversal": None,
"status": "succeeded",
"transfer_reversal": None
},
"previous_attributes": {
"destination_details": {
"card": {
"reference": None,
"reference_status": "pending"
}
}
}
},
"livemode": False,
"pending_webhooks": 0,
"request": {
"id": None,
"idempotency_key": None
},
"type": "charge.refund.updated"
}
]


Expand Down Expand Up @@ -268,7 +317,7 @@ def test_lazy_substream_data_is_filtered(
},
{
"json": {
"data": [refunds_api_objects[-1]],
"data": [refunds_api_objects[1]],
"has_more": False,
}
},
Expand All @@ -282,6 +331,7 @@ def test_lazy_substream_data_is_filtered(
"charge": "ch_3NYB8LAHLf1oYfwN3P6BxdKj",
"created": 1653299388,
"currency": "usd",
"updated": 1653299388,
},
{
"id": "re_Lf1oYfwN3EZRDIfF3NYB8LAH",
Expand All @@ -290,35 +340,55 @@ def test_lazy_substream_data_is_filtered(
"charge": "ch_YfwN3P6BxdKj3NYB8LAHLf1o",
"created": 1679568588,
"currency": "eur",
"updated": 1679568588,
},
],
[{"created[gte]": 1631199615, "created[lte]": 1662735615}, {"created[gte]": 1662735616, "created[lte]": 1692802815}],
[{"created[gte]": 1632409215, "created[lte]": 1663945215}, {"created[gte]": 1663945216, "created[lte]": 1692802815}],
"refunds",
"full_refresh",
{},
),
(
{
"/v1/refunds": [
"/v1/events":
[
{
"json": {
"data": [refunds_api_objects[-1]],
"data": [refunds_api_objects[2]],
"has_more": False,
}
},
],
},
[
{
"id": "re_Lf1oYfwN3EZRDIfF3NYB8LAH",
"id": "re_3NRL2GEcXtiJtvvh0ahgD9V8",
"object": "refund",
"amount": 15,
"charge": "ch_YfwN3P6BxdKj3NYB8LAHLf1o",
"created": 1679568588,
"currency": "eur",
"balance_transaction": "txn_3NRL2GEcXtiJtvvh0uhS7L1l",
"charge": "ch_3NRL2GEcXtiJtvvh0XOSc8NL",
"created": 1666518588,
"currency": "usd",
"destination_details": {
"card": {
"reference": "7901352802291512",
"reference_status": "available",
"reference_type": "acquirer_reference_number",
"type": "refund"
},
"type": "card"
},
"metadata": {},
"payment_intent": "pi_3NRL2GEcXtiJtvvh0OiNTz0f",
"reason": None,
"receipt_number": None,
"source_transfer_reversal": None,
"status": "succeeded",
"transfer_reversal": None,
"updated": 1666518588
}
],
[{"created[gte]": 1665308989, "created[lte]": 1692802815}],
[{}],
"refunds",
"incremental",
{"created": 1666518588},
Expand All @@ -333,7 +403,6 @@ def test_created_cursor_incremental_stream(
stream = stream_by_name(stream_name, {"lookback_window_days": 14, **config})
for url, response in requests_mock_map.items():
requests_mock.get(url, response)

slices = list(stream.stream_slices(sync_mode=sync_mode, stream_state=state))
assert slices == expected_slices
records = read_from_stream(stream, sync_mode, state)
Expand Down
10 changes: 10 additions & 0 deletions docs/integrations/sources/stripe-migrations.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Stripe Migration Guide

## Upgrading to 5.4.0

This change fixes incremental sync issues with the `Refunds` stream:

- Stream cursor has changed from `created` to `updated`.

The `Reset` for the affected stream `Refunds` is required. It's safe to do, since before this update the `Refunds` stream didn't use the `events` endpoint that have `30 Days data retention` period.

Because of the changed cursor field of the `Refunds` stream, incremental syncs are now fixed and the stream receives the updates using the `events` endpoint.

## Upgrading to 5.0.0

This change fixes multiple incremental sync issues with the `Refunds`, `Checkout Sessions` and `Checkout Sessions Line Items` streams:
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 5.4.0 | 2024-06-05 | [39138](https://github.com/airbytehq/airbyte/pull/39138) | Fixed the `Refunds` stream missing data for the `incremental` sync |
| 5.3.9 | 2024-05-22 | [38550](https://github.com/airbytehq/airbyte/pull/38550) | Update authenticator package |
| 5.3.8 | 2024-05-15 | [38248](https://github.com/airbytehq/airbyte/pull/38248) | Replace AirbyteLogger with logging.Logger |
| 5.3.7 | 2024-04-24 | [36663](https://github.com/airbytehq/airbyte/pull/36663) | Schema descriptions |
Expand Down
Loading