Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/cleanup #415

Merged
merged 11 commits into from
Jan 15, 2024
Merged
165 changes: 165 additions & 0 deletions __tests__/workerUtils.cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { DbJob, Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
makeSelectionOfJobs,
reset,
TEST_CONNECTION_STRING,
withPgClient,
} from "./helpers";

/** For sorting arrays of numbers or numeric strings */
function numerically(a: string | number, b: string | number) {
return parseFloat(String(a)) - parseFloat(String(b));
}

const options: WorkerSharedOptions = {};

// Test DELETE_PERMAFAILED_JOBS
test("cleanup with DELETE_PERMAFAILED_JOBS", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

const jobs = await makeSelectionOfJobs(utils, pgClient);
const { failedJob, regularJob1, regularJob2 } = jobs;
const permafailJobIds = [failedJob.id, regularJob1.id, regularJob2.id].sort(
numerically,
);
const remainingJobIds = Object.values(jobs)
.filter((r) => !permafailJobIds.includes(r.id))
.map((r) => r.id)
.sort(numerically);

const failedJobs = await utils.permanentlyFailJobs(
permafailJobIds,
"TESTING!",
);
expect(failedJobs.length).toEqual(permafailJobIds.length);

await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] });
const { rows } = await pgClient.query<DbJob>(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`,
);
const jobIds = rows
.map((r) => r.id)
.filter((id) => !permafailJobIds.includes(id))
.sort(numerically);
expect(jobIds).toEqual(remainingJobIds);
}));

test("cleanup with GC_JOB_QUEUES", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, "test", "test_job1"],
[WORKER_ID_2, "test2", "test_job2"],
[WORKER_ID_3, "test3", "test_job3"],
] as const;
for (const [workerId, queueName, taskIdentifier] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
taskIdentifier,
{ a: ++a },
{ queueName: queueName ?? undefined },
);
jobs.push(job);
if (workerId) {
await pgClient.query(
`\
with j as (
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set locked_at = $1, locked_by = $2
where id = $3
returning *
), q as (
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues as job_queues
set
locked_by = $2::text,
locked_at = $1
from j
where job_queues.id = j.job_queue_id
)
select * from j`,
[date.toISOString(), workerId, job.id],
);
}
}

const { rows: queuesBefore } = await pgClient.query<{ queue_name: string }>(
`select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`,
);
expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual([
"test",
"test2",
"test3",
]);

await utils.forceUnlockWorkers(["worker3"]);
const lastJob = jobs[jobs.length - 1]; // Belongs to queueName 'task3'
await utils.completeJobs([lastJob.id]);
await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] });
const { rows: queuesAfter } = await pgClient.query<{ queue_name: string }>(
`select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`,
);
expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual([
"test",
"test2",
]);
}));

test("cleanup with GC_TASK_IDENTIFIERS", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

for (const taskIdentifier of [
"job3",
"test_job1",
"test_job2",
"test_job3",
]) {
const job = await utils.addJob(taskIdentifier, {});
if (taskIdentifier === "test_job2") {
await utils.completeJobs([job.id]);
}
}

const { rows: tasksBefore } = await pgClient.query<{ identifier: string }>(
`select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`,
);
expect(tasksBefore.map((q) => q.identifier).sort()).toEqual([
"job3",
"test_job1",
"test_job2",
"test_job3",
]);

await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] });
const { rows: tasksAfter } = (await pgClient.query(
`select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`,
)) as { rows: { identifier: string }[] };
expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([
"job3",
"test_job1",
"test_job3",
]);

await utils.release();
}));
60 changes: 60 additions & 0 deletions src/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { CompiledOptions } from "./lib";

Check failure on line 1 in src/cleanup.ts

View workflow job for this annotation

GitHub Actions / lint (20.x)

Run autofix to sort these imports!
import { CleanupTask } from "./interfaces";

const ALL_CLEANUP_TASKS: CleanupTask[] = [
"GC_TASK_IDENTIFIERS",
"GC_JOB_QUEUES",
"DELETE_PERMAFAILED_JOBS",
];

export function assertCleanupTasks(
tasks: string[],
): asserts tasks is CleanupTask[] {
const invalid = tasks.filter(
(t) => !(ALL_CLEANUP_TASKS as string[]).includes(t),
);
if (invalid.length > 0) {
throw new Error(
`Invalid cleanup tasks; allowed values: '${ALL_CLEANUP_TASKS.join(
"', '",
)}'; you provided: '${tasks.join("', '")}'`,
);
}
}

export async function cleanup(
compiledOptions: CompiledOptions,
tasks: CleanupTask[] = ["GC_JOB_QUEUES", "GC_TASK_IDENTIFIERS"],
) {
const { withPgClient, escapedWorkerSchema } = compiledOptions;
await withPgClient(async (client) => {
if (tasks.includes("DELETE_PERMAFAILED_JOBS")) {
await client.query(
`\
delete from ${escapedWorkerSchema}._private_jobs jobs
where attempts = max_attempts
and locked_at is null;`,
);
}
if (tasks.includes("GC_TASK_IDENTIFIERS")) {
await client.query(
`\
delete from ${escapedWorkerSchema}._private_tasks tasks
where tasks.id not in (
select jobs.task_id
from ${escapedWorkerSchema}._private_jobs jobs
);`,
);
}
if (tasks.includes("GC_JOB_QUEUES")) {
await client.query(
`\
delete from ${escapedWorkerSchema}._private_job_queues job_queues
where locked_at is null and id not in (
select job_queue_id
from ${escapedWorkerSchema}._private_jobs jobs
);`,
);
}
});
}
15 changes: 15 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env node
import { loadConfig } from "graphile-config/load";

Check failure on line 2 in src/cli.ts

View workflow job for this annotation

GitHub Actions / lint (20.x)

Run autofix to sort these imports!
import * as yargs from "yargs";

import { getCronItemsInternal } from "./getCronItems";
Expand All @@ -7,6 +7,7 @@
import { getUtilsAndReleasersFromOptions } from "./lib";
import { EMPTY_PRESET, WorkerPreset } from "./preset";
import { runInternal, runOnceInternal } from "./runner";
import { cleanup, assertCleanupTasks } from "./cleanup";

const defaults = WorkerPreset.worker!;

Expand Down Expand Up @@ -71,6 +72,11 @@
normalize: true,
})
.string("config")
.option("cleanup", {
description:
"Clean the database, then exit. Accepts a comma-separated list of cleanup tasks: GC_TASK_IDENTIFIERS, GC_JOB_QUEUES, DELETE_PERMAFAILED_JOBS",
})
.string("cleanup")
.strict(true).argv;

const integerOrUndefined = (n: number | undefined): number | undefined => {
Expand Down Expand Up @@ -105,6 +111,7 @@
const userPreset = await loadConfig(argv.config);
const ONCE = argv.once;
const SCHEMA_ONLY = argv["schema-only"];
const CLEANUP = argv.cleanup as string | string[] | undefined;

if (SCHEMA_ONLY && ONCE) {
throw new Error("Cannot specify both --once and --schema-only");
Expand All @@ -130,6 +137,14 @@
return;
}

if (CLEANUP != null) {
const cleanups = Array.isArray(CLEANUP) ? CLEANUP : [CLEANUP];
const tasks = cleanups.flatMap((t) => t.split(",")).map((t) => t.trim());
assertCleanupTasks(tasks);
await cleanup(compiledOptions, tasks);
return;
}

const watchedTasks = await getTasksInternal(
compiledOptions,
compiledOptions.resolvedPreset.worker.taskDirectory,
Expand Down
18 changes: 18 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ export interface JobHelpers extends Helpers {
abortSignal?: AbortSignal;
}

export type CleanupTask =
| "GC_TASK_IDENTIFIERS"
| "GC_JOB_QUEUES"
| "DELETE_PERMAFAILED_JOBS";

/**
* Utilities for working with Graphile Worker. Primarily useful for migrating
* the jobs database and queueing jobs.
Expand Down Expand Up @@ -170,6 +175,19 @@ export interface WorkerUtils extends Helpers {
* were terminated, are permanently unreachable, etc).
*/
forceUnlockWorkers: (workerIds: string[]) => Promise<void>;

/**
* **Experimental**
*
* Database cleanup function. Supported tasks:
*
* - GC_TASK_IDENTIFIERS: delete task identifiers that are no longer referenced by any jobs
* - GC_JOB_QUEUES: delete job queues that are no longer referenced by any jobs
* - DELETE_PERMAFAILED_JOBS: delete permanently failed jobs if they are not locked
*
* Default: ["GC_JOB_QUEUES"]
*/
cleanup(options: { tasks?: CleanupTask[] }): Promise<void>;
}

export type PromiseOrDirect<T> = Promise<T> | T;
Expand Down
30 changes: 23 additions & 7 deletions src/workerUtils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
/* eslint-disable @typescript-eslint/ban-types */
import { DbJob, TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces";
import { cleanup } from "./cleanup";
import {
CleanupTask,
DbJob,
TaskSpec,
WorkerUtils,
WorkerUtilsOptions,
} from "./interfaces";
import { getUtilsAndReleasersFromOptions } from "./lib";
import { migrate } from "./migrate";

Expand All @@ -9,22 +16,23 @@ import { migrate } from "./migrate";
export async function makeWorkerUtils(
options: WorkerUtilsOptions,
): Promise<WorkerUtils> {
const [compiledSharedOptions, release] =
await getUtilsAndReleasersFromOptions(options, {
const [compiledOptions, release] = await getUtilsAndReleasersFromOptions(
options,
{
scope: {
label: "WorkerUtils",
},
});
const { logger, escapedWorkerSchema, withPgClient, addJob } =
compiledSharedOptions;
},
);
const { logger, escapedWorkerSchema, withPgClient, addJob } = compiledOptions;

return {
withPgClient,
logger,
release,
addJob,
migrate: () =>
withPgClient((pgClient) => migrate(compiledSharedOptions, pgClient)),
withPgClient((pgClient) => migrate(compiledOptions, pgClient)),

async completeJobs(ids) {
const { rows } = await withPgClient((client) =>
Expand Down Expand Up @@ -76,6 +84,14 @@ export async function makeWorkerUtils(
),
);
},

async cleanup(
options: { tasks?: CleanupTask[] } = {
tasks: ["GC_JOB_QUEUES"],
},
): Promise<void> {
return cleanup(compiledOptions, options.tasks);
},
};
}

Expand Down
Loading
Loading