Skip to content

Commit

Permalink
🎉 Source Salesforce: Add Transform for output records (airbytehq#7885)
Browse files Browse the repository at this point in the history
* Source Salesforce: Add Transform for output records
  • Loading branch information
yevhenii-ldv authored and schlattk committed Jan 4, 2022
1 parent 80a0937 commit 217b6ea
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5028,7 +5028,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.4"
- dockerImage: "airbyte/source-salesforce:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ def field_to_property_schema(field_params: Mapping[str, Any]) -> Mapping[str, An
elif sf_type == "boolean":
property_schema["type"] = ["boolean", "null"]
elif sf_type in LOOSE_TYPES:
property_schema["type"] = ["string", "integer", "number", "boolean", "array", "object", "null"]
"""
LOOSE_TYPES can return data of completely different types (more than 99% of them are `strings`),
and in order to avoid conflicts in schemas and destinations, we cast this data to the `string` type.
"""
property_schema["type"] = ["string", "null"]
elif sf_type == "location":
property_schema = {
"type": ["object", "null"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import csv
import json
import time
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
Expand All @@ -12,6 +11,7 @@
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests import codes, exceptions

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
Expand All @@ -22,6 +22,8 @@ class SalesforceStream(HttpStream, ABC):

page_size = 2000

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs):
super().__init__(**kwargs)
self.sf_api = sf_api
Expand Down Expand Up @@ -63,7 +65,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
if value.get("format") != "base64" and "object" not in value["type"]
}

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
Expand Down Expand Up @@ -103,6 +105,18 @@ class BulkSalesforceStream(SalesforceStream):
def path(self, **kwargs) -> str:
return f"/services/data/{self.sf_api.version}/jobs/query"

transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)

@transformer.registerCustomTransform
def transform_empty_string_to_none(instance, schema):
"""
BULK API returns a `csv` file, where all values are initially as string type.
This custom transformer replaces empty lines with `None` value.
"""
if isinstance(instance, str) and not instance.strip():
instance = None
return instance

@default_backoff_handler(max_tries=5, factor=15)
def _send_http_request(self, method: str, url: str, json: dict = None):
headers = self.authenticator.get_auth_header()
Expand Down Expand Up @@ -168,46 +182,6 @@ def next_page_token(self, last_record: dict) -> str:
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "

def transform(self, record: dict, schema: dict = None):
"""
BULK API always returns a CSV file, where all values are string. This function changes the data type according to the schema.
"""
if not schema:
schema = self.get_json_schema().get("properties", {})

def transform_types(field_types: list = None):
"""
Convert Jsonschema data types to Python data types.
"""
convert_types_map = {"boolean": bool, "string": str, "number": float, "integer": int, "object": dict, "array": list}
return [convert_types_map[field_type] for field_type in field_types if field_type != "null"]

for key, value in record.items():
if key not in schema:
continue

if value is None or isinstance(value, str) and value.strip() == "":
record[key] = None
else:
types = transform_types(schema[key]["type"])
if len(types) != 1:
continue

if types[0] == bool:
record[key] = True if isinstance(value, str) and value.lower() == "true" else False
elif types[0] == dict:
try:
record[key] = json.loads(value)
except Exception:
record[key] = None
continue
else:
record[key] = types[0](value)

if isinstance(record[key], dict):
self.transform(record[key], schema[key].get("properties", {}))
return record

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -231,7 +205,7 @@ def read_records(
if job_status == "JobComplete":
count = 0
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)
yield record

if count == self.page_size:
next_page_token = self.next_page_token(record)
Expand Down Expand Up @@ -273,7 +247,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
if value.get("format") != "base64" and "object" not in value["type"]
}

stream_date = stream_state.get(self.cursor_field)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2021-11-15 | [7885](https://github.com/airbytehq/airbyte/pull/7885) | Add `Transform` for output records |
| 0.1.4 | 2021-11-09 | [7778](https://github.com/airbytehq/airbyte/pull/7778) | Fix types for `anyType` fields |
| 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API |
| 0.1.2 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |
Expand Down

0 comments on commit 217b6ea

Please sign in to comment.