From c114ec85dc5e83e7846d45033cea891d1e1a0c8e Mon Sep 17 00:00:00 2001 From: Yevhenii <34103125+yevhenii-ldv@users.noreply.github.com> Date: Tue, 21 Sep 2021 14:48:42 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Salesforce:=20fix=20bug?= =?UTF-8?q?=20with=20pagination=20for=20BULK=20API=20(#6209)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Source Salesforce: fix bug with pagination for BULK API --- .../b117307c-14b6-41aa-9422-947e34922962.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../connectors/source-salesforce/Dockerfile | 2 +- .../configured_catalog_bulk.json | 36 +++++++++++++++++++ .../configured_catalog_rest.json | 36 +++++++++++++++++++ .../integration_tests/future_state.json | 9 +++++ .../integration_tests/sample_state.json | 9 +++++ .../source_salesforce/streams.py | 25 ++++++------- docs/integrations/sources/salesforce.md | 1 + 9 files changed, 107 insertions(+), 15 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json index 41fe84ae36ed7..bf4d411639e26 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce", "icon": "salesforce.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 0a25f6e5eedfd..e61e355ae3ca0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -126,7 +126,7 @@ - sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 name: Salesforce dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 1bd04b48cd16b..aad0c3fbc2e68 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json index 7638681d8d3ab..69da9893a876b 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json @@ -79,6 +79,42 @@ }, "sync_mode": "incremental", "destination_sync_mode": "append" + }, + { + "stream": { + "name": "LoginGeo", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "LoginHistory", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["LoginTime"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "PermissionSetTabSetting", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json index 08e80fbcecaf5..bdf4425c618f5 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json @@ -69,6 +69,42 @@ }, "sync_mode": "incremental", "destination_sync_mode": "append" + }, + { + "stream": { + "name": "LoginGeo", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "LoginHistory", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["LoginTime"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "PermissionSetTabSetting", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/future_state.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/future_state.json index d2ba96d22dd52..da64ffbef27ef 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/future_state.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/future_state.json @@ -16,5 +16,14 @@ }, "ObjectPermissions": { "SystemModstamp": "2121-08-23T10:27:22.000Z" + }, + "LoginGeo": { + "SystemModstamp": "2121-08-23T10:27:22.000Z" + }, + "LoginHistory": { + "LoginTime": "2121-08-23T10:27:22.000Z" + }, + "PermissionSetTabSetting": { + "SystemModstamp": "2121-08-23T10:27:22.000Z" } } diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/sample_state.json index 0cd3961fc0b5c..a1e03b234bcab 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/sample_state.json @@ -13,5 +13,14 @@ }, "ObjectPermissions": { "SystemModstamp": "2021-08-23T10:27:22.000Z" + }, + "LoginGeo": { + "SystemModstamp": "2021-08-23T10:27:22.000Z" + }, + "LoginHistory": { + "LoginTime": "2021-08-23T10:27:22.000Z" + }, + "PermissionSetTabSetting": { + "SystemModstamp": "2021-08-23T10:27:22.000Z" } } diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 8616e625d4c39..3429fedf4baac 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -40,7 +40,7 @@ class SalesforceStream(HttpStream, ABC): - limit = 2000 + page_size = 2000 def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs): super().__init__(**kwargs) @@ -66,8 +66,8 @@ def path(self, **kwargs) -> str: def next_page_token(self, response: requests.Response) -> str: response_data = response.json() - if len(response_data["records"]) == self.limit and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return f"WHERE {self.primary_key} > '{response_data['records'][-1][self.primary_key]}' " + if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: + return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' " def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None @@ -91,7 +91,7 @@ def request_params( query += next_page_token if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.primary_key} ASC LIMIT {self.limit}" + query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" return {"q": query} @@ -116,7 +116,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: class BulkSalesforceStream(SalesforceStream): - limit = 10000 + page_size = 30000 JOB_WAIT_TIMEOUT_MINS = 10 CHECK_INTERVAL_SECONDS = 2 @@ -186,7 +186,7 @@ def delete_job(self, url: str): def next_page_token(self, last_record: dict) -> str: if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return f"WHERE {self.primary_key} > '{last_record[self.primary_key]}' " + return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " def transform(self, record: dict, schema: dict = None): """ @@ -259,7 +259,7 @@ def read_records( for count, record in self.download_data(url=job_full_url): yield self.transform(record) - if count == self.limit: + if count == self.page_size: next_page_token = self.next_page_token(record) if not next_page_token: pagination_complete = True @@ -272,11 +272,12 @@ def read_records( if job_status in ["JobComplete", "Aborted", "Failed"]: self.delete_job(url=job_full_url) - pagination_complete = True + if job_status in ["Aborted", "Failed"]: + raise Exception(f"Job for {self.name} stream using BULK API was failed") class IncrementalSalesforceStream(SalesforceStream, ABC): - state_checkpoint_interval = 100 + state_checkpoint_interval = 500 def __init__(self, replication_key: str, start_date: str, **kwargs): super().__init__(**kwargs) @@ -285,7 +286,7 @@ def __init__(self, replication_key: str, start_date: str, **kwargs): def next_page_token(self, response: requests.Response) -> str: response_data = response.json() - if len(response_data["records"]) == self.limit and self.name not in UNSUPPORTED_FILTERING_STREAMS: + if len(response_data["records"]) == self.page_size and self.name not in UNSUPPORTED_FILTERING_STREAMS: return response_data["records"][-1][self.cursor_field] def request_params( @@ -304,9 +305,9 @@ def request_params( stream_date = stream_state.get(self.cursor_field) start_date = next_page_token or stream_date or self.start_date - query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} > {start_date} " + query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} >= {start_date} " if self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.limit}" + query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}" return {"q": query} @property diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index b9dd034eb01a2..863ec805009f4 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -735,4 +735,5 @@ List of available streams: | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.1 | 2021-09-21 | [6209](https://github.com/airbytehq/airbyte/pull/6209) | Fix bug with pagination for BULK API | | 0.1.0 | 2021-09-08 | [5619](https://github.com/airbytehq/airbyte/pull/5619) | Salesforce Aitbyte-Native Connector |