diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json index 02a97894c3e22..e7f417554cd8c 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "6acf6b55-4f1e-4fca-944e-1a3caef8aba8", "name": "Instagram", "dockerRepository": "airbyte/source-instagram", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.6", "documentationUrl": "https://hub.docker.com/r/airbyte/source-instagram" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7b82d423a2888..b58dcf504b673 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -250,7 +250,7 @@ - sourceDefinitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8 name: Instagram dockerRepository: airbyte/source-instagram - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://hub.docker.com/r/airbyte/source-instagram - sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80 name: Gitlab diff --git a/airbyte-integrations/connectors/source-instagram/.dockerignore b/airbyte-integrations/connectors/source-instagram/.dockerignore index 3c59cc698580f..4955f474ced39 100644 --- a/airbyte-integrations/connectors/source-instagram/.dockerignore +++ b/airbyte-integrations/connectors/source-instagram/.dockerignore @@ -1,6 +1,6 @@ * !Dockerfile -!Dockerfile.test +!main.py !source_instagram !setup.py !secrets diff --git a/airbyte-integrations/connectors/source-instagram/Dockerfile b/airbyte-integrations/connectors/source-instagram/Dockerfile index 04bf542fcc19e..567da436c8000 100644 --- a/airbyte-integrations/connectors/source-instagram/Dockerfile +++ b/airbyte-integrations/connectors/source-instagram/Dockerfile @@ -1,18 +1,16 @@ -FROM airbyte/integration-base-python:0.1.1 +FROM python:3.7-slim # Bash is installed for more convenient debugging. RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* -ENV CODE_PATH="source_instagram" -ENV AIRBYTE_IMPL_MODULE="source_instagram" -ENV AIRBYTE_IMPL_PATH="SourceInstagram" - WORKDIR /airbyte/integration_code -COPY $CODE_PATH ./$CODE_PATH +COPY source_instagram ./source_instagram +COPY main.py ./ COPY setup.py ./ -RUN pip install ".[main]" +RUN pip install . -ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh" +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-instagram diff --git a/airbyte-integrations/connectors/source-instagram/acceptance-test-config.yml b/airbyte-integrations/connectors/source-instagram/acceptance-test-config.yml new file mode 100644 index 0000000000000..556a3e72a47a5 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/acceptance-test-config.yml @@ -0,0 +1,25 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md) +# for more information about how to configure these tests +connector_image: airbyte/source-instagram:dev +tests: + spec: + - spec_path: "integration_tests/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog_without_stories.json" + validate_output_from_all_streams: yes +# disabled because the only incremental stream is user_insights and its state is nested +# incremental: +# - config_path: "secrets/config.json" +# configured_catalog_path: "integration_tests/configured_catalog.json" +# future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-instagram/build.gradle b/airbyte-integrations/connectors/source-instagram/build.gradle index 14c0dbd6618ac..a1146b8d99065 100644 --- a/airbyte-integrations/connectors/source-instagram/build.gradle +++ b/airbyte-integrations/connectors/source-instagram/build.gradle @@ -1,27 +1,13 @@ plugins { id 'airbyte-python' id 'airbyte-docker' - id 'airbyte-standard-source-test-file' + id 'airbyte-source-acceptance-test' } airbytePython { moduleDirectory 'source_instagram' } -airbyteStandardSourceTestFile { - specPath = "source_instagram/spec.json" - configPath = "secrets/config.json" - configuredCatalogPath = "sample_files/configured_catalog.json" -} - -task("pythonIntegrationTests", type: PythonTask, dependsOn: installTestReqs) { - module = "pytest" - command = "-s integration_tests" -} - -integrationTest.dependsOn("pythonIntegrationTests") - dependencies { - implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs) - implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/__init__.py b/airbyte-integrations/connectors/source-instagram/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-instagram/integration_tests/acceptance.py new file mode 100644 index 0000000000000..badb815c45c4a --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/acceptance.py @@ -0,0 +1,34 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..d3d681ddbf480 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog.json @@ -0,0 +1,109 @@ +{ + "streams": [ + { + "stream": { + "name": "media", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "media_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "stories", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "story_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "user_lifetime_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "user_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + } + ] +} diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog_without_stories.json b/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog_without_stories.json new file mode 100644 index 0000000000000..3df5fddbed01c --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/configured_catalog_without_stories.json @@ -0,0 +1,79 @@ +{ + "streams": [ + { + "stream": { + "name": "media", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "media_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "user_lifetime_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "user_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + } + ] +} diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/conftest.py b/airbyte-integrations/connectors/source-instagram/integration_tests/conftest.py new file mode 100644 index 0000000000000..9acadc87a2509 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/conftest.py @@ -0,0 +1,40 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import json + +import pytest +from airbyte_cdk.models import ConfiguredAirbyteCatalog + + +@pytest.fixture(scope="session", name="config") +def config_fixture(): + with open("secrets/config.json", "r") as config_file: + return json.load(config_file) + + +@pytest.fixture(scope="session", name="configured_catalog") +def configured_catalog_fixture(): + return ConfiguredAirbyteCatalog.parse_file("integration_tests/configured_catalog.json") diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-instagram/integration_tests/integration_test.py deleted file mode 100644 index a385f9443eda7..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/integration_tests/integration_test.py +++ /dev/null @@ -1,61 +0,0 @@ -# -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# - - -import json -from pathlib import Path - -from airbyte_protocol import ConfiguredAirbyteCatalog, Type -from base_python import AirbyteLogger -from source_instagram.source import SourceInstagram - -BASE_DIRECTORY = Path(__file__).resolve().parent.parent -config = json.loads(open(f"{BASE_DIRECTORY}/secrets/config.json", "r").read()) - - -class TestInstagramSource: - # Using standard tests is unreliable for Inside streams, as the information for them may be updated. - # It takes time to collect data on Insights, and during CI tests, a test for testIdenticalFullRefreshes is performed, - # and during the execution of tests, a change in Insights may occur and we will not pass the tests. - # Therefore, we use this test to test Insight streams. - def test_insights_streams_outputs_records(self): - catalog = self._read_catalog(f"{BASE_DIRECTORY}/sample_files/configured_catalog_insights.json") - self._run_sync_test(config, catalog) - - @staticmethod - def _read_catalog(path): - return ConfiguredAirbyteCatalog.parse_raw(open(path, "r").read()) - - @staticmethod - def _run_sync_test(conf, catalog): - records = [] - state = [] - for message in SourceInstagram().read(AirbyteLogger(), conf, catalog): - if message.type == Type.RECORD: - records.append(message) - elif message.type == Type.STATE: - state.append(message) - - assert len(records) > 0 - assert len(state) > 0 diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-instagram/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..6df8c11bb3bfb --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/invalid_config.json @@ -0,0 +1,4 @@ +{ + "start_date": "2021-04-01T00:00:00Z", + "access_token": "wrong_token" +} diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/spec.json b/airbyte-integrations/connectors/source-instagram/integration_tests/spec.json similarity index 62% rename from airbyte-integrations/connectors/source-instagram/source_instagram/spec.json rename to airbyte-integrations/connectors/source-instagram/integration_tests/spec.json index 7521ae31b38e8..745b195031ffc 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/spec.json +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/spec.json @@ -1,23 +1,27 @@ { "documentationUrl": "https://docs.airbyte.io/integrations/sources/instagram", + "changelogUrl": "https://docs.airbyte.io/integrations/sources/instagram", "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", "title": "Source Instagram", "type": "object", - "required": ["start_date", "access_token"], - "additionalProperties": false, "properties": { "start_date": { - "type": "string", + "title": "Start Date", "description": "The date from which you'd like to replicate data for User Insights, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", - "examples": ["2020-09-25T00:00:00Z"], - "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + "examples": ["2017-01-25T00:00:00Z"], + "type": "string", + "format": "date-time" }, "access_token": { - "type": "string", + "title": "Access Token", "description": "The value of the access token generated. See the docs for more information", - "airbyte_secret": true + "airbyte_secret": true, + "type": "string" } - } - } + }, + "required": ["start_date", "access_token"] + }, + "supportsIncremental": true, + "supported_destination_sync_modes": ["append"] } diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py b/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py new file mode 100644 index 0000000000000..024732b445d88 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py @@ -0,0 +1,80 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from typing import Any, Callable, List, MutableMapping, Tuple + +import pendulum +import pytest +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type +from source_instagram.source import SourceInstagram + + +@pytest.fixture(name="state") +def state_fixture() -> MutableMapping[str, Any]: + today = pendulum.today() + return { + "user_insights": { + "17841408147298757": {"date": (today - pendulum.duration(days=10)).to_datetime_string()}, + "17841403112736866": {"date": (today - pendulum.duration(days=5)).to_datetime_string()}, + } + } + + +class TestInstagramSource: + """Custom integration tests should test incremental with nested state""" + + def test_incremental_streams(self, configured_catalog, config, state): + catalog = self.slice_catalog(configured_catalog, lambda name: name == "user_insights") + records, states = self._read_records(config, catalog) + assert len(records) == 60, "UserInsights for two accounts over last 30 day should return 60 records when empty STATE provided" + + records, states = self._read_records(config, catalog, state) + assert len(records) == 60 - 10 - 5, "UserInsights should have less records returned when non empty STATE provided" + + assert states, "insights should produce states" + for state in states: + assert "user_insights" in state.state.data + assert isinstance(state.state.data["user_insights"], dict) + assert len(state.state.data["user_insights"].keys()) == 2 + + @staticmethod + def slice_catalog(catalog: ConfiguredAirbyteCatalog, predicate: Callable[[str], bool]) -> ConfiguredAirbyteCatalog: + sliced_catalog = ConfiguredAirbyteCatalog(streams=[]) + for stream in catalog.streams: + if predicate(stream.stream.name): + sliced_catalog.streams.append(stream) + return sliced_catalog + + @staticmethod + def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]: + records = [] + states = [] + for message in SourceInstagram().read(AirbyteLogger(), conf, catalog, state=state): + if message.type == Type.RECORD: + records.append(message) + elif message.type == Type.STATE: + states.append(message) + + return records, states diff --git a/airbyte-integrations/connectors/source-instagram/main_dev.py b/airbyte-integrations/connectors/source-instagram/main.py similarity index 96% rename from airbyte-integrations/connectors/source-instagram/main_dev.py rename to airbyte-integrations/connectors/source-instagram/main.py index 4d601a921d866..4c0d07313e0bd 100644 --- a/airbyte-integrations/connectors/source-instagram/main_dev.py +++ b/airbyte-integrations/connectors/source-instagram/main.py @@ -25,7 +25,7 @@ import sys -from base_python.entrypoint import launch +from airbyte_cdk.entrypoint import launch from source_instagram import SourceInstagram if __name__ == "__main__": diff --git a/airbyte-integrations/connectors/source-instagram/requirements.txt b/airbyte-integrations/connectors/source-instagram/requirements.txt index 76af767f3755a..7be17a56d745d 100644 --- a/airbyte-integrations/connectors/source-instagram/requirements.txt +++ b/airbyte-integrations/connectors/source-instagram/requirements.txt @@ -1,4 +1,3 @@ --e ../../bases/airbyte-protocol --e ../../bases/base-python --e ../../bases/base-python-test +# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. +-e ../../bases/source-acceptance-test -e . diff --git a/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog.json deleted file mode 100644 index 7d71c242a1d4b..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog.json +++ /dev/null @@ -1,225 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "media", - "json_schema": { - "type": "object", - "properties": { - "business_account_id": { - "type": ["null", "string"] - }, - "page_id": { - "type": ["null", "string"] - }, - "caption": { - "type": ["null", "string"] - }, - "comments_count": { - "type": ["null", "integer"] - }, - "id": { - "type": ["null", "string"] - }, - "ig_id": { - "type": ["null", "string"] - }, - "is_comment_enabled": { - "type": ["null", "boolean"] - }, - "like_count": { - "type": ["null", "integer"] - }, - "media_type": { - "type": ["null", "string"] - }, - "media_url": { - "type": ["null", "string"] - }, - "owner": { - "type": ["null", "object"], - "properties": { - "id": { - "type": ["null", "string"] - } - } - }, - "permalink": { - "type": ["null", "string"] - }, - "shortcode": { - "type": ["null", "string"] - }, - "thumbnail_url": { - "type": ["null", "string"] - }, - "timestamp": { - "type": ["null", "string"], - "format": "date-time" - }, - "username": { - "type": ["null", "string"] - }, - "children": { - "type": ["null", "array"], - "items": { - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "ig_id": { - "type": ["null", "string"] - }, - "media_type": { - "type": ["null", "string"] - }, - "media_url": { - "type": ["null", "string"] - }, - "owner": { - "type": ["null", "object"], - "properties": { - "id": { - "type": ["null", "string"] - } - } - }, - "permalink": { - "type": ["null", "string"] - }, - "shortcode": { - "type": ["null", "string"] - }, - "thumbnail_url": { - "type": ["null", "string"] - }, - "timestamp": { - "type": ["null", "string"], - "format": "date-time" - }, - "username": { - "type": ["null", "string"] - } - } - } - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "stories", - "json_schema": { - "type": "object", - "properties": { - "business_account_id": { - "type": ["null", "string"] - }, - "page_id": { - "type": ["null", "string"] - }, - "caption": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "ig_id": { - "type": ["null", "string"] - }, - "like_count": { - "type": ["null", "integer"] - }, - "media_type": { - "type": ["null", "string"] - }, - "media_url": { - "type": ["null", "string"] - }, - "owner": { - "type": ["null", "object"], - "properties": { - "id": { - "type": ["null", "string"] - } - } - }, - "permalink": { - "type": ["null", "string"] - }, - "shortcode": { - "type": ["null", "string"] - }, - "thumbnail_url": { - "type": ["null", "string"] - }, - "timestamp": { - "type": ["null", "string"], - "format": "date-time" - }, - "username": { - "type": ["null", "string"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "users", - "json_schema": { - "type": "object", - "properties": { - "page_id": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "biography": { - "type": ["null", "string"] - }, - "ig_id": { - "type": ["null", "integer"] - }, - "followers_count": { - "type": ["null", "integer"] - }, - "follows_count": { - "type": ["null", "integer"] - }, - "media_count": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "profile_picture_url": { - "type": ["null", "string"] - }, - "username": { - "type": ["null", "string"] - }, - "website": { - "type": ["null", "string"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - } - ] -} diff --git a/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog_insights.json b/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog_insights.json deleted file mode 100644 index 0a1e65a42852c..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/sample_files/configured_catalog_insights.json +++ /dev/null @@ -1,188 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "user_lifetime_insights", - "json_schema": { - "type": "object", - "properties": { - "page_id": { - "type": ["null", "string"] - }, - "business_account_id": { - "type": ["null", "string"] - }, - "date": { - "type": ["null", "string"], - "format": "date-time" - }, - "metric": { - "type": ["null", "string"] - }, - "value": { - "type": ["integer", "object"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "user_insights", - "json_schema": { - "type": "object", - "properties": { - "page_id": { - "type": ["null", "string"] - }, - "business_account_id": { - "type": ["null", "string"] - }, - "date": { - "type": ["null", "string"], - "format": "date-time" - }, - "follower_count": { - "type": ["null", "integer"] - }, - "get_directions_clicks": { - "type": ["null", "integer"] - }, - "impressions": { - "type": ["null", "integer"] - }, - "phone_call_clicks": { - "type": ["null", "integer"] - }, - "profile_views": { - "type": ["null", "integer"] - }, - "reach": { - "type": ["null", "integer"] - }, - "text_message_clicks": { - "type": ["null", "integer"] - }, - "website_clicks": { - "type": ["null", "integer"] - }, - "impressions_week": { - "type": ["null", "integer"] - }, - "reach_week": { - "type": ["null", "integer"] - }, - "impressions_days_28": { - "type": ["null", "integer"] - }, - "reach_days_28": { - "type": ["null", "integer"] - }, - "online_followers": { - "type": ["null", "object"] - } - } - }, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": true - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "media_insights", - "json_schema": { - "type": "object", - "properties": { - "business_account_id": { - "type": ["null", "string"] - }, - "page_id": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "engagement": { - "type": ["null", "integer"] - }, - "impressions": { - "type": ["null", "integer"] - }, - "reach": { - "type": ["null", "integer"] - }, - "saved": { - "type": ["null", "integer"] - }, - "video_views": { - "type": ["null", "integer"] - }, - "carousel_album_engagement": { - "type": ["null", "integer"] - }, - "carousel_album_impressions": { - "type": ["null", "integer"] - }, - "carousel_album_reach": { - "type": ["null", "integer"] - }, - "carousel_album_saved": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "story_insights", - "json_schema": { - "type": "object", - "properties": { - "business_account_id": { - "type": ["null", "string"] - }, - "page_id": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "exits": { - "type": ["null", "integer"] - }, - "impressions": { - "type": ["null", "integer"] - }, - "reach": { - "type": ["null", "integer"] - }, - "replies": { - "type": ["null", "integer"] - }, - "taps_forward": { - "type": ["null", "integer"] - }, - "taps_back": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - } - ] -} diff --git a/airbyte-integrations/connectors/source-instagram/setup.py b/airbyte-integrations/connectors/source-instagram/setup.py index 8e14e0fbd4702..9686356c7cfaa 100644 --- a/airbyte-integrations/connectors/source-instagram/setup.py +++ b/airbyte-integrations/connectors/source-instagram/setup.py @@ -25,24 +25,28 @@ from setuptools import find_packages, setup +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.1", + "cached_property~=1.5", + "facebook_business~=11.0", + "pendulum>=2,<3", + "backoff", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "requests_mock==1.8.0", +] + setup( name="source_instagram", description="Source implementation for Instagram.", author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=[ - "airbyte-protocol", - "base-python", - "facebook_business==10.0.0", - "pendulum==1.2.0", - "cached_property==1.5.2", - "backoff==1.10.0", - ], + install_requires=MAIN_REQUIREMENTS, package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, - setup_requires=["pytest-runner"], - tests_require=["pytest==6.1.2"], extras_require={ - "tests": ["airbyte_python_test==0.0.0", "pytest==6.1.2"], + "tests": TEST_REQUIREMENTS, }, ) diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/api.py b/airbyte-integrations/connectors/source-instagram/source_instagram/api.py new file mode 100644 index 0000000000000..798ec9e02ec28 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/api.py @@ -0,0 +1,116 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import json +from time import sleep +from typing import Any, List, Mapping + +import backoff +import pendulum +from airbyte_cdk.entrypoint import logger +from cached_property import cached_property +from facebook_business import FacebookAdsApi +from facebook_business.adobjects import user as fb_user +from facebook_business.adobjects.iguser import IGUser +from facebook_business.adobjects.page import Page +from facebook_business.exceptions import FacebookRequestError +from source_instagram.common import InstagramAPIException, retry_pattern + +backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=7, factor=5) + + +class MyFacebookAdsApi(FacebookAdsApi): + """Custom Facebook API class to intercept all API calls and handle call rate limits""" + + call_rate_threshold = 90 # maximum percentage of call limit utilization + pause_interval = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit + + @staticmethod + def parse_call_rate_header(headers): + call_count = 0 + pause_interval = pendulum.duration() + + usage_header = headers.get("x-business-use-case-usage") or headers.get("x-app-usage") or headers.get("x-ad-account-usage") + if usage_header: + usage_header = json.loads(usage_header) + call_count = usage_header.get("call_count") or usage_header.get("acc_id_util_pct") or 0 + pause_interval = pendulum.duration(minutes=usage_header.get("estimated_time_to_regain_access", 0)) + + return call_count, pause_interval + + def handle_call_rate_limit(self, response, params): + headers = response.headers() + call_count, pause_interval = self.parse_call_rate_header(headers) + if call_count > self.call_rate_threshold or pause_interval: + logger.warn(f"Utilization is too high ({call_count})%, pausing for {pause_interval}") + sleep(pause_interval.total_seconds()) + + @backoff_policy + def call( + self, + method, + path, + params=None, + headers=None, + files=None, + url_override=None, + api_version=None, + ): + """Makes an API call, delegate actual work to parent class and handles call rates""" + response = super().call(method, path, params, headers, files, url_override, api_version) + self.handle_call_rate_limit(response, params) + return response + + +class InstagramAPI: + def __init__(self, access_token: str): + self._api = FacebookAdsApi.init(access_token=access_token) + # design flaw in MyFacebookAdsApi requires such strange set of new default api instance + self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False) + FacebookAdsApi.set_default_api(self.api) + + @cached_property + def accounts(self) -> List[Mapping[str, Any]]: + return self._find_accounts() + + def _find_accounts(self) -> List[Mapping[str, Any]]: + try: + instagram_business_accounts = [] + accounts = fb_user.User(fbid="me").get_accounts() + for account in accounts: + page = Page(account.get_id()).api_get(fields=["instagram_business_account"]) + if page.get("instagram_business_account"): + instagram_business_accounts.append( + { + "page_id": account.get_id(), + "instagram_business_account": IGUser(page.get("instagram_business_account").get("id")), + } + ) + except FacebookRequestError as exc: + raise InstagramAPIException(f"Error: {exc.api_error_code()}, {exc.api_error_message()}") from exc + + if not instagram_business_accounts: + raise InstagramAPIException("Couldn't find an Instagram business account for current Access Token") + + return instagram_business_accounts diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/client/__init__.py b/airbyte-integrations/connectors/source-instagram/source_instagram/client/__init__.py deleted file mode 100644 index 1d6af8de8179a..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/client/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .client import Client - -__all__ = ["Client"] diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/client/api.py b/airbyte-integrations/connectors/source-instagram/source_instagram/client/api.py deleted file mode 100644 index 68e5d04ad1b33..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/client/api.py +++ /dev/null @@ -1,402 +0,0 @@ -# -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# - - -import urllib.parse as urlparse -from abc import ABC, abstractmethod -from typing import Any, Dict, Iterator, List, Optional, Sequence - -import backoff -import pendulum -from base_python.entrypoint import logger -from facebook_business.adobjects.igmedia import IGMedia -from facebook_business.adobjects.iguser import IGUser -from facebook_business.api import Cursor -from facebook_business.exceptions import FacebookRequestError - -from .common import retry_pattern - -backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=7, factor=5) - - -def clear_url(record_data: dict = None): - """ - This function removes the _nc_rid parameter from the video url and ccb from profile_picture_url for users. - _nc_rid is generated every time a new one and ccb can change its value, and tests fail when checking for identity. - This does not spoil the link, it remains correct and by clicking on it you can view the video or see picture. - """ - - def clear_query_params(url): - parsed_url = urlparse.urlparse(url) - res_query = [] - for q in parsed_url.query.split("&"): - key, value = q.split("=") - if key not in ["_nc_rid", "ccb"]: - res_query.append(f"{key}={value}") - - parse_result = parsed_url._replace(query="&".join(res_query)) - return urlparse.urlunparse(parse_result) - - if record_data.get("media_type") == "VIDEO" and record_data.get("media_url"): - record_data["media_url"] = clear_query_params(record_data["media_url"]) - elif record_data.get("profile_picture_url"): - record_data["profile_picture_url"] = clear_query_params(record_data["profile_picture_url"]) - return record_data - - -class StreamAPI(ABC): - result_return_limit = 100 - non_object_fields = ["page_id", "business_account_id"] - - def __init__(self, api, *args, **kwargs): - super().__init__(*args, **kwargs) - self._api = api - - @abstractmethod - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - - def filter_input_fields(self, fields: Sequence[str] = None): - return list(set(fields) - set(self.non_object_fields)) - - @backoff_policy - def load_next_page(self, instance: Cursor): - instance.load_next_page() - - @backoff_policy - def get_instance_cursor(self, ig_user: IGUser, method_name: str, params: dict = None, fields: Sequence[str] = None) -> Cursor: - return getattr(ig_user, method_name)(params=params, fields=fields) - - def pagination(self, ig_user: IGUser, method_name: str, params: dict = None, fields: Sequence[str] = None) -> Iterator[Any]: - """ - To implement pagination, we use private variables of the Cursor class. - - todo: Should be careful when updating the library version. - """ - instance = self.get_instance_cursor(ig_user, method_name, params, fields) - yield from instance._queue - next_page = not instance._finished_iteration - while next_page: - self.load_next_page(instance) - yield from instance._queue - next_page = not instance._finished_iteration - - -class IncrementalStreamAPI(StreamAPI, ABC): - @property - @abstractmethod - def state_pk(self): - """Name of the field associated with the state""" - - @property - @abstractmethod - def cursor_field(self): - """Name of the field associated with the account_id""" - - @property - def state(self) -> Dict[str, str]: - """ - State is a dictionary of the format {"account_id" : "cursor_value"} - """ - - return {account_id: str(account_state) for account_id, account_state in self._state.items()} - - @state.setter - def state(self, value): - # Convert State for each account from string to pendulum(datetime format) - self._state = {account_id: pendulum.parse(account_state) for account_id, account_state in value.items()} - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._state = None - - def state_filter(self, record: dict) -> Optional[dict]: - """Apply state filter to record, update cursor(state)""" - - cursor = pendulum.parse(record[self.state_pk]) - if self._state[record[self.cursor_field]] >= cursor: - return - - stream_name = self.__class__.__name__ - if stream_name.endswith("API"): - stream_name = stream_name[:-3] - logger.info( - f"Advancing bookmark for {stream_name} stream for {self.cursor_field} {record[self.cursor_field]} from {self._state[record[self.cursor_field]]} to {cursor}" - ) - self._state.update({record[self.cursor_field]: max(cursor, self._state[record[self.cursor_field]])}) - return record - - -class UsersAPI(StreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - yield { - **{"page_id": account["page_id"]}, - **clear_url(self._extend_record(account["instagram_business_account"], fields=self.filter_input_fields(fields))), - } - - @backoff_policy - def _extend_record(self, ig_user: IGUser, fields: Sequence[str] = None) -> Dict: - return ig_user.api_get(fields=fields).export_all_data() - - -class UserLifetimeInsightsAPI(StreamAPI): - LIFETIME_METRICS = ["audience_city", "audience_country", "audience_gender_age", "audience_locale"] - period = "lifetime" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - for insight in self._get_insight_records(account["instagram_business_account"], params=self._params()): - yield { - "page_id": account["page_id"], - "business_account_id": account["instagram_business_account"].get("id"), - "metric": insight.get("name"), - "date": insight.get("values")[0]["end_time"], - "value": insight.get("values")[0]["value"], - } - - def _params(self) -> Dict: - return {"metric": self.LIFETIME_METRICS, "period": self.period} - - @backoff_policy - def _get_insight_records(self, instagram_user: IGUser, params: dict = None) -> Iterator[Any]: - return instagram_user.get_insights(params=params) - - -class UserInsightsAPI(IncrementalStreamAPI): - METRICS_BY_PERIOD = { - "day": [ - "email_contacts", - "follower_count", - "get_directions_clicks", - "impressions", - "phone_call_clicks", - "profile_views", - "reach", - "text_message_clicks", - "website_clicks", - ], - "week": ["impressions", "reach"], - "days_28": ["impressions", "reach"], - "lifetime": ["online_followers"], - } - - state_pk = "date" - cursor_field = "business_account_id" - - # We can only get User Insights data for today and the previous 29 days. - # This is Facebook policy - buffer_days = 29 - days_increment = 1 - - def __init__(self, api): - super().__init__(api=api) - self._state = {} - self._end_date = pendulum.now() - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - account_id = account["instagram_business_account"].get("id") - self._set_state(account_id) - for params_per_day in self._params(account_id): - insight_list = [] - for params in params_per_day: - insight_list += self._get_insight_records(account["instagram_business_account"], params=params) - if not insight_list: - continue - - insight_record = {"page_id": account["page_id"], "business_account_id": account_id} - for insight in insight_list: - key = ( - f"{insight.get('name')}_{insight.get('period')}" - if insight.get("period") in ["week", "days_28"] - else insight.get("name") - ) - insight_record[key] = insight.get("values")[0]["value"] - if not insight_record.get("date"): - insight_record["date"] = insight.get("values")[0]["end_time"] - - record = self.state_filter(insight_record) - if record: - yield record - - def _params(self, account_id: str) -> Iterator[List]: - buffered_start_date = self._state[account_id] - - while buffered_start_date <= self._end_date: - params_list = [] - for period, metrics in self.METRICS_BY_PERIOD.items(): - params_list.append( - { - "metric": metrics, - "period": [period], - "since": buffered_start_date.to_datetime_string(), - "until": buffered_start_date.add(days=self.days_increment).to_datetime_string(), - } - ) - yield params_list - buffered_start_date = buffered_start_date.add(days=self.days_increment) - - def _set_state(self, account_id: str): - start_date = self._state[account_id] if self._state.get(account_id) else self._api._start_date - self._state[account_id] = max(start_date, pendulum.now().subtract(days=self.buffer_days)) - - @backoff_policy - def _get_insight_records(self, instagram_user: IGUser, params: dict = None) -> List: - return instagram_user.get_insights(params=params)._queue - - -class MediaAPI(StreamAPI): - # Children objects can only be of the media_type == "CAROUSEL_ALBUM". - # And children object does not support INVALID_CHILDREN_FIELDS fields, so they are excluded when trying to get child objects to avoid the error. - INVALID_CHILDREN_FIELDS = ["caption", "comments_count", "is_comment_enabled", "like_count", "children"] - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - children_fields = self.filter_input_fields(list(set(fields) - set(self.INVALID_CHILDREN_FIELDS))) - for account in self._api.accounts: - media = self._get_media( - account["instagram_business_account"], {"limit": self.result_return_limit}, self.filter_input_fields(fields) - ) - for record in media: - record_data = record.export_all_data() - if record_data.get("children"): - record_data["children"] = [ - clear_url(self._get_single_record(child_record["id"], children_fields).export_all_data()) - for child_record in record.get("children")["data"] - ] - record_data.update( - { - "page_id": account["page_id"], - "business_account_id": account["instagram_business_account"].get("id"), - } - ) - yield clear_url(record_data) - - def _get_media(self, instagram_user: IGUser, params: dict = None, fields: Sequence[str] = None) -> Iterator[Any]: - yield from self.pagination(instagram_user, "get_media", params=params, fields=fields) - - @backoff_policy - def _get_single_record(self, media_id: str, fields: Sequence[str] = None) -> IGMedia: - return IGMedia(media_id).api_get(fields=fields) - - -class StoriesAPI(StreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - stories = self._get_stories( - account["instagram_business_account"], {"limit": self.result_return_limit}, self.filter_input_fields(fields) - ) - for record in stories: - record_data = record.export_all_data() - record_data.update( - { - "page_id": account["page_id"], - "business_account_id": account["instagram_business_account"].get("id"), - } - ) - yield clear_url(record_data) - - def _get_stories(self, instagram_user: IGUser, params: dict, fields: Sequence[str] = None) -> Iterator[Any]: - yield from self.pagination(instagram_user, "get_stories", params=params, fields=fields) - - -class MediaInsightsAPI(MediaAPI): - MEDIA_METRICS = ["engagement", "impressions", "reach", "saved"] - CAROUSEL_ALBUM_METRICS = ["carousel_album_engagement", "carousel_album_impressions", "carousel_album_reach", "carousel_album_saved"] - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - ig_account = account["instagram_business_account"] - media = self._get_media(ig_account, {"limit": self.result_return_limit}, ["media_type"]) - for ig_media in media: - try: - yield { - **{ - "id": ig_media.get("id"), - "page_id": account["page_id"], - "business_account_id": ig_account.get("id"), - }, - **{record.get("name"): record.get("values")[0]["value"] for record in self._get_insights(ig_media)}, - } - except FacebookRequestError as error: - # An error might occur if the media was posted before the most recent time that - # the user's account was converted to a business account from a personal account - if error.api_error_subcode() == 2108006: - logger.error(f"Insights error for business_account_id {ig_account.get('id')}: {error.body()}") - - # We receive all Media starting from the last one, and if on the next Media we get an Insight error, - # then no reason to make inquiries for each Media further, since they were published even earlier. - break - raise error - - @backoff_policy - def _get_insights(self, item) -> Iterator[Any]: - """ - This is necessary because the functions that call this endpoint return - a generator, whose calls need decorated with a backoff. - """ - if item.get("media_type") == "VIDEO": - metrics = self.MEDIA_METRICS + ["video_views"] - elif item.get("media_type") == "CAROUSEL_ALBUM": - metrics = self.CAROUSEL_ALBUM_METRICS - else: - metrics = self.MEDIA_METRICS - - return item.get_insights(params={"metric": metrics}) - - -class StoriesInsightsAPI(StoriesAPI): - STORY_METRICS = ["exits", "impressions", "reach", "replies", "taps_forward", "taps_back"] - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - for account in self._api.accounts: - stories = self._get_stories(account["instagram_business_account"], {"limit": self.result_return_limit}, fields=[]) - for ig_story in stories: - insights = self._get_insights(ig_story) - if insights: - yield { - **{ - "id": ig_story.get("id"), - "page_id": account["page_id"], - "business_account_id": account["instagram_business_account"].get("id"), - }, - **{record.get("name"): record.get("values")[0]["value"] for record in insights}, - } - - @backoff_policy - def _get_insights(self, item) -> Iterator[Any]: - """ - This is necessary because the functions that call this endpoint return - a generator, whose calls need decorated with a backoff. - """ - - # Story IG Media object metrics with values less than 5 will return an error code 10 with the message (#10) - # Not enough viewers for the media to show insights. - try: - return item.get_insights(params={"metric": self.STORY_METRICS}) - except FacebookRequestError as error: - logger.error(f"Insights error: {error.api_error_message()}") - if error.api_error_code() == 10: - return [] - raise error diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/client/client.py b/airbyte-integrations/connectors/source-instagram/source_instagram/client/client.py deleted file mode 100644 index 6359562e4de21..0000000000000 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/client/client.py +++ /dev/null @@ -1,120 +0,0 @@ -# -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# - - -from typing import Any, List, Mapping, Tuple - -import backoff -import pendulum -from base_python import BaseClient -from base_python.entrypoint import logger -from cached_property import cached_property -from facebook_business import FacebookAdsApi -from facebook_business.adobjects import user as fb_user -from facebook_business.adobjects.iguser import IGUser -from facebook_business.adobjects.page import Page -from facebook_business.api import Cursor -from facebook_business.exceptions import FacebookRequestError - -from .api import MediaAPI, MediaInsightsAPI, StoriesAPI, StoriesInsightsAPI, UserInsightsAPI, UserLifetimeInsightsAPI, UsersAPI -from .common import InstagramAPIException, retry_pattern - -backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) - - -class Client(BaseClient): - def __init__(self, access_token: str, start_date: str): - self._start_date = pendulum.parse(start_date) - - self._api = FacebookAdsApi.init(access_token=access_token) - self._apis = { - "media": MediaAPI(self), - "media_insights": MediaInsightsAPI(self), - "stories": StoriesAPI(self), - "story_insights": StoriesInsightsAPI(self), - "users": UsersAPI(self), - "user_lifetime_insights": UserLifetimeInsightsAPI(self), - "user_insights": UserInsightsAPI(self), - } - super().__init__() - - def _enumerate_methods(self) -> Mapping[str, callable]: - """Detect available streams and return mapping""" - return {name: api.list for name, api in self._apis.items()} - - def stream_has_state(self, name: str) -> bool: - """Tell if stream supports incremental sync""" - return hasattr(self._apis[name], "state") - - def get_stream_state(self, name: str) -> Any: - """Get state of stream with corresponding name""" - return self._apis[name].state - - def set_stream_state(self, name: str, state: Any): - """Set state of stream with corresponding name""" - self._apis[name].state = state - - @cached_property - def accounts(self): - return self._find_accounts() - - def _find_accounts(self) -> List: - try: - instagram_business_accounts = [] - accounts = self._get_accounts() - for account in accounts: - page = Page(account.get_id()).api_get(fields=["instagram_business_account"]) - if page.get("instagram_business_account"): - instagram_business_accounts.append( - { - "page_id": account.get_id(), - "instagram_business_account": self._get_instagram_user(page), - } - ) - except FacebookRequestError as exc: - raise InstagramAPIException(f"Error: {exc.api_error_code()}, {exc.api_error_message()}") from exc - - if instagram_business_accounts: - return instagram_business_accounts - raise InstagramAPIException("Couldn't find an Instagram business account for current Access Token") - - @backoff_policy - def _get_accounts(self) -> Cursor: - return fb_user.User(fbid="me").get_accounts() - - @backoff_policy - def _get_instagram_user(self, page: Page) -> IGUser: - return IGUser(page.get("instagram_business_account").get("id")) - - def health_check(self) -> Tuple[bool, str]: - alive = True - error_message = None - try: - self._find_accounts() - except InstagramAPIException as exc: - logger.error(str(exc)) - alive = False - error_message = str(exc) - - return alive, error_message diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/client/common.py b/airbyte-integrations/connectors/source-instagram/source_instagram/common.py similarity index 51% rename from airbyte-integrations/connectors/source-instagram/source_instagram/client/common.py rename to airbyte-integrations/connectors/source-instagram/source_instagram/common.py index bbe049c5485fc..5b220e91605aa 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/client/common.py +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/common.py @@ -24,9 +24,11 @@ import sys +import urllib.parse as urlparse import backoff -from base_python.entrypoint import logger +from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger +from facebook_business.exceptions import FacebookRequestError from requests.status_codes import codes as status_codes @@ -34,19 +36,43 @@ class InstagramAPIException(Exception): """General class for all API errors""" +class InstagramExpectedError(InstagramAPIException): + """Error that we expect to happen, we should continue reading without retrying failed query""" + + def retry_pattern(backoff_type, exception, **wait_gen_kwargs): def log_retry_attempt(details): _, exc, _ = sys.exc_info() logger.info(str(exc)) logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...") - def should_retry_api_error(exc): - if ( - exc.http_status() == status_codes.TOO_MANY_REQUESTS - or (exc.http_status() == status_codes.FORBIDDEN and exc.api_error_message() == "(#4) Application request limit reached") - or exc.api_transient_error() - ): + def should_retry_api_error(exc: FacebookRequestError): + # Retryable OAuth Error Codes + if exc.api_error_type() == "OAuthException" and exc.api_error_code() in (1, 2, 4, 17, 341, 368): + return True + + # Rate Limiting Error Codes + if exc.api_error_code() in (4, 17, 32, 613): + return True + + if exc.http_status() == status_codes.TOO_MANY_REQUESTS: + return True + + # FIXME: add type and http_status + if exc.api_error_code() == 10 and exc.api_error_message() == "(#10) Not enough viewers for the media to show insights": + return False # expected error + + # Issue 4028, Sometimes an error about the Rate Limit is returned with a 400 HTTP code + if exc.http_status() == status_codes.BAD_REQUEST and exc.api_error_code() == 100 and exc.api_error_subcode() == 33: + return True + + if exc.api_transient_error(): return True + + # FIXME: add type, code and http_status + if exc.api_error_subcode() == 2108006: + return False + return False return backoff.on_exception( @@ -57,3 +83,15 @@ def should_retry_api_error(exc): giveup=lambda exc: not should_retry_api_error(exc), **wait_gen_kwargs, ) + + +def remove_params_from_url(url, params): + parsed_url = urlparse.urlparse(url) + res_query = [] + for q in parsed_url.query.split("&"): + key, value = q.split("=") + if key not in params: + res_query.append(f"{key}={value}") + + parse_result = parsed_url._replace(query="&".join(res_query)) + return urlparse.urlunparse(parse_result) diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py index 8d02c40328bb7..87883faf1d60e 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py @@ -22,11 +22,81 @@ # SOFTWARE. # +from datetime import datetime +from typing import Any, List, Mapping, Tuple, Type -from base_python import BaseSource +from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from pydantic import BaseModel, Field +from source_instagram.api import InstagramAPI +from source_instagram.streams import Media, MediaInsights, Stories, StoryInsights, UserInsights, UserLifetimeInsights, Users -from .client import Client +class ConnectorConfig(BaseModel): + class Config: + title = "Source Instagram" -class SourceInstagram(BaseSource): - client_class = Client + start_date: datetime = Field( + description="The date from which you'd like to replicate data for User Insights, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", + pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + examples=["2017-01-25T00:00:00Z"], + ) + + access_token: str = Field( + description='The value of the access token generated. See the docs for more information', + airbyte_secret=True, + ) + + +class SourceInstagram(AbstractSource): + def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + """Connection check to validate that the user-provided config can be used to connect to the underlying API + + :param config: the user-input config object conforming to the connector's spec.json + :param logger: logger object + :return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + ok = False + error_msg = None + + try: + config = ConnectorConfig.parse_obj(config) # FIXME: this will be not need after we fix CDK + api = InstagramAPI(access_token=config.access_token) + logger.info(f"Available accounts: {api.accounts}") + ok = True + except Exception as exc: + error_msg = repr(exc) + + return ok, error_msg + + def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: + """Discovery method, returns available streams + + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + config: ConnectorConfig = ConnectorConfig.parse_obj(config) # FIXME: this will be not need after we fix CDK + api = InstagramAPI(access_token=config.access_token) + + return [ + Media(api=api), + MediaInsights(api=api), + Stories(api=api), + StoryInsights(api=api), + Users(api=api), + UserLifetimeInsights(api=api), + UserInsights(api=api, start_date=config.start_date), + ] + + def spec(self, *args, **kwargs) -> ConnectorSpecification: + """ + Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) + required to run this integration. + """ + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.io/integrations/sources/instagram", + changelogUrl="https://docs.airbyte.io/integrations/sources/instagram", + supportsIncremental=True, + supported_destination_sync_modes=[DestinationSyncMode.append], + connectionSpecification=ConnectorConfig.schema(), + ) diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py b/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py new file mode 100644 index 0000000000000..8a402a0353202 --- /dev/null +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py @@ -0,0 +1,424 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional + +import pendulum +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream +from cached_property import cached_property +from facebook_business.adobjects.igmedia import IGMedia +from facebook_business.exceptions import FacebookRequestError +from source_instagram.api import InstagramAPI + +from .common import remove_params_from_url + + +class InstagramStream(Stream, ABC): + """Base stream class""" + + page_size = 100 + primary_key = "id" + + def __init__(self, api: InstagramAPI, **kwargs): + super().__init__(**kwargs) + self._api = api + + @cached_property + def fields(self) -> List[str]: + """List of fields that we want to query, for now just all properties from stream's schema""" + non_object_fields = ["page_id", "business_account_id"] + fields = list(self.get_json_schema().get("properties", {}).keys()) + return list(set(fields) - set(non_object_fields)) + + def request_params( + self, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + """Parameters that should be passed to query_records method""" + return {"limit": self.page_size} + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + """ + Override to define the slices for this stream. See the stream slicing section of the docs for more information. + + :param sync_mode: + :param cursor_field: + :param stream_state: + :return: + """ + for account in self._api.accounts: + yield {"account": account} + + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + return self._clear_url(record) + + @staticmethod + def _clear_url(record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + """ + This function removes the _nc_rid parameter from the video url and ccb from profile_picture_url for users. + _nc_rid is generated every time a new one and ccb can change its value, and tests fail when checking for identity. + This does not spoil the link, it remains correct and by clicking on it you can view the video or see picture. + """ + if record.get("media_url"): + record["media_url"] = remove_params_from_url(record["media_url"], params=["_nc_rid"]) + if record.get("profile_picture_url"): + record["profile_picture_url"] = remove_params_from_url(record["profile_picture_url"], params=["ccb"]) + + return record + + +class InstagramIncrementalStream(InstagramStream, ABC): + """Base class for incremental streams""" + + def __init__(self, start_date: datetime, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.instance(start_date) + + +class Users(InstagramStream): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user""" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + record = ig_account.api_get(fields=self.fields).export_all_data() + record["page_id"] = account["page_id"] + yield self.transform(record) + + +class UserLifetimeInsights(InstagramStream): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/insights""" + + primary_key = None + LIFETIME_METRICS = ["audience_city", "audience_country", "audience_gender_age", "audience_locale"] + period = "lifetime" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + for insight in ig_account.get_insights(params=self.request_params()): + yield { + "page_id": account["page_id"], + "business_account_id": ig_account.get("id"), + "metric": insight["name"], + "date": insight["values"][0]["end_time"], + "value": insight["values"][0]["value"], + } + + def request_params( + self, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_slice=stream_slice, stream_state=stream_state) + params.update({"metric": self.LIFETIME_METRICS, "period": self.period}) + return params + + +class UserInsights(InstagramIncrementalStream): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/insights""" + + METRICS_BY_PERIOD = { + "day": [ + "email_contacts", + "follower_count", + "get_directions_clicks", + "impressions", + "phone_call_clicks", + "profile_views", + "reach", + "text_message_clicks", + "website_clicks", + ], + "week": ["impressions", "reach"], + "days_28": ["impressions", "reach"], + "lifetime": ["online_followers"], + } + + primary_key = None + cursor_field = "date" + + # For some metrics we can only get insights not older than 30 days, it is Facebook policy + buffer_days = 30 + days_increment = 1 + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._end_date = pendulum.now() + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + account_id = ig_account.get("id") + + base_params = self.request_params(stream_state=stream_state, stream_slice=stream_slice) + insight_list = [] + # iterate over each period, query insights + for period, metrics in self.METRICS_BY_PERIOD.items(): + params = { + **base_params, + "metric": metrics, + "period": [period], + } + + # we get only first record, because cursor will try to fetch next date interval + cursor = ig_account.get_insights(params=params) + if len(cursor): + insight_list += [insights.export_all_data() for insights in cursor[: len(cursor)]] + + # end then merge all periods in one record + insight_record = {"page_id": account["page_id"], "business_account_id": account_id} + for insight in insight_list: + key = insight["name"] + if insight["period"] in ["week", "days_28"]: + key += f"_{insight['period']}" + + insight_record[key] = insight.get("values")[0]["value"] # this depends on days_increment value + if not insight_record.get(self.cursor_field): + insight_record[self.cursor_field] = insight.get("values")[0]["end_time"] + + yield insight_record + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + """Extend default slicing based on accounts with slices based on date intervals""" + stream_state = stream_state or {} + stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) + for stream_slice in stream_slices: + account = stream_slice["account"] + account_id = account["instagram_business_account"]["id"] + + state_value = stream_state.get(account_id, {}).get(self.cursor_field) + start_date = pendulum.parse(state_value) if state_value else self._start_date + start_date = max(start_date, self._start_date, pendulum.now().subtract(days=self.buffer_days)) + for since in pendulum.period(start_date, self._end_date).range("days", self.days_increment): + until = min(since.add(days=self.days_increment), self._end_date) + self.logger.info(f"Reading insights between {since.date()} and {until.date()}") + yield { + **stream_slice, + "since": since.to_datetime_string(), + "until": until.to_datetime_string(), # excluding + } + + def request_params( + self, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + """Append datetime range params""" + params = super().request_params(stream_state=stream_state, stream_slice=stream_slice) + return { + **params, + "since": stream_slice["since"], + "until": stream_slice["until"], + } + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): + """Update stream state from latest record""" + record_value = latest_record[self.cursor_field] + state_value = current_stream_state.get("business_account_id", {}).get(self.cursor_field) or record_value + max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value)) + + current_stream_state[latest_record["business_account_id"]] = { + self.cursor_field: str(max_cursor), + } + + return current_stream_state + + +class Media(InstagramStream): + """Children objects can only be of the media_type == "CAROUSEL_ALBUM". + And children object does not support INVALID_CHILDREN_FIELDS fields, + so they are excluded when trying to get child objects to avoid the error + """ + + INVALID_CHILDREN_FIELDS = ["caption", "comments_count", "is_comment_enabled", "like_count", "children"] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + This method should be overridden by subclasses to read records based on the inputs + """ + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + media = ig_account.get_media(params=self.request_params(), fields=self.fields) + for record in media: + record_data = record.export_all_data() + if record_data.get("children"): + ids = [child["id"] for child in record["children"]["data"]] + record_data["children"] = list(self._get_children(ids)) + + record_data.update( + { + "page_id": account["page_id"], + "business_account_id": ig_account.get("id"), + } + ) + yield self.transform(record_data) + + def _get_children(self, ids: List): + children_fields = list(set(self.fields) - set(self.INVALID_CHILDREN_FIELDS)) + for pk in ids: + yield self.transform(IGMedia(pk).api_get(fields=children_fields).export_all_data()) + + +class MediaInsights(Media): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-media/insights""" + + MEDIA_METRICS = ["engagement", "impressions", "reach", "saved"] + CAROUSEL_ALBUM_METRICS = ["carousel_album_engagement", "carousel_album_impressions", "carousel_album_reach", "carousel_album_saved"] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + media = ig_account.get_media(params=self.request_params(), fields=["media_type"]) + for ig_media in media: + account_id = ig_account.get("id") + insights = self._get_insights(ig_media, account_id) + if insights is None: + break + + insights["id"] = ig_media["id"] + insights["page_id"] = account["page_id"] + insights["business_account_id"] = ig_account["id"] + yield self.transform(insights) + + def _get_insights(self, item, account_id) -> Optional[MutableMapping[str, Any]]: + """Get insights for specific media""" + if item.get("media_type") == "VIDEO": + metrics = self.MEDIA_METRICS + ["video_views"] + elif item.get("media_type") == "CAROUSEL_ALBUM": + metrics = self.CAROUSEL_ALBUM_METRICS + else: + metrics = self.MEDIA_METRICS + + try: + insights = item.get_insights(params={"metric": metrics}) + return {record.get("name"): record.get("values")[0]["value"] for record in insights} + except FacebookRequestError as error: + # An error might occur if the media was posted before the most recent time that + # the user's account was converted to a business account from a personal account + if error.api_error_subcode() == 2108006: + self.logger.error(f"Insights error for business_account_id {account_id}: {error.api_error_message()}") + + # We receive all Media starting from the last one, and if on the next Media we get an Insight error, + # then no reason to make inquiries for each Media further, since they were published even earlier. + return None + raise error + + +class Stories(InstagramStream): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/stories""" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + stories = ig_account.get_stories(params=self.request_params(), fields=self.fields) + for record in stories: + record_data = record.export_all_data() + record_data["page_id"] = account["page_id"] + record_data["business_account_id"] = ig_account.get("id") + yield self.transform(record_data) + + +class StoryInsights(Stories): + """Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-media/insights""" + + metrics = ["exits", "impressions", "reach", "replies", "taps_forward", "taps_back"] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + account = stream_slice["account"] + ig_account = account["instagram_business_account"] + stories = ig_account.get_stories(params=self.request_params(), fields=[]) + for ig_story in stories: + insights = self._get_insights(IGMedia(ig_story["id"])) + if not insights: + continue + + insights["id"] = ig_story["id"] + insights["page_id"] = account["page_id"] + insights["business_account_id"] = ig_account["id"] + yield self.transform(insights) + + def _get_insights(self, story: IGMedia) -> MutableMapping[str, Any]: + """Get insights for specific story""" + + # Story IG Media object metrics with values less than 5 will return an error code 10 with the message (#10) + # Not enough viewers for the media to show insights. + try: + insights = story.get_insights(params={"metric": self.metrics}) + return {record["name"]: record["values"][0]["value"] for record in insights} + except FacebookRequestError as error: + if error.api_error_code() == 10: + self.logger.error(f"Insights error: {error.api_error_message()}") + return {} + raise error diff --git a/docs/integrations/sources/instagram.md b/docs/integrations/sources/instagram.md index b9d12511f7709..ff08ee3f9a2a7 100644 --- a/docs/integrations/sources/instagram.md +++ b/docs/integrations/sources/instagram.md @@ -79,3 +79,8 @@ More details how to get a User's Access Token you can find in the following docs With the Instagram Account ID and API access token, you should be ready to start pulling data from the Facebook Instagram API. Head to the Airbyte UI to setup your source connector! +## Changelog + +| Version | Date | Pull Request | Subject | +| :------ | :-------- | :----- | :------ | +| 0.1.6 | 2021-07-07 | [4210](https://github.com/airbytehq/airbyte/pull/4210) | Refactor connector to use CDK:
- improve error handling.
- fix sync fail with HTTP status 400.
- integrate SAT.|