Skip to content

Commit

Permalink
🎉 Source stripe - enhanced performance for streams which run substrea…
Browse files Browse the repository at this point in the history
…ms (#10359)

* enhanced performance for streams which run 1 requests for each main item.

* removed unused types

* moved common code to StripeSubStream

* updated docs, updated docker version

* updated connector version in source_specs.yaml
  • Loading branch information
midavadim authored Mar 10, 2022
1 parent d19754c commit a1a4bbc
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@
- name: Stripe
sourceDefinitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerRepository: airbyte/source-stripe
dockerImageTag: 0.1.28
dockerImageTag: 0.1.29
documentationUrl: https://docs.airbyte.io/integrations/sources/stripe
icon: stripe.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8037,7 +8037,7 @@
type: "string"
path_in_connector_config:
- "client_secret"
- dockerImage: "airbyte/source-stripe:0.1.28"
- dockerImage: "airbyte/source-stripe:0.1.29"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/stripe"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-stripe
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-e ../../bases/source-acceptance-test
-e .
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"invoice": {
"type": ["null", "string"]
},
"invoice_id": {
"type": ["null", "string"]
},
"subscription_item": {
"type": ["null", "string"]
},
Expand Down
149 changes: 126 additions & 23 deletions airbyte-integrations/connectors/source-stripe/source_stripe/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import math
from abc import ABC, abstractmethod
from itertools import chain
from typing import Any, Iterable, Mapping, MutableMapping, Optional

import pendulum
Expand Down Expand Up @@ -183,6 +184,110 @@ def path(self, **kwargs):
return "events"


class StripeSubStream(StripeStream, ABC):
"""
Research shows that records related to SubStream can be extracted from Parent streams which already
contain 1st page of needed items. Thus, it significantly decreases a number of requests needed to get
all item in parent stream, since parent stream returns 100 items per request.
Note, in major cases, pagination requests are not performed because sub items are fully reported in parent streams
For example:
Line items are part of each 'invoice' record, so use Invoices stream because
it allows bulk extraction:
0.1.28 and below - 1 request extracts line items for 1 invoice (+ pagination reqs)
0.1.29 and above - 1 request extracts line items for 100 invoices (+ pagination reqs)
if line items object has indication for next pages ('has_more' attr)
then use current stream to extract next pages. In major cases pagination requests
are not performed because line items are fully reported in 'invoice' record
Example for InvoiceLineItems and parent Invoice streams, record from Invoice stream:
{
"created": 1641038947, <--- 'Invoice' record
"customer": "cus_HezytZRkaQJC8W",
"id": "in_1KD6OVIEn5WyEQxn9xuASHsD", <---- value for 'parent_id' attribute
"object": "invoice",
"total": 0,
...
"lines": { <---- sub_items_attr
"data": [
{
"id": "il_1KD6OVIEn5WyEQxnm5bzJzuA", <---- 'Invoice' line item record
"object": "line_item",
...
},
{...}
],
"has_more": false, <---- next pages from 'InvoiceLineItemsPaginated' stream
"object": "list",
"total_count": 2,
"url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines"
}
}
"""

filter: Optional[Mapping[str, Any]] = None
add_parent_id: bool = False

@property
@abstractmethod
def parent(self) -> StripeStream:
"""
:return: parent stream which contains needed records in <sub_items_attr>
"""

@property
@abstractmethod
def parent_id(self) -> str:
"""
:return: string with attribute name
"""

@property
@abstractmethod
def sub_items_attr(self) -> str:
"""
:return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
If the stream has no primary keys, return None.
"""

def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs):
params = super().request_params(stream_slice=stream_slice, **kwargs)

# add 'starting_after' param
if not params.get("starting_after") and stream_slice and stream_slice.get("starting_after"):
params["starting_after"] = stream_slice["starting_after"]

return params

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:

parent_stream = self.parent(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)
for record in parent_stream.read_records(sync_mode=SyncMode.full_refresh):

items_obj = record.get(self.sub_items_attr, {})
if not items_obj:
continue

items = items_obj.get("data", [])

# non-generic filter, mainly for BankAccounts stream only
if self.filter:
items = [i for i in items if i.get(self.filter["attr"]) == self.filter["value"]]

# get next pages
items_next_pages = []
if items_obj.get("has_more") and items:
stream_slice = {self.parent_id: record["id"], "starting_after": items[-1]["id"]}
items_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs)

for item in chain(items, items_next_pages):
if self.add_parent_id:
# add reference to parent object when item doesn't have it already
item[self.parent_id] = record["id"]
yield item


class Invoices(IncrementalStripeStream):
"""
API docs: https://stripe.com/docs/api/invoices/list
Expand All @@ -194,20 +299,20 @@ def path(self, **kwargs):
return "invoices"


class InvoiceLineItems(StripeStream):
class InvoiceLineItems(StripeSubStream):
"""
API docs: https://stripe.com/docs/api/invoices/invoice_lines
"""

name = "invoice_line_items"

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
return f"invoices/{stream_slice['invoice_id']}/lines"
parent = Invoices
parent_id: str = "invoice_id"
sub_items_attr = "lines"
add_parent_id = True

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
invoices_stream = Invoices(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)
for invoice in invoices_stream.read_records(sync_mode=SyncMode.full_refresh):
yield from super().read_records(stream_slice={"invoice_id": invoice["id"]}, **kwargs)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
return f"invoices/{stream_slice[self.parent_id]}/lines"


class InvoiceItems(IncrementalStripeStream):
Expand Down Expand Up @@ -273,26 +378,25 @@ def request_params(self, stream_state=None, **kwargs):
return params


class SubscriptionItems(StripeStream):
class SubscriptionItems(StripeSubStream):
"""
API docs: https://stripe.com/docs/api/subscription_items/list
"""

name = "subscription_items"

parent: StripeStream = Subscriptions
parent_id: str = "subscription_id"
sub_items_attr: str = "items"

def path(self, **kwargs):
return "subscription_items"

def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs):
params = super().request_params(stream_slice=stream_slice, **kwargs)
params["subscription"] = stream_slice["subscription_id"]
params["subscription"] = stream_slice[self.parent_id]
return params

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
subscriptions_stream = Subscriptions(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)
for subscriptions in subscriptions_stream.read_records(sync_mode=SyncMode.full_refresh):
yield from super().read_records(stream_slice={"subscription_id": subscriptions["id"]}, **kwargs)


class Transfers(IncrementalStripeStream):
"""
Expand Down Expand Up @@ -327,27 +431,26 @@ def path(self, **kwargs):
return "payment_intents"


class BankAccounts(StripeStream):
class BankAccounts(StripeSubStream):
"""
API docs: https://stripe.com/docs/api/customer_bank_accounts/list
"""

name = "bank_accounts"

parent = Customers
parent_id = "customer_id"
sub_items_attr = "sources"
filter = {"attr": "object", "value": "bank_account"}

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs):
customer_id = stream_slice["customer_id"]
return f"customers/{customer_id}/sources"
return f"customers/{stream_slice[self.parent_id]}/sources"

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
params["object"] = "bank_account"
return params

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
customers_stream = Customers(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)
for customer in customers_stream.read_records(sync_mode=SyncMode.full_refresh):
yield from super().read_records(stream_slice={"customer_id": customer["id"]}, **kwargs)


class CheckoutSessions(StripeStream):
"""
Expand Down
47 changes: 24 additions & 23 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,28 @@ If you would like to test Airbyte using test data on Stripe, `sk_test_` and `rk_

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.28 | 2022-02-08 | [10165](https://github.com/airbytehq/airbyte/pull/10165) | Improve 404 handling for `CheckoutSessionsLineItems` stream |
| 0.1.27 | 2021-12-28 | [9148](https://github.com/airbytehq/airbyte/pull/9148) | Fix `date`, `arrival\_date` fields |
| 0.1.26 | 2021-12-21 | [8992](https://github.com/airbytehq/airbyte/pull/8992) | Fix type `events.request` in schema |
| 0.1.25 | 2021-11-25 | [8250](https://github.com/airbytehq/airbyte/pull/8250) | Rearrange setup fields |
| 0.1.24 | 2021-11-08 | [7729](https://github.com/airbytehq/airbyte/pull/7729) | Include tax data in `checkout_sessions_line_items` stream |
| 0.1.23 | 2021-11-08 | [7729](https://github.com/airbytehq/airbyte/pull/7729) | Correct `payment_intents` schema |
| 0.1.22 | 2021-11-05 | [7345](https://github.com/airbytehq/airbyte/pull/7345) | Add 3 new streams |
| 0.1.21 | 2021-10-07 | [6841](https://github.com/airbytehq/airbyte/pull/6841) | Fix missing `start_date` argument + update json files for SAT |
| 0.1.20 | 2021-09-30 | [6017](https://github.com/airbytehq/airbyte/pull/6017) | Add lookback\_window\_days parameter |
| 0.1.19 | 2021-09-27 | [6466](https://github.com/airbytehq/airbyte/pull/6466) | Use `start_date` parameter in incremental streams |
| 0.1.18 | 2021-09-14 | [6004](https://github.com/airbytehq/airbyte/pull/6004) | Fix coupons and subscriptions stream schemas by removing incorrect timestamp formatting |
| 0.1.17 | 2021-09-14 | [6004](https://github.com/airbytehq/airbyte/pull/6004) | Add `PaymentIntents` stream |
| 0.1.16 | 2021-07-28 | [4980](https://github.com/airbytehq/airbyte/pull/4980) | Remove Updated field from schemas |
| 0.1.15 | 2021-07-21 | [4878](https://github.com/airbytehq/airbyte/pull/4878) | Fix incorrect percent\_off and discounts data filed types |
| 0.1.14 | 2021-07-09 | [4669](https://github.com/airbytehq/airbyte/pull/4669) | Subscriptions Stream now returns all kinds of subscriptions \(including expired and canceled\) |
| 0.1.13 | 2021-07-03 | [4528](https://github.com/airbytehq/airbyte/pull/4528) | Remove regex for acc validation |
| 0.1.12 | 2021-06-08 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
| 0.1.11 | 2021-05-30 | [3744](https://github.com/airbytehq/airbyte/pull/3744) | Fix types in schema |
| 0.1.10 | 2021-05-28 | [3728](https://github.com/airbytehq/airbyte/pull/3728) | Update data types to be number instead of int |
| 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts |
| 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors |
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:--------------------------------------------------------------------------------------------------------|
| 0.1.29 | 2022-03-08 | [10359](https://github.com/airbytehq/airbyte/pull/10359) | Improved performance for streams with substreams: invoice_line_items, subscription_items, bank_accounts |
| 0.1.28 | 2022-02-08 | [10165](https://github.com/airbytehq/airbyte/pull/10165) | Improve 404 handling for `CheckoutSessionsLineItems` stream |
| 0.1.27 | 2021-12-28 | [9148](https://github.com/airbytehq/airbyte/pull/9148) | Fix `date`, `arrival\_date` fields |
| 0.1.26 | 2021-12-21 | [8992](https://github.com/airbytehq/airbyte/pull/8992) | Fix type `events.request` in schema |
| 0.1.25 | 2021-11-25 | [8250](https://github.com/airbytehq/airbyte/pull/8250) | Rearrange setup fields |
| 0.1.24 | 2021-11-08 | [7729](https://github.com/airbytehq/airbyte/pull/7729) | Include tax data in `checkout_sessions_line_items` stream |
| 0.1.23 | 2021-11-08 | [7729](https://github.com/airbytehq/airbyte/pull/7729) | Correct `payment_intents` schema |
| 0.1.22 | 2021-11-05 | [7345](https://github.com/airbytehq/airbyte/pull/7345) | Add 3 new streams |
| 0.1.21 | 2021-10-07 | [6841](https://github.com/airbytehq/airbyte/pull/6841) | Fix missing `start_date` argument + update json files for SAT |
| 0.1.20 | 2021-09-30 | [6017](https://github.com/airbytehq/airbyte/pull/6017) | Add lookback\_window\_days parameter |
| 0.1.19 | 2021-09-27 | [6466](https://github.com/airbytehq/airbyte/pull/6466) | Use `start_date` parameter in incremental streams |
| 0.1.18 | 2021-09-14 | [6004](https://github.com/airbytehq/airbyte/pull/6004) | Fix coupons and subscriptions stream schemas by removing incorrect timestamp formatting |
| 0.1.17 | 2021-09-14 | [6004](https://github.com/airbytehq/airbyte/pull/6004) | Add `PaymentIntents` stream |
| 0.1.16 | 2021-07-28 | [4980](https://github.com/airbytehq/airbyte/pull/4980) | Remove Updated field from schemas |
| 0.1.15 | 2021-07-21 | [4878](https://github.com/airbytehq/airbyte/pull/4878) | Fix incorrect percent\_off and discounts data filed types |
| 0.1.14 | 2021-07-09 | [4669](https://github.com/airbytehq/airbyte/pull/4669) | Subscriptions Stream now returns all kinds of subscriptions \(including expired and canceled\) |
| 0.1.13 | 2021-07-03 | [4528](https://github.com/airbytehq/airbyte/pull/4528) | Remove regex for acc validation |
| 0.1.12 | 2021-06-08 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
| 0.1.11 | 2021-05-30 | [3744](https://github.com/airbytehq/airbyte/pull/3744) | Fix types in schema |
| 0.1.10 | 2021-05-28 | [3728](https://github.com/airbytehq/airbyte/pull/3728) | Update data types to be number instead of int |
| 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts |
| 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors |

0 comments on commit a1a4bbc

Please sign in to comment.