Skip to content

Commit

Permalink
feat(store): add deleteJobOnCompletion to the queue worker
Browse files Browse the repository at this point in the history
This allows a higher throughput queue without keeping queue history in the table.
  • Loading branch information
dirkdev98 committed May 13, 2023
1 parent aab7d81 commit c82034d
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 32 deletions.
21 changes: 17 additions & 4 deletions docs/features/background-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,23 @@ docs on these functions for more information and their accepted arguments.

This function constructs a worker, applies the default options if no value is
provided and returns a `{ start, stop }` synchronously. `start` needs to be
called before any jobs are picked up. If you need to shutdown gracefully you can
use `await stop()`. This will finish all running jobs and prevent picking up new
jobs. See the `QueueWorkerOptions` as the second argument of this function for
all available options and their defaults.
called before any jobs are picked up. If you need to shut down gracefully you
can use `await stop()`. This will finish all running jobs and prevent picking up
new jobs. See the `QueueWorkerOptions` as the second argument of this function
for all available options and their defaults.

Some specific options include:

- `includeNames` / `excludeNames`: let this queue worker only pick up specific
jobs. This allows you to scale queue workers independently.
- `deleteJobOnCompletion`: by default, the queue keeps history of the processed
jobs. For high-volume queues, it is generally considered more efficient to
delete jobs on completion. If you want to keep a history of jobs for a few
days, you can use `false` and instead use
[`jobQueueCleanup`](#jobqueuecleanup).
- `unsafeIngoreSorting`: Ignore priority and scheduled based sorting. This is
useful in combination with `includeNames` to create a higher throughput queue,
with no guarantees of the order in which jobs are picked up.

### queueWorkerAddJob

Expand Down
6 changes: 6 additions & 0 deletions packages/store/src/queue-worker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ export type QueueWorkerOptions = {
* bound to any SemVer versioning of this package.
*/
unsafeIgnoreSorting?: boolean | undefined;
/**
* The default queue behavior is to keep jobs
* that have been processed and marking them complete. On high-volume queues it may be
* more efficient to automatically remove jobs after completion.
*/
deleteJobOnCompletion?: boolean | undefined;
};
export type QueueWorkerInternalOptions = Required<QueueWorkerOptions> & {
isQueueEnabled: boolean;
Expand Down
122 changes: 95 additions & 27 deletions packages/store/src/queue-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
newLogger,
} from "@compas/stdlib";
import cron from "cron-parser";
import { jobWhere, queryJob } from "./generated/database/job.js";
import { jobWhere } from "./generated/database/job.js";
import { validateStoreJob } from "./generated/store/validators.js";
import { queries } from "./generated.js";
import { query } from "./query.js";

Expand Down Expand Up @@ -53,6 +54,9 @@ import { query } from "./query.js";
* a lot of jobs are in the queue. This still only picks up jobs that are eligible to
* be picked up. However, it doesn't guarantee any order. This property is also not
* bound to any SemVer versioning of this package.
* @property {boolean} [deleteJobOnCompletion] The default queue behavior is to keep jobs
* that have been processed and marking them complete. On high-volume queues it may be
* more efficient to automatically remove jobs after completion.
*/

/**
Expand All @@ -79,7 +83,7 @@ const queryParts = {
* @param {import("../types/advanced-types").QueryPart<unknown>} [orderBy]
* @returns {import("../types/advanced-types").QueryPart<any>}
*/
getJobToDo(where, orderBy) {
getJobAndUpdate(where, orderBy) {
return query`
UPDATE "job"
SET
Expand All @@ -90,7 +94,7 @@ const queryParts = {
SELECT "id"
FROM "job" j
WHERE
${jobWhere(where, { skipValidator: true, shortName: "j." })}
${jobWhere(where, { skipValidator: true, shortName: "j." })}
AND NOT "isComplete"
AND "scheduledAt" < now() ${
orderBy
Expand All @@ -100,7 +104,35 @@ const queryParts = {
} FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id
RETURNING *
`;
},

/**
* @param {import("./generated/common/types").StoreJobWhere} where
* @param {import("../types/advanced-types").QueryPart<unknown>} [orderBy]
* @returns {import("../types/advanced-types").QueryPart<any>}
*/
getJobAndDelete(where, orderBy) {
return query`
DELETE
FROM "job"
WHERE
id = (
SELECT "id"
FROM "job" j
WHERE
${jobWhere(where, { skipValidator: true, shortName: "j." })}
AND NOT "isComplete"
AND "scheduledAt" < now() ${
orderBy
? query`ORDER BY
${orderBy}`
: query``
} FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`;
},
};
Expand Down Expand Up @@ -248,6 +280,7 @@ export function queueWorkerCreate(sql, options) {
opts.maxRetryCount = options.maxRetryCount ?? 2;
opts.handlerTimeout = options.handlerTimeout ?? 30 * 1000;
opts.unsafeIgnoreSorting = options.unsafeIgnoreSorting ?? false;
opts.deleteJobOnCompletion = options.deleteJobOnCompletion ?? false;

const logger = newLogger({
ctx: {
Expand All @@ -265,6 +298,9 @@ export function queueWorkerCreate(sql, options) {
const orderBy = opts.unsafeIgnoreSorting
? undefined
: query`"priority", "scheduledAt"`;
const jobTodoQuery = opts.deleteJobOnCompletion
? queryParts.getJobAndDelete
: queryParts.getJobAndUpdate;

const workers = Array.from({ length: opts.parallelCount }).map(() => ({
currentPromise: Promise.resolve(),
Expand All @@ -281,7 +317,7 @@ export function queueWorkerCreate(sql, options) {
opts.isQueueEnabled = true;

workers.map((it) =>
queueWorkerRun(logger, sql, opts, where, orderBy, it),
queueWorkerRun(logger, sql, opts, jobTodoQuery, where, orderBy, it),
);
},
async stop() {
Expand Down Expand Up @@ -368,35 +404,47 @@ async function queueWorkerUpsertCronJob(sql, job) {
* @param {import("@compas/stdlib").Logger} logger
* @param {import("postgres").Sql<{}>} sql
* @param {QueueWorkerInternalOptions} options
* @param {(where: import("./generated/common/types").StoreJobWhere, orderBy:
* import("../types/advanced-types").QueryPart|undefined) =>
* import("../types/advanced-types").QueryPart} jobTodoQuery
* @param {import("./generated/common/types").StoreJobWhere} where
* @param {import("../types/advanced-types").QueryPart|undefined} orderBy
* @param {{currentPromise: Promise<void>}} worker
*/
function queueWorkerRun(logger, sql, options, where, orderBy, worker) {
function queueWorkerRun(
logger,
sql,
options,
jobTodoQuery,
where,
orderBy,
worker,
) {
if (!options.isQueueEnabled) {
return;
}

Promise.resolve(worker.currentPromise).then(() => {
worker.currentPromise = sql
.begin(async (sql) => {
const [partialJob] = await queryParts
.getJobToDo(where, orderBy)
.exec(sql);
const [job] = await jobTodoQuery(where, orderBy).exec(sql);

if (!partialJob?.id) {
if (!job?.id) {
return {
didHandleJob: false,
};
}

const [job] = await queryJob({
where: {
id: partialJob.id,
},
}).exec(sql);
const { value, error } = validateStoreJob(job);
if (error) {
throw AppError.serverError({
message: "Job is invalid",
job,
error,
});
}

await queueWorkerExecuteJob(logger, sql, options, job);
await queueWorkerExecuteJob(logger, sql, options, value);

return {
didHandleJob: true,
Expand All @@ -407,7 +455,15 @@ function queueWorkerRun(logger, sql, options, where, orderBy, worker) {
worker.currentPromise = setTimeout(options.pollInterval);
}

return queueWorkerRun(logger, sql, options, where, orderBy, worker);
return queueWorkerRun(
logger,
sql,
options,
jobTodoQuery,
where,
orderBy,
worker,
);
});
});
}
Expand Down Expand Up @@ -450,13 +506,15 @@ async function queueWorkerExecuteJob(logger, sql, options, job) {
ctx: {
type: "queue_handler",
id: job.id,
name: job.name,
priority: job.priority,
},
}),
AbortSignal.timeout(timeout),
);

event.log.info({
job,
});

try {
// @ts-expect-error
await sql.savepoint(async (sql) => {
Expand All @@ -467,22 +525,32 @@ async function queueWorkerExecuteJob(logger, sql, options, job) {
} catch (e) {
event.log.error({
type: "job_error",
name: job.name,
scheduledAt: job.scheduledAt,
retryCount: job.retryCount,
error: AppError.format(e),
});

isJobComplete = job.retryCount + 1 >= options.maxRetryCount;

await queries.jobUpdate(sql, {
update: {
isComplete: isJobComplete,
if (options.deleteJobOnCompletion && !isJobComplete) {
// Re insert the job, since this transaction did remove the job.
await queries.jobInsert(sql, {
...job,
isComplete: false,
retryCount: job.retryCount + 1,
},
where: {
id: job.id,
},
});
});
} else {
await queries.jobUpdate(sql, {
update: {
isComplete: isJobComplete,
retryCount: job.retryCount + 1,
},
where: {
id: job.id,
},
});
}
}

if (isCronJob && isJobComplete) {
Expand Down
Loading

0 comments on commit c82034d

Please sign in to comment.