From 0631f0ceaa73442ad270bbf6c108830a019e654e Mon Sep 17 00:00:00 2001 From: Piyush Date: Thu, 23 Dec 2021 12:10:23 +0530 Subject: [PATCH] Cloud Scheduler to replace Celery Beat (#2) --- .flake8 | 5 + .isort.cfg | 7 ++ .pre-commit-config.yaml | 16 +++ README.md | 79 +++++++++---- examples/full/main.py | 10 +- examples/full/serializer.py | 1 + examples/full/settings.py | 24 +++- examples/full/tasks.py | 63 ++++++++-- examples/simple/main.py | 19 ++- .../{taskroute.py => delayed_route.py} | 13 ++- fastapi_cloud_tasks/delayer.py | 75 ++---------- fastapi_cloud_tasks/exception.py | 8 +- fastapi_cloud_tasks/hooks.py | 50 ++++++-- fastapi_cloud_tasks/requester.py | 88 ++++++++++++++ fastapi_cloud_tasks/scheduled_route.py | 50 ++++++++ fastapi_cloud_tasks/scheduler.py | 109 ++++++++++++++++++ fastapi_cloud_tasks/utils.py | 36 +++++- local-requirements.txt | 6 +- requirements.txt | 1 + setup.py | 11 +- 20 files changed, 535 insertions(+), 136 deletions(-) create mode 100644 .flake8 create mode 100644 .isort.cfg create mode 100644 .pre-commit-config.yaml rename fastapi_cloud_tasks/{taskroute.py => delayed_route.py} (78%) create mode 100644 fastapi_cloud_tasks/requester.py create mode 100644 fastapi_cloud_tasks/scheduled_route.py create mode 100644 fastapi_cloud_tasks/scheduler.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..95640d2 --- /dev/null +++ b/.flake8 @@ -0,0 +1,5 @@ +[flake8] +extend-ignore = E128, E203, E225, E266, E231, E501, E712, W503, C901, F403, F401, F841 +max-line-length = 119 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..830c91e --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,7 @@ +[settings] +line_length=119 +force_single_line=True +import_heading_stdlib=Standard Library Imports +import_heading_thirdparty=Third Party Imports +import_heading_firstparty=Imports from this repository +import_heading_localfolder=Imports from this module diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..045032b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,16 @@ +repos: + - repo: https://github.com/timothycrosley/isort + rev: "5.10.1" + hooks: + - id: isort + args: [--force-single-line-imports] + - repo: https://github.com/psf/black + rev: 21.12b0 + hooks: + - id: black + args: [--line-length=119] + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: [flake8-print==3.1.4] diff --git a/README.md b/README.md index dce6e55..b6ac65e 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks. +GCP's Cloud Scheduler + FastAPI = Replacement for celery beat. + FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks. ## Concept @@ -10,29 +12,43 @@ FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks. [FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint. -FastAPI Cloud Tasks works by putting the two together: + +[`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future. + +FastAPI Cloud Tasks works by putting the three together: - It adds a `.delay` method to existing routes on FastAPI. - When this method is called, it schedules a request with Cloud Tasks. - The task worker is a regular FastAPI server which gets called by Cloud Tasks. +- It adds a `.scheduler` method to existing routes on FastAPI. +- When this method is called, it schedules a recurring job with Cloud Scheduler. -If we host the task worker on Cloud Run, we get free autoscaling. +If we host the task worker on Cloud Run, we get autoscaling workers. ## Pseudocode In practice, this is what it looks like: ```python -router = APIRouter(route_class=TaskRouteBuilder(...)) +delayed_router = APIRouter(route_class=DelayedRouteBuilder(...)) +scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...)) class Recipe(BaseModel): ingredients: List[str] -@router.post("/{restaurant}/make_dinner") -async def make_dinner(restaurant: str, recipe: Recipe,): +@delayed_router.post("/{restaurant}/make_dinner") +async def make_dinner(restaurant: str, recipe: Recipe): # Do a ton of work here. -app.include_router(router) +@scheduled_router.post("/home_cook") +async def home_cook(recipe: Recipe): + # Make my own food + +app.include_router(delayed_router) +app.include_router(scheduled_router) + +# If you wan to make your own breakfast every morning at 7AM IST. +home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"])) ``` Now we can trigger the task with @@ -71,9 +87,9 @@ Forwarding http://feda-49-207-221-153.ngrok.io -> http://loca ```python # complete file: examples/simple/main.py -# First we construct our TaskRoute class with all relevant settings +# First we construct our DelayedRoute class with all relevant settings # This can be done once across the entire project -TaskRoute = TaskRouteBuilder( +DelayedRoute = DelayedRouteBuilder( base_url="http://feda-49-207-221-153.ngrok.io", queue_path=queue_path( project="gcp-project-id", @@ -82,13 +98,12 @@ TaskRoute = TaskRouteBuilder( ), ) -# Wherever we use -task_router = APIRouter(route_class=TaskRoute, prefix="/tasks") +delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks") class Payload(BaseModel): message: str -@task_router.post("/hello") +@delayed_router.post("/hello") async def hello(p: Payload = Payload(message="Default")): logger.warning(f"Hello task ran with payload: {p.message}") @@ -102,7 +117,7 @@ async def trigger(): hello.delay(p=Payload(message="Triggered task")) return {"message": "Hello task triggered"} -app.include_router(task_router) +app.include_router(delayed_router) ``` @@ -144,11 +159,11 @@ We'll only edit the parts from above that we need changed from above example. # URL of the Cloud Run service base_url = "https://hello-randomchars-el.a.run.app" -TaskRoute = TaskRouteBuilder( +DelayedRoute = DelayedRouteBuilder( base_url=base_url, # Task queue, same as above. queue_path=queue_path(...), - pre_create_hook=oidc_hook( + pre_create_hook=oidc_task_hook( token=tasks_v2.OidcToken( # Service account that you created service_account_email="fastapi-cloud-tasks@gcp-project-id.iam.gserviceaccount.com", @@ -162,15 +177,15 @@ Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks. ## Configuration -### TaskRouteBuilder +### DelayedRouteBuilder Usage: ```python -TaskRoute = TaskRouteBuilder(...) -task_router = APIRouter(route_class=TaskRoute) +DelayedRoute = DelayedRouteBuilder(...) +delayed_router = APIRouter(route_class=DelayedRoute) -@task_router.get("/simple_task") +@delayed_router.get("/simple_task") def mySimpleTask(): return {} ``` @@ -185,7 +200,7 @@ def mySimpleTask(): - `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc) -### Task level default options +#### Task level default options Usage: @@ -213,7 +228,7 @@ def mySimpleTask(): return {} ``` -### Delayer Options +#### Delayer Options Usage: @@ -221,7 +236,7 @@ Usage: mySimpleTask.options(...).delay() ``` -All options from above can be overriden per call (including TaskRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay. +All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay. Example: @@ -230,20 +245,36 @@ Example: mySimpleTask.options(countdown=120).delay() ``` +### ScheduledRouteBuilder + +Usage: + +```python +ScheduledRoute = ScheduledRouteBuilder(...) +scheduled_router = APIRouter(route_class=ScheduledRoute) + +@scheduled_router.get("/simple_scheduled_task") +def mySimpleScheduledTask(): + return {} + + +mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule() +``` + + ## Hooks We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that. Some hooks are included in the library. -- `oidc_hook` - Used to work with Cloud Run. -- `deadline_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker) +- `oidc_delayed_hook` / `oidc_scheduled_hook` - Used to pass OIDC token (for Cloud Run etc). +- `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker) - `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)` ## Future work - Ensure queue exists. -- Integrate with [Cloud Scheduler](https://cloud.google.com/scheduler/) to replace celery beat. - Make helper features for worker's side. Eg: - Easier access to current retry count. - API Exceptions to make GCP back-off. diff --git a/examples/full/main.py b/examples/full/main.py index 81b9bea..d2a2d3f 100644 --- a/examples/full/main.py +++ b/examples/full/main.py @@ -1,9 +1,15 @@ +# Standard Library Imports from uuid import uuid4 +# Third Party Imports +from fastapi import FastAPI +from fastapi import Response +from fastapi import status +from google.api_core.exceptions import AlreadyExists + +# Imports from this repository from examples.full.serializer import Payload from examples.full.tasks import hello -from fastapi import FastAPI, Response, status -from google.api_core.exceptions import AlreadyExists app = FastAPI() diff --git a/examples/full/serializer.py b/examples/full/serializer.py index 669c8a2..dec17ee 100644 --- a/examples/full/serializer.py +++ b/examples/full/serializer.py @@ -1,3 +1,4 @@ +# Third Party Imports from pydantic import BaseModel diff --git a/examples/full/settings.py b/examples/full/settings.py index c652ade..a758442 100644 --- a/examples/full/settings.py +++ b/examples/full/settings.py @@ -1,14 +1,24 @@ +# Standard Library Imports import os -from fastapi_cloud_tasks.utils import queue_path +# Third Party Imports +from google.cloud import scheduler_v1 from google.cloud import tasks_v2 -TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="http://example.com") +# Imports from this repository +from fastapi_cloud_tasks.utils import location_path +from fastapi_cloud_tasks.utils import queue_path + +TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://645e-35-207-241-4.ngrok.io") TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project") TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1") +SCHEDULED_LOCATION = os.getenv("SCHEDULED_LOCATION", default="us-central1") TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue") -TASK_SERVICE_ACCOUNT = os.getenv("TASK_SERVICE_ACCOUNT", default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com") +TASK_SERVICE_ACCOUNT = os.getenv( + "TASK_SERVICE_ACCOUNT", + default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com", +) TASK_QUEUE_PATH = queue_path( project=TASK_PROJECT_ID, @@ -16,4 +26,12 @@ queue=TASK_QUEUE, ) +SCHEDULED_LOCATION_PATH = location_path( + project=TASK_PROJECT_ID, + location=SCHEDULED_LOCATION, +) + TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL) +SCHEDULED_OIDC_TOKEN = scheduler_v1.OidcToken( + service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL +) diff --git a/examples/full/tasks.py b/examples/full/tasks.py index 84cf8e8..e7e9442 100644 --- a/examples/full/tasks.py +++ b/examples/full/tasks.py @@ -1,40 +1,83 @@ +# Standard Library Imports import logging -from examples.full.serializer import Payload -from examples.full.settings import TASK_LISTENER_BASE_URL, TASK_OIDC_TOKEN, TASK_QUEUE_PATH +# Third Party Imports from fastapi import FastAPI from fastapi.routing import APIRouter -from fastapi_cloud_tasks.hooks import chained_hook, deadline_hook, oidc_hook -from fastapi_cloud_tasks.taskroute import TaskRouteBuilder from google.protobuf import duration_pb2 +# Imports from this repository +from examples.full.serializer import Payload +from examples.full.settings import SCHEDULED_LOCATION_PATH +from examples.full.settings import SCHEDULED_OIDC_TOKEN +from examples.full.settings import TASK_LISTENER_BASE_URL +from examples.full.settings import TASK_OIDC_TOKEN +from examples.full.settings import TASK_QUEUE_PATH +from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder +from fastapi_cloud_tasks.hooks import chained_hook +from fastapi_cloud_tasks.hooks import deadline_delayed_hook +from fastapi_cloud_tasks.hooks import deadline_scheduled_hook +from fastapi_cloud_tasks.hooks import oidc_delayed_hook +from fastapi_cloud_tasks.hooks import oidc_scheduled_hook +from fastapi_cloud_tasks.scheduled_route import ScheduledRouteBuilder + app = FastAPI() logger = logging.getLogger("uvicorn") -TaskRoute = TaskRouteBuilder( +DelayedRoute = DelayedRouteBuilder( base_url=TASK_LISTENER_BASE_URL, queue_path=TASK_QUEUE_PATH, # Chain multiple hooks together pre_create_hook=chained_hook( # Add service account for cloud run - oidc_hook( + oidc_delayed_hook( token=TASK_OIDC_TOKEN, ), # Wait for half an hour - deadline_hook(duration=duration_pb2.Duration(seconds=1800)), + deadline_delayed_hook(duration=duration_pb2.Duration(seconds=1800)), + ), +) + +ScheduledRoute = ScheduledRouteBuilder( + base_url=TASK_LISTENER_BASE_URL, + location_path=SCHEDULED_LOCATION_PATH, + pre_create_hook=chained_hook( + # Add service account for cloud run + oidc_scheduled_hook( + token=SCHEDULED_OIDC_TOKEN, + ), + # Wait for half an hour + deadline_scheduled_hook(duration=duration_pb2.Duration(seconds=1800)), ), ) -router = APIRouter(route_class=TaskRoute, prefix="/tasks") +task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks") -@router.post("/hello") +@task_router.post("/hello") async def hello(p: Payload = Payload(message="Default")): message = f"Hello task ran with payload: {p.message}" logger.warning(message) return {"message": message} -app.include_router(router) +scheduled_router = APIRouter(route_class=ScheduledRoute, prefix="/scheduled") + + +@scheduled_router.post("/timed_hello") +async def scheduled_hello(p: Payload = Payload(message="Default")): + message = f"Scheduled hello task ran with payload: {p.message}" + logger.warning(message) + return {"message": message} + + +scheduled_hello.scheduler( + name="testing-examples-scheduled-hello", + schedule="*/5 * * * *", + time_zone="Asia/Kolkata", +).schedule(p=Payload(message="Scheduled")) + +app.include_router(task_router) +app.include_router(scheduled_router) diff --git a/examples/simple/main.py b/examples/simple/main.py index 4a0383d..39830a7 100644 --- a/examples/simple/main.py +++ b/examples/simple/main.py @@ -1,15 +1,19 @@ +# Standard Library Imports import logging import os import typing +# Third Party Imports from fastapi import FastAPI -from fastapi.routing import APIRouter from fastapi.params import Header -from fastapi_cloud_tasks.taskroute import TaskRouteBuilder -from fastapi_cloud_tasks.utils import queue_path +from fastapi.routing import APIRouter from pydantic import BaseModel -TaskRoute = TaskRouteBuilder( +# Imports from this repository +from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder +from fastapi_cloud_tasks.utils import queue_path + +DelayedRoute = DelayedRouteBuilder( # Base URL where the task server will get hosted base_url=os.getenv("TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"), # Full queue path to which we'll send tasks. @@ -21,7 +25,7 @@ ), ) -task_router = APIRouter(route_class=TaskRoute, prefix="/tasks") +task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks") logger = logging.getLogger("uvicorn") @@ -31,7 +35,10 @@ class Payload(BaseModel): @task_router.post("/hello") -async def hello(p: Payload = Payload(message="Default"), x_cloudtasks_taskretrycount: typing.Optional[int] = Header(0)): +async def hello( + p: Payload = Payload(message="Default"), + x_cloudtasks_taskretrycount: typing.Optional[int] = Header(0), +): if x_cloudtasks_taskretrycount < 5: raise Exception("Noooo") logger.warning(f"Hello task ran with payload: {p.message}") diff --git a/fastapi_cloud_tasks/taskroute.py b/fastapi_cloud_tasks/delayed_route.py similarity index 78% rename from fastapi_cloud_tasks/taskroute.py rename to fastapi_cloud_tasks/delayed_route.py index 1ac01c3..d687759 100644 --- a/fastapi_cloud_tasks/taskroute.py +++ b/fastapi_cloud_tasks/delayed_route.py @@ -3,19 +3,24 @@ # Third Party Imports from fastapi.routing import APIRoute +from google.cloud import scheduler_v1 from google.cloud import tasks_v2 +# Imports from this repository from fastapi_cloud_tasks.delayer import Delayer -from fastapi_cloud_tasks.hooks import Hook, noop_hook +from fastapi_cloud_tasks.hooks import DelayedTaskHook +from fastapi_cloud_tasks.hooks import noop_hook +from fastapi_cloud_tasks.scheduler import Scheduler -def TaskRouteBuilder( +def DelayedRouteBuilder( *, base_url: str, queue_path: str, task_create_timeout: float = 10.0, - pre_create_hook: Hook = None, + pre_create_hook: DelayedTaskHook = None, client=None, + scheduler_client=None, ): if client is None: client = tasks_v2.CloudTasksClient() @@ -30,7 +35,7 @@ def get_route_handler(self) -> Callable: self.endpoint.delay = self.delay return original_route_handler - def delayOptions(self, **options): + def delayOptions(self, **options) -> Delayer: delayOpts = dict( base_url=base_url, queue_path=queue_path, diff --git a/fastapi_cloud_tasks/delayer.py b/fastapi_cloud_tasks/delayer.py index 86bc6b2..c800fc1 100644 --- a/fastapi_cloud_tasks/delayer.py +++ b/fastapi_cloud_tasks/delayer.py @@ -1,25 +1,18 @@ # Standard Library Imports import datetime -from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse # Third Party Imports -from fastapi.dependencies.utils import request_params_to_args -from fastapi.encoders import jsonable_encoder from fastapi.routing import APIRoute from google.cloud import tasks_v2 from google.protobuf import timestamp_pb2 -from fastapi_cloud_tasks.exception import MissingParamError, WrongTypeError -from fastapi_cloud_tasks.hooks import Hook -from fastapi_cloud_tasks.utils import err_val, taskMethod +# Imports from this repository +from fastapi_cloud_tasks.hooks import DelayedTaskHook +from fastapi_cloud_tasks.requester import Requester +from fastapi_cloud_tasks.utils import taskMethod -try: - import ujson as json -except Exception: - import json - -class Delayer: +class Delayer(Requester): def __init__( self, *, @@ -28,13 +21,12 @@ def __init__( queue_path: str, task_create_timeout: float = 10.0, client: tasks_v2.CloudTasksClient, - pre_create_hook: Hook, + pre_create_hook: DelayedTaskHook, countdown: int = 0, task_id: str = None, ) -> None: - self.route = route + super().__init__(route=route, base_url=base_url) self.queue_path = queue_path - self.base_url = base_url.rstrip("/") self.countdown = countdown self.task_create_timeout = task_create_timeout @@ -77,56 +69,3 @@ def _schedule(self): timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) return timestamp - - def _headers(self, *, values): - headers = err_val(request_params_to_args(self.route.dependant.header_params, values)) - cookies = err_val(request_params_to_args(self.route.dependant.cookie_params, values)) - if len(cookies) > 0: - headers["Cookies"] = "; ".join([f"{k}={v}" for (k, v) in cookies.items()]) - # Always send string headers and skip all headers which are supposed to be sent by cloudtasks - return {str(k): str(v) for (k,v) in headers.items() if not str(k).startswith("x_cloudtasks_")} - - def _url(self, *, values): - route = self.route - path_values = err_val(request_params_to_args(route.dependant.path_params, values)) - for (name, converter) in route.param_convertors.items(): - if name in path_values: - continue - if name not in values: - raise MissingParamError(param=name) - - # TODO: should we catch errors here and raise better errors? - path_values[name] = converter.convert(values[name]) - path = route.path_format.format(**path_values) - params = err_val(request_params_to_args(route.dependant.query_params, values)) - - # Make final URL - - # Split base url into parts - url_parts = list(urlparse(self.base_url)) - - # Add relative path - # Note: you might think urljoin is a better solution here, it is not. - url_parts[2] = url_parts[2].strip("/") + "/" + path.strip("/") - - # Make query dict and update our with our params - query = dict(parse_qsl(url_parts[4])) - query.update(params) - - # override query params - url_parts[4] = urlencode(query) - return urlunparse(url_parts) - - def _body(self, *, values): - body = None - body_field = self.route.body_field - if body_field and body_field.name: - got_body = values.get(body_field.name, None) - if got_body is None: - if body_field.required: - raise MissingParamError(name=body_field.name) - got_body = body_field.get_default() - if not isinstance(got_body, body_field.type_): - raise WrongTypeError(field=body_field.name, type=body_field.type_) - body = json.dumps(jsonable_encoder(got_body)).encode() - return body diff --git a/fastapi_cloud_tasks/exception.py b/fastapi_cloud_tasks/exception.py index f27ff54..365e380 100644 --- a/fastapi_cloud_tasks/exception.py +++ b/fastapi_cloud_tasks/exception.py @@ -1,4 +1,6 @@ -from pydantic.errors import MissingError, PydanticValueError +# Third Party Imports +from pydantic.errors import MissingError +from pydantic.errors import PydanticValueError class MissingParamError(MissingError): @@ -7,3 +9,7 @@ class MissingParamError(MissingError): class WrongTypeError(PydanticValueError): msg_template = "Expected {field} to be of type {type}" + + +class BadMethodException(Exception): + pass diff --git a/fastapi_cloud_tasks/hooks.py b/fastapi_cloud_tasks/hooks.py index 288c77f..37e7fbf 100644 --- a/fastapi_cloud_tasks/hooks.py +++ b/fastapi_cloud_tasks/hooks.py @@ -1,17 +1,21 @@ -from typing import Callable, List +# Standard Library Imports +from typing import Callable +# Third Party Imports +from google.cloud import scheduler_v1 from google.cloud import tasks_v2 from google.protobuf import duration_pb2 -Hook = Callable[[tasks_v2.CreateTaskRequest], tasks_v2.CreateTaskRequest] +DelayedTaskHook = Callable[[tasks_v2.CreateTaskRequest], tasks_v2.CreateTaskRequest] +ScheduledHook = Callable[[scheduler_v1.CreateJobRequest], scheduler_v1.CreateJobRequest] -def noop_hook(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: +def noop_hook(request): return request -def chained_hook(*hooks: List[Hook]) -> Hook: - def chain(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: +def chained_hook(*hooks): + def chain(request): for hook in hooks: request = hook(request) return request @@ -19,7 +23,17 @@ def chain(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: return chain -def oidc_hook(token: tasks_v2.OidcToken) -> Hook: +def oidc_scheduled_hook(token: scheduler_v1.OidcToken) -> ScheduledHook: + def add_token( + request: scheduler_v1.CreateJobRequest, + ) -> scheduler_v1.CreateJobRequest: + request.job.http_target.oidc_token = token + return request + + return add_token + + +def oidc_delayed_hook(token: tasks_v2.OidcToken) -> DelayedTaskHook: def add_token(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: request.task.http_request.oidc_token = token return request @@ -27,7 +41,17 @@ def add_token(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest return add_token -def oauth_hook(token: tasks_v2.OAuthToken) -> Hook: +def oauth_scheduled_hook(token: scheduler_v1.OAuthToken) -> ScheduledHook: + def add_token( + request: scheduler_v1.CreateJobRequest, + ) -> scheduler_v1.CreateJobRequest: + request.job.http_target.oauth_token = token + return request + + return add_token + + +def oauth_delayed_hook(token: tasks_v2.OAuthToken) -> DelayedTaskHook: def add_token(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: request.task.http_request.oauth_token = token return request @@ -35,7 +59,17 @@ def add_token(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest return add_token -def deadline_hook(duration: duration_pb2.Duration) -> Hook: +def deadline_scheduled_hook(duration: duration_pb2.Duration) -> ScheduledHook: + def deadline( + request: scheduler_v1.CreateJobRequest, + ) -> scheduler_v1.CreateJobRequest: + request.job.attempt_deadline = duration + return request + + return deadline + + +def deadline_delayed_hook(duration: duration_pb2.Duration) -> DelayedTaskHook: def deadline(request: tasks_v2.CreateTaskRequest) -> tasks_v2.CreateTaskRequest: request.task.dispatch_deadline = duration return request diff --git a/fastapi_cloud_tasks/requester.py b/fastapi_cloud_tasks/requester.py new file mode 100644 index 0000000..5c925c4 --- /dev/null +++ b/fastapi_cloud_tasks/requester.py @@ -0,0 +1,88 @@ +# Standard Library Imports +from urllib.parse import parse_qsl +from urllib.parse import urlencode +from urllib.parse import urlparse +from urllib.parse import urlunparse + +# Third Party Imports +from fastapi.dependencies.utils import request_params_to_args +from fastapi.encoders import jsonable_encoder +from fastapi.routing import APIRoute + +# Imports from this repository +from fastapi_cloud_tasks.exception import MissingParamError +from fastapi_cloud_tasks.exception import WrongTypeError +from fastapi_cloud_tasks.utils import err_val + +try: + # Third Party Imports + import ujson as json +except Exception: + # Standard Library Imports + import json + + +class Requester: + def __init__( + self, + *, + route: APIRoute, + base_url: str, + ) -> None: + self.route = route + self.base_url = base_url.rstrip("/") + + def _headers(self, *, values): + headers = err_val(request_params_to_args(self.route.dependant.header_params, values)) + cookies = err_val(request_params_to_args(self.route.dependant.cookie_params, values)) + if len(cookies) > 0: + headers["Cookies"] = "; ".join([f"{k}={v}" for (k, v) in cookies.items()]) + # We use json only. + headers["Content-Type"] = "application/json" + # Always send string headers and skip all headers which are supposed to be sent by cloudtasks + return {str(k): str(v) for (k, v) in headers.items() if not str(k).startswith("x_cloudtasks_")} + + def _url(self, *, values): + route = self.route + path_values = err_val(request_params_to_args(route.dependant.path_params, values)) + for (name, converter) in route.param_convertors.items(): + if name in path_values: + continue + if name not in values: + raise MissingParamError(param=name) + + # TODO: should we catch errors here and raise better errors? + path_values[name] = converter.convert(values[name]) + path = route.path_format.format(**path_values) + params = err_val(request_params_to_args(route.dependant.query_params, values)) + + # Make final URL + + # Split base url into parts + url_parts = list(urlparse(self.base_url)) + + # Add relative path + # Note: you might think urljoin is a better solution here, it is not. + url_parts[2] = url_parts[2].strip("/") + "/" + path.strip("/") + + # Make query dict and update our with our params + query = dict(parse_qsl(url_parts[4])) + query.update(params) + + # override query params + url_parts[4] = urlencode(query) + return urlunparse(url_parts) + + def _body(self, *, values): + body = None + body_field = self.route.body_field + if body_field and body_field.name: + got_body = values.get(body_field.name, None) + if got_body is None: + if body_field.required: + raise MissingParamError(name=body_field.name) + got_body = body_field.get_default() + if not isinstance(got_body, body_field.type_): + raise WrongTypeError(field=body_field.name, type=body_field.type_) + body = json.dumps(jsonable_encoder(got_body)).encode() + return body diff --git a/fastapi_cloud_tasks/scheduled_route.py b/fastapi_cloud_tasks/scheduled_route.py new file mode 100644 index 0000000..958525a --- /dev/null +++ b/fastapi_cloud_tasks/scheduled_route.py @@ -0,0 +1,50 @@ +# Standard Library Imports +from typing import Callable + +# Third Party Imports +from fastapi.routing import APIRoute +from google.cloud import scheduler_v1 + +# Imports from this repository +from fastapi_cloud_tasks.hooks import ScheduledHook +from fastapi_cloud_tasks.hooks import noop_hook +from fastapi_cloud_tasks.scheduler import Scheduler + + +def ScheduledRouteBuilder( + *, + base_url: str, + location_path: str, + job_create_timeout: float = 10.0, + pre_create_hook: ScheduledHook = None, + client=None, +): + + if client is None: + client = scheduler_v1.CloudSchedulerClient() + + if pre_create_hook is None: + pre_create_hook = noop_hook + + class ScheduledRouteMixin(APIRoute): + def get_route_handler(self) -> Callable: + original_route_handler = super().get_route_handler() + self.endpoint.scheduler = self.schedulerOptions + return original_route_handler + + def schedulerOptions(self, *, name, schedule, **options) -> Scheduler: + schedulerOpts = dict( + base_url=base_url, + location_path=location_path, + client=client, + pre_create_hook=pre_create_hook, + job_create_timeout=job_create_timeout, + name=name, + schedule=schedule, + ) + + schedulerOpts.update(options) + + return Scheduler(route=self, **schedulerOpts) + + return ScheduledRouteMixin diff --git a/fastapi_cloud_tasks/scheduler.py b/fastapi_cloud_tasks/scheduler.py new file mode 100644 index 0000000..5bb697c --- /dev/null +++ b/fastapi_cloud_tasks/scheduler.py @@ -0,0 +1,109 @@ +# Standard Library Imports + +# Third Party Imports +from fastapi.routing import APIRoute +from google.cloud import scheduler_v1 +from google.protobuf import duration_pb2 + +# Imports from this repository +from fastapi_cloud_tasks.hooks import ScheduledHook +from fastapi_cloud_tasks.requester import Requester +from fastapi_cloud_tasks.utils import schedulerMethod + + +class Scheduler(Requester): + def __init__( + self, + *, + route: APIRoute, + base_url: str, + location_path: str, + schedule: str, + client: scheduler_v1.CloudSchedulerClient, + pre_create_hook: ScheduledHook, + name: str = "", + job_create_timeout: float = 10.0, + retry_config: scheduler_v1.RetryConfig = None, + time_zone: str = "UTC", + force: bool = False, + ) -> None: + super().__init__(route=route, base_url=base_url) + if name == "": + name = route.unique_id + + if retry_config is None: + retry_config = scheduler_v1.RetryConfig( + retry_count=5, + max_retry_duration=duration_pb2.Duration(seconds=0), + min_backoff_duration=duration_pb2.Duration(seconds=5), + max_backoff_duration=duration_pb2.Duration(seconds=120), + max_doublings=5, + ) + + self.retry_config = retry_config + location_parts = client.parse_common_location_path(location_path) + + self.job_id = client.job_path(job=name, **location_parts) + self.time_zone = time_zone + + self.location_path = location_path + self.cron_schedule = schedule + self.job_create_timeout = job_create_timeout + + self.method = schedulerMethod(route.methods) + self.client = client + self.pre_create_hook = pre_create_hook + self.force = force + + def schedule(self, **kwargs): + # Create http request + request = scheduler_v1.HttpTarget() + request.http_method = self.method + request.uri = self._url(values=kwargs) + request.headers = self._headers(values=kwargs) + + body = self._body(values=kwargs) + if body: + request.body = body + + # Scheduled the task + job = scheduler_v1.Job( + name=self.job_id, + http_target=request, + schedule=self.cron_schedule, + retry_config=self.retry_config, + time_zone=self.time_zone, + ) + + request = scheduler_v1.CreateJobRequest(parent=self.location_path, job=job) + + request = self.pre_create_hook(request) + + if self.force or self._has_changed(request=request): + # Delete and create job + self.delete() + self.client.create_job(request=request, timeout=self.job_create_timeout) + + def _has_changed(self, request: scheduler_v1.CreateJobRequest): + try: + job = self.client.get_job(name=request.job.name) + # Remove things that are either output only or GCP adds by default + job.user_update_time = None + job.state = None + job.status = None + job.last_attempt_time = None + job.schedule_time = None + del job.http_target.headers["User-Agent"] + # Proto compare works directly with `__eq__` + return job != request.job + except Exception: + return True + return False + + def delete(self): + # We return true or exception because you could have the delete code on multiple instances + try: + self.client.delete_job(name=self.job_id, timeout=self.job_create_timeout) + return True + except Exception as ex: + return ex diff --git a/fastapi_cloud_tasks/utils.py b/fastapi_cloud_tasks/utils.py index 104c93e..d5d653b 100644 --- a/fastapi_cloud_tasks/utils.py +++ b/fastapi_cloud_tasks/utils.py @@ -1,10 +1,20 @@ # Standard Library Imports -from typing import Dict, List, Tuple +from typing import Dict +from typing import List +from typing import Tuple # Third Party Imports +from google.cloud import scheduler_v1 from google.cloud import tasks_v2 from pydantic.error_wrappers import ErrorWrapper +# Imports from this repository +from fastapi_cloud_tasks.exception import BadMethodException + + +def location_path(*, project, location): + return scheduler_v1.CloudSchedulerClient.common_location_path(project=project, location=location) + def queue_path(*, project, location, queue): return tasks_v2.CloudTasksClient.queue_path(project=project, location=location, queue=queue) @@ -33,8 +43,28 @@ def taskMethod(methods): methods = list(methods) # Only crash if we're being bound if len(methods) > 1: - raise Exception("Can't trigger task with multiple methods") + raise BadMethodException("Can't trigger task with multiple methods") + method = methodMap.get(methods[0], None) + if method is None: + raise BadMethodException(f"Unknown method {methods[0]}") + return method + + +def schedulerMethod(methods): + methodMap = { + "POST": scheduler_v1.HttpMethod.POST, + "GET": scheduler_v1.HttpMethod.GET, + "HEAD": scheduler_v1.HttpMethod.HEAD, + "PUT": scheduler_v1.HttpMethod.PUT, + "DELETE": scheduler_v1.HttpMethod.DELETE, + "PATCH": scheduler_v1.HttpMethod.PATCH, + "OPTIONS": scheduler_v1.HttpMethod.OPTIONS, + } + methods = list(methods) + # Only crash if we're being bound + if len(methods) > 1: + raise BadMethodException("Can't schedule task with multiple methods") method = methodMap.get(methods[0], None) if method is None: - raise Exception(f"Unknown method {methods[0]}") + raise BadMethodException(f"Unknown method {methods[0]}") return method diff --git a/local-requirements.txt b/local-requirements.txt index 5738ba0..d59ceac 100644 --- a/local-requirements.txt +++ b/local-requirements.txt @@ -1,4 +1,6 @@ -r ./requirements.txt -black==21.10b0 +black==21.12b0 flake8==4.0.1 -uvicorn==0.15.0 \ No newline at end of file +uvicorn==0.15.0 +isort==5.10.1 +pre-commit==2.16.0 diff --git a/requirements.txt b/requirements.txt index 83a844d..1aac9e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ google-cloud-tasks==2.7.0 +google-cloud-scheduler==2.5.0 fastapi==0.70.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 6d3dd46..215f34b 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,11 @@ +# Third Party Imports from setuptools import setup -with open('version.txt') as f: +with open("version.txt") as f: version = f.read().strip() -with open('README.md', encoding='utf-8') as f: - long_description = f.read().strip() +with open("README.md", encoding="utf-8") as f: + long_description = f.read().strip() setup( @@ -12,10 +13,10 @@ version=version, description="Trigger delayed Cloud Tasks from FastAPI", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", licesnse="MIT", packages=["fastapi_cloud_tasks"], - install_requires=["google-cloud-tasks", "fastapi"], + install_requires=["google-cloud-tasks", "google-cloud-scheduler", "fastapi"], test_requires=[], zip_safe=False, )