Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Salesforce: Add Transform for output records #7885

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ def field_to_property_schema(field_params: Mapping[str, Any]) -> Mapping[str, An
elif sf_type == "boolean":
property_schema["type"] = ["boolean", "null"]
elif sf_type in LOOSE_TYPES:
property_schema["type"] = ["string", "integer", "number", "boolean", "array", "object", "null"]
"""
LOOSE_TYPES can return data of completely different types (more than 99% of them are `strings`),
and in order to avoid conflicts in schemas and destinations, we cast this data to the `string` type.
"""
property_schema["type"] = ["string", "null"]
elif sf_type == "location":
property_schema = {
"type": ["object", "null"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import csv
import json
import time
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
Expand All @@ -12,6 +11,7 @@
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests import codes, exceptions

from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
Expand All @@ -22,6 +22,8 @@ class SalesforceStream(HttpStream, ABC):

page_size = 2000

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def __init__(self, sf_api: Salesforce, pk: str, stream_name: str, schema: dict = None, **kwargs):
super().__init__(**kwargs)
self.sf_api = sf_api
Expand Down Expand Up @@ -63,7 +65,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
if not (("format" in value and value["format"] == "base64") or "object" in value["type"])
}

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
Expand Down Expand Up @@ -103,6 +105,18 @@ class BulkSalesforceStream(SalesforceStream):
def path(self, **kwargs) -> str:
return f"/services/data/{self.sf_api.version}/jobs/query"

transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)

@transformer.registerCustomTransform
def transform_empty_string_to_none(instance, schema):
"""
BULK API returns a `csv` file, where all values are initially as string type.
This custom transformer replaces empty lines with `None` value.
"""
if isinstance(instance, str) and not instance.strip():
instance = None
yevhenii-ldv marked this conversation as resolved.
Show resolved Hide resolved
return instance

@default_backoff_handler(max_tries=5, factor=15)
def _send_http_request(self, method: str, url: str, json: dict = None):
headers = self.authenticator.get_auth_header()
Expand Down Expand Up @@ -168,46 +182,6 @@ def next_page_token(self, last_record: dict) -> str:
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "

def transform(self, record: dict, schema: dict = None):
"""
BULK API always returns a CSV file, where all values are string. This function changes the data type according to the schema.
"""
if not schema:
schema = self.get_json_schema().get("properties", {})

def transform_types(field_types: list = None):
"""
Convert Jsonschema data types to Python data types.
"""
convert_types_map = {"boolean": bool, "string": str, "number": float, "integer": int, "object": dict, "array": list}
return [convert_types_map[field_type] for field_type in field_types if field_type != "null"]

for key, value in record.items():
if key not in schema:
continue

if value is None or isinstance(value, str) and value.strip() == "":
record[key] = None
else:
types = transform_types(schema[key]["type"])
if len(types) != 1:
continue

if types[0] == bool:
record[key] = True if isinstance(value, str) and value.lower() == "true" else False
elif types[0] == dict:
try:
record[key] = json.loads(value)
except Exception:
record[key] = None
continue
else:
record[key] = types[0](value)

if isinstance(record[key], dict):
self.transform(record[key], schema[key].get("properties", {}))
return record

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -231,7 +205,7 @@ def read_records(
if job_status == "JobComplete":
count = 0
for count, record in self.download_data(url=job_full_url):
yield self.transform(record)
yield record

if count == self.page_size:
next_page_token = self.next_page_token(record)
Expand Down Expand Up @@ -273,7 +247,7 @@ def request_params(
selected_properties = {
key: value
for key, value in selected_properties.items()
if not (("format" in value and value["format"] == "base64") or ("object" in value["type"] and len(value["type"]) < 3))
if not (("format" in value and value["format"] == "base64") or "object" in value["type"])
yevhenii-ldv marked this conversation as resolved.
Show resolved Hide resolved
}

stream_date = stream_state.get(self.cursor_field)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2021-11-?? | [????](https://github.com/airbytehq/airbyte/pull/????) | Add `Transform` for output records |
| 0.1.4 | 2021-11-09 | [7778](https://github.com/airbytehq/airbyte/pull/7778) | Fix types for `anyType` fields |
| 0.1.3 | 2021-11-06 | [7592](https://github.com/airbytehq/airbyte/pull/7592) | Fix getting `anyType` fields using BULK API |
| 0.1.2 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |
Expand Down