Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Facebook Marketing: implement retry logic for async reports #7904

Merged
merged 12 commits into from
Nov 12, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.24
dockerImageTag: 0.2.25
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.24"
- dockerImage: "airbyte/source-facebook-marketing:0.2.25"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.24
LABEL io.airbyte.version=0.2.25
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
@@ -1,23 +1,3 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.33",
"airbyte-cdk~=0.1.35",
"cached_property~=1.5",
"facebook_business~=12.0",
"pendulum>=2,<3",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,5 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from .source import SourceFacebookMarketing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.common import FacebookAPIException

logger = logging.getLogger(__name__)
logger = logging.getLogger("airbyte")


class MyFacebookAdsApi(FacebookAdsApi):
Expand Down Expand Up @@ -68,14 +68,14 @@ def handle_call_rate_limit(self, response, params):

if max_usage > self.call_rate_threshold:
max_pause_interval = max(max_pause_interval, self.pause_interval_minimum)
logger.warn(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
logger.warning(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
sleep(max_pause_interval.total_seconds())
else:
headers = response.headers()
usage, pause_interval = self.parse_call_rate_header(headers)
if usage > self.call_rate_threshold or pause_interval:
pause_interval = max(pause_interval, self.pause_interval_minimum)
logger.warn(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
sleep(pause_interval.total_seconds())

def call(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from enum import Enum
from typing import Any, Mapping

import backoff
import pendulum
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.api import API

from .common import JobException, JobTimeoutException, retry_pattern

backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
logger = logging.getLogger("airbyte")


class Status(Enum):
"""Async job statuses"""

COMPLETED = "Job Completed"
FAILED = "Job Failed"
SKIPPED = "Job Skipped"
STARTED = "Job Started"


class AsyncJob:
"""AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job"""

MAX_WAIT_TO_START = pendulum.duration(minutes=5)
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)

def __init__(self, api: API, params: Mapping[str, Any]):
"""Initialize

:param api: Facebook Api wrapper
:param params: job params, required to start/restart job
"""
self._params = params
self._api = api
self._job = None
self._start_time = None
self._finish_time = None
self._failed = False

@backoff_policy
def start(self):
"""Start remote job"""
if self._job:
raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead")

self._job = self._api.account.get_insights(params=self._params, is_async=True)
self._start_time = pendulum.now()
job_id = self._job["report_run_id"]
time_range = self._params["time_range"]
breakdowns = self._params["breakdowns"]
logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns}")

def restart(self):
"""Restart failed job"""
if not self._job or not self.failed:
raise RuntimeError(f"{self}: Incorrect usage of restart - only failed jobs can be restarted")

self._job = None
self._failed = False
self._start_time = None
self._finish_time = None
self.start()
logger.info(f"{self}: restarted")

@property
def elapsed_time(self):
"""Elapsed time since the job start"""
if not self._start_time:
return None

end_time = self._finish_time or pendulum.now()
return end_time - self._start_time

@property
def completed(self) -> bool:
"""Check job status and return True if it is completed successfully

:return: True if completed successfully, False - if task still running
:raises: JobException in case job failed to start, failed or timed out
"""
try:
return self._check_status()
except JobException:
self._failed = True
raise

@property
def failed(self) -> bool:
"""Tell if the job previously failed"""
return self._failed

@backoff_policy
def _update_job(self):
if not self._job:
raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started")
self._job = self._job.api_get()

def _check_status(self) -> bool:
"""Perform status check

:return: True if the job is completed, False - if the job is still running
:raises: errors if job failed or timed out
"""
self._update_job()
job_progress_pct = self._job["async_percent_completion"]
logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})")
runtime = self.elapsed_time

if self._job["async_status"] == Status.COMPLETED.value:
self._finish_time = pendulum.now()
return True
elif self._job["async_status"] == Status.FAILED.value:
raise JobException(f"{self._job} failed after {runtime.in_seconds()} seconds.")
elif self._job["async_status"] == Status.SKIPPED.value:
raise JobException(f"{self._job} skipped after {runtime.in_seconds()} seconds.")

if runtime > self.MAX_WAIT_TO_START and self._job["async_percent_completion"] == 0:
raise JobTimeoutException(
f"{self._job} did not start after {runtime.in_seconds()} seconds."
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
)
elif runtime > self.MAX_WAIT_TO_FINISH:
raise JobTimeoutException(
f"{self._job} did not finish after {runtime.in_seconds()} seconds."
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
)
return False

@backoff_policy
def get_result(self) -> Any:
"""Retrieve result of the finished job."""
if not self._job or self.failed:
raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed")
return self._job.get_result()

def __str__(self) -> str:
"""String representation of the job wrapper."""
job_id = self._job["report_run_id"] if self._job else "<None>"
time_range = self._params["time_range"]
breakdowns = self._params["breakdowns"]
return f"AdReportRun(id={job_id}, time_range={time_range}, breakdowns={breakdowns}"
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
FACEBOOK_UNKNOWN_ERROR_CODE = 99
DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1)

logger = logging.getLogger(__name__)
logger = logging.getLogger("airbyte")


class FacebookAPIException(Exception):
"""General class for all API errors"""


class JobException(Exception):
"""Job failed after FB exception"""
"""Scheduled job failed"""


class JobTimeoutException(Exception):
class JobTimeoutException(JobException):
"""Scheduled job timed out"""


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Type
Expand Down Expand Up @@ -37,7 +36,7 @@
Campaigns,
)

logger = logging.getLogger(__name__)
logger = logging.getLogger("airbyte")


class InsightConfig(BaseModel):
Expand Down
Loading