Skip to content

Commit

Permalink
auto import extra considerations (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
chavda-bhavik authored Oct 17, 2024
2 parents 066a29d + fb9eee4 commit d0f9ab1
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 65 deletions.
4 changes: 4 additions & 0 deletions apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ export class CreateUserJobDto {
@IsString()
@IsOptional()
extra?: string;

@IsString()
@IsOptional()
authHeaderValue?: string;
}
17 changes: 9 additions & 8 deletions apps/api/src/app/import-jobs/import-jobs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ import { ApiTags, ApiSecurity, ApiOperation } from '@nestjs/swagger';
import { Body, Controller, Delete, Get, Param, ParseArrayPipe, Post, Put, UseGuards } from '@nestjs/common';
import {
CreateUserJob,
GetColumnSchemaMapping,
CreateJobMapping,
UpdateUserJob,
CreateJobMapping,
GetColumnSchemaMapping,
GetUserJob,
UserJobPause,
UserJobDelete,
UserJobResume,
UserJobTerminate,
UserJobDelete,
} from './usecase';
import { ACCESS_KEY_NAME } from '@impler/shared';
import { JwtAuthGuard } from '@shared/framework/auth.gaurd';
import { UpdateJobDto, CreateUserJobDto, UpdateJobMappingDto } from './dtos';

@ApiTags('Import-Jobs')
@ApiTags('Import Jobs')
@Controller('/import-jobs')
@UseGuards(JwtAuthGuard)
@ApiSecurity(ACCESS_KEY_NAME)
Expand All @@ -35,12 +35,13 @@ export class ImportJobsController {
@Post(':templateId')
@ApiOperation({ summary: 'Create User-Job' })
@ApiSecurity(ACCESS_KEY_NAME)
async createUserJobRoute(@Param('templateId') templateId: string, @Body() createUserJobData: CreateUserJobDto) {
async createUserJobRoute(@Param('templateId') templateId: string, @Body() jobData: CreateUserJobDto) {
return this.createUserJob.execute({
_templateId: templateId,
url: createUserJobData.url,
extra: createUserJobData.extra,
externalUserId: createUserJobData.externalUserId,
url: jobData.url,
extra: jobData.extra,
externalUserId: jobData.externalUserId,
authHeaderValue: jobData.authHeaderValue,
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export class CreateUserJobCommand {
url: string;
extra?: string;
_templateId: string;
externalUserId?: string;
extra?: string;
authHeaderValue?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ export class CreateUserJob {
private readonly userJobRepository: UserJobRepository
) {}

async execute({ _templateId, url, externalUserId, extra }: CreateUserJobCommand): Promise<UserJobEntity> {
async execute({
url,
extra,
_templateId,
externalUserId,
authHeaderValue,
}: CreateUserJobCommand): Promise<UserJobEntity> {
const mimeType = await this.rssService.getMimeType(url);
if (mimeType === FileMimeTypesEnum.XML || mimeType === FileMimeTypesEnum.TEXTXML) {
const { rssKeyHeading } = await this.rssService.parseRssFeed(url);
Expand All @@ -23,9 +29,10 @@ export class CreateUserJob {

return await this.userJobRepository.create({
url,
extra,
authHeaderValue,
headings: rssKeyHeading,
_templateId: _templateId,
extra,
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ export class GetImportJobDataConsumer extends BaseConsumer {
const data = JSON.parse(message.content) as { _jobId: string };
const importJobHistoryId = this.commonRepository.generateMongoId().toString();
const importedData = await this.getJobImportedData(data._jobId);
const allDataFilePath = this.fileNameService.getAllJsonDataFilePath(importJobHistoryId);
await this.convertRecordsToJsonFile(importJobHistoryId, importedData);
await this.importJobHistoryRepository.create({
_id: importJobHistoryId,
_jobId: data._jobId,
allDataFilePath: this.fileNameService.getAllJsonDataFilePath(importJobHistoryId),
allDataFilePath,
status: ImportJobHistoryStatusEnum.PROCESSING,
});
const userJobInfo = await this.userJobRepository.getUserJobWithTemplate(data._jobId);
Expand All @@ -42,7 +43,7 @@ export class GetImportJobDataConsumer extends BaseConsumer {
});

if (webhookDestination?.callbackUrl) {
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, { importJobHistoryId });
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, { _jobId: data._jobId, allDataFilePath });
}

return;
Expand Down
2 changes: 1 addition & 1 deletion apps/queue-manager/src/consumers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export * from './send-webhook-data.consumer';
export * from './end-import.consumer';
export * from './send-bubble-data.consumer';
export * from './import-job-data.consumer';
export * from './get-import-job-data.consumer';
export * from './send-import-job-data.consumer';
79 changes: 33 additions & 46 deletions apps/queue-manager/src/consumers/send-import-job-data.consumer.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import {
TemplateRepository,
WebhookDestinationRepository,
ColumnRepository,
TemplateRepository,
ImportJobHistoryRepository,
UserJobEntity,
WebhookDestinationRepository,
WebhookLogEntity,
UploadRepository,
WebhookLogRepository,
UserJobRepository,
} from '@impler/dal';
import { StorageService } from '@impler/services';
import {
ColumnTypesEnum,
SendImportJobData,
SendImportJobCachedData,
replaceVariablesInObject,
FileEncodingsEnum,
QueuesEnum,
ColumnTypesEnum,
UploadStatusEnum,
FileEncodingsEnum,
ColumnDelimiterEnum,
} from '@impler/shared';

Expand All @@ -30,17 +27,15 @@ const DEFAULT_PAGE = 1;

export class SendImportJobDataConsumer extends BaseConsumer {
private columnRepository: ColumnRepository = new ColumnRepository();
private uploadRepository: UploadRepository = new UploadRepository();
private userJobRepository: UserJobRepository = new UserJobRepository();
private templateRepository: TemplateRepository = new TemplateRepository();
private webhookLogRepository: WebhookLogRepository = new WebhookLogRepository();
private importJobHistoryRepository: ImportJobHistoryRepository = new ImportJobHistoryRepository();
private webhookDestinationRepository: WebhookDestinationRepository = new WebhookDestinationRepository();
private storageService: StorageService = getStorageServiceClass();

async message(message: { content: string }) {
const data = JSON.parse(message.content) as SendImportJobData;
const cachedData = data.cache || (await this.getInitialCachedData(data.importJobHistoryId));
const cachedData = data.cache || (await this.getInitialCachedData(data._jobId, data.allDataFilePath));
let allDataJson: null | any[] = null;

if (cachedData && cachedData.callbackUrl) {
Expand All @@ -53,7 +48,7 @@ export class SendImportJobDataConsumer extends BaseConsumer {
}
const { sendData, page } = this.buildSendData({
data: allDataJson,
uploadId: data.importJobHistoryId,
uploadId: data._jobId,
chunkSize: cachedData.chunkSize,
recordFormat: cachedData.recordFormat,
chunkFormat: cachedData.chunkFormat,
Expand All @@ -67,14 +62,14 @@ export class SendImportJobDataConsumer extends BaseConsumer {

const response = await this.makeApiCall({
data: sendData,
uploadId: data.importJobHistoryId,
uploadId: data._jobId,
page,
method: 'POST',
url: cachedData.callbackUrl,
headers,
});

this.makeResponseEntry(response);
await this.makeResponseEntry(response);

const nextPageNumber = this.getNextPageNumber({
totalRecords: allDataJson.length,
Expand All @@ -85,15 +80,16 @@ export class SendImportJobDataConsumer extends BaseConsumer {
if (nextPageNumber) {
// Make next call
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, {
importJobHistoryId: data.importJobHistoryId,
_jobId: data._jobId,
allDataFilePath: data.allDataFilePath,
cache: {
...cachedData,
page: nextPageNumber,
},
});
} as SendImportJobData);
} else {
// Processing is done
this.finalizeUpload(data.importJobHistoryId);
this.finalizeUpload(data._jobId);
}
}
}
Expand All @@ -115,10 +111,10 @@ export class SendImportJobDataConsumer extends BaseConsumer {
Math.min(page * chunkSize, data.length)
);

if (multiSelectHeadings && Object.keys(multiSelectHeadings).length > 0) {
if (Array.isArray(multiSelectHeadings) && multiSelectHeadings.length > 0) {
slicedData = slicedData.map((obj) => {
Object.keys(multiSelectHeadings).forEach((heading) => {
obj.record[heading] = obj.record[heading] ? obj.record[heading].split(multiSelectHeadings[heading]) : [];
multiSelectHeadings.forEach((heading) => {
obj[heading] = obj[heading] ? (Array.isArray(obj[heading]) ? obj[heading] : obj[heading].split(',')) : [];
});

return obj;
Expand All @@ -144,21 +140,16 @@ export class SendImportJobDataConsumer extends BaseConsumer {
};
}

private async getInitialCachedData(_importJobHistoryId: string): Promise<SendImportJobCachedData> {
const importJobHistory = await this.importJobHistoryRepository.getHistoryWithJob(_importJobHistoryId, [
'_templateId',
]);
const userJobEmail = await this.userJobRepository.getUserEmailFromJobId(importJobHistory._jobId);
private async getInitialCachedData(_jobId: string, allDataFilePath: string): Promise<SendImportJobCachedData> {
const userJob = await this.userJobRepository.findById(_jobId);
const userJobEmail = await this.userJobRepository.getUserEmailFromJobId(_jobId);

const columns = await this.columnRepository.find({
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
_templateId: userJob._templateId,
});
const templateData = await this.templateRepository.findById(
(importJobHistory._jobId as unknown as UserJobEntity)._templateId,
'name _projectId code'
);
const templateData = await this.templateRepository.findById(userJob._templateId, 'name _projectId code');
const webhookDestination = await this.webhookDestinationRepository.findOne({
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
_templateId: userJob._templateId,
});

if (!webhookDestination || !webhookDestination.callbackUrl) {
Expand All @@ -175,33 +166,29 @@ export class SendImportJobDataConsumer extends BaseConsumer {
});
}

this.importJobHistoryRepository.create({
_jobId: importJobHistory._id,
});

return {
page: 1,
allDataFilePath,
email: userJobEmail,
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
multiSelectHeadings,
extra: userJob.extra,
name: templateData.name,
_templateId: userJob._templateId,
authHeaderValue: userJob.authHeaderValue,
callbackUrl: webhookDestination?.callbackUrl,
chunkSize: webhookDestination?.chunkSize,
name: templateData.name,
page: 1,
extra: (importJobHistory._jobId as unknown as UserJobEntity).extra,
authHeaderName: webhookDestination.authHeaderName,
authHeaderValue: '',
allDataFilePath: importJobHistory.allDataFilePath,
defaultValues: JSON.stringify(defaultValueObj),
recordFormat: (importJobHistory._jobId as unknown as UserJobEntity).customRecordFormat,
chunkFormat: (importJobHistory._jobId as unknown as UserJobEntity).customChunkFormat,
multiSelectHeadings,
recordFormat: userJob.customRecordFormat,
chunkFormat: userJob.customChunkFormat,
};
}

private async makeResponseEntry(data: Partial<WebhookLogEntity>) {
return await this.webhookLogRepository.create(data);
return this.importJobHistoryRepository.create(data);
}

private async finalizeUpload(importJobHistoryId: string) {
return await this.uploadRepository.update({ _id: importJobHistoryId }, { status: UploadStatusEnum.COMPLETED });
private async finalizeUpload(_jobId: string) {
return await this.userJobRepository.update({ _id: _jobId }, { status: UploadStatusEnum.COMPLETED });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { useMutation } from '@tanstack/react-query';
import { useForm, SubmitHandler } from 'react-hook-form';

import { notifier } from '@util';
import { IAutoImportValues } from '@types';
import { useAppState } from '@store/app.context';
import { useAPIState } from '@store/api.context';
import { useJobsInfo } from '@store/jobinfo.context';
import { useImplerState } from '@store/impler.context';
import { IUserJob, IErrorObject } from '@impler/shared';
import { IAutoImportValues } from '@types';
import { useAppState } from '@store/app.context';

interface IUseAutoImportPhase1Props {
goNext: () => void;
Expand Down
2 changes: 2 additions & 0 deletions libs/dal/src/repositories/user-job/user-job.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class UserJobEntity {

status: string;

authHeaderValue: string;

customRecordFormat: string;

customChunkFormat: string;
Expand Down
3 changes: 3 additions & 0 deletions libs/dal/src/repositories/user-job/user-job.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const userJobSchema = new Schema(
externalUserId: {
type: Schema.Types.String,
},
authHeaderValue: {
type: Schema.Types.String,
},
status: {
type: Schema.Types.String,
},
Expand Down
4 changes: 2 additions & 2 deletions libs/shared/src/entities/UserJob/Userjob.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export interface IUserJob {
_id: string;
url: string;
_templateId: string;
headings: string[];
cron: string;
headings: string[];
_templateId: string;
}
3 changes: 2 additions & 1 deletion libs/shared/src/types/upload/upload.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ export type SendWebhookData = {
};

export type SendImportJobData = {
importJobHistoryId: string;
_jobId: string;
allDataFilePath: string;
cache?: SendImportJobCachedData;
};

Expand Down

0 comments on commit d0f9ab1

Please sign in to comment.