diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 325770495c90..854e0b9cb7e6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -486,7 +486,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 0e2738736d7b..41e13b8119b2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5028,7 +5028,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.4" +- dockerImage: "airbyte/source-salesforce:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 9fd1d06507f9..f4d0c7c3c9d8 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index d56b16307c2f..81293589be4e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -315,7 +315,11 @@ def field_to_property_schema(field_params: Mapping[str, Any]) -> Mapping[str, An elif sf_type == "boolean": property_schema["type"] = ["boolean", "null"] elif sf_type in LOOSE_TYPES: - property_schema["type"] = ["string", "integer", "number", "boolean", "array", "object", "null"] + """ + LOOSE_TYPES can return data of completely different types (more than 99% of them are `strings`), + and in order to avoid conflicts in schemas and destinations, we cast this data to the `string` type. + """ + property_schema["type"] = ["string", "null"] elif sf_type == "location": property_schema = { "type": ["object", "null"], diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 3c5d44d7e115..0dbc677909c6 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -3,7 +3,6 @@ # import csv -import json import time from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -12,6 +11,7 @@ import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from requests import codes, exceptions from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce @@ -22,6 +22,8 @@ class SalesforceStream(HttpStream, ABC): page_size = 2000 + transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) + def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs): super().__init__(**kwargs) self.sf_api = sf_api @@ -63,7 +65,7 @@ def request_params( selected_properties = { key: value for key, value in selected_properties.items() - if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3)) + if value.get("format") != "base64" and "object" not in value["type"] } query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " @@ -103,6 +105,18 @@ class BulkSalesforceStream(SalesforceStream): def path(self, **kwargs) -> str: return f"/services/data/{self.sf_api.version}/jobs/query" + transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization) + + @transformer.registerCustomTransform + def transform_empty_string_to_none(instance, schema): + """ + BULK API returns a `csv` file, where all values are initially as string type. + This custom transformer replaces empty lines with `None` value. + """ + if isinstance(instance, str) and not instance.strip(): + instance = None + return instance + @default_backoff_handler(max_tries=5, factor=15) def _send_http_request(self, method: str, url: str, json: dict = None): headers = self.authenticator.get_auth_header() @@ -168,46 +182,6 @@ def next_page_token(self, last_record: dict) -> str: if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' " - def transform(self, record: dict, schema: dict = None): - """ - BULK API always returns a CSV file, where all values are string. This function changes the data type according to the schema. - """ - if not schema: - schema = self.get_json_schema().get("properties", {}) - - def transform_types(field_types: list = None): - """ - Convert Jsonschema data types to Python data types. - """ - convert_types_map = {"boolean": bool, "string": str, "number": float, "integer": int, "object": dict, "array": list} - return [convert_types_map[field_type] for field_type in field_types if field_type != "null"] - - for key, value in record.items(): - if key not in schema: - continue - - if value is None or isinstance(value, str) and value.strip() == "": - record[key] = None - else: - types = transform_types(schema[key]["type"]) - if len(types) != 1: - continue - - if types[0] == bool: - record[key] = True if isinstance(value, str) and value.lower() == "true" else False - elif types[0] == dict: - try: - record[key] = json.loads(value) - except Exception: - record[key] = None - continue - else: - record[key] = types[0](value) - - if isinstance(record[key], dict): - self.transform(record[key], schema[key].get("properties", {})) - return record - def read_records( self, sync_mode: SyncMode, @@ -231,7 +205,7 @@ def read_records( if job_status == "JobComplete": count = 0 for count, record in self.download_data(url=job_full_url): - yield self.transform(record) + yield record if count == self.page_size: next_page_token = self.next_page_token(record) @@ -273,7 +247,7 @@ def request_params( selected_properties = { key: value for key, value in selected_properties.items() - if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3)) + if value.get("format") != "base64" and "object" not in value["type"] } stream_date = stream_state.get(self.cursor_field) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 4a95bcd8ba9a..9a9a86711ad1 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -734,6 +734,7 @@ List of available streams: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.5 | 2021-11-15 | [7885](https://github.com/airbytehq/airbyte/pull/7885) | Add `Transform` for output records | | 0.1.4 | 2021-11-09 | [7778](https://github.com/airbytehq/airbyte/pull/7778) | Fix types for `anyType` fields | | 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API | | 0.1.2 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |