Skip to content

Commit

Permalink
🐛 Source Salesforce: fix bug with pagination for BULK API (#6209)
Browse files Browse the repository at this point in the history
* Source Salesforce: fix bug with pagination for BULK API
  • Loading branch information
yevhenii-ldv authored Sep 21, 2021
1 parent 45f6598 commit c114ec8
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
- sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
name: Salesforce
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,42 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@
},
"ObjectPermissions": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
},
"LoginGeo": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
},
"LoginHistory": {
"LoginTime": "2121-08-23T10:27:22.000Z"
},
"PermissionSetTabSetting": {
"SystemModstamp": "2121-08-23T10:27:22.000Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,14 @@
},
"ObjectPermissions": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
},
"LoginGeo": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
},
"LoginHistory": {
"LoginTime": "2021-08-23T10:27:22.000Z"
},
"PermissionSetTabSetting": {
"SystemModstamp": "2021-08-23T10:27:22.000Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

class SalesforceStream(HttpStream, ABC):

limit = 2000
page_size = 2000

def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs):
super().__init__(**kwargs)
Expand All @@ -66,8 +66,8 @@ def path(self, **kwargs) -> str:

def next_page_token(self, response: requests.Response) -> str:
response_data = response.json()
if len(response_data["records"]) == self.limit and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} > '{response_data['records'][-1][self.primary_key]}' "
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' "

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
Expand All @@ -91,7 +91,7 @@ def request_params(
query += next_page_token

if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.limit}"
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"

return {"q": query}

Expand All @@ -116,7 +116,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

class BulkSalesforceStream(SalesforceStream):

limit = 10000
page_size = 30000
JOB_WAIT_TIMEOUT_MINS = 10
CHECK_INTERVAL_SECONDS = 2

Expand Down Expand Up @@ -186,7 +186,7 @@ def delete_job(self, url: str):

def next_page_token(self, last_record: dict) -> str:
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} > '{last_record[self.primary_key]}' "
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "

def transform(self, record: dict, schema: dict = None):
"""
Expand Down Expand Up @@ -259,7 +259,7 @@ def read_records(
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)

if count == self.limit:
if count == self.page_size:
next_page_token = self.next_page_token(record)
if not next_page_token:
pagination_complete = True
Expand All @@ -272,11 +272,12 @@ def read_records(

if job_status in ["JobComplete", "Aborted", "Failed"]:
self.delete_job(url=job_full_url)
pagination_complete = True
if job_status in ["Aborted", "Failed"]:
raise Exception(f"Job for {self.name} stream using BULK API was failed")


class IncrementalSalesforceStream(SalesforceStream, ABC):
state_checkpoint_interval = 100
state_checkpoint_interval = 500

def __init__(self, replication_key: str, start_date: str, **kwargs):
super().__init__(**kwargs)
Expand All @@ -285,7 +286,7 @@ def __init__(self, replication_key: str, start_date: str, **kwargs):

def next_page_token(self, response: requests.Response) -> str:
response_data = response.json()
if len(response_data["records"]) == self.limit and self.name not in UNSUPPORTED_FILTERING_STREAMS:
if len(response_data["records"]) == self.page_size and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return response_data["records"][-1][self.cursor_field]

def request_params(
Expand All @@ -304,9 +305,9 @@ def request_params(
stream_date = stream_state.get(self.cursor_field)
start_date = next_page_token or stream_date or self.start_date

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} > {start_date} "
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.limit}"
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}"
return {"q": query}

@property
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,5 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.1 | 2021-09-21 | [6209](https://github.com/airbytehq/airbyte/pull/6209) | Fix bug with pagination for BULK API |
| 0.1.0 | 2021-09-08 | [5619](https://github.com/airbytehq/airbyte/pull/5619) | Salesforce Aitbyte-Native Connector |

0 comments on commit c114ec8

Please sign in to comment.