Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Job Launcher] Cancel job #805

Merged
merged 12 commits into from
Aug 29, 2023
12 changes: 8 additions & 4 deletions packages/apps/job-launcher/server/src/common/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ export const ConfigNames = {
JOB_LAUNCHER_FEE: 'JOB_LAUNCHER_FEE',
RECORDING_ORACLE_FEE: 'RECORDING_ORACLE_FEE',
REPUTATION_ORACLE_FEE: 'REPUTATION_ORACLE_FEE',
EXCHANGE_ORACLE_ADDRESS: 'EXCHANGE_ORACLE_ADDRESS',
EXCHANGE_ORACLE_WEBHOOK_URL: 'EXCHANGE_ORACLE_WEBHOOK_URL',
FORTUNE_EXCHANGE_ORACLE_ADDRESS: 'FORTUNE_EXCHANGE_ORACLE_ADDRESS',
CVAT_EXCHANGE_ORACLE_ADDRESS: 'CVAT_EXCHANGE_ORACLE_ADDRESS',
FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL: 'FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL',
CVAT_EXCHANGE_ORACLE_WEBHOOK_URL: 'CVAT_EXCHANGE_ORACLE_WEBHOOK_URL',
RECORDING_ORACLE_ADDRESS: 'RECORDING_ORACLE_ADDRESS',
REPUTATION_ORACLE_ADDRESS: 'REPUTATION_ORACLE_ADDRESS',
S3_ENDPOINT: 'S3_ENDPOINT',
Expand Down Expand Up @@ -72,8 +74,10 @@ export const envValidator = Joi.object({
JOB_LAUNCHER_FEE: Joi.string().default(10),
RECORDING_ORACLE_FEE: Joi.string().default(10),
REPUTATION_ORACLE_FEE: Joi.string().default(10),
EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3005'),
FORTUNE_EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
CVAT_EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3004'),
CVAT_EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3005'),
RECORDING_ORACLE_ADDRESS: Joi.string().required(),
REPUTATION_ORACLE_ADDRESS: Joi.string().required(),
// S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export enum ErrorEscrow {
NotFound = 'Escrow not found',
NotCreated = 'Escrow has not been created',
NotLaunched = 'Escrow has not been launched',
InvalidStatusCancellation = 'Escrow has an invalid status for cancellation',
InvalidBalanceCancellation = 'Escrow has an invalid balance for cancellation'
}

/**
Expand Down
8 changes: 5 additions & 3 deletions packages/apps/job-launcher/server/src/common/enums/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ export enum JobStatus {
PENDING = 'PENDING',
PAID = 'PAID',
LAUNCHED = 'LAUNCHED',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
TO_CANCEL = 'TO_CANCEL',
CANCELED = 'CANCELED',
}

export enum JobStatusFilter {
PENDING = 'PENDING',
PAID = 'PAID',
LAUNCHED = 'LAUNCHED',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
TO_CANCEL = 'TO_CANCEL',
CANCELED = 'CANCELED',
}

export enum JobRequestType {
Expand Down
4 changes: 4 additions & 0 deletions packages/apps/job-launcher/server/src/common/enums/webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum EventType {
ESCROW_CREATED = 'escrow_created',
TASK_CREATION_FAILED = 'task_creation_failed',
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import { ConfigNames } from '../common/config';
keepConnectionAlive:
configService.get<string>(ConfigNames.NODE_ENV) === 'test',
migrationsRun: false,
ssl: configService.get<boolean>(ConfigNames.POSTGRES_SSL, false),
ssl: configService.get<string>(ConfigNames.POSTGRES_SSL)!.toLowerCase() === 'true',
};
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export class InitialMigration1691485394906 implements MigrationInterface {
"source" "hmt"."payments_source_enum" NOT NULL,
"status" "hmt"."payments_status_enum" NOT NULL,
"user_id" integer NOT NULL,
"job_id" integer,
CONSTRAINT "REL_f83af8ea8055b85bde0e095e40" UNIQUE ("job_id"),
CONSTRAINT "PK_197ab7af18c93fbb0c9b28b4a59" PRIMARY KEY ("id")
)
`);
Expand All @@ -51,8 +53,9 @@ export class InitialMigration1691485394906 implements MigrationInterface {
'PENDING',
'PAID',
'LAUNCHED',
'COMPLETED',
'FAILED'
'FAILED',
'TO_CANCEL',
'CANCELED'
)
`);
await queryRunner.query(`
Expand Down Expand Up @@ -131,6 +134,10 @@ export class InitialMigration1691485394906 implements MigrationInterface {
ALTER TABLE "hmt"."jobs"
ADD CONSTRAINT "FK_9027c8f0ba75fbc1ac46647d043" FOREIGN KEY ("user_id") REFERENCES "hmt"."users"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments"
ADD CONSTRAINT "FK_f83af8ea8055b85bde0e095e400" FOREIGN KEY ("job_id") REFERENCES "hmt"."jobs"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
`);
await queryRunner.query(`
ALTER TABLE "hmt"."tokens"
ADD CONSTRAINT "FK_8769073e38c365f315426554ca5" FOREIGN KEY ("user_id") REFERENCES "hmt"."users"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
Expand All @@ -151,6 +158,9 @@ export class InitialMigration1691485394906 implements MigrationInterface {
await queryRunner.query(`
ALTER TABLE "hmt"."jobs" DROP CONSTRAINT "FK_9027c8f0ba75fbc1ac46647d043"
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments" DROP CONSTRAINT "FK_f83af8ea8055b85bde0e095e400"
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments" DROP CONSTRAINT "FK_427785468fb7d2733f59e7d7d39"
`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
Controller,
DefaultValuePipe,
Get,
Param,
Patch,
Post,
Query,
Request,
Expand All @@ -11,7 +13,7 @@ import {
import { ApiBearerAuth, ApiQuery, ApiTags } from '@nestjs/swagger';
import { JwtAuthGuard } from '../../common/guards';
import { RequestWithUser } from '../../common/types';
import { JobFortuneDto, JobCvatDto, JobListDto } from './job.dto';
import { JobFortuneDto, JobCvatDto, JobListDto, JobCancelDto } from './job.dto';
import { JobService } from './job.service';
import { JobRequestType, JobStatusFilter } from '../../common/enums/job';
import { Public } from '../../common/decorators';
Expand Down Expand Up @@ -65,4 +67,12 @@ export class JobController {
public async launchCronJob(): Promise<any> {
return this.jobService.launchCronJob();
}

@Patch('/cancel/:id')
public async cancelJob(
@Request() req: RequestWithUser,
@Param() params: JobCancelDto,
): Promise<boolean> {
return this.jobService.requestToCancelJob(req.user.id, params.id);
}
}
67 changes: 67 additions & 0 deletions packages/apps/job-launcher/server/src/modules/job/job.cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { LessThanOrEqual, Repository } from 'typeorm';
import { SortDirection } from '../../common/enums/collection';
import { JOB_RETRIES_COUNT_THRESHOLD } from '../../common/constants';
import { JobStatus } from '../../common/enums/job';
import { JobEntity } from './job.entity';
import { JobService } from './job.service';

@Injectable()
export class JobCron {
private readonly logger = new Logger(JobCron.name);

constructor(
private readonly jobService: JobService,
@InjectRepository(JobEntity)
private readonly jobEntityRepository: Repository<JobEntity>,
) {}

@Cron(CronExpression.EVERY_10_SECONDS)
public async launchJob() {
try {
// TODO: Add retry policy and process failure requests https://github.com/humanprotocol/human-protocol/issues/334
const jobEntity = await this.jobEntityRepository.findOne({
where: {
status: JobStatus.PAID,
retriesCount: LessThanOrEqual(JOB_RETRIES_COUNT_THRESHOLD),
waitUntil: LessThanOrEqual(new Date()),
},
order: {
waitUntil: SortDirection.ASC,
},
});

if (!jobEntity) return;

await this.jobService.launchJob(jobEntity);
} catch (e) {
this.logger.error(e);
return;
}
}

@Cron(CronExpression.EVERY_10_SECONDS)
public async cancelJob() {
try {
const jobEntity = await this.jobEntityRepository.findOne({
where: {
status: JobStatus.TO_CANCEL,
retriesCount: LessThanOrEqual(JOB_RETRIES_COUNT_THRESHOLD),
waitUntil: LessThanOrEqual(new Date()),
},
order: {
waitUntil: SortDirection.ASC,
},
});

if (!jobEntity) return;

await this.jobService.cancelJob(jobEntity);
} catch (e) {
this.logger.error(e);
return;
}
}
}
9 changes: 9 additions & 0 deletions packages/apps/job-launcher/server/src/modules/job/job.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import {
IsDate,
IsOptional,
IsObject,
IsNumberString
} from 'class-validator';
import { ChainId } from '@human-protocol/sdk';
import {
JobRequestType,
JobStatus,
JobStatusFilter,
} from '../../common/enums/job';
import { EventType } from '../../common/enums/webhook';

export class JobCreateDto {
public chainId: ChainId;
Expand Down Expand Up @@ -82,6 +84,12 @@ export class JobCvatDto extends JobDto {
public jobBounty: string;
}

export class JobCancelDto {
@ApiProperty()
@IsNumberString()
public id: number;
}

export class JobUpdateDto {
@ApiPropertyOptional({
enum: JobStatus,
Expand All @@ -106,6 +114,7 @@ export class SaveManifestDto {
export class SendWebhookDto {
public escrowAddress: string;
public chainId: number;
public eventType: EventType;
}

export class FortuneManifestDto {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Column, Entity, Index, ManyToOne } from 'typeorm';
import { Column, Entity, Index, ManyToOne, OneToOne } from 'typeorm';

import { NS } from '../../common/constants';
import { IJob } from '../../common/interfaces';
import { JobStatus } from '../../common/enums/job';
import { BaseEntity } from '../../database/base.entity';
import { UserEntity } from '../user/user.entity';
import { PaymentEntity } from '../payment/payment.entity';

@Entity({ schema: NS, name: 'jobs' })
@Index(['chainId', 'escrowAddress'], { unique: true })
Expand Down Expand Up @@ -39,6 +40,9 @@ export class JobEntity extends BaseEntity implements IJob {
@Column({ type: 'int' })
public userId: number;

@OneToOne(() => PaymentEntity, (payment) => payment.job)
public payment: PaymentEntity;

@Column({ type: 'int', default: 0 })
public retriesCount: number;

Expand Down
Loading