-
Notifications
You must be signed in to change notification settings - Fork 10
Using Task Queues when endpoints take too long to complete
Google App Engine has a hard 1-minute deadline for normal endpoints and a hard 10-minute deadline for Cron endpoints. This is not long enough for certain calls, but there's an alternative: we can use Task Queues to queue them for processing outside of the endpoint call, optionally splitting them into sub-tasks.
As of 11 Jan 2023 we have 7 Task Queues endpoints:
- 2 RDR Export endpoints (users and workspaces) used by the related Cron job
- 2 User access status checks (project audit and access module sync) used by related Cron jobs
- Process an egress event
- 2 Asynchronous workspace calls (create and duplicate)
Callers and handlers of task queues need to share the following information:
- The name of the task queue
- The shape of the task queue elements
- The task queue handling endpoint
Also consider adding monitoring (wiki) (Example PR)
The overall flow looks like this: Google Cloud Scheduler -> AoU cron endpoint -> create and add task to queue -> AoU Task Queue endpoint -> process task
Let's follow along for synchronizeUserAccess
which is a daily check of every user's access module statuses to ensure they are assigned to the correct tier(s). This checks for the expiration of those modules which expire, and acts as a stopgap for edge cases such as the user manually disabling 2FA.
The cron endpoint is declared like this in workbench-api.yaml
"/v1/cron/synchronizeUserAccess":
get:
security: []
tags:
- offlineUser
- cron
description: Check each user to ensure they still meet compliance guidelines. If they do not, remove them from access tiers.
operationId: synchronizeUserAccess
Swagger API generation parses the tags
and operationId
to create an interface that we need to implement as OfflineUserController.synchronizeUserAccess()
. This calls TaskQueueService.groupAndPushSynchronizeAccessTasks()
which adds elements to a Task Queue. In this case, we're creating SynchronizeUserAccessRequest
objects and pushing them to the synchronizeAccessQueue
where they will be handled by the task queue endpoint synchronizeUserAccess
.
The queue is defined in queue.yaml
(syntax here)
- name: synchronizeAccessQueue
target: api
# rate parameters
bucket_size: 500
rate: 1/s
max_concurrent_requests: 10
retry_parameters:
task_retry_limit: 1
task_age_limit: 5m
The task queue handler endpoint is declared like this in workbench-api.yaml
"/v1/cloudTask/synchronizeUserAccess":
post:
tags:
- cloudTaskUser
- cloudTask
security: []
description: >
Check each user to ensure they still meet compliance guidelines.
If they do not, remove them from access tiers.
operationId: synchronizeUserAccess
Swagger API generation parses the tags
and operationId
to create an interface that we need to implement as CloudTaskUserController.synchronizeUserAccess()
. Finally, this is the code that actually synchronizes the user access, a batch at a time.