diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9acd1184..cbbd81fd 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,6 +19,11 @@ to make sure the system as a whole remains consistent. Read more: [Worker Pro Migration](https://worker.graphile.org/docs/pro/migration). +## Pending + +- Add `cleanup` function to remove unused queues, stale task identifiers, and + permanently failed jobs. + ## v0.16.1 - Fixes issue importing task files that were written in TypeScript ESM format diff --git a/__tests__/workerUtils.addJob.test.ts b/__tests__/workerUtils.addJob.test.ts index d753dcc2..9d5d5137 100644 --- a/__tests__/workerUtils.addJob.test.ts +++ b/__tests__/workerUtils.addJob.test.ts @@ -4,6 +4,7 @@ import { runTaskListOnce, Task, WorkerSharedOptions, + WorkerUtils, } from "../src/index"; import { getJobs, @@ -19,16 +20,23 @@ const REFERENCE_TIMESTAMP = 1609459200000; /* 1st January 2021, 00:00:00 UTC */ const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("runs a job added through the worker utils", () => withPgClient(async (pgClient) => { await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); await utils.addJob("job3", { a: 1 }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -44,7 +52,7 @@ test("supports the jobKey API", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); await utils.addJob("job3", { a: 1 }, { jobKey: "UNIQUE" }); @@ -52,6 +60,7 @@ test("supports the jobKey API", () => await utils.addJob("job3", { a: 3 }, { jobKey: "UNIQUE" }); await utils.addJob("job3", { a: 4 }, { jobKey: "UNIQUE" }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -69,7 +78,7 @@ test("supports the jobKey API with jobKeyMode", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); const runAt1 = new Date("2200-01-01T00:00:00Z"); @@ -119,6 +128,7 @@ test("supports the jobKey API with jobKeyMode", () => expect(job.run_at.toISOString()).toBe(runAt4.toISOString()); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -156,7 +166,7 @@ test("adding job respects useNodeTime", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, useNodeTime: true, }); @@ -164,6 +174,7 @@ test("adding job respects useNodeTime", () => await setTime(timeOfAddJob); await utils.addJob("job3", { a: 1 }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts new file mode 100644 index 00000000..61268e9e --- /dev/null +++ b/__tests__/workerUtils.cleanup.test.ts @@ -0,0 +1,175 @@ +import { + DbJob, + Job, + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; +import { + ESCAPED_GRAPHILE_WORKER_SCHEMA, + makeSelectionOfJobs, + reset, + TEST_CONNECTION_STRING, + withPgClient, +} from "./helpers"; + +/** For sorting arrays of numbers or numeric strings */ +function numerically(a: string | number, b: string | number) { + return parseFloat(String(a)) - parseFloat(String(b)); +} + +const options: WorkerSharedOptions = {}; + +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + +// Test DELETE_PERMAFAILED_JOBS +test("cleanup with DELETE_PERMAFAILED_JOBS", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); + + const jobs = await makeSelectionOfJobs(utils, pgClient); + const { failedJob, regularJob1, regularJob2 } = jobs; + const permafailJobIds = [failedJob.id, regularJob1.id, regularJob2.id].sort( + numerically, + ); + const remainingJobIds = Object.values(jobs) + .filter((r) => !permafailJobIds.includes(r.id)) + .map((r) => r.id) + .sort(numerically); + + const failedJobs = await utils.permanentlyFailJobs( + permafailJobIds, + "TESTING!", + ); + expect(failedJobs.length).toEqual(permafailJobIds.length); + + await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] }); + const { rows } = await pgClient.query( + `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`, + ); + const jobIds = rows + .map((r) => r.id) + .filter((id) => !permafailJobIds.includes(id)) + .sort(numerically); + expect(jobIds).toEqual(remainingJobIds); + })); + +test("cleanup with GC_JOB_QUEUES", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); + + const jobs: Job[] = []; + const WORKER_ID_1 = "worker1"; + const WORKER_ID_2 = "worker2"; + const WORKER_ID_3 = "worker3"; + let a = 0; + const date = new Date(); + const specs = [ + [WORKER_ID_1, "test", "test_job1"], + [WORKER_ID_2, "test2", "test_job2"], + [WORKER_ID_3, "test3", "test_job3"], + ] as const; + for (const [workerId, queueName, taskIdentifier] of specs) { + date.setMinutes(date.getMinutes() - 1); + const job = await utils.addJob( + taskIdentifier, + { a: ++a }, + { queueName: queueName ?? undefined }, + ); + jobs.push(job); + if (workerId) { + await pgClient.query( + `\ +with j as ( + update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs + set locked_at = $1, locked_by = $2 + where id = $3 + returning * +), q as ( + update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues as job_queues + set + locked_by = $2::text, + locked_at = $1 + from j + where job_queues.id = j.job_queue_id +) +select * from j`, + [date.toISOString(), workerId, job.id], + ); + } + } + + const { rows: queuesBefore } = await pgClient.query<{ queue_name: string }>( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, + ); + expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual([ + "test", + "test2", + "test3", + ]); + + await utils.forceUnlockWorkers(["worker3"]); + const lastJob = jobs[jobs.length - 1]; // Belongs to queueName 'task3' + await utils.completeJobs([lastJob.id]); + await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] }); + const { rows: queuesAfter } = await pgClient.query<{ queue_name: string }>( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, + ); + expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual([ + "test", + "test2", + ]); + })); + +test("cleanup with GC_TASK_IDENTIFIERS", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); + + for (const taskIdentifier of [ + "job3", + "test_job1", + "test_job2", + "test_job3", + ]) { + const job = await utils.addJob(taskIdentifier, {}); + if (taskIdentifier === "test_job2") { + await utils.completeJobs([job.id]); + } + } + + const { rows: tasksBefore } = await pgClient.query<{ identifier: string }>( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`, + ); + expect(tasksBefore.map((q) => q.identifier).sort()).toEqual([ + "job3", + "test_job1", + "test_job2", + "test_job3", + ]); + + await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] }); + const { rows: tasksAfter } = (await pgClient.query( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`, + )) as { rows: { identifier: string }[] }; + expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([ + "job3", + "test_job1", + "test_job3", + ]); + })); diff --git a/__tests__/workerUtils.completeJobs.test.ts b/__tests__/workerUtils.completeJobs.test.ts index acb4ebaa..ac7a872b 100644 --- a/__tests__/workerUtils.completeJobs.test.ts +++ b/__tests__/workerUtils.completeJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -40,4 +50,5 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining[1]).toMatchObject(untouchedJob); await utils.release(); + utils = null; })); diff --git a/__tests__/workerUtils.forceUnlockWorkers.test.ts b/__tests__/workerUtils.forceUnlockWorkers.test.ts index 5d057fe7..792e77a0 100644 --- a/__tests__/workerUtils.forceUnlockWorkers.test.ts +++ b/__tests__/workerUtils.forceUnlockWorkers.test.ts @@ -1,4 +1,9 @@ -import { Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + Job, + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { ESCAPED_GRAPHILE_WORKER_SCHEMA, getJobs, @@ -9,11 +14,17 @@ import { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("unlocks jobs for the given workers, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -89,6 +100,4 @@ where jobs.job_queue_id = job_queues.id;`, locked_by: WORKER_ID_1, }), ]); - - await utils.release(); })); diff --git a/__tests__/workerUtils.permanentlyFailJobs.test.ts b/__tests__/workerUtils.permanentlyFailJobs.test.ts index 518aafb3..babd0acd 100644 --- a/__tests__/workerUtils.permanentlyFailJobs.test.ts +++ b/__tests__/workerUtils.permanentlyFailJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -46,6 +56,4 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining).toHaveLength(2); expect(remaining[0]).toMatchObject(lockedJob); expect(remaining[1]).toMatchObject(untouchedJob); - - await utils.release(); })); diff --git a/__tests__/workerUtils.rescheduleJobs.test.ts b/__tests__/workerUtils.rescheduleJobs.test.ts index 0b3e653d..c54dd097 100644 --- a/__tests__/workerUtils.rescheduleJobs.test.ts +++ b/__tests__/workerUtils.rescheduleJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -54,6 +64,4 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining).toHaveLength(2); expect(remaining[0]).toMatchObject(lockedJob); expect(remaining[1]).toMatchObject(untouchedJob); - - await utils.release(); })); diff --git a/src/cleanup.ts b/src/cleanup.ts new file mode 100644 index 00000000..06b8cf19 --- /dev/null +++ b/src/cleanup.ts @@ -0,0 +1,60 @@ +import { CleanupTask } from "./interfaces"; +import { CompiledOptions } from "./lib"; + +const ALL_CLEANUP_TASKS: CleanupTask[] = [ + "GC_TASK_IDENTIFIERS", + "GC_JOB_QUEUES", + "DELETE_PERMAFAILED_JOBS", +]; + +export function assertCleanupTasks( + tasks: string[], +): asserts tasks is CleanupTask[] { + const invalid = tasks.filter( + (t) => !(ALL_CLEANUP_TASKS as string[]).includes(t), + ); + if (invalid.length > 0) { + throw new Error( + `Invalid cleanup tasks; allowed values: '${ALL_CLEANUP_TASKS.join( + "', '", + )}'; you provided: '${tasks.join("', '")}'`, + ); + } +} + +export async function cleanup( + compiledOptions: CompiledOptions, + tasks: CleanupTask[] = ["GC_JOB_QUEUES", "GC_TASK_IDENTIFIERS"], +) { + const { withPgClient, escapedWorkerSchema } = compiledOptions; + await withPgClient(async (client) => { + if (tasks.includes("DELETE_PERMAFAILED_JOBS")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_jobs jobs +where attempts = max_attempts +and locked_at is null;`, + ); + } + if (tasks.includes("GC_TASK_IDENTIFIERS")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_tasks tasks +where tasks.id not in ( + select jobs.task_id + from ${escapedWorkerSchema}._private_jobs jobs +);`, + ); + } + if (tasks.includes("GC_JOB_QUEUES")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_job_queues job_queues +where locked_at is null and id not in ( + select job_queue_id + from ${escapedWorkerSchema}._private_jobs jobs +);`, + ); + } + }); +} diff --git a/src/cli.ts b/src/cli.ts index c18b6aff..489b7dc2 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,6 +2,7 @@ import { loadConfig } from "graphile-config/load"; import * as yargs from "yargs"; +import { assertCleanupTasks, cleanup } from "./cleanup"; import { getCronItemsInternal } from "./getCronItems"; import { getTasksInternal } from "./getTasks"; import { getUtilsAndReleasersFromOptions } from "./lib"; @@ -71,6 +72,11 @@ const argv = yargs normalize: true, }) .string("config") + .option("cleanup", { + description: + "Clean the database, then exit. Accepts a comma-separated list of cleanup tasks: GC_TASK_IDENTIFIERS, GC_JOB_QUEUES, DELETE_PERMAFAILED_JOBS", + }) + .string("cleanup") .strict(true).argv; const integerOrUndefined = (n: number | undefined): number | undefined => { @@ -105,6 +111,7 @@ async function main() { const userPreset = await loadConfig(argv.config); const ONCE = argv.once; const SCHEMA_ONLY = argv["schema-only"]; + const CLEANUP = argv.cleanup as string | string[] | undefined; if (SCHEMA_ONLY && ONCE) { throw new Error("Cannot specify both --once and --schema-only"); @@ -130,6 +137,14 @@ async function main() { return; } + if (CLEANUP != null) { + const cleanups = Array.isArray(CLEANUP) ? CLEANUP : [CLEANUP]; + const tasks = cleanups.flatMap((t) => t.split(",")).map((t) => t.trim()); + assertCleanupTasks(tasks); + await cleanup(compiledOptions, tasks); + return; + } + const watchedTasks = await getTasksInternal( compiledOptions, compiledOptions.resolvedPreset.worker.taskDirectory, diff --git a/src/interfaces.ts b/src/interfaces.ts index e115cc6b..9a858ad1 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -106,6 +106,11 @@ export interface JobHelpers extends Helpers { abortSignal?: AbortSignal; } +export type CleanupTask = + | "GC_TASK_IDENTIFIERS" + | "GC_JOB_QUEUES" + | "DELETE_PERMAFAILED_JOBS"; + /** * Utilities for working with Graphile Worker. Primarily useful for migrating * the jobs database and queueing jobs. @@ -170,6 +175,19 @@ export interface WorkerUtils extends Helpers { * were terminated, are permanently unreachable, etc). */ forceUnlockWorkers: (workerIds: string[]) => Promise; + + /** + * **Experimental** + * + * Database cleanup function. Supported tasks: + * + * - GC_TASK_IDENTIFIERS: delete task identifiers that are no longer referenced by any jobs + * - GC_JOB_QUEUES: delete job queues that are no longer referenced by any jobs + * - DELETE_PERMAFAILED_JOBS: delete permanently failed jobs if they are not locked + * + * Default: ["GC_JOB_QUEUES"] + */ + cleanup(options: { tasks?: CleanupTask[] }): Promise; } export type PromiseOrDirect = Promise | T; diff --git a/src/workerUtils.ts b/src/workerUtils.ts index f61b2981..95d480c6 100644 --- a/src/workerUtils.ts +++ b/src/workerUtils.ts @@ -1,5 +1,12 @@ /* eslint-disable @typescript-eslint/ban-types */ -import { DbJob, TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces"; +import { cleanup } from "./cleanup"; +import { + CleanupTask, + DbJob, + TaskSpec, + WorkerUtils, + WorkerUtilsOptions, +} from "./interfaces"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { migrate } from "./migrate"; @@ -9,14 +16,15 @@ import { migrate } from "./migrate"; export async function makeWorkerUtils( options: WorkerUtilsOptions, ): Promise { - const [compiledSharedOptions, release] = - await getUtilsAndReleasersFromOptions(options, { + const [compiledOptions, release] = await getUtilsAndReleasersFromOptions( + options, + { scope: { label: "WorkerUtils", }, - }); - const { logger, escapedWorkerSchema, withPgClient, addJob } = - compiledSharedOptions; + }, + ); + const { logger, escapedWorkerSchema, withPgClient, addJob } = compiledOptions; return { withPgClient, @@ -24,7 +32,7 @@ export async function makeWorkerUtils( release, addJob, migrate: () => - withPgClient((pgClient) => migrate(compiledSharedOptions, pgClient)), + withPgClient((pgClient) => migrate(compiledOptions, pgClient)), async completeJobs(ids) { const { rows } = await withPgClient((client) => @@ -76,6 +84,14 @@ export async function makeWorkerUtils( ), ); }, + + async cleanup( + options: { tasks?: CleanupTask[] } = { + tasks: ["GC_JOB_QUEUES"], + }, + ): Promise { + return cleanup(compiledOptions, options.tasks); + }, }; } diff --git a/website/docs/admin-functions.md b/website/docs/admin-functions.md index ee2ce77f..c8b3f7bb 100644 --- a/website/docs/admin-functions.md +++ b/website/docs/admin-functions.md @@ -123,3 +123,68 @@ their IDs/keys). Do **NOT** pass any alive worker ids to this method or Bad Things may happen. ::: + +## Database cleanup + +Over time it's likely that graphile_worker's tables will grow with stale values +for old job queue names, task identifiers, or permanently failed jobs. You can +clean up this stale information with the cleanup function, indicating which +cleanup operations you would like to undertake. + +:::tip + +If you find yourself calling this quite often or on a schedule, it's likely that +you are doing something wrong (e.g. allowing jobs to permafail, using random +values for job queue names, etc). + +::: + +### GC_JOB_QUEUES + +Delete job queues that don't contain any jobs. Safe. + +### GC_TASK_IDENTIFIERS + +Deletes task identifiers that don't contain any jobs. Unsafe to execute whilst +any Worker is running. + +:::warning + +It is currently unsafe to run this whilst any Graphile Worker instance is +running since any task identifiers for which there are no jobs queued will be +deleted; when another job with that identifier is queued a _new_ unique +identifier will be generated and that won't match the internal identifiers that +the running workers have cached. + +::: + +### DELETE_PERMAFAILED_JOBS + +Deletes any unlocked jobs that will never be reattempted due to `attempts` +reaching `max_attempts`. Will delete this data, but is otherwise safe. + +:::tip + +You should write your tasks such that no job will ever permafail; for example +after 20 attempts you might have the job do some cleanup and then exit +successfuly. + +::: + +### Example + +In the CLI: + +```bash title="CLI" +graphile-worker --cleanup DELETE_PERMAFAILED_JOBS,GC_TASK_IDENTIFIERS,GC_JOB_QUEUES +``` + +Or in the library using [WorkerUtils](/docs/library/queue#workerutils): + +```ts title="JS API" +await workerUtils.cleanup([ + "DELETE_PERMAFAILED_JOBS", + "GC_TASK_IDENTIFIERS", + "GC_JOB_QUEUES", +]); +```