From 30ef01934e3e572b07fea6ecf5fa59d39bb76961 Mon Sep 17 00:00:00 2001 From: Christophe Macabiau Date: Tue, 19 Dec 2023 18:38:32 +0100 Subject: [PATCH 01/10] feat(database): database cleanup --- __tests__/workerUtils.cleanup.test.ts | 95 +++++++++++++++++++++++++++ src/interfaces.ts | 32 +++++++-- src/workerUtils.ts | 39 ++++++++++- 3 files changed, 157 insertions(+), 9 deletions(-) create mode 100644 __tests__/workerUtils.cleanup.test.ts diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts new file mode 100644 index 00000000..e5ad8ef7 --- /dev/null +++ b/__tests__/workerUtils.cleanup.test.ts @@ -0,0 +1,95 @@ +import { Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + ESCAPED_GRAPHILE_WORKER_SCHEMA, + makeSelectionOfJobs, + reset, + TEST_CONNECTION_STRING, + withPgClient +} from "./helpers"; + +/** For sorting arrays of numbers or numeric strings */ +function numerically(a: string | number, b: string | number) { + return parseFloat(String(a)) - parseFloat(String(b)); +} + +const options: WorkerSharedOptions = {}; + +test("cleanup the database", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + const utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING }); + + const { failedJob, regularJob1, lockedJob, regularJob2 } = await makeSelectionOfJobs(utils, pgClient); + const jobs = [failedJob, regularJob1, lockedJob, regularJob2]; + const jobIds = jobs.map((j) => j.id).sort(numerically); + + // Test DELETE_PERMAFAILED_JOBS + const failedJobs = await utils.permanentlyFailJobs(jobIds, "TESTING!"); + const failedJobIds = failedJobs.map((j) => j.id).sort(numerically); + expect(failedJobIds).toEqual( + [failedJob.id, regularJob1.id, regularJob2.id].sort(numerically), + ); + for (const j of failedJobs) { + expect(j.last_error).toEqual("TESTING!"); + expect(j.attempts).toEqual(j.max_attempts); + expect(j.attempts).toBeGreaterThan(0); + } + + await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] }); + const { rows: jobsFromView } = await pgClient.query( + `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`); + failedJobIds.forEach((id) => + expect(jobsFromView.find((j) => j.id === id)).toBeUndefined()); + + const jobs2: Job[] = []; + const WORKER_ID_1 = "worker1"; + const WORKER_ID_2 = "worker2"; + const WORKER_ID_3 = "worker3"; + let a = 0; + const date = new Date(); + const specs = [ + [WORKER_ID_1, "test", "test_job1"], + [WORKER_ID_2, "test2", "test_job2"], + [WORKER_ID_3, "test3", "test_job3"], + ] as const; + for (const [workerId, queueName, jobId] of specs) { + date.setMinutes(date.getMinutes() - 1); + const job = await utils.addJob( + jobId, + { a: ++a }, + { queueName: queueName ?? undefined }, + ); + await pgClient.query(` + update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs + set locked_at = $1, locked_by = $2 + where id = $3`, + [workerId ? date.toISOString() : null, workerId, job.id], + ); + jobs2.push(job); + } + + // Test GC_JOB_QUEUES + const { rows: queuesBefore } = await pgClient.query( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`) as { rows: { queue_name: string }[] }; + expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual(['test', 'test2', 'test3']); + + await utils.forceUnlockWorkers(['worker3']); + await utils.completeJobs([jobs2[jobs2.length - 1].id]); + await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] }); + const { rows: queuesAfter } = await pgClient.query( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`) as { rows: { queue_name: string }[] }; + expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual(['test', 'test2']); + + // Test GC_TASK_IDENTIFIERS + const { rows: tasksBefore } = await pgClient.query( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`) as { rows: { identifier: string }[] }; + expect(tasksBefore.map((q) => q.identifier).sort()).toEqual(['job3', 'test_job1', 'test_job2', 'test_job3']); + + await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] }); + const { rows: tasksAfter } = await pgClient.query( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`) as { rows: { identifier: string }[] }; + expect(tasksAfter.map((q) => q.identifier).sort()).toEqual(['job3', 'test_job1', 'test_job2']); + + await utils.release(); + })); \ No newline at end of file diff --git a/src/interfaces.ts b/src/interfaces.ts index e115cc6b..700cbd32 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -106,6 +106,11 @@ export interface JobHelpers extends Helpers { abortSignal?: AbortSignal; } +export type CleanupTask = + | "GC_TASK_IDENTIFIERS" + | "GC_JOB_QUEUES" + | "DELETE_PERMAFAILED_JOBS"; + /** * Utilities for working with Graphile Worker. Primarily useful for migrating * the jobs database and queueing jobs. @@ -170,6 +175,19 @@ export interface WorkerUtils extends Helpers { * were terminated, are permanently unreachable, etc). */ forceUnlockWorkers: (workerIds: string[]) => Promise; + + /** + * **Experimental** + * + * Database cleanup function + * - GC_TASK_IDENTIFIERS: delete task identifiers that are no longer referenced by any jobs + * - GC_JOB_QUEUES: delete job queues that are no longer referenced by any jobs + * - DELETE_PERMAFAILED_JOBS: delete permanently failed jobs if they are not locked + * + * Default: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"] + */ + cleanup(options: { tasks?: CleanupTask[], }): Promise; + } export type PromiseOrDirect = Promise | T; @@ -194,12 +212,12 @@ export function isValidTask( export type TaskList = { [Key in - | keyof GraphileWorker.Tasks - | (string & {})]?: Key extends keyof GraphileWorker.Tasks - ? Task - : // The `any` here is required otherwise declaring something as a `TaskList` can cause issues. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Task; + | keyof GraphileWorker.Tasks + | (string & {})]?: Key extends keyof GraphileWorker.Tasks + ? Task + : // The `any` here is required otherwise declaring something as a `TaskList` can cause issues. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Task; }; export interface WatchedTaskList { @@ -716,7 +734,7 @@ export interface JobAndCronIdentifierWithDetails extends JobAndCronIdentifier { last_execution: Date | null; } -export interface WorkerUtilsOptions extends SharedOptions {} +export interface WorkerUtilsOptions extends SharedOptions { } type BaseEventMap = Record; type EventMapKey = string & keyof TEventMap; diff --git a/src/workerUtils.ts b/src/workerUtils.ts index f61b2981..0a812f7c 100644 --- a/src/workerUtils.ts +++ b/src/workerUtils.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/ban-types */ -import { DbJob, TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces"; +import { CleanupTask, DbJob, TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { migrate } from "./migrate"; @@ -76,7 +76,42 @@ export async function makeWorkerUtils( ), ); }, - }; + + async cleanup(options: { tasks?: CleanupTask[] } = { tasks: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"] }): Promise { + await withPgClient(async (client) => { + if (options.tasks?.includes("DELETE_PERMAFAILED_JOBS")) { + await client.query( + `delete from ${escapedWorkerSchema}._private_jobs jobs + where attempts = max_attempts + and locked_at is null;` + ); + } + const queries = []; + if (options.tasks?.includes("GC_TASK_IDENTIFIERS")) { + queries.push(client.query( + `delete from ${escapedWorkerSchema}._private_tasks tasks + where not exists ( + select 1 + from ${escapedWorkerSchema}._private_jobs jobs + where jobs.task_id = tasks.id + );` + )); + } + if (options.tasks?.includes("GC_JOB_QUEUES")) { + queries.push(client.query( + `delete from ${escapedWorkerSchema}._private_job_queues job_queues + where locked_at is null and not exists ( + select 1 + from ${escapedWorkerSchema}._private_jobs jobs + where jobs.job_queue_id = job_queues.id + );` + )); + } + await Promise.all(queries); + } + ); + } + } } /** From 35cd8ded451881fa5f0fc58c0827f5fce8fcc6c4 Mon Sep 17 00:00:00 2001 From: Christophe Macabiau Date: Tue, 19 Dec 2023 18:41:17 +0100 Subject: [PATCH 02/10] feat(cli): database cleanup command --- src/cli.ts | 13 +++++++++++++ src/workerUtils.ts | 14 ++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/cli.ts b/src/cli.ts index c18b6aff..226b52a3 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,11 +2,13 @@ import { loadConfig } from "graphile-config/load"; import * as yargs from "yargs"; +import { CleanupTask } from "."; import { getCronItemsInternal } from "./getCronItems"; import { getTasksInternal } from "./getTasks"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { EMPTY_PRESET, WorkerPreset } from "./preset"; import { runInternal, runOnceInternal } from "./runner"; +import { cleanup } from "./workerUtils"; const defaults = WorkerPreset.worker!; @@ -71,6 +73,11 @@ const argv = yargs normalize: true, }) .string("config") + .option("cleanup", { + description: "Database cleanup function; options: GC_TASK_IDENTIFIERS, GC_JOB_QUEUES, DELETE_PERMAFAILED_JOBS", + default: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"], + }) + .string("cleanup") .strict(true).argv; const integerOrUndefined = (n: number | undefined): number | undefined => { @@ -105,6 +112,7 @@ async function main() { const userPreset = await loadConfig(argv.config); const ONCE = argv.once; const SCHEMA_ONLY = argv["schema-only"]; + const CLEANUP = argv.cleanup; if (SCHEMA_ONLY && ONCE) { throw new Error("Cannot specify both --once and --schema-only"); @@ -130,6 +138,11 @@ async function main() { return; } + if (CLEANUP) { + await cleanup(compiledOptions.resolvedPreset.worker, argv.cleanup as CleanupTask[]); + return; + } + const watchedTasks = await getTasksInternal( compiledOptions, compiledOptions.resolvedPreset.worker.taskDirectory, diff --git a/src/workerUtils.ts b/src/workerUtils.ts index 0a812f7c..752e4f9c 100644 --- a/src/workerUtils.ts +++ b/src/workerUtils.ts @@ -136,3 +136,17 @@ export async function quickAddJob< await utils.release(); } } + +/** + * **Experimental** + * + * Database cleanup function + */ +export async function cleanup(options: WorkerUtilsOptions, tasks: CleanupTask[]) { + const utils = await makeWorkerUtils(options); + try { + return await utils.cleanup({ tasks }); + } finally { + await utils.release(); + } +} From df045d16dcf278cec148d40c462affc482a41b52 Mon Sep 17 00:00:00 2001 From: Christophe Macabiau Date: Wed, 20 Dec 2023 10:06:54 +0100 Subject: [PATCH 03/10] fix(style): lint fixes --- __tests__/workerUtils.cleanup.test.ts | 70 +++++++++++++++++++-------- src/interfaces.ts | 9 ++-- src/workerUtils.ts | 50 ++++++++++++------- 3 files changed, 87 insertions(+), 42 deletions(-) diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts index e5ad8ef7..e1a2208e 100644 --- a/__tests__/workerUtils.cleanup.test.ts +++ b/__tests__/workerUtils.cleanup.test.ts @@ -4,7 +4,7 @@ import { makeSelectionOfJobs, reset, TEST_CONNECTION_STRING, - withPgClient + withPgClient, } from "./helpers"; /** For sorting arrays of numbers or numeric strings */ @@ -18,9 +18,16 @@ test("cleanup the database", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING }); + const utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); - const { failedJob, regularJob1, lockedJob, regularJob2 } = await makeSelectionOfJobs(utils, pgClient); + const { + failedJob, + regularJob1, + lockedJob, + regularJob2, + } = await makeSelectionOfJobs(utils, pgClient); const jobs = [failedJob, regularJob1, lockedJob, regularJob2]; const jobIds = jobs.map((j) => j.id).sort(numerically); @@ -38,9 +45,11 @@ test("cleanup the database", () => await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] }); const { rows: jobsFromView } = await pgClient.query( - `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`); + `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`, + ); failedJobIds.forEach((id) => - expect(jobsFromView.find((j) => j.id === id)).toBeUndefined()); + expect(jobsFromView.find((j) => j.id === id)).toBeUndefined(), + ); const jobs2: Job[] = []; const WORKER_ID_1 = "worker1"; @@ -60,7 +69,8 @@ test("cleanup the database", () => { a: ++a }, { queueName: queueName ?? undefined }, ); - await pgClient.query(` + await pgClient.query( + ` update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs set locked_at = $1, locked_by = $2 where id = $3`, @@ -70,26 +80,46 @@ test("cleanup the database", () => } // Test GC_JOB_QUEUES - const { rows: queuesBefore } = await pgClient.query( - `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`) as { rows: { queue_name: string }[] }; - expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual(['test', 'test2', 'test3']); + const { rows: queuesBefore } = (await pgClient.query( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, + )) as { rows: { queue_name: string }[] }; + expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual([ + "test", + "test2", + "test3", + ]); - await utils.forceUnlockWorkers(['worker3']); + await utils.forceUnlockWorkers(["worker3"]); await utils.completeJobs([jobs2[jobs2.length - 1].id]); await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] }); - const { rows: queuesAfter } = await pgClient.query( - `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`) as { rows: { queue_name: string }[] }; - expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual(['test', 'test2']); + const { rows: queuesAfter } = (await pgClient.query( + `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, + )) as { rows: { queue_name: string }[] }; + expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual([ + "test", + "test2", + ]); // Test GC_TASK_IDENTIFIERS - const { rows: tasksBefore } = await pgClient.query( - `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`) as { rows: { identifier: string }[] }; - expect(tasksBefore.map((q) => q.identifier).sort()).toEqual(['job3', 'test_job1', 'test_job2', 'test_job3']); + const { rows: tasksBefore } = (await pgClient.query( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`, + )) as { rows: { identifier: string }[] }; + expect(tasksBefore.map((q) => q.identifier).sort()).toEqual([ + "job3", + "test_job1", + "test_job2", + "test_job3", + ]); await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] }); - const { rows: tasksAfter } = await pgClient.query( - `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`) as { rows: { identifier: string }[] }; - expect(tasksAfter.map((q) => q.identifier).sort()).toEqual(['job3', 'test_job1', 'test_job2']); + const { rows: tasksAfter } = (await pgClient.query( + `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`, + )) as { rows: { identifier: string }[] }; + expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([ + "job3", + "test_job1", + "test_job2", + ]); await utils.release(); - })); \ No newline at end of file + })); diff --git a/src/interfaces.ts b/src/interfaces.ts index 700cbd32..867209ac 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -176,18 +176,17 @@ export interface WorkerUtils extends Helpers { */ forceUnlockWorkers: (workerIds: string[]) => Promise; - /** + /** * **Experimental** - * + * * Database cleanup function * - GC_TASK_IDENTIFIERS: delete task identifiers that are no longer referenced by any jobs * - GC_JOB_QUEUES: delete job queues that are no longer referenced by any jobs * - DELETE_PERMAFAILED_JOBS: delete permanently failed jobs if they are not locked - * + * * Default: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"] */ - cleanup(options: { tasks?: CleanupTask[], }): Promise; - + cleanup(options: { tasks?: CleanupTask[] }): Promise; } export type PromiseOrDirect = Promise | T; diff --git a/src/workerUtils.ts b/src/workerUtils.ts index 752e4f9c..3bcd0475 100644 --- a/src/workerUtils.ts +++ b/src/workerUtils.ts @@ -1,5 +1,11 @@ /* eslint-disable @typescript-eslint/ban-types */ -import { CleanupTask, DbJob, TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces"; +import { + CleanupTask, + DbJob, + TaskSpec, + WorkerUtils, + WorkerUtilsOptions, +} from "./interfaces"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { migrate } from "./migrate"; @@ -77,41 +83,48 @@ export async function makeWorkerUtils( ); }, - async cleanup(options: { tasks?: CleanupTask[] } = { tasks: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"] }): Promise { + async cleanup( + options: { tasks?: CleanupTask[] } = { + tasks: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"], + }, + ): Promise { await withPgClient(async (client) => { if (options.tasks?.includes("DELETE_PERMAFAILED_JOBS")) { await client.query( `delete from ${escapedWorkerSchema}._private_jobs jobs where attempts = max_attempts - and locked_at is null;` + and locked_at is null;`, ); } const queries = []; if (options.tasks?.includes("GC_TASK_IDENTIFIERS")) { - queries.push(client.query( - `delete from ${escapedWorkerSchema}._private_tasks tasks + queries.push( + client.query( + `delete from ${escapedWorkerSchema}._private_tasks tasks where not exists ( select 1 from ${escapedWorkerSchema}._private_jobs jobs where jobs.task_id = tasks.id - );` - )); + );`, + ), + ); } if (options.tasks?.includes("GC_JOB_QUEUES")) { - queries.push(client.query( - `delete from ${escapedWorkerSchema}._private_job_queues job_queues + queries.push( + client.query( + `delete from ${escapedWorkerSchema}._private_job_queues job_queues where locked_at is null and not exists ( select 1 from ${escapedWorkerSchema}._private_jobs jobs where jobs.job_queue_id = job_queues.id - );` - )); + );`, + ), + ); } await Promise.all(queries); - } - ); - } - } + }); + }, + }; } /** @@ -139,10 +152,13 @@ export async function quickAddJob< /** * **Experimental** - * + * * Database cleanup function */ -export async function cleanup(options: WorkerUtilsOptions, tasks: CleanupTask[]) { +export async function cleanup( + options: WorkerUtilsOptions, + tasks: CleanupTask[], +) { const utils = await makeWorkerUtils(options); try { return await utils.cleanup({ tasks }); From f2c0301b36e5efb75c2626fff4e3e89b71b0ea01 Mon Sep 17 00:00:00 2001 From: Christophe Macabiau Date: Wed, 20 Dec 2023 10:56:36 +0100 Subject: [PATCH 04/10] doc(admin): cleanup function --- website/docs/admin-functions.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/website/docs/admin-functions.md b/website/docs/admin-functions.md index ee2ce77f..70073035 100644 --- a/website/docs/admin-functions.md +++ b/website/docs/admin-functions.md @@ -123,3 +123,13 @@ their IDs/keys). Do **NOT** pass any alive worker ids to this method or Bad Things may happen. ::: + +## Database cleanup +You can cleanup the tables of the database using the cleanup function. 3 options are available: +- DELETE_PERMAFAILED_JOBS: delete permanently failed jobs (assuming they are not locked) +- GC_TASK_IDENTIFIERS: delete tasks identifiers without jobs +- GC_JOB_QUEUES: delete job queues without jobs + +```ts title="JS API" +await workerUtils.cleanup(["DELETE_PERMAFAILED_JOBS","GC_TASK_IDENTIFIERS","GC_JOB_QUEUES"]); +``` From e9e6f7b389075a268016ce17bc12859d9536b923 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 18:12:10 +0000 Subject: [PATCH 05/10] Benjie's refactoring --- __tests__/helpers.ts | 1 + __tests__/workerUtils.cleanup.test.ts | 129 +++++++++++++++++--------- src/cleanup.ts | 60 ++++++++++++ src/cli.ts | 16 ++-- src/interfaces.ts | 14 +-- src/workerUtils.ts | 69 ++------------ website/docs/admin-functions.md | 65 ++++++++++++- 7 files changed, 230 insertions(+), 124 deletions(-) create mode 100644 src/cleanup.ts diff --git a/__tests__/helpers.ts b/__tests__/helpers.ts index 5aa2f24a..58d78964 100644 --- a/__tests__/helpers.ts +++ b/__tests__/helpers.ts @@ -241,6 +241,7 @@ export async function makeSelectionOfJobs( Object.assign(failedJob, failedJobUpdate); return { + _all: [failedJob, regularJob1, lockedJob, regularJob2, untouchedJob], failedJob, regularJob1, lockedJob, diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts index e1a2208e..9e83eec5 100644 --- a/__tests__/workerUtils.cleanup.test.ts +++ b/__tests__/workerUtils.cleanup.test.ts @@ -1,4 +1,4 @@ -import { Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { DbJob, Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; import { ESCAPED_GRAPHILE_WORKER_SCHEMA, makeSelectionOfJobs, @@ -14,7 +14,8 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; -test("cleanup the database", () => +// Test DELETE_PERMAFAILED_JOBS +test("cleanup with DELETE_PERMAFAILED_JOBS", () => withPgClient(async (pgClient) => { await reset(pgClient, options); @@ -22,36 +23,40 @@ test("cleanup the database", () => connectionString: TEST_CONNECTION_STRING, }); - const { - failedJob, - regularJob1, - lockedJob, - regularJob2, - } = await makeSelectionOfJobs(utils, pgClient); - const jobs = [failedJob, regularJob1, lockedJob, regularJob2]; - const jobIds = jobs.map((j) => j.id).sort(numerically); - - // Test DELETE_PERMAFAILED_JOBS - const failedJobs = await utils.permanentlyFailJobs(jobIds, "TESTING!"); - const failedJobIds = failedJobs.map((j) => j.id).sort(numerically); - expect(failedJobIds).toEqual( - [failedJob.id, regularJob1.id, regularJob2.id].sort(numerically), + const { failedJob, regularJob1, regularJob2, _all } = + await makeSelectionOfJobs(utils, pgClient); + const permafailJobIds = [failedJob.id, regularJob1.id, regularJob2.id].sort( + numerically, ); - for (const j of failedJobs) { - expect(j.last_error).toEqual("TESTING!"); - expect(j.attempts).toEqual(j.max_attempts); - expect(j.attempts).toBeGreaterThan(0); - } + const remainingJobIds = _all + .filter((r) => !permafailJobIds.includes(r.id)) + .map((r) => r.id); + + const failedJobs = await utils.permanentlyFailJobs( + permafailJobIds, + "TESTING!", + ); + expect(failedJobs.length).toEqual(permafailJobIds.length); await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] }); - const { rows: jobsFromView } = await pgClient.query( + const { rows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`, ); - failedJobIds.forEach((id) => - expect(jobsFromView.find((j) => j.id === id)).toBeUndefined(), - ); + const jobIds = rows + .map((r) => r.id) + .filter((id) => !permafailJobIds.includes(id)); + expect(jobIds).toEqual(remainingJobIds); + })); - const jobs2: Job[] = []; +test("cleanup with GC_JOB_QUEUES", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + const utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); + + const jobs: Job[] = []; const WORKER_ID_1 = "worker1"; const WORKER_ID_2 = "worker2"; const WORKER_ID_3 = "worker3"; @@ -62,27 +67,38 @@ test("cleanup the database", () => [WORKER_ID_2, "test2", "test_job2"], [WORKER_ID_3, "test3", "test_job3"], ] as const; - for (const [workerId, queueName, jobId] of specs) { + for (const [workerId, queueName, taskIdentifier] of specs) { date.setMinutes(date.getMinutes() - 1); const job = await utils.addJob( - jobId, + taskIdentifier, { a: ++a }, { queueName: queueName ?? undefined }, ); - await pgClient.query( - ` - update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs - set locked_at = $1, locked_by = $2 - where id = $3`, - [workerId ? date.toISOString() : null, workerId, job.id], - ); - jobs2.push(job); + jobs.push(job); + if (workerId) { + await pgClient.query( + `\ +with j as ( + update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs + set locked_at = $1, locked_by = $2 + where id = $3 +), q as ( + update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues as job_queues + set + locked_by = $2::text, + locked_at = $1 + from j + where job_queues.id = j.job_queue_id +) +select * from j`, + [date.toISOString(), workerId, job.id], + ); + } } - // Test GC_JOB_QUEUES - const { rows: queuesBefore } = (await pgClient.query( + const { rows: queuesBefore } = await pgClient.query<{ queue_name: string }>( `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, - )) as { rows: { queue_name: string }[] }; + ); expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual([ "test", "test2", @@ -90,20 +106,41 @@ test("cleanup the database", () => ]); await utils.forceUnlockWorkers(["worker3"]); - await utils.completeJobs([jobs2[jobs2.length - 1].id]); + const lastJob = jobs[jobs.length - 1]; // Belongs to queueName 'task3' + await utils.completeJobs([lastJob.id]); await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] }); - const { rows: queuesAfter } = (await pgClient.query( + const { rows: queuesAfter } = await pgClient.query<{ queue_name: string }>( `select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`, - )) as { rows: { queue_name: string }[] }; + ); expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual([ "test", "test2", ]); + })); - // Test GC_TASK_IDENTIFIERS - const { rows: tasksBefore } = (await pgClient.query( +test("cleanup with GC_TASK_IDENTIFIERS", () => + withPgClient(async (pgClient) => { + await reset(pgClient, options); + + const utils = await makeWorkerUtils({ + connectionString: TEST_CONNECTION_STRING, + }); + + for (const taskIdentifier of [ + "job3", + "test_job1", + "test_job2", + "test_job3", + ]) { + const job = await utils.addJob(taskIdentifier, {}); + if (taskIdentifier === "test_job2") { + await utils.completeJobs([job.id]); + } + } + + const { rows: tasksBefore } = await pgClient.query<{ identifier: string }>( `select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`, - )) as { rows: { identifier: string }[] }; + ); expect(tasksBefore.map((q) => q.identifier).sort()).toEqual([ "job3", "test_job1", @@ -118,7 +155,7 @@ test("cleanup the database", () => expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([ "job3", "test_job1", - "test_job2", + "test_job3", ]); await utils.release(); diff --git a/src/cleanup.ts b/src/cleanup.ts new file mode 100644 index 00000000..74c90699 --- /dev/null +++ b/src/cleanup.ts @@ -0,0 +1,60 @@ +import { CompiledOptions } from "./lib"; +import { CleanupTask } from "./interfaces"; + +const ALL_CLEANUP_TASKS: CleanupTask[] = [ + "GC_TASK_IDENTIFIERS", + "GC_JOB_QUEUES", + "DELETE_PERMAFAILED_JOBS", +]; + +export function assertCleanupTasks( + tasks: string[], +): asserts tasks is CleanupTask[] { + const invalid = tasks.filter( + (t) => !(ALL_CLEANUP_TASKS as string[]).includes(t), + ); + if (invalid.length > 0) { + throw new Error( + `Invalid cleanup tasks; allowed values: '${ALL_CLEANUP_TASKS.join( + "', '", + )}'; you provided: '${tasks.join("', '")}'`, + ); + } +} + +export async function cleanup( + compiledOptions: CompiledOptions, + tasks: CleanupTask[] = ["GC_JOB_QUEUES", "GC_TASK_IDENTIFIERS"], +) { + const { withPgClient, escapedWorkerSchema } = compiledOptions; + await withPgClient(async (client) => { + if (tasks.includes("DELETE_PERMAFAILED_JOBS")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_jobs jobs +where attempts = max_attempts +and locked_at is null;`, + ); + } + if (tasks.includes("GC_TASK_IDENTIFIERS")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_tasks tasks +where tasks.id not in ( + select jobs.task_id + from ${escapedWorkerSchema}._private_jobs jobs +);`, + ); + } + if (tasks.includes("GC_JOB_QUEUES")) { + await client.query( + `\ +delete from ${escapedWorkerSchema}._private_job_queues job_queues +where locked_at is null and id not in ( + select job_queue_id + from ${escapedWorkerSchema}._private_jobs jobs +);`, + ); + } + }); +} diff --git a/src/cli.ts b/src/cli.ts index 226b52a3..01b12420 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,13 +2,12 @@ import { loadConfig } from "graphile-config/load"; import * as yargs from "yargs"; -import { CleanupTask } from "."; import { getCronItemsInternal } from "./getCronItems"; import { getTasksInternal } from "./getTasks"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { EMPTY_PRESET, WorkerPreset } from "./preset"; import { runInternal, runOnceInternal } from "./runner"; -import { cleanup } from "./workerUtils"; +import { cleanup, assertCleanupTasks } from "./cleanup"; const defaults = WorkerPreset.worker!; @@ -74,8 +73,8 @@ const argv = yargs }) .string("config") .option("cleanup", { - description: "Database cleanup function; options: GC_TASK_IDENTIFIERS, GC_JOB_QUEUES, DELETE_PERMAFAILED_JOBS", - default: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"], + description: + "Clean the database, then exit. Accepts a comma-separated list of cleanup tasks: GC_TASK_IDENTIFIERS, GC_JOB_QUEUES, DELETE_PERMAFAILED_JOBS", }) .string("cleanup") .strict(true).argv; @@ -112,7 +111,7 @@ async function main() { const userPreset = await loadConfig(argv.config); const ONCE = argv.once; const SCHEMA_ONLY = argv["schema-only"]; - const CLEANUP = argv.cleanup; + const CLEANUP = argv.cleanup as string | string[] | undefined; if (SCHEMA_ONLY && ONCE) { throw new Error("Cannot specify both --once and --schema-only"); @@ -138,8 +137,11 @@ async function main() { return; } - if (CLEANUP) { - await cleanup(compiledOptions.resolvedPreset.worker, argv.cleanup as CleanupTask[]); + if (CLEANUP != null) { + const cleanups = Array.isArray(CLEANUP) ? CLEANUP : [CLEANUP]; + const tasks = cleanups.flatMap((t) => t.split(",")).map((t) => t.trim()); + assertCleanupTasks(tasks); + await cleanup(compiledOptions, tasks); return; } diff --git a/src/interfaces.ts b/src/interfaces.ts index 867209ac..41aa7def 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -211,12 +211,12 @@ export function isValidTask( export type TaskList = { [Key in - | keyof GraphileWorker.Tasks - | (string & {})]?: Key extends keyof GraphileWorker.Tasks - ? Task - : // The `any` here is required otherwise declaring something as a `TaskList` can cause issues. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Task; + | keyof GraphileWorker.Tasks + | (string & {})]?: Key extends keyof GraphileWorker.Tasks + ? Task + : // The `any` here is required otherwise declaring something as a `TaskList` can cause issues. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + Task; }; export interface WatchedTaskList { @@ -733,7 +733,7 @@ export interface JobAndCronIdentifierWithDetails extends JobAndCronIdentifier { last_execution: Date | null; } -export interface WorkerUtilsOptions extends SharedOptions { } +export interface WorkerUtilsOptions extends SharedOptions {} type BaseEventMap = Record; type EventMapKey = string & keyof TEventMap; diff --git a/src/workerUtils.ts b/src/workerUtils.ts index 3bcd0475..95d480c6 100644 --- a/src/workerUtils.ts +++ b/src/workerUtils.ts @@ -1,4 +1,5 @@ /* eslint-disable @typescript-eslint/ban-types */ +import { cleanup } from "./cleanup"; import { CleanupTask, DbJob, @@ -15,14 +16,15 @@ import { migrate } from "./migrate"; export async function makeWorkerUtils( options: WorkerUtilsOptions, ): Promise { - const [compiledSharedOptions, release] = - await getUtilsAndReleasersFromOptions(options, { + const [compiledOptions, release] = await getUtilsAndReleasersFromOptions( + options, + { scope: { label: "WorkerUtils", }, - }); - const { logger, escapedWorkerSchema, withPgClient, addJob } = - compiledSharedOptions; + }, + ); + const { logger, escapedWorkerSchema, withPgClient, addJob } = compiledOptions; return { withPgClient, @@ -30,7 +32,7 @@ export async function makeWorkerUtils( release, addJob, migrate: () => - withPgClient((pgClient) => migrate(compiledSharedOptions, pgClient)), + withPgClient((pgClient) => migrate(compiledOptions, pgClient)), async completeJobs(ids) { const { rows } = await withPgClient((client) => @@ -85,44 +87,10 @@ export async function makeWorkerUtils( async cleanup( options: { tasks?: CleanupTask[] } = { - tasks: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"], + tasks: ["GC_JOB_QUEUES"], }, ): Promise { - await withPgClient(async (client) => { - if (options.tasks?.includes("DELETE_PERMAFAILED_JOBS")) { - await client.query( - `delete from ${escapedWorkerSchema}._private_jobs jobs - where attempts = max_attempts - and locked_at is null;`, - ); - } - const queries = []; - if (options.tasks?.includes("GC_TASK_IDENTIFIERS")) { - queries.push( - client.query( - `delete from ${escapedWorkerSchema}._private_tasks tasks - where not exists ( - select 1 - from ${escapedWorkerSchema}._private_jobs jobs - where jobs.task_id = tasks.id - );`, - ), - ); - } - if (options.tasks?.includes("GC_JOB_QUEUES")) { - queries.push( - client.query( - `delete from ${escapedWorkerSchema}._private_job_queues job_queues - where locked_at is null and not exists ( - select 1 - from ${escapedWorkerSchema}._private_jobs jobs - where jobs.job_queue_id = job_queues.id - );`, - ), - ); - } - await Promise.all(queries); - }); + return cleanup(compiledOptions, options.tasks); }, }; } @@ -149,20 +117,3 @@ export async function quickAddJob< await utils.release(); } } - -/** - * **Experimental** - * - * Database cleanup function - */ -export async function cleanup( - options: WorkerUtilsOptions, - tasks: CleanupTask[], -) { - const utils = await makeWorkerUtils(options); - try { - return await utils.cleanup({ tasks }); - } finally { - await utils.release(); - } -} diff --git a/website/docs/admin-functions.md b/website/docs/admin-functions.md index 70073035..c8b3f7bb 100644 --- a/website/docs/admin-functions.md +++ b/website/docs/admin-functions.md @@ -125,11 +125,66 @@ Things may happen. ::: ## Database cleanup -You can cleanup the tables of the database using the cleanup function. 3 options are available: -- DELETE_PERMAFAILED_JOBS: delete permanently failed jobs (assuming they are not locked) -- GC_TASK_IDENTIFIERS: delete tasks identifiers without jobs -- GC_JOB_QUEUES: delete job queues without jobs + +Over time it's likely that graphile_worker's tables will grow with stale values +for old job queue names, task identifiers, or permanently failed jobs. You can +clean up this stale information with the cleanup function, indicating which +cleanup operations you would like to undertake. + +:::tip + +If you find yourself calling this quite often or on a schedule, it's likely that +you are doing something wrong (e.g. allowing jobs to permafail, using random +values for job queue names, etc). + +::: + +### GC_JOB_QUEUES + +Delete job queues that don't contain any jobs. Safe. + +### GC_TASK_IDENTIFIERS + +Deletes task identifiers that don't contain any jobs. Unsafe to execute whilst +any Worker is running. + +:::warning + +It is currently unsafe to run this whilst any Graphile Worker instance is +running since any task identifiers for which there are no jobs queued will be +deleted; when another job with that identifier is queued a _new_ unique +identifier will be generated and that won't match the internal identifiers that +the running workers have cached. + +::: + +### DELETE_PERMAFAILED_JOBS + +Deletes any unlocked jobs that will never be reattempted due to `attempts` +reaching `max_attempts`. Will delete this data, but is otherwise safe. + +:::tip + +You should write your tasks such that no job will ever permafail; for example +after 20 attempts you might have the job do some cleanup and then exit +successfuly. + +::: + +### Example + +In the CLI: + +```bash title="CLI" +graphile-worker --cleanup DELETE_PERMAFAILED_JOBS,GC_TASK_IDENTIFIERS,GC_JOB_QUEUES +``` + +Or in the library using [WorkerUtils](/docs/library/queue#workerutils): ```ts title="JS API" -await workerUtils.cleanup(["DELETE_PERMAFAILED_JOBS","GC_TASK_IDENTIFIERS","GC_JOB_QUEUES"]); +await workerUtils.cleanup([ + "DELETE_PERMAFAILED_JOBS", + "GC_TASK_IDENTIFIERS", + "GC_JOB_QUEUES", +]); ``` From b220d06ee8dd03da748c5ef2aab35da4114cdfe7 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 18:34:29 +0000 Subject: [PATCH 06/10] Tweak comment --- src/interfaces.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index 41aa7def..9a858ad1 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -179,12 +179,13 @@ export interface WorkerUtils extends Helpers { /** * **Experimental** * - * Database cleanup function + * Database cleanup function. Supported tasks: + * * - GC_TASK_IDENTIFIERS: delete task identifiers that are no longer referenced by any jobs * - GC_JOB_QUEUES: delete job queues that are no longer referenced by any jobs * - DELETE_PERMAFAILED_JOBS: delete permanently failed jobs if they are not locked * - * Default: ["GC_TASK_IDENTIFIERS", "GC_JOB_QUEUES"] + * Default: ["GC_JOB_QUEUES"] */ cleanup(options: { tasks?: CleanupTask[] }): Promise; } From 53c7b10962d2de8fe032952b55279325484161a6 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 18:43:24 +0000 Subject: [PATCH 07/10] Fix tests --- __tests__/helpers.ts | 1 - __tests__/workerUtils.cleanup.test.ts | 13 ++++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/__tests__/helpers.ts b/__tests__/helpers.ts index 58d78964..5aa2f24a 100644 --- a/__tests__/helpers.ts +++ b/__tests__/helpers.ts @@ -241,7 +241,6 @@ export async function makeSelectionOfJobs( Object.assign(failedJob, failedJobUpdate); return { - _all: [failedJob, regularJob1, lockedJob, regularJob2, untouchedJob], failedJob, regularJob1, lockedJob, diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts index 9e83eec5..fd2dd13f 100644 --- a/__tests__/workerUtils.cleanup.test.ts +++ b/__tests__/workerUtils.cleanup.test.ts @@ -23,14 +23,15 @@ test("cleanup with DELETE_PERMAFAILED_JOBS", () => connectionString: TEST_CONNECTION_STRING, }); - const { failedJob, regularJob1, regularJob2, _all } = - await makeSelectionOfJobs(utils, pgClient); + const jobs = await makeSelectionOfJobs(utils, pgClient); + const { failedJob, regularJob1, regularJob2 } = jobs; const permafailJobIds = [failedJob.id, regularJob1.id, regularJob2.id].sort( numerically, ); - const remainingJobIds = _all + const remainingJobIds = Object.values(jobs) .filter((r) => !permafailJobIds.includes(r.id)) - .map((r) => r.id); + .map((r) => r.id) + .sort(numerically); const failedJobs = await utils.permanentlyFailJobs( permafailJobIds, @@ -44,7 +45,8 @@ test("cleanup with DELETE_PERMAFAILED_JOBS", () => ); const jobIds = rows .map((r) => r.id) - .filter((id) => !permafailJobIds.includes(id)); + .filter((id) => !permafailJobIds.includes(id)) + .sort(numerically); expect(jobIds).toEqual(remainingJobIds); })); @@ -82,6 +84,7 @@ with j as ( update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs set locked_at = $1, locked_by = $2 where id = $3 + returning * ), q as ( update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues as job_queues set From 1d8b40175dfbe29ff6d308b8f0596e505062a351 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 18:49:42 +0000 Subject: [PATCH 08/10] Lint --- src/cleanup.ts | 2 +- src/cli.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cleanup.ts b/src/cleanup.ts index 74c90699..06b8cf19 100644 --- a/src/cleanup.ts +++ b/src/cleanup.ts @@ -1,5 +1,5 @@ -import { CompiledOptions } from "./lib"; import { CleanupTask } from "./interfaces"; +import { CompiledOptions } from "./lib"; const ALL_CLEANUP_TASKS: CleanupTask[] = [ "GC_TASK_IDENTIFIERS", diff --git a/src/cli.ts b/src/cli.ts index 01b12420..489b7dc2 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,12 +2,12 @@ import { loadConfig } from "graphile-config/load"; import * as yargs from "yargs"; +import { assertCleanupTasks, cleanup } from "./cleanup"; import { getCronItemsInternal } from "./getCronItems"; import { getTasksInternal } from "./getTasks"; import { getUtilsAndReleasersFromOptions } from "./lib"; import { EMPTY_PRESET, WorkerPreset } from "./preset"; import { runInternal, runOnceInternal } from "./runner"; -import { cleanup, assertCleanupTasks } from "./cleanup"; const defaults = WorkerPreset.worker!; From 0935549718ed0aa2bfb270ef08b0f90a6a1fefa6 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 19:00:37 +0000 Subject: [PATCH 09/10] Make tests more reliable on error --- __tests__/workerUtils.addJob.test.ts | 19 ++++++++++++---- __tests__/workerUtils.cleanup.test.ts | 22 ++++++++++++++----- __tests__/workerUtils.completeJobs.test.ts | 15 +++++++++++-- .../workerUtils.forceUnlockWorkers.test.ts | 17 ++++++++++---- .../workerUtils.permanentlyFailJobs.test.ts | 16 ++++++++++---- __tests__/workerUtils.rescheduleJobs.test.ts | 16 ++++++++++---- 6 files changed, 81 insertions(+), 24 deletions(-) diff --git a/__tests__/workerUtils.addJob.test.ts b/__tests__/workerUtils.addJob.test.ts index d753dcc2..9d5d5137 100644 --- a/__tests__/workerUtils.addJob.test.ts +++ b/__tests__/workerUtils.addJob.test.ts @@ -4,6 +4,7 @@ import { runTaskListOnce, Task, WorkerSharedOptions, + WorkerUtils, } from "../src/index"; import { getJobs, @@ -19,16 +20,23 @@ const REFERENCE_TIMESTAMP = 1609459200000; /* 1st January 2021, 00:00:00 UTC */ const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("runs a job added through the worker utils", () => withPgClient(async (pgClient) => { await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); await utils.addJob("job3", { a: 1 }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -44,7 +52,7 @@ test("supports the jobKey API", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); await utils.addJob("job3", { a: 1 }, { jobKey: "UNIQUE" }); @@ -52,6 +60,7 @@ test("supports the jobKey API", () => await utils.addJob("job3", { a: 3 }, { jobKey: "UNIQUE" }); await utils.addJob("job3", { a: 4 }, { jobKey: "UNIQUE" }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -69,7 +78,7 @@ test("supports the jobKey API with jobKeyMode", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); const runAt1 = new Date("2200-01-01T00:00:00Z"); @@ -119,6 +128,7 @@ test("supports the jobKey API with jobKeyMode", () => expect(job.run_at.toISOString()).toBe(runAt4.toISOString()); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); @@ -156,7 +166,7 @@ test("adding job respects useNodeTime", () => await reset(pgClient, options); // Schedule a job - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, useNodeTime: true, }); @@ -164,6 +174,7 @@ test("adding job respects useNodeTime", () => await setTime(timeOfAddJob); await utils.addJob("job3", { a: 1 }); await utils.release(); + utils = null; // Assert that it has an entry in jobs / job_queues const jobs = await getJobs(pgClient); diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts index fd2dd13f..61268e9e 100644 --- a/__tests__/workerUtils.cleanup.test.ts +++ b/__tests__/workerUtils.cleanup.test.ts @@ -1,4 +1,10 @@ -import { DbJob, Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + DbJob, + Job, + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { ESCAPED_GRAPHILE_WORKER_SCHEMA, makeSelectionOfJobs, @@ -14,12 +20,18 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + // Test DELETE_PERMAFAILED_JOBS test("cleanup with DELETE_PERMAFAILED_JOBS", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -54,7 +66,7 @@ test("cleanup with GC_JOB_QUEUES", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -125,7 +137,7 @@ test("cleanup with GC_TASK_IDENTIFIERS", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -160,6 +172,4 @@ test("cleanup with GC_TASK_IDENTIFIERS", () => "test_job1", "test_job3", ]); - - await utils.release(); })); diff --git a/__tests__/workerUtils.completeJobs.test.ts b/__tests__/workerUtils.completeJobs.test.ts index acb4ebaa..ac7a872b 100644 --- a/__tests__/workerUtils.completeJobs.test.ts +++ b/__tests__/workerUtils.completeJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -40,4 +50,5 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining[1]).toMatchObject(untouchedJob); await utils.release(); + utils = null; })); diff --git a/__tests__/workerUtils.forceUnlockWorkers.test.ts b/__tests__/workerUtils.forceUnlockWorkers.test.ts index 5d057fe7..792e77a0 100644 --- a/__tests__/workerUtils.forceUnlockWorkers.test.ts +++ b/__tests__/workerUtils.forceUnlockWorkers.test.ts @@ -1,4 +1,9 @@ -import { Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + Job, + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { ESCAPED_GRAPHILE_WORKER_SCHEMA, getJobs, @@ -9,11 +14,17 @@ import { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("unlocks jobs for the given workers, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -89,6 +100,4 @@ where jobs.job_queue_id = job_queues.id;`, locked_by: WORKER_ID_1, }), ]); - - await utils.release(); })); diff --git a/__tests__/workerUtils.permanentlyFailJobs.test.ts b/__tests__/workerUtils.permanentlyFailJobs.test.ts index 518aafb3..babd0acd 100644 --- a/__tests__/workerUtils.permanentlyFailJobs.test.ts +++ b/__tests__/workerUtils.permanentlyFailJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -46,6 +56,4 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining).toHaveLength(2); expect(remaining[0]).toMatchObject(lockedJob); expect(remaining[1]).toMatchObject(untouchedJob); - - await utils.release(); })); diff --git a/__tests__/workerUtils.rescheduleJobs.test.ts b/__tests__/workerUtils.rescheduleJobs.test.ts index 0b3e653d..c54dd097 100644 --- a/__tests__/workerUtils.rescheduleJobs.test.ts +++ b/__tests__/workerUtils.rescheduleJobs.test.ts @@ -1,4 +1,8 @@ -import { makeWorkerUtils, WorkerSharedOptions } from "../src/index"; +import { + makeWorkerUtils, + WorkerSharedOptions, + WorkerUtils, +} from "../src/index"; import { getJobs, makeSelectionOfJobs, @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) { const options: WorkerSharedOptions = {}; +let utils: WorkerUtils | null = null; +afterEach(async () => { + await utils?.release(); + utils = null; +}); + test("completes the jobs, leaves others unaffected", () => withPgClient(async (pgClient) => { await reset(pgClient, options); - const utils = await makeWorkerUtils({ + utils = await makeWorkerUtils({ connectionString: TEST_CONNECTION_STRING, }); @@ -54,6 +64,4 @@ test("completes the jobs, leaves others unaffected", () => expect(remaining).toHaveLength(2); expect(remaining[0]).toMatchObject(lockedJob); expect(remaining[1]).toMatchObject(untouchedJob); - - await utils.release(); })); From cc98ecd7949b62da8962d715b288acb861b0a54e Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 15 Jan 2024 19:26:33 +0000 Subject: [PATCH 10/10] Add to release notes --- RELEASE_NOTES.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5659cebc..5d88a1a5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,6 +19,11 @@ to make sure the system as a whole remains consistent. Read more: [Worker Pro Migration](https://worker.graphile.org/docs/pro/migration). +## Pending + +- Add `cleanup` function to remove unused queues, stale task identifiers, and + permanently failed jobs. + ## v0.16.0 _There's a breakdown of these release notes available on the new