Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 🎸 Added sheduled kind of jobs to the Queue #96

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 129 additions & 76 deletions packages/queue/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const logger = createLogger('Queue');
*/
export enum JobStatus {
Pending,
Scheduled,
Started,
Done,
Cancelled,
Expand Down Expand Up @@ -101,6 +102,8 @@ export interface JobConfig<T extends JobData = JobData> {
retries?: number;
/** Retries delay */
retriesDelay?: number;
/** Scheduled time */
scheduledTime?: number;
/** The history of the job */
history?: JobHistoryInterface;
}
Expand Down Expand Up @@ -133,6 +136,8 @@ export class Job<T extends JobData = JobData> {
retries: number;
/** The period of time between retries */
retriesDelay: number;
/** Scheduled time */
scheduledTime?: number;
/** The history of the job */
history: JobHistory;

Expand All @@ -152,6 +157,11 @@ export class Job<T extends JobData = JobData> {
this.maxRetries = config.maxRetries ?? 0;
this.retries = config.retries ?? 0;
this.retriesDelay = config.retriesDelay ?? 0;

if (this.isRecurrent && config.scheduledTime) {
throw new Error('Job cannot be recurrent and scheduled at the same time');
}
this.scheduledTime = config.scheduledTime;
this.history = new JobHistory(config.history ?? {});
}

Expand Down Expand Up @@ -204,7 +214,7 @@ export class Job<T extends JobData = JobData> {
get executable() {
return (
!this.expired &&
this.status === JobStatus.Pending &&
[JobStatus.Pending, JobStatus.Scheduled].includes(this.status) &&
((!this.isRecurrent &&
(this.maxRetries === 0 ||
(this.maxRetries > 0 && this.retries < this.maxRetries))) ||
Expand Down Expand Up @@ -401,7 +411,9 @@ export class Queue extends EventEmitter<QueueEvents> {
// Only Pending jobs must be restored
if (
jobConfig.history &&
JobHistory.getStatus(jobConfig.history) === JobStatus.Pending
[JobStatus.Pending, JobStatus.Scheduled].includes(
JobHistory.getStatus(jobConfig.history),
)
) {
this.add(jobConfig);
}
Expand Down Expand Up @@ -512,6 +524,87 @@ export class Queue extends EventEmitter<QueueEvents> {
void this.storageUpdate(job.id, job);
}

/**
* Executes Job
* @param {Job} job Job to execute
* @returns {Promise<void>}
*/
private async executeJob(job: Job): Promise<void> {
try {
if (!job.executable) {
return;
}

this.changeJobStatus(job, JobStatus.Started);

const handler = this.handlers.getHandler(job.handlerName);

const result = await job.execute(handler);
logger.trace(`Job #${job.id} execution result: ${String(result)}`);

if (result && job.isRecurrent) {
// If the job is recurrent and the handler returned true, reschedule the job
if (!job.expired) {
logger.trace(`Job #${job.id} is done but new one is scheduled`);
this.changeJobStatus(job, JobStatus.Done);
setTimeout(() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
isRecurrent: job.isRecurrent,
recurrenceInterval: job.recurrenceInterval,
maxRecurrences: job.maxRecurrences,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
}, job.recurrenceInterval);
} else {
logger.trace(`Job #${job.id} is expired`);
this.changeJobStatus(job, JobStatus.Expired);
}
} else {
logger.trace(`Job #${job.id} is done`);
this.changeJobStatus(job, JobStatus.Done);
}
} catch (error) {
logger.error(`Job #${job.id} is errored`, error);
job.history.errors.push(String(error));

if (job.maxRetries > 0 && job.retries < job.maxRetries) {
// If the job hasn't reached the maximum number of retries, retry it
job.retries++;

if (job.retriesDelay > 0) {
logger.trace(`Job #${job.id} filed but scheduled for restart`);
this.changeJobStatus(job, JobStatus.Failed);
setTimeout(
() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
},
backoffWithJitter(
job.retriesDelay,
job.retries,
job.retriesDelay * 10,
),
);
} else {
logger.trace(`Job #${job.id} failed and immediately restarted`);
this.changeJobStatus(job, JobStatus.Pending);
}
} else {
logger.trace(`Job #${job.id} filed`);
this.changeJobStatus(job, JobStatus.Failed);
}
}
}

/**
* Starts processing jobs in the queue.
* It finds executable jobs and runs them concurrently up to the concurrency limit.
Expand All @@ -524,13 +617,37 @@ export class Queue extends EventEmitter<QueueEvents> {
*/
private async start() {
try {
const now = Date.now();

const activeJobs = this.jobs.filter(
(job) => job.status === JobStatus.Started,
);
const pendingJobs = this.jobs.filter((job) => job.executable);
logger.trace(`Active jobs: ${activeJobs.length}`);

// Select all pending jobs except for scheduled
const pendingJobs = this.jobs.filter(
(job) =>
job.executable &&
(!job.scheduledTime ||
(job.scheduledTime && job.scheduledTime <= now)),
);
logger.trace(`Pending jobs: ${pendingJobs.length}`);

// Select all scheduled jobs
const scheduledJobs = this.jobs.filter(
(job) => job.executable && job.scheduledTime,
);

if (scheduledJobs.length > 0) {
scheduledJobs.forEach((job) => {
if (job.scheduledTime && job.scheduledTime > now) {
const delay = job.scheduledTime - now;
this.changeJobStatus(job, JobStatus.Scheduled);
setTimeout(() => void this.executeJob(job), delay);
}
});
}

const availableSlots = this.concurrencyLimit - activeJobs.length;
logger.trace(`Available slots: ${availableSlots}`);

Expand All @@ -546,79 +663,9 @@ export class Queue extends EventEmitter<QueueEvents> {
);

// Start all the selected jobs concurrently
const promises = jobsToStart.map(async (job) => {
try {
this.changeJobStatus(job, JobStatus.Started);

const handler = this.handlers.getHandler(job.handlerName);

const result = await job.execute(handler);
logger.trace(`Job #${job.id} execution result: ${String(result)}`);

if (result && job.isRecurrent) {
// If the job is recurrent and the handler returned true, reschedule the job
if (!job.expired) {
logger.trace(`Job #${job.id} is done but new one is scheduled`);
this.changeJobStatus(job, JobStatus.Done);
setTimeout(() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
isRecurrent: job.isRecurrent,
recurrenceInterval: job.recurrenceInterval,
maxRecurrences: job.maxRecurrences,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
}, job.recurrenceInterval);
} else {
logger.trace(`Job #${job.id} is expired`);
this.changeJobStatus(job, JobStatus.Expired);
}
} else {
logger.trace(`Job #${job.id} is done`);
this.changeJobStatus(job, JobStatus.Done);
}
} catch (error) {
logger.error(`Job #${job.id} is errored`, error);
job.history.errors.push(String(error));

if (job.maxRetries > 0 && job.retries < job.maxRetries) {
// If the job hasn't reached the maximum number of retries, retry it
job.retries++;

if (job.retriesDelay > 0) {
logger.trace(`Job #${job.id} filed but scheduled for restart`);
this.changeJobStatus(job, JobStatus.Failed);
setTimeout(
() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
},
backoffWithJitter(
job.retriesDelay,
job.retries,
job.retriesDelay * 10,
),
);
} else {
logger.trace(`Job #${job.id} failed and immediately restarted`);
this.changeJobStatus(job, JobStatus.Pending);
}
} else {
logger.trace(`Job #${job.id} filed`);
this.changeJobStatus(job, JobStatus.Failed);
}
}
});

await Promise.allSettled(promises);
await Promise.allSettled(
jobsToStart.map(async (job) => this.executeJob(job)),
);

// After these jobs are done, check if there are any more jobs to process
logger.trace('Trying to restart queue');
Expand Down Expand Up @@ -650,6 +697,12 @@ export class Queue extends EventEmitter<QueueEvents> {
*/
add<T extends JobData = JobData>(config: JobConfig<T>): string {
const job = new Job<T>(config);

// In case of restored Scheduled jobs we need to bring them to Pending again
if (job.status === JobStatus.Scheduled) {
job.status = JobStatus.Pending;
}

this.jobs.push(job);
logger.trace('Job added:', job);
void this.storageUpdate(job.id, job);
Expand Down
72 changes: 72 additions & 0 deletions packages/queue/test/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,75 @@ describe('Queue', function () {
});
});
});

describe('Scheduled jobs', () => {
let queue: Queue;
let handler: JobHandler<JobData>;

beforeEach(() => {
queue = new Queue({ concurrencyLimit: 5 });
handler = async () => Promise.resolve(true);
queue.registerHandler('scheduledHandler', handler);
});

it('should correctly schedule a job for future execution', async function () {
const futureTime = Date.now() + 100;
const jobId = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTime,
});

expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Scheduled);

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 150);
});
});

it('should immediately execute a job scheduled for the past', async function () {
const pastTime = Date.now() - 1000;
const jobId = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: pastTime,
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 50);
});
});

it('should handle the execution of multiple scheduled jobs', async function () {
const futureTimeShort = Date.now() + 50;
const futureTimeLong = Date.now() + 200;

const jobIdShort = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTimeShort,
});
const jobIdLong = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTimeLong,
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobIdShort)?.status).to.equal(JobStatus.Done);
expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Scheduled);
resolve(true);
}, 100);
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 250);
});
});
});
Loading