Skip to content

Commit

Permalink
🐛 Source Intercom: backoff for companies' scrolling (#8395)
Browse files Browse the repository at this point in the history
* backoff for companies scroll

* remove a unused companies stream property

* fix tests

* bump version

* update source_specs
  • Loading branch information
antixar authored Dec 2, 2021
1 parent 5ce8339 commit 64bd0a6
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "d8313939-3782-41b0-be29-b3ca20d8dd3a",
"name": "Intercom",
"dockerRepository": "airbyte/source-intercom",
"dockerImageTag": "0.1.8",
"dockerImageTag": "0.1.9",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/intercom",
"icon": "intercom.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2831,7 +2831,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-intercom:0.1.8"
- dockerImage: "airbyte/source-intercom:0.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-intercom
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import time
from copy import deepcopy
from pathlib import Path
from typing import Mapping

import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from requests.exceptions import HTTPError
from source_intercom.source import Companies, SourceIntercom, VersionApiAuthenticator

LOGGER = AirbyteLogger()
# from unittest.mock import Mock

HERE = Path(__file__).resolve().parent


@pytest.fixture(scope="module")
def stream_attributes() -> Mapping[str, str]:
filename = HERE.parent / "secrets/config.json"
with open(filename) as json_file:
return json.load(json_file)


@pytest.mark.parametrize(
"version,not_supported_streams,custom_companies_data_field",
(
(1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
(1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
(1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
(1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
(1.4, ["company_segments"], "companies"),
(2.0, [], "data"),
(2.1, [], "data"),
(2.2, [], "data"),
(2.3, [], "data"),
),
)
def test_supported_versions(stream_attributes, version, not_supported_streams, custom_companies_data_field):
class CustomVersionApiAuthenticator(VersionApiAuthenticator):
relevant_supported_version = str(version)

authenticator = CustomVersionApiAuthenticator(token=stream_attributes["access_token"])
for stream in SourceIntercom().streams(deepcopy(stream_attributes)):
stream._authenticator = authenticator

if stream.name == "companies":
stream.data_fields = [custom_companies_data_field]
elif hasattr(stream, "parent_stream_class") and stream.parent_stream_class == Companies:
stream.parent_stream_class.data_fields = [custom_companies_data_field]

slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
if stream.name in not_supported_streams:
LOGGER.info(f"version {version} shouldn't be supported the stream '{stream.name}'")
with pytest.raises(HTTPError) as err:
next(stream.read_records(sync_mode=None, stream_slice=slices[0]), None)
# example of response errors:
# {"type": "error.list", "request_id": "000hjqhpf95ef3b8f8v0",
# "errors": [{"code": "intercom_version_invalid", "message": "The requested version could not be found"}]}
assert len(err.value.response.json()["errors"]) > 0
err_data = err.value.response.json()["errors"][0]
LOGGER.info(f"version {version} doesn't support the stream '{stream.name}', error: {err_data}")
else:
LOGGER.info(f"version {version} should be supported the stream '{stream.name}'")
records = stream.read_records(sync_mode=None, stream_slice=slices[0])
if stream.name == "companies":
# need to read all records for scroll resetting
list(records)
else:
next(records, None)


def test_companies_scroll(stream_attributes):
authenticator = VersionApiAuthenticator(token=stream_attributes["access_token"])
stream1 = Companies(authenticator=authenticator)
stream2 = Companies(authenticator=authenticator)
stream3 = Companies(authenticator=authenticator)

# read the first stream and stop
next(stream1.read_records(sync_mode=SyncMode.full_refresh))

start_time = time.time()
# read all records
records = list(stream2.read_records(sync_mode=SyncMode.full_refresh))
assert len(records) == 3
assert (time.time() - start_time) > 60.0

start_time = time.time()
# read all records again
records = list(stream3.read_records(sync_mode=SyncMode.full_refresh))
assert len(records) == 3
assert (time.time() - start_time) < 5.0
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,23 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
self.logger.error(f"Stream {self.name}: {e.response.status_code} " f"{e.response.reason} - {error_message}")
raise e

def get_data(self, response: requests.Response) -> List:
data = response.json()

for data_field in self.data_fields:
if data and isinstance(data, dict):
data = data.get(data_field, [])
# def get_data(self, response: requests.Response) -> List:

if isinstance(data, list):
data = data
elif isinstance(data, dict):
data = [data]
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:

return data
data = response.json()

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
data = self.get_data(response)
for data_field in self.data_fields:
if data_field not in data:
continue
data = data[data_field]
if data and isinstance(data, list):
break

for record in data:
yield record
if isinstance(data, dict):
yield data
else:
yield from data

# This is probably overkill because the request itself likely took more
# than the rate limit, but keep it just to be safe.
Expand Down Expand Up @@ -133,8 +131,6 @@ def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, a
):
yield {"id": item["id"]}

yield from []


class Admins(IntercomStream):
"""Return list of all admins.
Expand All @@ -158,14 +154,40 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
"""For reset scroll needs to iterate pages untill the last.
Another way need wait 1 min for the scroll to expire to get a new list for companies segments."""

data = response.json().get("data")
data = response.json()
scroll_param = data.get("scroll_param")

if data:
return {"scroll_param": response.json().get("scroll_param")}
# this stream always has only one data field
data_field = self.data_fields[0]
if scroll_param and data.get(data_field):
return {"scroll_param": scroll_param}

def path(self, **kwargs) -> str:
return "companies/scroll"

@classmethod
def check_exists_scroll(cls, response: requests.Response) -> bool:
if response.status_code == 400:
# example response:
# {..., "errors": [{'code': 'scroll_exists', 'message': 'scroll already exists for this workspace'}]}
err_body = response.json()["errors"][0]
if err_body["code"] == "scroll_exists":
return True

return False

def should_retry(self, response: requests.Response) -> bool:
if self.check_exists_scroll(response):
return True
return super().should_retry(response)

def backoff_time(self, response: requests.Response) -> Optional[float]:
if self.check_exists_scroll(response):
self.logger.warning("A previous scroll request is exists. " "It must be deleted within an minute automatically")
# try to check 3 times
return 20.5
return super().backoff_time(response)


class CompanySegments(ChildStreamMixin, IncrementalIntercomStream):
"""Return list of all company segments.
Expand Down Expand Up @@ -292,13 +314,28 @@ def path(self, **kwargs) -> str:
return "teams"


class VersionApiAuthenticator(TokenAuthenticator):
"""Intercom API support its dynamic versions' switching.
But this connector should support only one for any resource account and
it is realised by the additional request header 'Intercom-Version'
Docs: https://developers.intercom.com/building-apps/docs/update-your-api-version#section-selecting-the-version-via-the-developer-hub
"""

relevant_supported_version = "2.2"

def get_auth_header(self) -> Mapping[str, Any]:
headers = super().get_auth_header()
headers["Intercom-Version"] = self.relevant_supported_version
return headers


class SourceIntercom(AbstractSource):
"""
Source Intercom fetch data from messaging platform.
"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
authenticator = TokenAuthenticator(token=config["access_token"])
authenticator = VersionApiAuthenticator(token=config["access_token"])
try:
url = f"{IntercomStream.url_base}/tags"
auth_headers = {"Accept": "application/json", **authenticator.get_auth_header()}
Expand All @@ -312,7 +349,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config["start_date"] = datetime.strptime(config["start_date"], "%Y-%m-%dT%H:%M:%SZ").timestamp()
AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}")

auth = TokenAuthenticator(token=config["access_token"])
auth = VersionApiAuthenticator(token=config["access_token"])
return [
Admins(authenticator=auth, **config),
Companies(authenticator=auth, **config),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/intercom.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ Please read [How to get your Access Token](https://developers.intercom.com/build

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.8 | 2021-09-28 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support |
| 0.1.9 | 2021-12-03 | [8395](https://github.com/airbytehq/airbyte/pull/8395) | Fix backoff of 'companies' stream |
| 0.1.8 | 2021-11-09 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support |
| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.6 | 2021-10-07 | [6879](https://github.com/airbytehq/airbyte/pull/6879) | Corrected pagination for contacts |
| 0.1.5 | 2021-09-28 | [6082](https://github.com/airbytehq/airbyte/pull/6082) | Corrected android\_last\_seen\_at field data type in schemas |
Expand Down

0 comments on commit 64bd0a6

Please sign in to comment.