diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json index 9cf14666e7b14..b0f7d72a5c777 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.3", "documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce", "icon": "salesforce.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e3ab0f83124c1..db629e47be54d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -468,7 +468,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-integrations/connectors/source-salesforce/BOOTSTRAP.md b/airbyte-integrations/connectors/source-salesforce/BOOTSTRAP.md index d08b9bdb0f83d..943fb5c4e4f93 100644 --- a/airbyte-integrations/connectors/source-salesforce/BOOTSTRAP.md +++ b/airbyte-integrations/connectors/source-salesforce/BOOTSTRAP.md @@ -10,6 +10,19 @@ There are two types of objects: To query an object, one must use [SOQL](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_query.htm), Salesforce’s proprietary SQL language. An example might be `SELECT * FROM WHERE SystemModstamp > 2122-01-18T21:18:20.000Z`. +Because the `Salesforce` connector pulls all objects from `Salesforce` dynamically, then all streams are dynamically generated accordingly. +And at the stage of creating a schema for each stream, we understand whether the stream is dynamic or not (if the stream has one of the +following fields: `SystemModstamp`, `LastModifiedDate`, `CreatedDate`, `LoginTime`, then it is dynamic). +Based on this data, for streams that have information about record updates - we filter by `updated at`, and for streams that have information +only about the date of creation of the record (as in the case of streams that have only the `CreatedDate` field) - we filter by `created at`. +And we assign the Cursor as follows: +``` +@property +def cursor_field(self) -> str: + return self.replication_key +``` +`replication_key` is one of the following values: `SystemModstamp`, `LastModifiedDate`, `CreatedDate`, `LoginTime`. + In addition there are two types of APIs exposed by Salesforce: * **[REST API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_queryall.htm)**: completely synchronous * **[BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/queries.htm)**: has larger rate limit allowance (150k objects per day on the standard plan) but is asynchronous and therefore follows a request-poll-wait pattern. diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 4779533f45117..47ba807177208 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json index 69da9893a876b..0088a9218122b 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_bulk.json @@ -80,30 +80,6 @@ "sync_mode": "incremental", "destination_sync_mode": "append" }, - { - "stream": { - "name": "LoginGeo", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "LoginHistory", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["LoginTime"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, { "stream": { "name": "PermissionSetTabSetting", diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json index bdf4425c618f5..c1d410e37bf9c 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog_rest.json @@ -70,30 +70,6 @@ "sync_mode": "incremental", "destination_sync_mode": "append" }, - { - "stream": { - "name": "LoginGeo", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "LoginHistory", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["LoginTime"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, { "stream": { "name": "PermissionSetTabSetting", diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json index a676b2a0674b6..a167a53fc2c4d 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json @@ -28,7 +28,7 @@ "airbyte_secret": true }, "start_date": { - "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.", + "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream.", "type": "string", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", "examples": ["2021-07-25T00:00:00Z"] diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index b668bf0c965f4..3c5d44d7e115e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -63,7 +63,7 @@ def request_params( selected_properties = { key: value for key, value in selected_properties.items() - if not (("format" in value and value["format"] == "base64") or "object" in value["type"]) + if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3)) } query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " @@ -179,13 +179,7 @@ def transform_types(field_types: list = None): """ Convert Jsonschema data types to Python data types. """ - convert_types_map = { - "boolean": bool, - "string": str, - "number": float, - "integer": int, - "object": dict, - } + convert_types_map = {"boolean": bool, "string": str, "number": float, "integer": int, "object": dict, "array": list} return [convert_types_map[field_type] for field_type in field_types if field_type != "null"] for key, value in record.items(): @@ -279,7 +273,7 @@ def request_params( selected_properties = { key: value for key, value in selected_properties.items() - if not (("format" in value and value["format"] == "base64") or "object" in value["type"]) + if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3)) } stream_date = stream_state.get(self.cursor_field) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 64b27d5c5bf9d..afe7cc996f951 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -734,6 +734,7 @@ List of available streams: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API | | 0.1.2 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification | | 0.1.1 | 2021-09-21 | [6209](https://github.com/airbytehq/airbyte/pull/6209) | Fix bug with pagination for BULK API | | 0.1.0 | 2021-09-08 | [5619](https://github.com/airbytehq/airbyte/pull/5619) | Salesforce Aitbyte-Native Connector |