diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c186c714b6867..1233afe32facc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -510,7 +510,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ee8d8c42dd01d..c564fb3a2551a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5339,7 +5339,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.6" +- dockerImage: "airbyte/source-salesforce:0.1.7" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 213098b0c05f8..88553489acd01 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index 9bd68f9775c9f..f9e894e2189e2 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -17,8 +17,10 @@ tests: basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_rest.json" + timeout_seconds: 600 - config_path: "secrets/config_bulk.json" configured_catalog_path: "integration_tests/configured_catalog_bulk.json" + timeout_seconds: 600 incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_rest.json" diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 8f542a755ffc5..61105920545ba 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -144,7 +144,10 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: except exceptions.HTTPError as error: if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]: error_data = error.response.json()[0] - if error_data.get("message", "") == "Selecting compound data not supported in Bulk Query": + if (error_data.get("message", "") == "Selecting compound data not supported in Bulk Query") or ( + error_data.get("errorCode", "") == "INVALIDENTITY" + and "is not supported by the Bulk API" in error_data.get("message", "") + ): self.logger.error( f"Cannot receive data for stream '{self.name}' using BULK API, error message: '{error_data.get('message')}'" ) @@ -184,7 +187,7 @@ def wait_for_job(self, url: str) -> str: self.logger.warning(f"Not wait the {self.name} data for {self._wait_timeout} minutes, data: {job_info}!!") return job_status - def execute_job(self, query: Mapping[str, Any], url: str) -> str: + def execute_job(self, query: str, url: str) -> str: job_status = "Failed" for i in range(0, self.MAX_RETRY_NUMBER): job_id = self.create_stream_job(query=query, url=url) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index d9b68d7c35963..5fb41c5b5fa83 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -89,6 +89,28 @@ def test_bulk_sync_creation_failed(stream_bulk_config, stream_bulk_api): assert err.value.response.json()[0]["message"] == "test_error" +def test_bulk_sync_unsupported_stream(stream_bulk_config, stream_bulk_api, caplog): + stream_name = "AcceptedEventRelation" + stream: BulkIncrementalSalesforceStream = _generate_stream(stream_name, stream_bulk_config, stream_bulk_api) + with requests_mock.Mocker() as m: + m.register_uri( + "POST", + stream.path(), + status_code=400, + json=[{"errorCode": "INVALIDENTITY", "message": f"Entity '{stream_name}' is not supported by the Bulk API."}], + ) + list(stream.read_records(sync_mode=SyncMode.full_refresh)) + + logs = caplog.records + + assert logs + assert logs[1].levelname == "ERROR" + assert ( + logs[1].msg + == f"Cannot receive data for stream '{stream_name}' using BULK API, error message: 'Entity '{stream_name}' is not supported by the Bulk API.'" + ) + + @pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 193434]) def test_bulk_sync_pagination(item_number, stream_bulk_config, stream_bulk_api): stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index ea0b908a1bbbf..b31cf1b967790 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -735,6 +735,7 @@ List of available streams: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. | | 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs | | 0.1.5 | 2021-11-15 | [7885](https://github.com/airbytehq/airbyte/pull/7885) | Add `Transform` for output records | | 0.1.4 | 2021-11-09 | [7778](https://github.com/airbytehq/airbyte/pull/7778) | Fix types for `anyType` fields |