Skip to content

Commit

Permalink
Add "database cleanup" feature (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Jan 15, 2024
2 parents 0257b2b + 2c2649b commit 0d71581
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 25 deletions.
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ to make sure the system as a whole remains consistent.
Read more:
[Worker Pro Migration](https://worker.graphile.org/docs/pro/migration).

## Pending

- Add `cleanup` function to remove unused queues, stale task identifiers, and
permanently failed jobs.

## v0.16.1

- Fixes issue importing task files that were written in TypeScript ESM format
Expand Down
19 changes: 15 additions & 4 deletions __tests__/workerUtils.addJob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
runTaskListOnce,
Task,
WorkerSharedOptions,
WorkerUtils,
} from "../src/index";
import {
getJobs,
Expand All @@ -19,16 +20,23 @@ const REFERENCE_TIMESTAMP = 1609459200000; /* 1st January 2021, 00:00:00 UTC */

const options: WorkerSharedOptions = {};

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

test("runs a job added through the worker utils", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

// Schedule a job
const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});
await utils.addJob("job3", { a: 1 });
await utils.release();
utils = null;

// Assert that it has an entry in jobs / job_queues
const jobs = await getJobs(pgClient);
Expand All @@ -44,14 +52,15 @@ test("supports the jobKey API", () =>
await reset(pgClient, options);

// Schedule a job
const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});
await utils.addJob("job3", { a: 1 }, { jobKey: "UNIQUE" });
await utils.addJob("job3", { a: 2 }, { jobKey: "UNIQUE" });
await utils.addJob("job3", { a: 3 }, { jobKey: "UNIQUE" });
await utils.addJob("job3", { a: 4 }, { jobKey: "UNIQUE" });
await utils.release();
utils = null;

// Assert that it has an entry in jobs / job_queues
const jobs = await getJobs(pgClient);
Expand All @@ -69,7 +78,7 @@ test("supports the jobKey API with jobKeyMode", () =>
await reset(pgClient, options);

// Schedule a job
const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});
const runAt1 = new Date("2200-01-01T00:00:00Z");
Expand Down Expand Up @@ -119,6 +128,7 @@ test("supports the jobKey API with jobKeyMode", () =>
expect(job.run_at.toISOString()).toBe(runAt4.toISOString());

await utils.release();
utils = null;

// Assert that it has an entry in jobs / job_queues
const jobs = await getJobs(pgClient);
Expand Down Expand Up @@ -156,14 +166,15 @@ test("adding job respects useNodeTime", () =>
await reset(pgClient, options);

// Schedule a job
const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
useNodeTime: true,
});
const timeOfAddJob = REFERENCE_TIMESTAMP + 1 * HOUR;
await setTime(timeOfAddJob);
await utils.addJob("job3", { a: 1 });
await utils.release();
utils = null;

// Assert that it has an entry in jobs / job_queues
const jobs = await getJobs(pgClient);
Expand Down
175 changes: 175 additions & 0 deletions __tests__/workerUtils.cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import {
DbJob,
Job,
makeWorkerUtils,
WorkerSharedOptions,
WorkerUtils,
} from "../src/index";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
makeSelectionOfJobs,
reset,
TEST_CONNECTION_STRING,
withPgClient,
} from "./helpers";

/** For sorting arrays of numbers or numeric strings */
function numerically(a: string | number, b: string | number) {
return parseFloat(String(a)) - parseFloat(String(b));
}

const options: WorkerSharedOptions = {};

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

// Test DELETE_PERMAFAILED_JOBS
test("cleanup with DELETE_PERMAFAILED_JOBS", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

const jobs = await makeSelectionOfJobs(utils, pgClient);
const { failedJob, regularJob1, regularJob2 } = jobs;
const permafailJobIds = [failedJob.id, regularJob1.id, regularJob2.id].sort(
numerically,
);
const remainingJobIds = Object.values(jobs)
.filter((r) => !permafailJobIds.includes(r.id))
.map((r) => r.id)
.sort(numerically);

const failedJobs = await utils.permanentlyFailJobs(
permafailJobIds,
"TESTING!",
);
expect(failedJobs.length).toEqual(permafailJobIds.length);

await utils.cleanup({ tasks: ["DELETE_PERMAFAILED_JOBS"] });
const { rows } = await pgClient.query<DbJob>(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs`,
);
const jobIds = rows
.map((r) => r.id)
.filter((id) => !permafailJobIds.includes(id))
.sort(numerically);
expect(jobIds).toEqual(remainingJobIds);
}));

test("cleanup with GC_JOB_QUEUES", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, "test", "test_job1"],
[WORKER_ID_2, "test2", "test_job2"],
[WORKER_ID_3, "test3", "test_job3"],
] as const;
for (const [workerId, queueName, taskIdentifier] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
taskIdentifier,
{ a: ++a },
{ queueName: queueName ?? undefined },
);
jobs.push(job);
if (workerId) {
await pgClient.query(
`\
with j as (
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set locked_at = $1, locked_by = $2
where id = $3
returning *
), q as (
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues as job_queues
set
locked_by = $2::text,
locked_at = $1
from j
where job_queues.id = j.job_queue_id
)
select * from j`,
[date.toISOString(), workerId, job.id],
);
}
}

const { rows: queuesBefore } = await pgClient.query<{ queue_name: string }>(
`select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`,
);
expect(queuesBefore.map((q) => q.queue_name).sort()).toEqual([
"test",
"test2",
"test3",
]);

await utils.forceUnlockWorkers(["worker3"]);
const lastJob = jobs[jobs.length - 1]; // Belongs to queueName 'task3'
await utils.completeJobs([lastJob.id]);
await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] });
const { rows: queuesAfter } = await pgClient.query<{ queue_name: string }>(
`select queue_name from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_job_queues`,
);
expect(queuesAfter.map((q) => q.queue_name).sort()).toEqual([
"test",
"test2",
]);
}));

test("cleanup with GC_TASK_IDENTIFIERS", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

for (const taskIdentifier of [
"job3",
"test_job1",
"test_job2",
"test_job3",
]) {
const job = await utils.addJob(taskIdentifier, {});
if (taskIdentifier === "test_job2") {
await utils.completeJobs([job.id]);
}
}

const { rows: tasksBefore } = await pgClient.query<{ identifier: string }>(
`select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`,
);
expect(tasksBefore.map((q) => q.identifier).sort()).toEqual([
"job3",
"test_job1",
"test_job2",
"test_job3",
]);

await utils.cleanup({ tasks: ["GC_TASK_IDENTIFIERS"] });
const { rows: tasksAfter } = (await pgClient.query(
`select identifier from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_tasks`,
)) as { rows: { identifier: string }[] };
expect(tasksAfter.map((q) => q.identifier).sort()).toEqual([
"job3",
"test_job1",
"test_job3",
]);
}));
15 changes: 13 additions & 2 deletions __tests__/workerUtils.completeJobs.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { makeWorkerUtils, WorkerSharedOptions } from "../src/index";
import {
makeWorkerUtils,
WorkerSharedOptions,
WorkerUtils,
} from "../src/index";
import {
getJobs,
makeSelectionOfJobs,
Expand All @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) {

const options: WorkerSharedOptions = {};

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

test("completes the jobs, leaves others unaffected", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

Expand All @@ -40,4 +50,5 @@ test("completes the jobs, leaves others unaffected", () =>
expect(remaining[1]).toMatchObject(untouchedJob);

await utils.release();
utils = null;
}));
17 changes: 13 additions & 4 deletions __tests__/workerUtils.forceUnlockWorkers.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { Job, makeWorkerUtils, WorkerSharedOptions } from "../src/index";
import {
Job,
makeWorkerUtils,
WorkerSharedOptions,
WorkerUtils,
} from "../src/index";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
getJobs,
Expand All @@ -9,11 +14,17 @@ import {

const options: WorkerSharedOptions = {};

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

test("unlocks jobs for the given workers, leaves others unaffected", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

Expand Down Expand Up @@ -89,6 +100,4 @@ where jobs.job_queue_id = job_queues.id;`,
locked_by: WORKER_ID_1,
}),
]);

await utils.release();
}));
16 changes: 12 additions & 4 deletions __tests__/workerUtils.permanentlyFailJobs.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { makeWorkerUtils, WorkerSharedOptions } from "../src/index";
import {
makeWorkerUtils,
WorkerSharedOptions,
WorkerUtils,
} from "../src/index";
import {
getJobs,
makeSelectionOfJobs,
Expand All @@ -14,11 +18,17 @@ function numerically(a: string | number, b: string | number) {

const options: WorkerSharedOptions = {};

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

test("completes the jobs, leaves others unaffected", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

const utils = await makeWorkerUtils({
utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});

Expand Down Expand Up @@ -46,6 +56,4 @@ test("completes the jobs, leaves others unaffected", () =>
expect(remaining).toHaveLength(2);
expect(remaining[0]).toMatchObject(lockedJob);
expect(remaining[1]).toMatchObject(untouchedJob);

await utils.release();
}));
Loading

0 comments on commit 0d71581

Please sign in to comment.