Skip to content

Commit

Permalink
Cloud Scheduler to replace Celery Beat (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ofpiyush authored Dec 23, 2021
1 parent 8d927f3 commit 0631f0c
Show file tree
Hide file tree
Showing 20 changed files with 535 additions and 136 deletions.
5 changes: 5 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[flake8]
extend-ignore = E128, E203, E225, E266, E231, E501, E712, W503, C901, F403, F401, F841
max-line-length = 119
max-complexity = 18
select = B,C,E,F,W,T4,B9
7 changes: 7 additions & 0 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[settings]
line_length=119
force_single_line=True
import_heading_stdlib=Standard Library Imports
import_heading_thirdparty=Third Party Imports
import_heading_firstparty=Imports from this repository
import_heading_localfolder=Imports from this module
16 changes: 16 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
repos:
- repo: https://github.com/timothycrosley/isort
rev: "5.10.1"
hooks:
- id: isort
args: [--force-single-line-imports]
- repo: https://github.com/psf/black
rev: 21.12b0
hooks:
- id: black
args: [--line-length=119]
- repo: https://gitlab.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
additional_dependencies: [flake8-print==3.1.4]
79 changes: 55 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.

GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.

FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.

## Concept
Expand All @@ -10,29 +12,43 @@ FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.

[FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint.

FastAPI Cloud Tasks works by putting the two together:

[`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future.

FastAPI Cloud Tasks works by putting the three together:

- It adds a `.delay` method to existing routes on FastAPI.
- When this method is called, it schedules a request with Cloud Tasks.
- The task worker is a regular FastAPI server which gets called by Cloud Tasks.
- It adds a `.scheduler` method to existing routes on FastAPI.
- When this method is called, it schedules a recurring job with Cloud Scheduler.

If we host the task worker on Cloud Run, we get free autoscaling.
If we host the task worker on Cloud Run, we get autoscaling workers.

## Pseudocode

In practice, this is what it looks like:

```python
router = APIRouter(route_class=TaskRouteBuilder(...))
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))
scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...))

class Recipe(BaseModel):
ingredients: List[str]

@router.post("/{restaurant}/make_dinner")
async def make_dinner(restaurant: str, recipe: Recipe,):
@delayed_router.post("/{restaurant}/make_dinner")
async def make_dinner(restaurant: str, recipe: Recipe):
# Do a ton of work here.

app.include_router(router)
@scheduled_router.post("/home_cook")
async def home_cook(recipe: Recipe):
# Make my own food

app.include_router(delayed_router)
app.include_router(scheduled_router)

# If you wan to make your own breakfast every morning at 7AM IST.
home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"]))
```

Now we can trigger the task with
Expand Down Expand Up @@ -71,9 +87,9 @@ Forwarding http://feda-49-207-221-153.ngrok.io -> http://loca
```python
# complete file: examples/simple/main.py

# First we construct our TaskRoute class with all relevant settings
# First we construct our DelayedRoute class with all relevant settings
# This can be done once across the entire project
TaskRoute = TaskRouteBuilder(
DelayedRoute = DelayedRouteBuilder(
base_url="http://feda-49-207-221-153.ngrok.io",
queue_path=queue_path(
project="gcp-project-id",
Expand All @@ -82,13 +98,12 @@ TaskRoute = TaskRouteBuilder(
),
)

# Wherever we use
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")

class Payload(BaseModel):
message: str

@task_router.post("/hello")
@delayed_router.post("/hello")
async def hello(p: Payload = Payload(message="Default")):
logger.warning(f"Hello task ran with payload: {p.message}")

Expand All @@ -102,7 +117,7 @@ async def trigger():
hello.delay(p=Payload(message="Triggered task"))
return {"message": "Hello task triggered"}

app.include_router(task_router)
app.include_router(delayed_router)

```

Expand Down Expand Up @@ -144,11 +159,11 @@ We'll only edit the parts from above that we need changed from above example.
# URL of the Cloud Run service
base_url = "https://hello-randomchars-el.a.run.app"

TaskRoute = TaskRouteBuilder(
DelayedRoute = DelayedRouteBuilder(
base_url=base_url,
# Task queue, same as above.
queue_path=queue_path(...),
pre_create_hook=oidc_hook(
pre_create_hook=oidc_task_hook(
token=tasks_v2.OidcToken(
# Service account that you created
service_account_email="[email protected]",
Expand All @@ -162,15 +177,15 @@ Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.

## Configuration

### TaskRouteBuilder
### DelayedRouteBuilder

Usage:

```python
TaskRoute = TaskRouteBuilder(...)
task_router = APIRouter(route_class=TaskRoute)
DelayedRoute = DelayedRouteBuilder(...)
delayed_router = APIRouter(route_class=DelayedRoute)

@task_router.get("/simple_task")
@delayed_router.get("/simple_task")
def mySimpleTask():
return {}
```
Expand All @@ -185,7 +200,7 @@ def mySimpleTask():

- `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc)

### Task level default options
#### Task level default options

Usage:

Expand Down Expand Up @@ -213,15 +228,15 @@ def mySimpleTask():
return {}
```

### Delayer Options
#### Delayer Options

Usage:

```python
mySimpleTask.options(...).delay()
```

All options from above can be overriden per call (including TaskRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.

Example:

Expand All @@ -230,20 +245,36 @@ Example:
mySimpleTask.options(countdown=120).delay()
```

### ScheduledRouteBuilder

Usage:

```python
ScheduledRoute = ScheduledRouteBuilder(...)
scheduled_router = APIRouter(route_class=ScheduledRoute)

@scheduled_router.get("/simple_scheduled_task")
def mySimpleScheduledTask():
return {}


mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
```


## Hooks

We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that.

Some hooks are included in the library.

- `oidc_hook` - Used to work with Cloud Run.
- `deadline_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
- `oidc_delayed_hook` / `oidc_scheduled_hook` - Used to pass OIDC token (for Cloud Run etc).
- `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
- `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)`

## Future work

- Ensure queue exists.
- Integrate with [Cloud Scheduler](https://cloud.google.com/scheduler/) to replace celery beat.
- Make helper features for worker's side. Eg:
- Easier access to current retry count.
- API Exceptions to make GCP back-off.
10 changes: 8 additions & 2 deletions examples/full/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Standard Library Imports
from uuid import uuid4

# Third Party Imports
from fastapi import FastAPI
from fastapi import Response
from fastapi import status
from google.api_core.exceptions import AlreadyExists

# Imports from this repository
from examples.full.serializer import Payload
from examples.full.tasks import hello
from fastapi import FastAPI, Response, status
from google.api_core.exceptions import AlreadyExists

app = FastAPI()

Expand Down
1 change: 1 addition & 0 deletions examples/full/serializer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Third Party Imports
from pydantic import BaseModel


Expand Down
24 changes: 21 additions & 3 deletions examples/full/settings.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
# Standard Library Imports
import os

from fastapi_cloud_tasks.utils import queue_path
# Third Party Imports
from google.cloud import scheduler_v1
from google.cloud import tasks_v2

TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="http://example.com")
# Imports from this repository
from fastapi_cloud_tasks.utils import location_path
from fastapi_cloud_tasks.utils import queue_path

TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://645e-35-207-241-4.ngrok.io")
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
SCHEDULED_LOCATION = os.getenv("SCHEDULED_LOCATION", default="us-central1")
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")

TASK_SERVICE_ACCOUNT = os.getenv("TASK_SERVICE_ACCOUNT", default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com")
TASK_SERVICE_ACCOUNT = os.getenv(
"TASK_SERVICE_ACCOUNT",
default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com",
)

TASK_QUEUE_PATH = queue_path(
project=TASK_PROJECT_ID,
location=TASK_LOCATION,
queue=TASK_QUEUE,
)

SCHEDULED_LOCATION_PATH = location_path(
project=TASK_PROJECT_ID,
location=SCHEDULED_LOCATION,
)

TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)
SCHEDULED_OIDC_TOKEN = scheduler_v1.OidcToken(
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
)
63 changes: 53 additions & 10 deletions examples/full/tasks.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,83 @@
# Standard Library Imports
import logging

from examples.full.serializer import Payload
from examples.full.settings import TASK_LISTENER_BASE_URL, TASK_OIDC_TOKEN, TASK_QUEUE_PATH
# Third Party Imports
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_cloud_tasks.hooks import chained_hook, deadline_hook, oidc_hook
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
from google.protobuf import duration_pb2

# Imports from this repository
from examples.full.serializer import Payload
from examples.full.settings import SCHEDULED_LOCATION_PATH
from examples.full.settings import SCHEDULED_OIDC_TOKEN
from examples.full.settings import TASK_LISTENER_BASE_URL
from examples.full.settings import TASK_OIDC_TOKEN
from examples.full.settings import TASK_QUEUE_PATH
from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder
from fastapi_cloud_tasks.hooks import chained_hook
from fastapi_cloud_tasks.hooks import deadline_delayed_hook
from fastapi_cloud_tasks.hooks import deadline_scheduled_hook
from fastapi_cloud_tasks.hooks import oidc_delayed_hook
from fastapi_cloud_tasks.hooks import oidc_scheduled_hook
from fastapi_cloud_tasks.scheduled_route import ScheduledRouteBuilder

app = FastAPI()


logger = logging.getLogger("uvicorn")

TaskRoute = TaskRouteBuilder(
DelayedRoute = DelayedRouteBuilder(
base_url=TASK_LISTENER_BASE_URL,
queue_path=TASK_QUEUE_PATH,
# Chain multiple hooks together
pre_create_hook=chained_hook(
# Add service account for cloud run
oidc_hook(
oidc_delayed_hook(
token=TASK_OIDC_TOKEN,
),
# Wait for half an hour
deadline_hook(duration=duration_pb2.Duration(seconds=1800)),
deadline_delayed_hook(duration=duration_pb2.Duration(seconds=1800)),
),
)

ScheduledRoute = ScheduledRouteBuilder(
base_url=TASK_LISTENER_BASE_URL,
location_path=SCHEDULED_LOCATION_PATH,
pre_create_hook=chained_hook(
# Add service account for cloud run
oidc_scheduled_hook(
token=SCHEDULED_OIDC_TOKEN,
),
# Wait for half an hour
deadline_scheduled_hook(duration=duration_pb2.Duration(seconds=1800)),
),
)

router = APIRouter(route_class=TaskRoute, prefix="/tasks")
task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")


@router.post("/hello")
@task_router.post("/hello")
async def hello(p: Payload = Payload(message="Default")):
message = f"Hello task ran with payload: {p.message}"
logger.warning(message)
return {"message": message}


app.include_router(router)
scheduled_router = APIRouter(route_class=ScheduledRoute, prefix="/scheduled")


@scheduled_router.post("/timed_hello")
async def scheduled_hello(p: Payload = Payload(message="Default")):
message = f"Scheduled hello task ran with payload: {p.message}"
logger.warning(message)
return {"message": message}


scheduled_hello.scheduler(
name="testing-examples-scheduled-hello",
schedule="*/5 * * * *",
time_zone="Asia/Kolkata",
).schedule(p=Payload(message="Scheduled"))

app.include_router(task_router)
app.include_router(scheduled_router)
Loading

0 comments on commit 0631f0c

Please sign in to comment.