From 64bd0a6cd8ff82cd2fef89537a67f01541113b7c Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Fri, 3 Dec 2021 01:17:54 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=20Source=20Intercom:=20backoff?= =?UTF-8?q?=20for=20companies'=20scrolling=20=20(#8395)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * backoff for companies scroll * remove a unused companies stream property * fix tests * bump version * update source_specs --- .../d8313939-3782-41b0-be29-b3ca20d8dd3a.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../acceptance-test-docker.sh | 2 +- .../source-python/acceptance-test-docker.sh | 2 +- .../connectors/source-intercom/Dockerfile | 2 +- .../source-intercom/acceptance-test-docker.sh | 2 +- .../integration_tests/integration_test.py | 97 +++++++++++++++++++ .../source-intercom/source_intercom/source.py | 81 +++++++++++----- .../acceptance-test-docker.sh | 2 +- .../acceptance-test-docker.sh | 2 +- docs/integrations/sources/intercom.md | 3 +- 12 files changed, 167 insertions(+), 32 deletions(-) create mode 100644 airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json index a828cf65c10ac..0b068362b03f1 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "d8313939-3782-41b0-be29-b3ca20d8dd3a", "name": "Intercom", "dockerRepository": "airbyte/source-intercom", - "dockerImageTag": "0.1.8", + "dockerImageTag": "0.1.9", "documentationUrl": "https://docs.airbyte.io/integrations/sources/intercom", "icon": "intercom.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 3eddb0dc56855..aa83629111f71 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -302,7 +302,7 @@ - name: Intercom sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a dockerRepository: airbyte/source-intercom - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://docs.airbyte.io/integrations/sources/intercom icon: intercom.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ea09f72a9dec7..9cc90584080c8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2831,7 +2831,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-intercom:0.1.8" +- dockerImage: "airbyte/source-intercom:0.1.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom" connectionSpecification: diff --git a/airbyte-integrations/connector-templates/source-python-http-api/acceptance-test-docker.sh b/airbyte-integrations/connector-templates/source-python-http-api/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100644 --- a/airbyte-integrations/connector-templates/source-python-http-api/acceptance-test-docker.sh +++ b/airbyte-integrations/connector-templates/source-python-http-api/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/airbyte-integrations/connector-templates/source-python/acceptance-test-docker.sh b/airbyte-integrations/connector-templates/source-python/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100644 --- a/airbyte-integrations/connector-templates/source-python/acceptance-test-docker.sh +++ b/airbyte-integrations/connector-templates/source-python/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/airbyte-integrations/connectors/source-intercom/Dockerfile b/airbyte-integrations/connectors/source-intercom/Dockerfile index 5a7317b571aec..dd2aed35b040e 100644 --- a/airbyte-integrations/connectors/source-intercom/Dockerfile +++ b/airbyte-integrations/connectors/source-intercom/Dockerfile @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/source-intercom diff --git a/airbyte-integrations/connectors/source-intercom/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-intercom/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100755 --- a/airbyte-integrations/connectors/source-intercom/acceptance-test-docker.sh +++ b/airbyte-integrations/connectors/source-intercom/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py new file mode 100644 index 0000000000000..38ea36041d8d5 --- /dev/null +++ b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py @@ -0,0 +1,97 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +import time +from copy import deepcopy +from pathlib import Path +from typing import Mapping + +import pytest +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import SyncMode +from requests.exceptions import HTTPError +from source_intercom.source import Companies, SourceIntercom, VersionApiAuthenticator + +LOGGER = AirbyteLogger() +# from unittest.mock import Mock + +HERE = Path(__file__).resolve().parent + + +@pytest.fixture(scope="module") +def stream_attributes() -> Mapping[str, str]: + filename = HERE.parent / "secrets/config.json" + with open(filename) as json_file: + return json.load(json_file) + + +@pytest.mark.parametrize( + "version,not_supported_streams,custom_companies_data_field", + ( + (1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.4, ["company_segments"], "companies"), + (2.0, [], "data"), + (2.1, [], "data"), + (2.2, [], "data"), + (2.3, [], "data"), + ), +) +def test_supported_versions(stream_attributes, version, not_supported_streams, custom_companies_data_field): + class CustomVersionApiAuthenticator(VersionApiAuthenticator): + relevant_supported_version = str(version) + + authenticator = CustomVersionApiAuthenticator(token=stream_attributes["access_token"]) + for stream in SourceIntercom().streams(deepcopy(stream_attributes)): + stream._authenticator = authenticator + + if stream.name == "companies": + stream.data_fields = [custom_companies_data_field] + elif hasattr(stream, "parent_stream_class") and stream.parent_stream_class == Companies: + stream.parent_stream_class.data_fields = [custom_companies_data_field] + + slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + if stream.name in not_supported_streams: + LOGGER.info(f"version {version} shouldn't be supported the stream '{stream.name}'") + with pytest.raises(HTTPError) as err: + next(stream.read_records(sync_mode=None, stream_slice=slices[0]), None) + # example of response errors: + # {"type": "error.list", "request_id": "000hjqhpf95ef3b8f8v0", + # "errors": [{"code": "intercom_version_invalid", "message": "The requested version could not be found"}]} + assert len(err.value.response.json()["errors"]) > 0 + err_data = err.value.response.json()["errors"][0] + LOGGER.info(f"version {version} doesn't support the stream '{stream.name}', error: {err_data}") + else: + LOGGER.info(f"version {version} should be supported the stream '{stream.name}'") + records = stream.read_records(sync_mode=None, stream_slice=slices[0]) + if stream.name == "companies": + # need to read all records for scroll resetting + list(records) + else: + next(records, None) + + +def test_companies_scroll(stream_attributes): + authenticator = VersionApiAuthenticator(token=stream_attributes["access_token"]) + stream1 = Companies(authenticator=authenticator) + stream2 = Companies(authenticator=authenticator) + stream3 = Companies(authenticator=authenticator) + + # read the first stream and stop + next(stream1.read_records(sync_mode=SyncMode.full_refresh)) + + start_time = time.time() + # read all records + records = list(stream2.read_records(sync_mode=SyncMode.full_refresh)) + assert len(records) == 3 + assert (time.time() - start_time) > 60.0 + + start_time = time.time() + # read all records again + records = list(stream3.read_records(sync_mode=SyncMode.full_refresh)) + assert len(records) == 3 + assert (time.time() - start_time) < 5.0 diff --git a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py index c8f0769e64b71..099f0c4739785 100755 --- a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py +++ b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py @@ -64,25 +64,23 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: self.logger.error(f"Stream {self.name}: {e.response.status_code} " f"{e.response.reason} - {error_message}") raise e - def get_data(self, response: requests.Response) -> List: - data = response.json() - - for data_field in self.data_fields: - if data and isinstance(data, dict): - data = data.get(data_field, []) + # def get_data(self, response: requests.Response) -> List: - if isinstance(data, list): - data = data - elif isinstance(data, dict): - data = [data] + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - return data + data = response.json() - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - data = self.get_data(response) + for data_field in self.data_fields: + if data_field not in data: + continue + data = data[data_field] + if data and isinstance(data, list): + break - for record in data: - yield record + if isinstance(data, dict): + yield data + else: + yield from data # This is probably overkill because the request itself likely took more # than the rate limit, but keep it just to be safe. @@ -133,8 +131,6 @@ def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, a ): yield {"id": item["id"]} - yield from [] - class Admins(IntercomStream): """Return list of all admins. @@ -158,14 +154,40 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, """For reset scroll needs to iterate pages untill the last. Another way need wait 1 min for the scroll to expire to get a new list for companies segments.""" - data = response.json().get("data") + data = response.json() + scroll_param = data.get("scroll_param") - if data: - return {"scroll_param": response.json().get("scroll_param")} + # this stream always has only one data field + data_field = self.data_fields[0] + if scroll_param and data.get(data_field): + return {"scroll_param": scroll_param} def path(self, **kwargs) -> str: return "companies/scroll" + @classmethod + def check_exists_scroll(cls, response: requests.Response) -> bool: + if response.status_code == 400: + # example response: + # {..., "errors": [{'code': 'scroll_exists', 'message': 'scroll already exists for this workspace'}]} + err_body = response.json()["errors"][0] + if err_body["code"] == "scroll_exists": + return True + + return False + + def should_retry(self, response: requests.Response) -> bool: + if self.check_exists_scroll(response): + return True + return super().should_retry(response) + + def backoff_time(self, response: requests.Response) -> Optional[float]: + if self.check_exists_scroll(response): + self.logger.warning("A previous scroll request is exists. " "It must be deleted within an minute automatically") + # try to check 3 times + return 20.5 + return super().backoff_time(response) + class CompanySegments(ChildStreamMixin, IncrementalIntercomStream): """Return list of all company segments. @@ -292,13 +314,28 @@ def path(self, **kwargs) -> str: return "teams" +class VersionApiAuthenticator(TokenAuthenticator): + """Intercom API support its dynamic versions' switching. + But this connector should support only one for any resource account and + it is realised by the additional request header 'Intercom-Version' + Docs: https://developers.intercom.com/building-apps/docs/update-your-api-version#section-selecting-the-version-via-the-developer-hub + """ + + relevant_supported_version = "2.2" + + def get_auth_header(self) -> Mapping[str, Any]: + headers = super().get_auth_header() + headers["Intercom-Version"] = self.relevant_supported_version + return headers + + class SourceIntercom(AbstractSource): """ Source Intercom fetch data from messaging platform. """ def check_connection(self, logger, config) -> Tuple[bool, any]: - authenticator = TokenAuthenticator(token=config["access_token"]) + authenticator = VersionApiAuthenticator(token=config["access_token"]) try: url = f"{IntercomStream.url_base}/tags" auth_headers = {"Accept": "application/json", **authenticator.get_auth_header()} @@ -312,7 +349,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: config["start_date"] = datetime.strptime(config["start_date"], "%Y-%m-%dT%H:%M:%SZ").timestamp() AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}") - auth = TokenAuthenticator(token=config["access_token"]) + auth = VersionApiAuthenticator(token=config["access_token"]) return [ Admins(authenticator=auth, **config), Companies(authenticator=auth, **config), diff --git a/airbyte-integrations/connectors/source-scaffold-source-http/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-scaffold-source-http/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100644 --- a/airbyte-integrations/connectors/source-scaffold-source-http/acceptance-test-docker.sh +++ b/airbyte-integrations/connectors/source-scaffold-source-http/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/airbyte-integrations/connectors/source-scaffold-source-python/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-scaffold-source-python/acceptance-test-docker.sh index e4d8b1cef8961..c51577d10690c 100644 --- a/airbyte-integrations/connectors/source-scaffold-source-python/acceptance-test-docker.sh +++ b/airbyte-integrations/connectors/source-scaffold-source-python/acceptance-test-docker.sh @@ -1,7 +1,7 @@ #!/usr/bin/env sh # Build latest connector image -docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) # Pull latest acctest image docker pull airbyte/source-acceptance-test:latest diff --git a/docs/integrations/sources/intercom.md b/docs/integrations/sources/intercom.md index e90e0247835f7..4fc188228466e 100644 --- a/docs/integrations/sources/intercom.md +++ b/docs/integrations/sources/intercom.md @@ -55,7 +55,8 @@ Please read [How to get your Access Token](https://developers.intercom.com/build | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.8 | 2021-09-28 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support | +| 0.1.9 | 2021-12-03 | [8395](https://github.com/airbytehq/airbyte/pull/8395) | Fix backoff of 'companies' stream | +| 0.1.8 | 2021-11-09 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support | | 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | | 0.1.6 | 2021-10-07 | [6879](https://github.com/airbytehq/airbyte/pull/6879) | Corrected pagination for contacts | | 0.1.5 | 2021-09-28 | [6082](https://github.com/airbytehq/airbyte/pull/6082) | Corrected android\_last\_seen\_at field data type in schemas |