-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source stripe - enhanced performance for streams which run substreams #10359
🎉 Source stripe - enhanced performance for streams which run substreams #10359
Conversation
Codecov Report
@@ Coverage Diff @@
## master #10359 +/- ##
=========================================
Coverage ? 70.15%
=========================================
Files ? 3
Lines ? 258
Branches ? 0
=========================================
Hits ? 181
Misses ? 77
Partials ? 0 Continue to review full report at Codecov.
|
/test connector=connectors/source-stripe
|
/test connector=connectors/source-stripe
|
airbyte-integrations/connectors/source-stripe/source_stripe/schemas/invoice_line_items.json
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the big deal, but I think we can optimise the code even more here, by reusing some parts, please read the comments bellow.
airbyte-integrations/connectors/source-stripe/source_stripe/streams.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loading of the substreams was implemented without the normal iterable logic
airbyte-integrations/connectors/source-stripe/source_stripe/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-stripe/source_stripe/streams.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a simple integration test for this new logic? For example we can compare records for 2 flows:
- your new logic
- mock the
lines
property by an empty array.
Both lists should be same
/test connector=connectors/source-stripe
|
…pe-performance-improvement
/test connector=connectors/source-stripe
|
|
|
||
# filter out 'bank_account' source items only | ||
if self.filter: | ||
items = [i for i in items if i.get(self.filter["attr"]) == self.filter["value"]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this bank_account
filter? where does it come from?
It seems like maybe the comment is not in sync with code, right?
(only used in customers.bank_account?)
can we add comments on what the generic filter is for? i am guessing bank_account is one example of such usage?
@ChristopheDuong I saw you approved that review, but there were a few comments from you. I have replied to these comments. Could you please let me know if I can merge this PR into main. |
/publish connector=connectors/source-stripe
|
Enhanced performance for stripe source
What
Tested on sync stripe -> bigquery (with airbyte creds):
Succeeded
40.75 MB | 18,385 records | 57m 31s | Sync
The problem was with a few streams:
invoice_line_items: 31m / 4985 items
2022-02-10 16: 23:40 �[44msource�[0m > Syncing stream: invoice_line_items
2022-02-10 16:30:19 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 5000
2022-02-10 16:37:07 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 6000
2022-02-10 16:43:50 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 7000
2022-02-10 16:49:47 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 8000
2022-02-10 16:54:20 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 9000
2022-02-10 16:54:23 �[44msource�[0m > Read 4985 records from invoice_line_items stream
2022-02-10 16:54:23 �[44msource�[0m > Finished syncing SourceStripe
2022-02-10 16:54:23 �[44msource�[0m > SourceStripe runtimes:
subscription_items: 10m / 1995 items
2022-02-10 16:58:30 �[44msource�[0m > Syncing stream: subscription_items
2022-02-10 17:00:22 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 15000
2022-02-10 17:05:57 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 16000
2022-02-10 17:08:44 �[44msource�[0m > Read 1995 records from subscription_items stream
2022-02-10 17:08:44 �[44msource�[0m > Finished syncing SourceStripe
2022-02-10 17:08:44 �[44msource�[0m > SourceStripe runtimes:
Reason:
invoice_line_items - stream runs 1 request for each of 4372 invoices (main stream)
subscription_items - stream runs 1 request for each of 1686 subscriptions (main stream)
How
Research shows that records from main streams already contain 1st page of needed items (invoice_line_items and subscription_items).
But In major cases, pagination requests are not performed because line items are fully reported in main streams' streams
invoice_line_items: 2:20m / 4988 items
2022-02-15 17:52:30 �[44msource�[0m > Syncing stream: invoice_line_items
2022-02-15 17:52:34 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 4000
2022-02-15 17:53:05 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 5000
2022-02-15 17:53:35 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 6000
2022-02-15 17:54:03 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 7000
2022-02-15 17:54:29 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 8000
2022-02-15 17:54:50 �[44msource�[0m > Read 4988 records from invoice_line_items stream
2022-02-15 17:54:50 �[44msource�[0m > Finished syncing SourceStripe
subscription_items: 50s / 1995 items
2022-02-15 17:58:53 �[44msource�[0m > Syncing stream: subscription_items
2022-02-15 17:59:03 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 15000
2022-02-15 17:59:31 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):281 - Records read: 16000
2022-02-15 17:59:40 �[44msource�[0m > Read 1995 records from subscription_items stream
2022-02-15 17:59:40 �[44msource�[0m > Finished syncing SourceStripe
Results of manual tests:
![image](https://user-images.githubusercontent.com/3889748/155319832-a9bace8a-b270-4840-b2b6-4c9d291aae84.png)
Last: new version
Previous: Old version
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes