Skip to content

Commit

Permalink
only create if changed
Browse files Browse the repository at this point in the history
  • Loading branch information
ofpiyush committed Dec 16, 2021
1 parent 5216e71 commit f2757fc
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 45 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ repos:
rev: 21.12b0
hooks:
- id: black
args: [--line-length=119]
- repo: https://gitlab.com/pycqa/flake8
rev: 4.0.1
hooks:
Expand Down
10 changes: 3 additions & 7 deletions examples/full/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
# Imports from this repository
from fastapi_cloud_tasks.utils import queue_path

TASK_LISTENER_BASE_URL = os.getenv(
"TASK_LISTENER_BASE_URL", default="https://b22d-35-207-241-4.ngrok.io"
)
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="applied-honor-105708")
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://example.com")
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")

Expand All @@ -26,9 +24,7 @@
queue=TASK_QUEUE,
)

TASK_OIDC_TOKEN = tasks_v2.OidcToken(
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
)
TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)
SCHEDULER_OIDC_TOKEN = scheduler_v1.OidcToken(
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
)
4 changes: 1 addition & 3 deletions examples/simple/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

TaskRoute = TaskRouteBuilder(
# Base URL where the task server will get hosted
base_url=os.getenv(
"TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"
),
base_url=os.getenv("TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"),
# Full queue path to which we'll send tasks.
# Edit values below to match your project
queue_path=queue_path(
Expand Down
4 changes: 1 addition & 3 deletions fastapi_cloud_tasks/delayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def delay(self, **kwargs):

request = self.pre_create_hook(request)

return self.client.create_task(
request=request, timeout=self.task_create_timeout
)
return self.client.create_task(request=request, timeout=self.task_create_timeout)

def _schedule(self):
if self.countdown is None or self.countdown <= 0:
Expand Down
18 changes: 4 additions & 14 deletions fastapi_cloud_tasks/requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,18 @@ def __init__(
self.base_url = base_url.rstrip("/")

def _headers(self, *, values):
headers = err_val(
request_params_to_args(self.route.dependant.header_params, values)
)
cookies = err_val(
request_params_to_args(self.route.dependant.cookie_params, values)
)
headers = err_val(request_params_to_args(self.route.dependant.header_params, values))
cookies = err_val(request_params_to_args(self.route.dependant.cookie_params, values))
if len(cookies) > 0:
headers["Cookies"] = "; ".join([f"{k}={v}" for (k, v) in cookies.items()])
# We use json only.
headers["Content-Type"] = "application/json"
# Always send string headers and skip all headers which are supposed to be sent by cloudtasks
return {
str(k): str(v)
for (k, v) in headers.items()
if not str(k).startswith("x_cloudtasks_")
}
return {str(k): str(v) for (k, v) in headers.items() if not str(k).startswith("x_cloudtasks_")}

def _url(self, *, values):
route = self.route
path_values = err_val(
request_params_to_args(route.dependant.path_params, values)
)
path_values = err_val(request_params_to_args(route.dependant.path_params, values))
for (name, converter) in route.param_convertors.items():
if name in path_values:
continue
Expand Down
44 changes: 32 additions & 12 deletions fastapi_cloud_tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Third Party Imports
from fastapi.routing import APIRoute
from google.cloud import scheduler_v1
from google.protobuf import duration_pb2

# Imports from this repository
from fastapi_cloud_tasks.hooks import SchedulerHook
Expand All @@ -23,14 +24,21 @@ def __init__(
name: str = "",
schedule_create_timeout: float = 10.0,
retry_config: scheduler_v1.RetryConfig = None,
time_zone: str = None
time_zone: str = "UTC",
force: bool = False,
) -> None:
super().__init__(route=route, base_url=base_url)
if name == "":
name = route.unique_id

if retry_config is None:
retry_config = scheduler_v1.RetryConfig(retry_count=5)
retry_config = scheduler_v1.RetryConfig(
retry_count=5,
max_retry_duration=duration_pb2.Duration(seconds=0),
min_backoff_duration=duration_pb2.Duration(seconds=5),
max_backoff_duration=duration_pb2.Duration(seconds=120),
max_doublings=5,
)

self.retry_config = retry_config
location_parts = client.parse_common_location_path(location_path)
Expand All @@ -45,6 +53,7 @@ def __init__(
self.method = schedulerMethod(route.methods)
self.client = client
self.pre_scheduler_hook = pre_scheduler_hook
self.force = force

def schedule(self, **kwargs):
# Create http request
Expand All @@ -63,25 +72,36 @@ def schedule(self, **kwargs):
http_target=request,
schedule=self.cron_schedule,
retry_config=self.retry_config,
time_zone=self.time_zone,
)
if self.time_zone is not None:
job.time_zone = self.time_zone

request = scheduler_v1.CreateJobRequest(parent=self.location_path, job=job)

request = self.pre_scheduler_hook(request)

# Delete and create job
self.delete()
return self.client.create_job(
request=request, timeout=self.schedule_create_timeout
)
if self.force or self._has_changed(request=request):
# Delete and create job
self.delete()
self.client.create_job(request=request, timeout=self.schedule_create_timeout)

def _has_changed(self, request: scheduler_v1.CreateJobRequest):
try:
job = self.client.get_job(name=request.job.name)
# Remove things that are either output only or GCP adds by default
job.user_update_time = None
job.state = None
job.schedule_time = None
del job.http_target.headers["User-Agent"]
# Proto compare works directly with `__eq__`
return job != request.job
except Exception:
return True
return False

def delete(self):
# We return true or exception because you could have the delete code on multiple instances
try:
self.client.delete_job(
name=self.job_id, timeout=self.schedule_create_timeout
)
self.client.delete_job(name=self.job_id, timeout=self.schedule_create_timeout)
return True
except Exception as ex:
return ex
8 changes: 2 additions & 6 deletions fastapi_cloud_tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@


def location_path(*, project, location):
return scheduler_v1.CloudSchedulerClient.common_location_path(
project=project, location=location
)
return scheduler_v1.CloudSchedulerClient.common_location_path(project=project, location=location)


def queue_path(*, project, location, queue):
return tasks_v2.CloudTasksClient.queue_path(
project=project, location=location, queue=queue
)
return tasks_v2.CloudTasksClient.queue_path(project=project, location=location, queue=queue)


def err_val(resp: Tuple[Dict, List[ErrorWrapper]]):
Expand Down

0 comments on commit f2757fc

Please sign in to comment.