Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Hubspot: Fix issue with getting 414 HTTP error for streams #6954

Merged
merged 14 commits into from
Oct 26, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ RUN pip install .

ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-hubspot
144 changes: 96 additions & 48 deletions airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from functools import lru_cache, partial
from http import HTTPStatus
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

import backoff
import pendulum as pendulum
Expand All @@ -17,6 +17,10 @@
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout

# The value is obtained experimentally, Hubspot allows the URL length up to ~ 16300 symbols,
# so it was decided to limit the length of the `properties` parameter to 15000 characters.
PROPERTIES_PARAM_MAX_LENGTH = 15000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were you able to confirm this is the max allowed length? is there a reference you can include for the next person looking at the code? (or rather, how did you get this number? can you leave a comment about it?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not found any information on this in the Hubspot documentation or support anywhere. The user has a similar problem as we do, he described it here, but he was also not answered correctly to his question.
I obtained this value experimentally, using various URLs for Hubspot, I found that somewhere on the border of 16400 characters in URL, the Hubspot starts returning us a 414 error.
I put 15000 as a constant for the length of the "property" value only, and leaving about 1000 for the rest of the URL (basic URL, pagination, additional fields such as associations and contacts).

I`ll leave a comment inside the code about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonderful. Very helpful comment, thank you!


# we got this when provided API Token has incorrect format
CLOUDFLARE_ORIGIN_DNS_ERROR = 530

Expand Down Expand Up @@ -49,6 +53,22 @@
CUSTOM_FIELD_VALUE_TO_TYPE = {v: k for k, v in CUSTOM_FIELD_TYPE_TO_VALUE.items()}


def split_properties(properties_list: List[str]) -> Iterator[Tuple[str]]:
summary_length = 0
local_properties = []
for property_ in properties_list:
if len(property_) + summary_length >= PROPERTIES_PARAM_MAX_LENGTH:
yield local_properties
local_properties = []
summary_length = 0

local_properties.append(property_)
summary_length += len(property_)

if local_properties:
yield local_properties


def retry_connection_handler(**kwargs):
"""Retry helper, log each attempt"""

Expand Down Expand Up @@ -177,6 +197,7 @@ class Stream(ABC):
page_field = "offset"
limit_field = "limit"
limit = 100
offset = 0

@property
@abstractmethod
Expand Down Expand Up @@ -302,58 +323,85 @@ def _filter_old_records(self, records: Iterable) -> Iterable:
yield record

def _read(self, getter: Callable, params: MutableMapping[str, Any] = None) -> Iterator:
next_page_token = None
while True:
response = getter(params=params)
if isinstance(response, Mapping):
if response.get("status", None) == "error":
"""
When the API Key doen't have the permissions to access the endpoint,
we break the read, skip this stream and log warning message for the user.

Example:

response.json() = {
'status': 'error',
'message': 'This hapikey (....) does not have proper permissions! (requires any of [automation-access])',
'correlationId': '111111-2222-3333-4444-55555555555'}
"""
logger.warn(f"Stream `{self.data_field}` cannot be procced. {response.get('message')}")
break

if response.get(self.data_field) is None:
"""
When the response doen't have the stream's data, raise an exception.
"""
raise RuntimeError("Unexpected API response: {} not in {}".format(self.data_field, response.keys()))

yield from response[self.data_field]

# pagination
if "paging" in response: # APIv3 pagination
if "next" in response["paging"]:
params["after"] = response["paging"]["next"]["after"]
else:
break
else:
if not response.get(self.more_key, False):
break
if self.page_field in response:
params[self.page_filter] = response[self.page_field]
if next_page_token:
params.update(next_page_token)

properties_list = list(self.properties.keys())
if properties_list:
# TODO: Additional processing was added due to the fact that users receive 414 errors while syncing their streams (issues #3977 and #5835).
# We will need to fix this code when the Hubspot developers add the ability to use a special parameter to get all properties for an entity.
# According to Hubspot Community (https://community.hubspot.com/t5/APIs-Integrations/Get-all-contact-properties-without-explicitly-listing-them/m-p/447950)
# and the official documentation, this does not exist at the moment.
stream_records = {}

for properties in split_properties(properties_list):
params.update({"properties": ",".join(properties)})
response = getter(params=params)
for record in self._transform(self.parse_response(response)):
if record["id"] not in stream_records:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonderful, this is really easy to follow now! also simpler than my earlier suggestion ;)

stream_records[record["id"]] = record
elif stream_records[record["id"]].get("properties"):
stream_records[record["id"]]["properties"].update(record.get("properties", {}))

yield from [value for key, value in stream_records.items()]
else:
response = list(response)
yield from response
response = getter(params=params)
yield from self._transform(self.parse_response(response))

# pagination
if len(response) < self.limit:
break
else:
params[self.page_filter] = params.get(self.page_filter, 0) + self.limit
next_page_token = self.next_page_token(response)
if not next_page_token:
break

def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
default_params = {self.limit_field: self.limit, "properties": ",".join(self.properties.keys())}
default_params = {self.limit_field: self.limit}
params = {**default_params, **params} if params else {**default_params}
yield from self._filter_dynamic_fields(self._filter_old_records(self._read(getter, params)))

def parse_response(self, response: Union[Mapping[str, Any], List[dict]]) -> Iterator:
if isinstance(response, Mapping):
if response.get("status", None) == "error":
"""
When the API Key doen't have the permissions to access the endpoint,
we break the read, skip this stream and log warning message for the user.

Example:

yield from self._filter_dynamic_fields(self._filter_old_records(self._transform(self._read(getter, params))))
response.json() = {
'status': 'error',
'message': 'This hapikey (....) does not have proper permissions! (requires any of [automation-access])',
'correlationId': '111111-2222-3333-4444-55555555555'}
"""
logger.warn(f"Stream `{self.entity}` cannot be procced. {response.get('message')}")
return

if response.get(self.data_field) is None:
"""
When the response doen't have the stream's data, raise an exception.
"""
raise RuntimeError("Unexpected API response: {} not in {}".format(self.data_field, response.keys()))

yield from response[self.data_field]

else:
response = list(response)
yield from response

def next_page_token(self, response: Union[Mapping[str, Any], List[dict]]) -> Optional[Mapping[str, Union[int, str]]]:
if isinstance(response, Mapping):
if "paging" in response: # APIv3 pagination
if "next" in response["paging"]:
return {"after": response["paging"]["next"]["after"]}
else:
if not response.get(self.more_key, False):
return
if self.page_field in response:
return {self.page_filter: response[self.page_field]}
else:
if len(response) >= self.limit:
self.offset += self.limit
return {self.page_filter: self.offset}

@staticmethod
def _get_field_props(field_type: str) -> Mapping[str, List[str]]:
Expand Down Expand Up @@ -639,13 +687,13 @@ def _transform(self, records: Iterable) -> Iterable:


class FormStream(Stream):
"""Marketing Forms, API v2
"""Marketing Forms, API v3
by default non-marketing forms are filtered out of this endpoint
Docs: https://developers.hubspot.com/docs/api/marketing/forms
"""

entity = "form"
url = "/forms/v2/forms"
url = "/marketing/v3/forms"
updated_at_field = "updatedAt"
created_at_field = "createdAt"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,25 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"properties": {
"portalId": {
Copy link
Contributor

@sherifnada sherifnada Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we changed so many schemas? were all of these changes schemas previously incorrect? Most importantly: were these fields not being populated before?

The thing I'm trying to verify here is whether a user who has built a workflow based on the output of this connector (e.g: SQL statements or dashboard) will have some breaking changes as a result of this change, or if this is just declaring the data that has always been output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All but one of the changed schemas were previously incorrect and the schemas did not match the records.
The only moment where I changed the scheme, but it was correct, was Forms Stream. I changed it, since I also changed the URL to get data on it. Before that, we used an outdated URL with version 2 (/forms/v2/form), I updated it to the third version (/marketing/v3/forms), we get the same data, but the format for them is slightly changed. I did this because we refer to the Hubspot documentation, but there is no longer any description for getting forms using the API version 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. thanks for the context!

"type": ["null", "integer"]
},
"companyId": {
"type": ["null", "integer"]
},
"isDeleted": {
"type": ["null", "boolean"]
"id": {
"type": ["null", "string"]
},
"stateChanges": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"]
}
"createdAt": {
"type": ["null", "string"],
"format": "date-time"
},
"additionalDomains": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
"updatedAt": {
"type": ["null", "string"],
"format": "date-time"
},
"mergeAudits": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"mergedCompanyId": {
"type": ["null", "integer"]
},
"canonicalCompanyId": {
"type": ["null", "integer"]
},
"sourceId": {
"type": ["null", "string"]
},
"entityId": {
"type": ["null", "string"]
},
"mergedCompanyName": {
"type": ["null", "string"]
},
"movedProperties": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
}
}
}
"archived": {
"type": ["null", "boolean"]
},
"contacts": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
"type": "string"
}
},
"createdAt": {
"type": ["null", "string"]
},
"updatedAt": {
"type": ["null", "string"]
}
}
}
Loading