diff --git a/analytics-service/src/analytics/report.service.ts b/analytics-service/src/analytics/report.service.ts index 16dfc7e7fc..e1165a6b03 100644 --- a/analytics-service/src/analytics/report.service.ts +++ b/analytics-service/src/analytics/report.service.ts @@ -1,4 +1,4 @@ -import { DatabaseServer, MessageAction, PinoLogger } from '@guardian/common'; +import { DatabaseServer, MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS, MessageAction, PinoLogger } from '@guardian/common'; import { GenerateUUIDv4 } from '@guardian/interfaces'; import JSZip from 'jszip'; import xl from 'excel4node'; @@ -218,36 +218,14 @@ export class ReportService { const schemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSchema }); const systemSchemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSystemSchema }); - const docByPolicy = await databaseServer.aggregate(Document, [ - { $match: { uuid, } }, - { - $group: { - _id: { - policyTopicId: '$policyTopicId', - type: '$type', - action: '$action' - }, count: { $sum: 1 } - } - } - ] as FilterObject[]); - - const docByInstance = await databaseServer.aggregate(Document, [ - { $match: { uuid } }, - { - $group: { - _id: { - instanceTopicId: '$instanceTopicId', - type: '$type', - action: '$action' - }, count: { $sum: 1 } - } - } - ] as FilterObject[]); + const docByPolicy = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_POLICY, uuid) as FilterObject[]) + + const docByInstance = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_INSTANCE, uuid) as FilterObject[]) - const docsGroups = await databaseServer.aggregate(Document, [ - { $match: { uuid } }, - { $group: { _id: { type: '$type', action: '$action' }, count: { $sum: 1 } } } - ] as FilterObject[]); + const docsGroups = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOCS_GROUPS, uuid) as FilterObject[]) const didCount = docsGroups .filter(g => g._id.type === DocumentType.DID && g._id.action !== MessageAction.RevokeDocument) @@ -482,18 +460,9 @@ export class ReportService { } }), size); - const schemasByName = await new DatabaseServer().aggregate(Document, [ - { $match: { uuid: report.uuid } }, - { - $group: { - _id: { - name: '$name', - action: '$action', - }, count: { $sum: 1 } - } - }, - { $sort: { count: -1 } } - ] as FilterObject[]); + const databaseServer = new DatabaseServer(); + + const schemasByName = await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.SCHEMA_BY_NAME, report.uuid) as FilterObject[]) const topAllSchemasByName = []; const topSystemSchemasByName = []; diff --git a/api-gateway/src/helpers/interceptors/cache.ts b/api-gateway/src/helpers/interceptors/cache.ts index 399a54f6ab..fcb2f49acc 100644 --- a/api-gateway/src/helpers/interceptors/cache.ts +++ b/api-gateway/src/helpers/interceptors/cache.ts @@ -48,8 +48,10 @@ export class CacheInterceptor implements NestInterceptor { if (cachedResponse) { let result = JSON.parse(cachedResponse); - if (typeof result === 'string') { - result = Buffer.from(result, 'base64'); + if (result.type === 'buffer') { + result = Buffer.from(result.data, 'base64'); + } else { + result = result.data; } return result; @@ -73,7 +75,11 @@ export class CacheInterceptor implements NestInterceptor { } if (Buffer.isBuffer(result)) { - result = result.toString('base64'); + result = { type: 'buffer', data: result.toString('base64') }; + } else if (typeof response === 'object') { + result = { type: 'json', data: result }; + } else { + result = { type: 'string', data: result }; } await this.cacheService.set(cacheKey, JSON.stringify(result), ttl, cacheTag); diff --git a/common/src/database-modules/database-server.ts b/common/src/database-modules/database-server.ts index 7f259f3fd4..1709227944 100644 --- a/common/src/database-modules/database-server.ts +++ b/common/src/database-modules/database-server.ts @@ -50,7 +50,7 @@ import { TopicType, } from '@guardian/interfaces'; import { BaseEntity } from '../models/index.js'; -import { DataBaseHelper } from '../helpers/index.js'; +import { DataBaseHelper, IGetDocumentAggregationFilters, MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS } from '../helpers/index.js'; import { Theme } from '../entity/theme.js'; import { GetConditionsPoliciesByCategories } from '../helpers/policy-category.js'; import { PolicyTool } from '../entity/tool.js'; @@ -281,20 +281,9 @@ export class DatabaseServer { */ public async aggregate(entityClass: new () => T, aggregation: FilterObject[]): Promise { if (this.dryRun) { - if (Array.isArray(aggregation)) { - aggregation.unshift({ - $match: { - dryRunId: this.dryRun, - dryRunClass: this.classMap.get(entityClass), - }, - } as FilterObject & { - $match?: { - dryRunId?: string; - dryRunClass?: string; - } - }); - } - return await new DataBaseHelper(DryRun).aggregate(aggregation) as unknown as T[]; + const dryRunClass = this.classMap.get(entityClass) + + return await new DataBaseHelper(DryRun).aggregateDryRan(aggregation, this.dryRun, dryRunClass) as unknown as T[]; } else { return await new DataBaseHelper(entityClass).aggregate(aggregation); } @@ -1070,6 +1059,50 @@ export class DatabaseServer { return await this.aggregate(ApprovalDocumentCollection, aggregation) as ApprovalDocumentCollection[]; } + /** + * get document aggregation filters + * @param props + * + * @returns Result + */ + public getDocumentAggregationFilters(props: IGetDocumentAggregationFilters): void { + return DataBaseHelper.getDocumentAggregationFilters(props); + } + + /** + * get document aggregation filters for analytics + * @param nameFilter + * @param uuid + * + * @returns Result + */ + public getAnalyticsDocAggregationFilters(nameFilter: string, uuid: string): unknown[] { + return DataBaseHelper.getAnalyticsDocAggregationFilters(nameFilter, uuid); + } + + /** + * get document aggregation filters for analytics + * @param nameFilterMap + * @param nameFilterAttributes + * @param existingAttributes + * + * @returns Result + */ + public getAttributesAggregationFilters(nameFilterMap: string, nameFilterAttributes: string, existingAttributes: string[] | []): unknown[] { + return DataBaseHelper.getAttributesAggregationFilters(nameFilterMap, nameFilterAttributes, existingAttributes); + } + + /** + * get tasks aggregation filters + * @param nameFilter + * @param processTimeout + * + * @returns Result + */ + public getTasksAggregationFilters(nameFilter: string, processTimeout: number): unknown[] { + return DataBaseHelper.getTasksAggregationFilters(nameFilter, processTimeout); + } + /** * Get Vc Documents * @param filters @@ -1879,21 +1912,25 @@ export class DatabaseServer { /** * Get transactions serials count * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * * @returns Serials count */ public async getTransactionsSerialsCount( mintRequestId: string, transferStatus?: MintTransactionStatus | any ): Promise { - const aggregation = this._getTransactionsSerialsAggregation( + const aggregation = DataBaseHelper._getTransactionsSerialsAggregation( mintRequestId, transferStatus ); - aggregation.push({ - $project: { - serials: { $size: '$serials' }, - }, + + DataBaseHelper.getTransactionsSerialsAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS.COUNT }); + const result: any = await this.aggregate(MintTransaction, aggregation); return result[0]?.serials || 0; } @@ -2050,61 +2087,18 @@ export class DatabaseServer { ]; } - /** - * Get aggregation filter for transactions serials - * @param mintRequestId Mint request identifier - * @returns Aggregation filter - */ - private _getTransactionsSerialsAggregation( - mintRequestId: string, - transferStatus?: MintTransactionStatus | any - ): any[] { - const match: any = { - mintRequestId, - }; - if (transferStatus) { - match.transferStatus = transferStatus; - } - const aggregation: any[] = [ - { - $match: match, - }, - { - $group: { - _id: 1, - serials: { - $push: '$serials', - }, - }, - }, - { - $project: { - serials: { - $reduce: { - input: '$serials', - initialValue: [], - in: { - $concatArrays: ['$$value', '$$this'], - }, - }, - }, - }, - }, - ]; - - return aggregation; - } - /** * Get transactions serials * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * * @returns Serials */ public async getTransactionsSerials( mintRequestId: string, transferStatus?: MintTransactionStatus | any ): Promise { - const aggregation = this._getTransactionsSerialsAggregation( + const aggregation = DataBaseHelper._getTransactionsSerialsAggregation( mintRequestId, transferStatus ); diff --git a/common/src/helpers/db-helper.ts b/common/src/helpers/db-helper.ts index 783163d3f8..7f9e04e844 100644 --- a/common/src/helpers/db-helper.ts +++ b/common/src/helpers/db-helper.ts @@ -5,6 +5,7 @@ import { DataBaseNamingStrategy } from './db-naming-strategy.js'; import { GridFSBucket } from 'mongodb'; import fixConnectionString from './fix-connection-string.js'; import type { FindOptions } from '@mikro-orm/core/drivers/IDatabaseDriver'; +import { MintTransactionStatus } from '@guardian/interfaces'; interface ICommonConnectionConfig { driver: typeof MongoDriver; @@ -13,6 +14,50 @@ interface ICommonConnectionConfig { clientUrl: string; entities: string[]; } +export interface IGetAggregationFilters { + aggregation: unknown[], + aggregateMethod: string, + nameFilter: string, +} + +export interface IGetDocumentAggregationFilters extends IGetAggregationFilters { + timelineLabelPath?: string, + timelineDescriptionPath?: string, + dryRun?: string, + sortObject?: Record, + itemsPerPage?: number, + page?: number, + policyId?: string, +} + +export const MAP_DOCUMENT_AGGREGATION_FILTERS = { + BASE: 'base', + HISTORY: 'history', + SORT: 'sort', + PAGINATION: 'pagination', + VC_DOCUMENTS: 'vc-documents', + VP_DOCUMENTS: 'vp-documents', + APPROVE: 'approve' +} + +export const MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS = { + DOC_BY_POLICY: 'doc_by_policy', + DOC_BY_INSTANCE: 'doc_by_instance', + DOCS_GROUPS: 'docs_groups', + SCHEMA_BY_NAME: 'schema_by_name', +} + +export const MAP_ATTRIBUTES_AGGREGATION_FILTERS = { + RESULT: 'result' +} + +export const MAP_TASKS_AGGREGATION_FILTERS = { + RESULT: 'result' +} + +export const MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS = { + COUNT: 'count' +} /** * Common connection config @@ -169,6 +214,326 @@ export class DataBaseHelper { return aggregateEntities; } + /** + * AggregateDryRun + * @param pipeline Pipeline + * @param dryRunId + * @param dryRunClass + * + * @returns Result + */ + @CreateRequestContext(() => DataBaseHelper.orm) + public async aggregateDryRan(pipeline: FilterObject[], dryRunId: string, dryRunClass: string): Promise { + if (Array.isArray(pipeline)) { + pipeline.unshift({ + $match: { + dryRunId, + dryRunClass, + }, + } as FilterObject & { + $match?: { + dryRunId?: string; + dryRunClass?: string; + } + }); + } + + return await this.aggregate(pipeline) + } + + /** + * get document aggregation filters + * @param props + * + * @returns Result + */ + public static getDocumentAggregationFilters(props: IGetDocumentAggregationFilters): void { + const { + aggregation, + aggregateMethod, + nameFilter, + timelineLabelPath, + timelineDescriptionPath, + dryRun, + sortObject, + itemsPerPage, + page, + policyId, + } = props; + + const filters = { + [MAP_DOCUMENT_AGGREGATION_FILTERS.BASE]: [ + { + $match: { + '__sourceTag__': { $ne: null }, + }, + }, { + $set: { + 'option': { + $cond: { + if: { + $or: [ + { $eq: [null, '$newOption'] }, + { $not: '$newOption' }, + ], + }, + then: '$option', + else: '$newOption', + }, + }, + }, + }, { + $unset: 'newOptions', + }, + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.HISTORY]: [ + { + $lookup: { + from: `${ + dryRun + ? 'dry_run' + : 'document_state' + }`, + localField: 'id', + foreignField: 'documentId', + pipeline: [ + { + $set: { + labelValue: timelineLabelPath + ? '$document.' + (timelineLabelPath || 'option.status') + : '$document.option.status', + comment: timelineDescriptionPath + ? '$document.' + (timelineDescriptionPath || 'option.comment') + : '$document.option.comment', + created: '$createDate', + }, + }, + ], + as: 'history', + }, + }, + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.SORT]: [ + { + $sort: sortObject + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.PAGINATION]: [ + { + $skip: itemsPerPage * page + }, + { + $limit: itemsPerPage + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.VC_DOCUMENTS]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.VP_DOCUMENTS]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.APPROVE]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + }; + + aggregation[aggregateMethod](...filters[nameFilter]); + } + + /** + * get document aggregation filters for analytics + * @param nameFilter + * @param uuid + * + * @returns Result + */ + public static getAnalyticsDocAggregationFilters(nameFilter: string, uuid: string): unknown[] { + const filters = { + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_POLICY]: [ + { $match: { uuid, } }, + { + $group: { + _id: { + policyTopicId: '$policyTopicId', + type: '$type', + action: '$action' + }, count: { $sum: 1 } + } + } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_INSTANCE]: [ + { $match: { uuid } }, + { + $group: { + _id: { + instanceTopicId: '$instanceTopicId', + type: '$type', + action: '$action' + }, count: { $sum: 1 } + } + } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOCS_GROUPS]: [ + { $match: { uuid } }, + { $group: { _id: { type: '$type', action: '$action' }, count: { $sum: 1 } } } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.SCHEMA_BY_NAME]: [ + { $match: { uuid } }, + { + $group: { + _id: { + name: '$name', + action: '$action', + }, count: { $sum: 1 } + } + }, + { $sort: { count: -1 } } + ] + }; + + return filters[nameFilter]; + } + + /** + * get attributes aggregation filters + * @param nameFilterMap + * @param nameFilterAttributes + * @param existingAttributes + * + * @returns Result + */ + public static getAttributesAggregationFilters(nameFilterMap: string, nameFilterAttributes: string, existingAttributes: string[] | []): unknown[] { + const filters = { + [MAP_ATTRIBUTES_AGGREGATION_FILTERS.RESULT]: [ + { $project: { attributes: '$attributes' } }, + { $unwind: { path: '$attributes' } }, + { $match: { attributes: { $regex: nameFilterAttributes, $options: 'i' } } }, + { $match: { attributes: { $not: { $in: existingAttributes } } } }, + { $group: { _id: null, uniqueValues: { $addToSet: '$attributes' } } }, + { $unwind: { path: '$uniqueValues' } }, + { $limit: 20 }, + { $group: { _id: null, uniqueValues: { $addToSet: '$uniqueValues' } } }, + ], + }; + + return filters[nameFilterMap]; + } + + /** + * get tasks aggregation filters + * @param nameFilter + * @param processTimeout + * + * @returns Result + */ + public static getTasksAggregationFilters(nameFilter: string, processTimeout: number): unknown[] { + const filters = { + [MAP_TASKS_AGGREGATION_FILTERS.RESULT]: [ + { + $match: { + sent: true, + done: { $ne: true }, + }, + }, + { + $addFields: { + timeDifference: { + $subtract: ['$processedTime', '$createDate'], + }, + }, + }, + { + $match: { + timeDifference: { $gt: processTimeout }, + }, + }, + ], + }; + + return filters[nameFilter]; + } + + /** + * get transactions serials aggregation filters + * @param props + * + * @returns Result + */ + public static getTransactionsSerialsAggregationFilters(props: IGetAggregationFilters): void { + const { aggregation, aggregateMethod, nameFilter } = props; + + const filters = { + [MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS.COUNT]: [ + { + $project: { + serials: { $size: '$serials' }, + }, + } + ], + }; + + aggregation[aggregateMethod](...filters[nameFilter]); + } + + /** + * Get aggregation filter for transactions serials + * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * + * @returns Aggregation filter + */ + public static _getTransactionsSerialsAggregation( + mintRequestId: string, + transferStatus?: MintTransactionStatus | unknown + ): unknown[] { + const match: any = { + mintRequestId, + }; + + if (transferStatus) { + match.transferStatus = transferStatus; + } + + return [ + { + $match: match, + }, + { + $group: { + _id: 1, + serials: { + $push: '$serials', + }, + }, + }, + { + $project: { + serials: { + $reduce: { + input: '$serials', + initialValue: [], + in: { + $concatArrays: ['$$value', '$$this'], + }, + }, + }, + }, + }, + ]; + } + /** * Find and count * @param filters Filters diff --git a/logger-service/src/api/logger.service.ts b/logger-service/src/api/logger.service.ts index 3453703c3b..b1c103c765 100644 --- a/logger-service/src/api/logger.service.ts +++ b/logger-service/src/api/logger.service.ts @@ -1,4 +1,4 @@ -import { LargePayloadContainer, MessageError, MessageResponse, Log, DatabaseServer } from '@guardian/common'; +import { LargePayloadContainer, MessageError, MessageResponse, Log, DatabaseServer, MAP_ATTRIBUTES_AGGREGATION_FILTERS } from '@guardian/common'; import { MessageAPI } from '@guardian/interfaces'; import { Controller, Module } from '@nestjs/common'; import { ClientsModule, Ctx, MessagePattern, NatsContext, Payload, Transport } from '@nestjs/microservices'; @@ -72,16 +72,10 @@ export class LoggerService { try { const nameFilter = `.*${msg.name || ''}.*`; const existingAttributes = msg.existingAttributes || []; - const aggregateAttrResult = await logRepository.aggregate(Log, [ - { $project: { attributes: '$attributes' } }, - { $unwind: { path: '$attributes' } }, - { $match: { attributes: { $regex: nameFilter, $options: 'i' } } }, - { $match: { attributes: { $not: { $in: existingAttributes } } } }, - { $group: { _id: null, uniqueValues: { $addToSet: '$attributes' } } }, - { $unwind: { path: '$uniqueValues' } }, - { $limit: 20 }, - { $group: { _id: null, uniqueValues: { $addToSet: '$uniqueValues' } } }, - ] as FilterObject[]); + + const aggregateAttrResult = + await logRepository.aggregate(Log, logRepository.getAttributesAggregationFilters(MAP_ATTRIBUTES_AGGREGATION_FILTERS.RESULT, nameFilter, existingAttributes) as FilterObject[]); + return new MessageResponse(aggregateAttrResult[0].uniqueValues?.sort() || []); } catch (error) { diff --git a/policy-service/src/policy-engine/blocks/documents-source.ts b/policy-service/src/policy-engine/blocks/documents-source.ts index ec506490d9..b6a02569bf 100644 --- a/policy-service/src/policy-engine/blocks/documents-source.ts +++ b/policy-service/src/policy-engine/blocks/documents-source.ts @@ -8,6 +8,7 @@ import { StateField } from '../helpers/decorators/index.js'; import { ExternalDocuments, ExternalEvent, ExternalEventType } from '../interfaces/external-event.js'; import ObjGet from 'lodash.get'; import { BlockActionError } from '../errors/index.js'; +import { MAP_DOCUMENT_AGGREGATION_FILTERS } from '@guardian/common'; /** * Document source block with UI @@ -286,59 +287,27 @@ export class InterfaceDocumentsSource { */ private async getDataByAggregationFilters(ref: IPolicySourceBlock, user: PolicyUser, sortState: any, paginationData: any, history? : IPolicyAddonBlock) { const filtersAndDataType = await ref.getGlobalSourcesFilters(user); - const aggregation = [...filtersAndDataType.filters, { - $match: { - '__sourceTag__': { $ne: null } - } - }, { - $set: { - 'option': { - $cond: { - if: { - $or: [ - { $eq: [null, '$newOption'] }, - { $not: '$newOption' } - ] - }, - then: '$option', - else: '$newOption' - } - } - } - }, { - $unset: 'newOptions', - }]; + + const aggregation = [...filtersAndDataType.filters] as unknown[]; + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.BASE + }); if (history) { - aggregation.push({ - $lookup: { - from: `${ - ref.databaseServer.getDryRun() - ? 'dry_run' - : 'document_state' - }`, - localField: 'id', - foreignField: 'documentId', - pipeline: [ - { - $set: { - labelValue: history - ? '$document.' + - (history.options.timelineLabelPath || - 'option.status') - : '$document.option.status', - comment: history - ? '$document.' + - (history.options - .timelineDescriptionPath || - 'option.comment') - : '$document.option.comment', - created: '$createDate', - }, - }, - ], - as: 'history', - }, + const dryRun = ref.databaseServer.getDryRun(); + + const { timelineLabelPath, timelineDescriptionPath } = history.options; + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.HISTORY, + timelineLabelPath, + timelineDescriptionPath, + dryRun, }); } @@ -355,47 +324,60 @@ export class InterfaceDocumentsSource { sortObject[sortState.orderField] = 1; break; } - aggregation.push({ - $sort: sortObject + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.SORT, + sortObject, }); } if (paginationData) { - aggregation.push({ - $skip: paginationData.itemsPerPage * paginationData.page - }, - { - $limit: paginationData.itemsPerPage + const { itemsPerPage, page } = paginationData; + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.PAGINATION, + itemsPerPage, + page }); } switch (filtersAndDataType.dataType) { case 'vc-documents': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.VC_DOCUMENTS, + policyId: ref.policyId, }); + return await ref.databaseServer.getVcDocumentsByAggregation(aggregation); case 'did-documents': return await ref.databaseServer.getDidDocumentsByAggregation(aggregation); case 'vp-documents': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.VP_DOCUMENTS, + policyId: ref.policyId, }); + const data = await ref.databaseServer.getVpDocumentsByAggregation(aggregation); for (const item of data as any[]) { [item.serials, item.amount, item.error, item.wasTransferNeeded, item.transferSerials, item.transferAmount, item.tokenIds] = await ref.databaseServer.getVPMintInformation(item); } return data; case 'approve': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.APPROVE, + policyId: ref.policyId, }); + return await ref.databaseServer.getApprovalDocumentsByAggregation(aggregation); default: return []; diff --git a/queue-service/src/queue-service/queue-service.ts b/queue-service/src/queue-service/queue-service.ts index 88dfa69578..35a2be3f2c 100644 --- a/queue-service/src/queue-service/queue-service.ts +++ b/queue-service/src/queue-service/queue-service.ts @@ -1,4 +1,4 @@ -import { DatabaseServer, MessageError, MessageResponse, NatsService, Singleton } from '@guardian/common'; +import { DatabaseServer, MAP_TASKS_AGGREGATION_FILTERS, MessageError, MessageResponse, NatsService, Singleton } from '@guardian/common'; import { GenerateUUIDv4, ITask, OrderDirection, QueueEvents, WorkerEvents } from '@guardian/interfaces'; import { FilterObject } from '@mikro-orm/core'; import { TaskEntity } from '../entity/task'; @@ -208,26 +208,8 @@ export class QueueService extends NatsService{ const dataBaseServer = new DatabaseServer(); - const tasks = await dataBaseServer.aggregate(TaskEntity, [ - { - $match: { - sent: true, - done: { $ne: true }, - }, - }, - { - $addFields: { - timeDifference: { - $subtract: ['$processedTime', '$createDate'], - }, - }, - }, - { - $match: { - timeDifference: { $gt: this.processTimeout }, - }, - }, - ] as FilterObject[]); + const tasks = + await dataBaseServer.aggregate(TaskEntity, dataBaseServer.getTasksAggregationFilters(MAP_TASKS_AGGREGATION_FILTERS.RESULT, this.processTimeout) as FilterObject[]); for (const task of tasks) { task.processedTime = null;