Skip to content

Commit

Permalink
🐛 Source Salesforce: Fix getting anyType fields using BULK API (#7592)
Browse files Browse the repository at this point in the history
* Source Salesforce: Fix getting anyType fields using BULK API
  • Loading branch information
yevhenii-ldv authored Nov 5, 2021
1 parent 9f750c2 commit 2efef3c
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
13 changes: 13 additions & 0 deletions airbyte-integrations/connectors/source-salesforce/BOOTSTRAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ There are two types of objects:
To query an object, one must use [SOQL](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_query.htm), Salesforce’s proprietary SQL language.
An example might be `SELECT * FROM <sobject.name> WHERE SystemModstamp > 2122-01-18T21:18:20.000Z`.

Because the `Salesforce` connector pulls all objects from `Salesforce` dynamically, then all streams are dynamically generated accordingly.
And at the stage of creating a schema for each stream, we understand whether the stream is dynamic or not (if the stream has one of the
following fields: `SystemModstamp`, `LastModifiedDate`, `CreatedDate`, `LoginTime`, then it is dynamic).
Based on this data, for streams that have information about record updates - we filter by `updated at`, and for streams that have information
only about the date of creation of the record (as in the case of streams that have only the `CreatedDate` field) - we filter by `created at`.
And we assign the Cursor as follows:
```
@property
def cursor_field(self) -> str:
return self.replication_key
```
`replication_key` is one of the following values: `SystemModstamp`, `LastModifiedDate`, `CreatedDate`, `LoginTime`.

In addition there are two types of APIs exposed by Salesforce:
* **[REST API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_queryall.htm)**: completely synchronous
* **[BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/queries.htm)**: has larger rate limit allowance (150k objects per day on the standard plan) but is asynchronous and therefore follows a request-poll-wait pattern.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,6 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,6 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginGeo",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LoginHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LoginTime"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"airbyte_secret": true
},
"start_date": {
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream.",
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2021-07-25T00:00:00Z"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or "object" in value["type"])
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
}

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
Expand Down Expand Up @@ -179,13 +179,7 @@ def transform_types(field_types: list = None):
"""
Convert Jsonschema data types to Python data types.
"""
convert_types_map = {
"boolean": bool,
"string": str,
"number": float,
"integer": int,
"object": dict,
}
convert_types_map = {"boolean": bool, "string": str, "number": float, "integer": int, "object": dict, "array": list}
return [convert_types_map[field_type] for field_type in field_types if field_type != "null"]

for key, value in record.items():
Expand Down Expand Up @@ -279,7 +273,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or "object" in value["type"])
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
}

stream_date = stream_state.get(self.cursor_field)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API |
| 0.1.2 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |
| 0.1.1 | 2021-09-21 | [6209](https://github.com/airbytehq/airbyte/pull/6209) | Fix bug with pagination for BULK API |
| 0.1.0 | 2021-09-08 | [5619](https://github.com/airbytehq/airbyte/pull/5619) | Salesforce Aitbyte-Native Connector |
Expand Down

0 comments on commit 2efef3c

Please sign in to comment.