-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source salesforce: failed async bulk jobs #8009
🐛 Source salesforce: failed async bulk jobs #8009
Conversation
118a207
to
f2f748c
Compare
/test connector=connectors/source-salesforces
|
/test connector=connectors/source-salesforce
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good refactor + nice tests. I have one comment on the versioning. Please fix that before merging.
…orce-failed-normalization
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-salesforce
|
/test connector=connectors/source-salesforce
|
/publish connector=connectors/source-salesforce
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking change on spec.json
@@ -1,7 +1,7 @@ | |||
#!/usr/bin/env sh | |||
|
|||
# Build latest connector image | |||
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) | |||
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also do this in templates? (can be separate pr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm planning to aggregate more improvements for a separate PR)) for example I want to fix an issue with running of custom acceptance tests locally. These tests can be executed by GitHub CI only and developers can't run them by the script acceptance-test-docker.sh
return resp.json() | ||
|
||
def generate_schema(self, stream_name: str) -> Mapping[str, Any]: | ||
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}} | ||
def generate_schema(self, stream_name: str = None) -> Mapping[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did the default value change? is it meaningful to call this without a stream name supplied?
@@ -43,6 +43,14 @@ | |||
"type": "string", | |||
"enum": ["BULK", "REST"], | |||
"default": "BULK" | |||
}, | |||
"wait_timeout": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this option should not be exposed - it is an implementation detail. See the UX handbook for more context behind why it's not a good idea to expose it. Maybe instead we can use a dynamically increasing wait time for each job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thus I forwarded this PR to Airbyte team)
Your idea with autoscaling of waiting timeout is pretty but we can reproduce this case with long responses locally. For more details I added more informative log messages for troubleshooting of possible similar cases.
@@ -734,6 +734,8 @@ List of available streams: | |||
|
|||
| Version | Date | Pull Request | Subject | | |||
| :--- | :--- | :--- | :--- | | |||
|
|||
| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs | | |
| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retrying of BULK jobs | |
MAX_CHECK_INTERVAL_SECONDS = 2.0 | ||
MAX_RETRY_NUMBER = 3 | ||
|
||
def __init__(self, wait_timeout: Optional[int], **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every time we pass a time duration we should either use a data type which encodes the unit of time or we should embed the unit of time in the var name e.g: wait_timeout_in_minutes
expiration_time: DateTime = pendulum.now().add(seconds=int(self._wait_timeout * 60.0)) | ||
job_status = "InProgress" | ||
delay_timeout = 0 | ||
delay_cnt = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use a more expressive variable name?
self.logger.info(f"Sleeping {self.CHECK_INTERVAL_SECONDS} seconds while waiting for Job: {job_id} to complete") | ||
time.sleep(self.CHECK_INTERVAL_SECONDS) | ||
def execute_job(self, query: Mapping[str, Any], url: str) -> str: | ||
job_status = "Failed" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's probably better to encode these job statuses using an enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was the bug-fixing only with minimal refactoring. Sure we can do it while a next review.
|
||
self.logger.info(f"Sleeping {self.CHECK_INTERVAL_SECONDS} seconds while waiting for Job: {job_id} to complete") | ||
time.sleep(self.CHECK_INTERVAL_SECONDS) | ||
def execute_job(self, query: Mapping[str, Any], url: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signature should be -> Optional[str]
job_status = self.wait_for_job(url=job_full_url) | ||
if job_status not in ["UploadComplete", "InProgress"]: | ||
break | ||
self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...") | |
self.logger.error(f"Waiting error. Running retry {i+1} out of {self.MAX_RETRY_NUMBER}...") |
|
||
if job_status in ["Aborted", "Failed"]: | ||
self.delete_job(url=job_full_url) | ||
raise Exception(f"Job for {self.name} stream using BULK API was failed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we be retrying this job instead of failing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure about possible reasons of failures. Unfortunately I didn't succeed to catch any real failed or aborted job. Maybe there are some internal job failure(timeout or resource issues)
* update dockerfile * update version * update changelogs * remove test config * fix after flake8 * bump versio to 0.1.6 * remove secrets from config * update source specs
What
Periodically the
salesforce
connector doesn't wait completion ofbulk
jobs. Default waiting timeout is 10mins.How
Implementation of 2 new features:
Pre-merge Checklist
Expand the relevant checklist and delete the others.
Updating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described here