From e7b0722e995475f55383abfd6a6080512d9cf79f Mon Sep 17 00:00:00 2001 From: Vitalii Vdovenko Date: Thu, 13 May 2021 15:44:49 +0300 Subject: [PATCH 1/5] FB Marketing source #1390 - returning buffered record while incremental sync --- .../source_facebook_marketing/client/api.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py index 72a601cdddb26..4e74bebc2969e 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py @@ -156,8 +156,10 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: latest_cursor = None for record in super().read(getter, params): cursor = pendulum.parse(record[self.state_pk]) - if self._state and self._state >= cursor: - continue + if self._state: + buffer_days = getattr(self, 'buffer_days') + 1 if hasattr(self, 'buffer_days') else 0 + if self._state.subtract(days=buffer_days) >= cursor: + continue latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor yield record From 0ce8b8beb71b17e5b8781ba8a41b8685ac04b5c5 Mon Sep 17 00:00:00 2001 From: Vitalii Vdovenko Date: Thu, 13 May 2021 17:14:47 +0300 Subject: [PATCH 2/5] FB Marketing source #1390 - improving checking while syncing buffered record --- .../source_facebook_marketing/client/api.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py index 4e74bebc2969e..bd0b3d1a7e3b8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py @@ -154,12 +154,11 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: """Apply state filter to set of records, update cursor(state) if necessary in the end""" params = params or {} latest_cursor = None + buffer_days = getattr(self, 'buffer_days') + 1 if hasattr(self, 'buffer_days') else 0 for record in super().read(getter, params): cursor = pendulum.parse(record[self.state_pk]) - if self._state: - buffer_days = getattr(self, 'buffer_days') + 1 if hasattr(self, 'buffer_days') else 0 - if self._state.subtract(days=buffer_days) >= cursor: - continue + if self._state and self._state.subtract(days=buffer_days) >= cursor: + continue latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor yield record From e1df74fd7d8f06bff89ac9cf73e7a8365add4b4c Mon Sep 17 00:00:00 2001 From: Vitalii Vdovenko Date: Thu, 13 May 2021 20:02:20 +0300 Subject: [PATCH 3/5] FB Marketing source #1390 - adding loop_back to IncrementalStreamAPI --- .../source_facebook_marketing/client/api.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py index bd0b3d1a7e3b8..4a6a469c6c1c1 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py @@ -106,6 +106,8 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]: class IncrementalStreamAPI(StreamAPI, ABC): + loop_back = -1 + @property @abstractmethod def state_pk(self): @@ -154,10 +156,9 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: """Apply state filter to set of records, update cursor(state) if necessary in the end""" params = params or {} latest_cursor = None - buffer_days = getattr(self, 'buffer_days') + 1 if hasattr(self, 'buffer_days') else 0 for record in super().read(getter, params): cursor = pendulum.parse(record[self.state_pk]) - if self._state and self._state.subtract(days=buffer_days) >= cursor: + if self._state and self._state.subtract(days=self.loop_back + 1) >= cursor: continue latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor yield record @@ -327,7 +328,7 @@ class AdsInsightAPI(IncrementalStreamAPI): level = "ad" action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS time_increment = 1 - buffer_days = 28 + loop_back = 28 def __init__(self, api, start_date, breakdowns=None): super().__init__(api=api) @@ -377,7 +378,7 @@ def _run_job_until_completion(self, params) -> AdReportRun: def _params(self, fields: Sequence[str] = None) -> Iterator[dict]: # Facebook freezes insight data 28 days after it was generated, which means that all data # from the past 28 days may have changed since we last emitted it, so we retrieve it again. - buffered_start_date = self._state.subtract(days=self.buffer_days) + buffered_start_date = self._state.subtract(days=self.loop_back) end_date = pendulum.now() fields = list(set(fields) - set(self.INVALID_INSIGHT_FIELDS)) From c296a677894c5489cdbf38c833519b00122b9293 Mon Sep 17 00:00:00 2001 From: Vitalii Vdovenko Date: Thu, 13 May 2021 21:52:15 +0300 Subject: [PATCH 4/5] FB Marketing source #1390 - bump version --- .../e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../connectors/source-facebook-marketing/Dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json index c4cd01b4aaa89..ef5f7d248a4ad 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c", "name": "Facebook Marketing", "dockerRepository": "airbyte/source-facebook-marketing", - "dockerImageTag": "0.2.3", + "dockerImageTag": "0.2.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing", "icon": "facebook.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d3b1d2c89e028..64febb2e45cb9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -91,7 +91,7 @@ - sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c name: Facebook Marketing dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.3 + dockerImageTag: 0.2.4 documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing icon: facebook.svg - sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index de0d5b7f47c24..64449692669ac 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install . -LABEL io.airbyte.version=0.2.3 +LABEL io.airbyte.version=0.2.4 LABEL io.airbyte.name=airbyte/source-facebook-marketing From 215b0e30a9703d6466aa489c1174d7db767881a7 Mon Sep 17 00:00:00 2001 From: Vitalii Vdovenko Date: Thu, 13 May 2021 22:30:02 +0300 Subject: [PATCH 5/5] FB Marketing source #1390 - add CHANGELOG.md --- .../connectors/source-facebook-marketing/CHANGELOG.md | 5 +++++ .../source_facebook_marketing/client/api.py | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md diff --git a/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md b/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md new file mode 100644 index 0000000000000..fae3029ad94e6 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.2.4 +Fix an issue that caused losing Insights data from the past 28 days while incremental sync: https://github.com/airbytehq/airbyte/pull/3395 + diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py index 4a6a469c6c1c1..0fdaaaf2da567 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py @@ -106,7 +106,7 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]: class IncrementalStreamAPI(StreamAPI, ABC): - loop_back = -1 + buffer_days = -1 @property @abstractmethod @@ -158,7 +158,7 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: latest_cursor = None for record in super().read(getter, params): cursor = pendulum.parse(record[self.state_pk]) - if self._state and self._state.subtract(days=self.loop_back + 1) >= cursor: + if self._state and self._state.subtract(days=self.buffer_days + 1) >= cursor: continue latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor yield record @@ -328,7 +328,7 @@ class AdsInsightAPI(IncrementalStreamAPI): level = "ad" action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS time_increment = 1 - loop_back = 28 + buffer_days = 28 def __init__(self, api, start_date, breakdowns=None): super().__init__(api=api) @@ -378,7 +378,7 @@ def _run_job_until_completion(self, params) -> AdReportRun: def _params(self, fields: Sequence[str] = None) -> Iterator[dict]: # Facebook freezes insight data 28 days after it was generated, which means that all data # from the past 28 days may have changed since we last emitted it, so we retrieve it again. - buffered_start_date = self._state.subtract(days=self.loop_back) + buffered_start_date = self._state.subtract(days=self.buffer_days) end_date = pendulum.now() fields = list(set(fields) - set(self.INVALID_INSIGHT_FIELDS))