From 3eb4e7c6f6b084a1a6fa5449577df4d2ca94ceaf Mon Sep 17 00:00:00 2001 From: Ihar Date: Sat, 17 Aug 2024 14:15:51 +0500 Subject: [PATCH 1/9] fix: get cache from base64 --- api-gateway/src/helpers/interceptors/cache.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api-gateway/src/helpers/interceptors/cache.ts b/api-gateway/src/helpers/interceptors/cache.ts index 399a54f6ab..3c14071bc4 100644 --- a/api-gateway/src/helpers/interceptors/cache.ts +++ b/api-gateway/src/helpers/interceptors/cache.ts @@ -12,6 +12,14 @@ import { getCacheKey } from './utils/index.js'; //constants import { CACHE, CACHE_PREFIXES, META_DATA } from '#constants'; +function isBase64(str: string): boolean { + try { + return Buffer.from(str, 'base64').toString('base64') === str; + } catch (err) { + return false; + } +} + @Injectable() export class CacheInterceptor implements NestInterceptor { constructor(private readonly cacheService: CacheService) { @@ -48,7 +56,7 @@ export class CacheInterceptor implements NestInterceptor { if (cachedResponse) { let result = JSON.parse(cachedResponse); - if (typeof result === 'string') { + if (typeof result === 'string' && isBase64(result)) { result = Buffer.from(result, 'base64'); } From f1e2d7da040048b5f9fe31eccc01b53c8c948e69 Mon Sep 17 00:00:00 2001 From: Ihar Date: Sat, 17 Aug 2024 16:35:24 +0500 Subject: [PATCH 2/9] fix: devide cases responce bufer and base64 in cache --- api-gateway/src/helpers/interceptors/cache.ts | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/api-gateway/src/helpers/interceptors/cache.ts b/api-gateway/src/helpers/interceptors/cache.ts index 3c14071bc4..fcb2f49acc 100644 --- a/api-gateway/src/helpers/interceptors/cache.ts +++ b/api-gateway/src/helpers/interceptors/cache.ts @@ -12,14 +12,6 @@ import { getCacheKey } from './utils/index.js'; //constants import { CACHE, CACHE_PREFIXES, META_DATA } from '#constants'; -function isBase64(str: string): boolean { - try { - return Buffer.from(str, 'base64').toString('base64') === str; - } catch (err) { - return false; - } -} - @Injectable() export class CacheInterceptor implements NestInterceptor { constructor(private readonly cacheService: CacheService) { @@ -56,8 +48,10 @@ export class CacheInterceptor implements NestInterceptor { if (cachedResponse) { let result = JSON.parse(cachedResponse); - if (typeof result === 'string' && isBase64(result)) { - result = Buffer.from(result, 'base64'); + if (result.type === 'buffer') { + result = Buffer.from(result.data, 'base64'); + } else { + result = result.data; } return result; @@ -81,7 +75,11 @@ export class CacheInterceptor implements NestInterceptor { } if (Buffer.isBuffer(result)) { - result = result.toString('base64'); + result = { type: 'buffer', data: result.toString('base64') }; + } else if (typeof response === 'object') { + result = { type: 'json', data: result }; + } else { + result = { type: 'string', data: result }; } await this.cacheService.set(cacheKey, JSON.stringify(result), ttl, cacheTag); From 9ac54de621a4e509f14a7520b19f14285b7d8b9f Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 16:47:42 +0500 Subject: [PATCH 3/9] feat: move aggregation filters to bd helper[3966] --- .../src/analytics/report.service.ts | 53 ++++--------------- 1 file changed, 11 insertions(+), 42 deletions(-) diff --git a/analytics-service/src/analytics/report.service.ts b/analytics-service/src/analytics/report.service.ts index 16dfc7e7fc..e1165a6b03 100644 --- a/analytics-service/src/analytics/report.service.ts +++ b/analytics-service/src/analytics/report.service.ts @@ -1,4 +1,4 @@ -import { DatabaseServer, MessageAction, PinoLogger } from '@guardian/common'; +import { DatabaseServer, MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS, MessageAction, PinoLogger } from '@guardian/common'; import { GenerateUUIDv4 } from '@guardian/interfaces'; import JSZip from 'jszip'; import xl from 'excel4node'; @@ -218,36 +218,14 @@ export class ReportService { const schemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSchema }); const systemSchemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSystemSchema }); - const docByPolicy = await databaseServer.aggregate(Document, [ - { $match: { uuid, } }, - { - $group: { - _id: { - policyTopicId: '$policyTopicId', - type: '$type', - action: '$action' - }, count: { $sum: 1 } - } - } - ] as FilterObject[]); - - const docByInstance = await databaseServer.aggregate(Document, [ - { $match: { uuid } }, - { - $group: { - _id: { - instanceTopicId: '$instanceTopicId', - type: '$type', - action: '$action' - }, count: { $sum: 1 } - } - } - ] as FilterObject[]); + const docByPolicy = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_POLICY, uuid) as FilterObject[]) + + const docByInstance = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_INSTANCE, uuid) as FilterObject[]) - const docsGroups = await databaseServer.aggregate(Document, [ - { $match: { uuid } }, - { $group: { _id: { type: '$type', action: '$action' }, count: { $sum: 1 } } } - ] as FilterObject[]); + const docsGroups = + await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOCS_GROUPS, uuid) as FilterObject[]) const didCount = docsGroups .filter(g => g._id.type === DocumentType.DID && g._id.action !== MessageAction.RevokeDocument) @@ -482,18 +460,9 @@ export class ReportService { } }), size); - const schemasByName = await new DatabaseServer().aggregate(Document, [ - { $match: { uuid: report.uuid } }, - { - $group: { - _id: { - name: '$name', - action: '$action', - }, count: { $sum: 1 } - } - }, - { $sort: { count: -1 } } - ] as FilterObject[]); + const databaseServer = new DatabaseServer(); + + const schemasByName = await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.SCHEMA_BY_NAME, report.uuid) as FilterObject[]) const topAllSchemasByName = []; const topSystemSchemasByName = []; From 5680f5b3aa1862919f72dc59afb984a1c1b386ee Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 16:48:17 +0500 Subject: [PATCH 4/9] feat: move aggregation filters to bd helper for logger service[3966] --- logger-service/src/api/logger.service.ts | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/logger-service/src/api/logger.service.ts b/logger-service/src/api/logger.service.ts index 3453703c3b..b1c103c765 100644 --- a/logger-service/src/api/logger.service.ts +++ b/logger-service/src/api/logger.service.ts @@ -1,4 +1,4 @@ -import { LargePayloadContainer, MessageError, MessageResponse, Log, DatabaseServer } from '@guardian/common'; +import { LargePayloadContainer, MessageError, MessageResponse, Log, DatabaseServer, MAP_ATTRIBUTES_AGGREGATION_FILTERS } from '@guardian/common'; import { MessageAPI } from '@guardian/interfaces'; import { Controller, Module } from '@nestjs/common'; import { ClientsModule, Ctx, MessagePattern, NatsContext, Payload, Transport } from '@nestjs/microservices'; @@ -72,16 +72,10 @@ export class LoggerService { try { const nameFilter = `.*${msg.name || ''}.*`; const existingAttributes = msg.existingAttributes || []; - const aggregateAttrResult = await logRepository.aggregate(Log, [ - { $project: { attributes: '$attributes' } }, - { $unwind: { path: '$attributes' } }, - { $match: { attributes: { $regex: nameFilter, $options: 'i' } } }, - { $match: { attributes: { $not: { $in: existingAttributes } } } }, - { $group: { _id: null, uniqueValues: { $addToSet: '$attributes' } } }, - { $unwind: { path: '$uniqueValues' } }, - { $limit: 20 }, - { $group: { _id: null, uniqueValues: { $addToSet: '$uniqueValues' } } }, - ] as FilterObject[]); + + const aggregateAttrResult = + await logRepository.aggregate(Log, logRepository.getAttributesAggregationFilters(MAP_ATTRIBUTES_AGGREGATION_FILTERS.RESULT, nameFilter, existingAttributes) as FilterObject[]); + return new MessageResponse(aggregateAttrResult[0].uniqueValues?.sort() || []); } catch (error) { From e01cf6c5715d2f0f917d97fd24326f136541cfb1 Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 16:49:33 +0500 Subject: [PATCH 5/9] feat: move aggregation filters to bd helper for policy service[3966] --- .../policy-engine/blocks/documents-source.ts | 120 ++++++++---------- 1 file changed, 50 insertions(+), 70 deletions(-) diff --git a/policy-service/src/policy-engine/blocks/documents-source.ts b/policy-service/src/policy-engine/blocks/documents-source.ts index ec506490d9..68fbf4ea8c 100644 --- a/policy-service/src/policy-engine/blocks/documents-source.ts +++ b/policy-service/src/policy-engine/blocks/documents-source.ts @@ -8,6 +8,7 @@ import { StateField } from '../helpers/decorators/index.js'; import { ExternalDocuments, ExternalEvent, ExternalEventType } from '../interfaces/external-event.js'; import ObjGet from 'lodash.get'; import { BlockActionError } from '../errors/index.js'; +import { MAP_DOCUMENT_AGGREGATION_FILTERS } from '@guardian/common'; /** * Document source block with UI @@ -286,59 +287,25 @@ export class InterfaceDocumentsSource { */ private async getDataByAggregationFilters(ref: IPolicySourceBlock, user: PolicyUser, sortState: any, paginationData: any, history? : IPolicyAddonBlock) { const filtersAndDataType = await ref.getGlobalSourcesFilters(user); - const aggregation = [...filtersAndDataType.filters, { - $match: { - '__sourceTag__': { $ne: null } - } - }, { - $set: { - 'option': { - $cond: { - if: { - $or: [ - { $eq: [null, '$newOption'] }, - { $not: '$newOption' } - ] - }, - then: '$option', - else: '$newOption' - } - } - } - }, { - $unset: 'newOptions', - }]; + let aggregation = [...filtersAndDataType.filters] as unknown[]; + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.BASE + }); if (history) { - aggregation.push({ - $lookup: { - from: `${ - ref.databaseServer.getDryRun() - ? 'dry_run' - : 'document_state' - }`, - localField: 'id', - foreignField: 'documentId', - pipeline: [ - { - $set: { - labelValue: history - ? '$document.' + - (history.options.timelineLabelPath || - 'option.status') - : '$document.option.status', - comment: history - ? '$document.' + - (history.options - .timelineDescriptionPath || - 'option.comment') - : '$document.option.comment', - created: '$createDate', - }, - }, - ], - as: 'history', - }, + const dryRun = ref.databaseServer.getDryRun(); + + const { timelineLabelPath, timelineDescriptionPath } = history.options; + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.HISTORY, + timelineLabelPath, + timelineDescriptionPath, + dryRun, }); } @@ -355,47 +322,60 @@ export class InterfaceDocumentsSource { sortObject[sortState.orderField] = 1; break; } - aggregation.push({ - $sort: sortObject + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.SORT, + sortObject, }); } if (paginationData) { - aggregation.push({ - $skip: paginationData.itemsPerPage * paginationData.page - }, - { - $limit: paginationData.itemsPerPage + const { itemsPerPage, page } = paginationData; + + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.PAGINATION, + itemsPerPage, + page }); } switch (filtersAndDataType.dataType) { case 'vc-documents': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.VC_DOCUMENTS, + policyId: ref.policyId, }); + return await ref.databaseServer.getVcDocumentsByAggregation(aggregation); case 'did-documents': return await ref.databaseServer.getDidDocumentsByAggregation(aggregation); case 'vp-documents': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.VP_DOCUMENTS, + policyId: ref.policyId, }); + const data = await ref.databaseServer.getVpDocumentsByAggregation(aggregation); for (const item of data as any[]) { [item.serials, item.amount, item.error, item.wasTransferNeeded, item.transferSerials, item.transferAmount, item.tokenIds] = await ref.databaseServer.getVPMintInformation(item); } return data; case 'approve': - aggregation.unshift({ - $match: { - policyId: { $eq: ref.policyId } - } + ref.databaseServer.getDocumentAggregationFilters({ + aggregation, + aggregateMethod: 'unshift', + nameFilter: MAP_DOCUMENT_AGGREGATION_FILTERS.APPROVE, + policyId: ref.policyId, }); + return await ref.databaseServer.getApprovalDocumentsByAggregation(aggregation); default: return []; From 863bf7b223aeec2aba9ea0832be006e64c9bbadc Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 16:50:06 +0500 Subject: [PATCH 6/9] feat: move aggregation filters to bd helper for queue service[3966] --- .../src/queue-service/queue-service.ts | 24 +++---------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/queue-service/src/queue-service/queue-service.ts b/queue-service/src/queue-service/queue-service.ts index 88dfa69578..35a2be3f2c 100644 --- a/queue-service/src/queue-service/queue-service.ts +++ b/queue-service/src/queue-service/queue-service.ts @@ -1,4 +1,4 @@ -import { DatabaseServer, MessageError, MessageResponse, NatsService, Singleton } from '@guardian/common'; +import { DatabaseServer, MAP_TASKS_AGGREGATION_FILTERS, MessageError, MessageResponse, NatsService, Singleton } from '@guardian/common'; import { GenerateUUIDv4, ITask, OrderDirection, QueueEvents, WorkerEvents } from '@guardian/interfaces'; import { FilterObject } from '@mikro-orm/core'; import { TaskEntity } from '../entity/task'; @@ -208,26 +208,8 @@ export class QueueService extends NatsService{ const dataBaseServer = new DatabaseServer(); - const tasks = await dataBaseServer.aggregate(TaskEntity, [ - { - $match: { - sent: true, - done: { $ne: true }, - }, - }, - { - $addFields: { - timeDifference: { - $subtract: ['$processedTime', '$createDate'], - }, - }, - }, - { - $match: { - timeDifference: { $gt: this.processTimeout }, - }, - }, - ] as FilterObject[]); + const tasks = + await dataBaseServer.aggregate(TaskEntity, dataBaseServer.getTasksAggregationFilters(MAP_TASKS_AGGREGATION_FILTERS.RESULT, this.processTimeout) as FilterObject[]); for (const task of tasks) { task.processedTime = null; From fbaa21b7f531e39ef73f693cdc1f3ed2f7dc80d8 Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 16:50:27 +0500 Subject: [PATCH 7/9] feat: move aggregation filters to bd helper and server[3966] --- .../src/database-modules/database-server.ts | 126 +++--- common/src/helpers/db-helper.ts | 365 ++++++++++++++++++ .../policy-engine/blocks/documents-source.ts | 4 +- 3 files changed, 428 insertions(+), 67 deletions(-) diff --git a/common/src/database-modules/database-server.ts b/common/src/database-modules/database-server.ts index 7f259f3fd4..1709227944 100644 --- a/common/src/database-modules/database-server.ts +++ b/common/src/database-modules/database-server.ts @@ -50,7 +50,7 @@ import { TopicType, } from '@guardian/interfaces'; import { BaseEntity } from '../models/index.js'; -import { DataBaseHelper } from '../helpers/index.js'; +import { DataBaseHelper, IGetDocumentAggregationFilters, MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS } from '../helpers/index.js'; import { Theme } from '../entity/theme.js'; import { GetConditionsPoliciesByCategories } from '../helpers/policy-category.js'; import { PolicyTool } from '../entity/tool.js'; @@ -281,20 +281,9 @@ export class DatabaseServer { */ public async aggregate(entityClass: new () => T, aggregation: FilterObject[]): Promise { if (this.dryRun) { - if (Array.isArray(aggregation)) { - aggregation.unshift({ - $match: { - dryRunId: this.dryRun, - dryRunClass: this.classMap.get(entityClass), - }, - } as FilterObject & { - $match?: { - dryRunId?: string; - dryRunClass?: string; - } - }); - } - return await new DataBaseHelper(DryRun).aggregate(aggregation) as unknown as T[]; + const dryRunClass = this.classMap.get(entityClass) + + return await new DataBaseHelper(DryRun).aggregateDryRan(aggregation, this.dryRun, dryRunClass) as unknown as T[]; } else { return await new DataBaseHelper(entityClass).aggregate(aggregation); } @@ -1070,6 +1059,50 @@ export class DatabaseServer { return await this.aggregate(ApprovalDocumentCollection, aggregation) as ApprovalDocumentCollection[]; } + /** + * get document aggregation filters + * @param props + * + * @returns Result + */ + public getDocumentAggregationFilters(props: IGetDocumentAggregationFilters): void { + return DataBaseHelper.getDocumentAggregationFilters(props); + } + + /** + * get document aggregation filters for analytics + * @param nameFilter + * @param uuid + * + * @returns Result + */ + public getAnalyticsDocAggregationFilters(nameFilter: string, uuid: string): unknown[] { + return DataBaseHelper.getAnalyticsDocAggregationFilters(nameFilter, uuid); + } + + /** + * get document aggregation filters for analytics + * @param nameFilterMap + * @param nameFilterAttributes + * @param existingAttributes + * + * @returns Result + */ + public getAttributesAggregationFilters(nameFilterMap: string, nameFilterAttributes: string, existingAttributes: string[] | []): unknown[] { + return DataBaseHelper.getAttributesAggregationFilters(nameFilterMap, nameFilterAttributes, existingAttributes); + } + + /** + * get tasks aggregation filters + * @param nameFilter + * @param processTimeout + * + * @returns Result + */ + public getTasksAggregationFilters(nameFilter: string, processTimeout: number): unknown[] { + return DataBaseHelper.getTasksAggregationFilters(nameFilter, processTimeout); + } + /** * Get Vc Documents * @param filters @@ -1879,21 +1912,25 @@ export class DatabaseServer { /** * Get transactions serials count * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * * @returns Serials count */ public async getTransactionsSerialsCount( mintRequestId: string, transferStatus?: MintTransactionStatus | any ): Promise { - const aggregation = this._getTransactionsSerialsAggregation( + const aggregation = DataBaseHelper._getTransactionsSerialsAggregation( mintRequestId, transferStatus ); - aggregation.push({ - $project: { - serials: { $size: '$serials' }, - }, + + DataBaseHelper.getTransactionsSerialsAggregationFilters({ + aggregation, + aggregateMethod: 'push', + nameFilter: MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS.COUNT }); + const result: any = await this.aggregate(MintTransaction, aggregation); return result[0]?.serials || 0; } @@ -2050,61 +2087,18 @@ export class DatabaseServer { ]; } - /** - * Get aggregation filter for transactions serials - * @param mintRequestId Mint request identifier - * @returns Aggregation filter - */ - private _getTransactionsSerialsAggregation( - mintRequestId: string, - transferStatus?: MintTransactionStatus | any - ): any[] { - const match: any = { - mintRequestId, - }; - if (transferStatus) { - match.transferStatus = transferStatus; - } - const aggregation: any[] = [ - { - $match: match, - }, - { - $group: { - _id: 1, - serials: { - $push: '$serials', - }, - }, - }, - { - $project: { - serials: { - $reduce: { - input: '$serials', - initialValue: [], - in: { - $concatArrays: ['$$value', '$$this'], - }, - }, - }, - }, - }, - ]; - - return aggregation; - } - /** * Get transactions serials * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * * @returns Serials */ public async getTransactionsSerials( mintRequestId: string, transferStatus?: MintTransactionStatus | any ): Promise { - const aggregation = this._getTransactionsSerialsAggregation( + const aggregation = DataBaseHelper._getTransactionsSerialsAggregation( mintRequestId, transferStatus ); diff --git a/common/src/helpers/db-helper.ts b/common/src/helpers/db-helper.ts index 783163d3f8..7f9e04e844 100644 --- a/common/src/helpers/db-helper.ts +++ b/common/src/helpers/db-helper.ts @@ -5,6 +5,7 @@ import { DataBaseNamingStrategy } from './db-naming-strategy.js'; import { GridFSBucket } from 'mongodb'; import fixConnectionString from './fix-connection-string.js'; import type { FindOptions } from '@mikro-orm/core/drivers/IDatabaseDriver'; +import { MintTransactionStatus } from '@guardian/interfaces'; interface ICommonConnectionConfig { driver: typeof MongoDriver; @@ -13,6 +14,50 @@ interface ICommonConnectionConfig { clientUrl: string; entities: string[]; } +export interface IGetAggregationFilters { + aggregation: unknown[], + aggregateMethod: string, + nameFilter: string, +} + +export interface IGetDocumentAggregationFilters extends IGetAggregationFilters { + timelineLabelPath?: string, + timelineDescriptionPath?: string, + dryRun?: string, + sortObject?: Record, + itemsPerPage?: number, + page?: number, + policyId?: string, +} + +export const MAP_DOCUMENT_AGGREGATION_FILTERS = { + BASE: 'base', + HISTORY: 'history', + SORT: 'sort', + PAGINATION: 'pagination', + VC_DOCUMENTS: 'vc-documents', + VP_DOCUMENTS: 'vp-documents', + APPROVE: 'approve' +} + +export const MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS = { + DOC_BY_POLICY: 'doc_by_policy', + DOC_BY_INSTANCE: 'doc_by_instance', + DOCS_GROUPS: 'docs_groups', + SCHEMA_BY_NAME: 'schema_by_name', +} + +export const MAP_ATTRIBUTES_AGGREGATION_FILTERS = { + RESULT: 'result' +} + +export const MAP_TASKS_AGGREGATION_FILTERS = { + RESULT: 'result' +} + +export const MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS = { + COUNT: 'count' +} /** * Common connection config @@ -169,6 +214,326 @@ export class DataBaseHelper { return aggregateEntities; } + /** + * AggregateDryRun + * @param pipeline Pipeline + * @param dryRunId + * @param dryRunClass + * + * @returns Result + */ + @CreateRequestContext(() => DataBaseHelper.orm) + public async aggregateDryRan(pipeline: FilterObject[], dryRunId: string, dryRunClass: string): Promise { + if (Array.isArray(pipeline)) { + pipeline.unshift({ + $match: { + dryRunId, + dryRunClass, + }, + } as FilterObject & { + $match?: { + dryRunId?: string; + dryRunClass?: string; + } + }); + } + + return await this.aggregate(pipeline) + } + + /** + * get document aggregation filters + * @param props + * + * @returns Result + */ + public static getDocumentAggregationFilters(props: IGetDocumentAggregationFilters): void { + const { + aggregation, + aggregateMethod, + nameFilter, + timelineLabelPath, + timelineDescriptionPath, + dryRun, + sortObject, + itemsPerPage, + page, + policyId, + } = props; + + const filters = { + [MAP_DOCUMENT_AGGREGATION_FILTERS.BASE]: [ + { + $match: { + '__sourceTag__': { $ne: null }, + }, + }, { + $set: { + 'option': { + $cond: { + if: { + $or: [ + { $eq: [null, '$newOption'] }, + { $not: '$newOption' }, + ], + }, + then: '$option', + else: '$newOption', + }, + }, + }, + }, { + $unset: 'newOptions', + }, + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.HISTORY]: [ + { + $lookup: { + from: `${ + dryRun + ? 'dry_run' + : 'document_state' + }`, + localField: 'id', + foreignField: 'documentId', + pipeline: [ + { + $set: { + labelValue: timelineLabelPath + ? '$document.' + (timelineLabelPath || 'option.status') + : '$document.option.status', + comment: timelineDescriptionPath + ? '$document.' + (timelineDescriptionPath || 'option.comment') + : '$document.option.comment', + created: '$createDate', + }, + }, + ], + as: 'history', + }, + }, + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.SORT]: [ + { + $sort: sortObject + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.PAGINATION]: [ + { + $skip: itemsPerPage * page + }, + { + $limit: itemsPerPage + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.VC_DOCUMENTS]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.VP_DOCUMENTS]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + [MAP_DOCUMENT_AGGREGATION_FILTERS.APPROVE]: [ + { + $match: { + policyId: { $eq: policyId } + } + } + ], + }; + + aggregation[aggregateMethod](...filters[nameFilter]); + } + + /** + * get document aggregation filters for analytics + * @param nameFilter + * @param uuid + * + * @returns Result + */ + public static getAnalyticsDocAggregationFilters(nameFilter: string, uuid: string): unknown[] { + const filters = { + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_POLICY]: [ + { $match: { uuid, } }, + { + $group: { + _id: { + policyTopicId: '$policyTopicId', + type: '$type', + action: '$action' + }, count: { $sum: 1 } + } + } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_INSTANCE]: [ + { $match: { uuid } }, + { + $group: { + _id: { + instanceTopicId: '$instanceTopicId', + type: '$type', + action: '$action' + }, count: { $sum: 1 } + } + } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOCS_GROUPS]: [ + { $match: { uuid } }, + { $group: { _id: { type: '$type', action: '$action' }, count: { $sum: 1 } } } + ], + [MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.SCHEMA_BY_NAME]: [ + { $match: { uuid } }, + { + $group: { + _id: { + name: '$name', + action: '$action', + }, count: { $sum: 1 } + } + }, + { $sort: { count: -1 } } + ] + }; + + return filters[nameFilter]; + } + + /** + * get attributes aggregation filters + * @param nameFilterMap + * @param nameFilterAttributes + * @param existingAttributes + * + * @returns Result + */ + public static getAttributesAggregationFilters(nameFilterMap: string, nameFilterAttributes: string, existingAttributes: string[] | []): unknown[] { + const filters = { + [MAP_ATTRIBUTES_AGGREGATION_FILTERS.RESULT]: [ + { $project: { attributes: '$attributes' } }, + { $unwind: { path: '$attributes' } }, + { $match: { attributes: { $regex: nameFilterAttributes, $options: 'i' } } }, + { $match: { attributes: { $not: { $in: existingAttributes } } } }, + { $group: { _id: null, uniqueValues: { $addToSet: '$attributes' } } }, + { $unwind: { path: '$uniqueValues' } }, + { $limit: 20 }, + { $group: { _id: null, uniqueValues: { $addToSet: '$uniqueValues' } } }, + ], + }; + + return filters[nameFilterMap]; + } + + /** + * get tasks aggregation filters + * @param nameFilter + * @param processTimeout + * + * @returns Result + */ + public static getTasksAggregationFilters(nameFilter: string, processTimeout: number): unknown[] { + const filters = { + [MAP_TASKS_AGGREGATION_FILTERS.RESULT]: [ + { + $match: { + sent: true, + done: { $ne: true }, + }, + }, + { + $addFields: { + timeDifference: { + $subtract: ['$processedTime', '$createDate'], + }, + }, + }, + { + $match: { + timeDifference: { $gt: processTimeout }, + }, + }, + ], + }; + + return filters[nameFilter]; + } + + /** + * get transactions serials aggregation filters + * @param props + * + * @returns Result + */ + public static getTransactionsSerialsAggregationFilters(props: IGetAggregationFilters): void { + const { aggregation, aggregateMethod, nameFilter } = props; + + const filters = { + [MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS.COUNT]: [ + { + $project: { + serials: { $size: '$serials' }, + }, + } + ], + }; + + aggregation[aggregateMethod](...filters[nameFilter]); + } + + /** + * Get aggregation filter for transactions serials + * @param mintRequestId Mint request identifier + * @param transferStatus Transfer status + * + * @returns Aggregation filter + */ + public static _getTransactionsSerialsAggregation( + mintRequestId: string, + transferStatus?: MintTransactionStatus | unknown + ): unknown[] { + const match: any = { + mintRequestId, + }; + + if (transferStatus) { + match.transferStatus = transferStatus; + } + + return [ + { + $match: match, + }, + { + $group: { + _id: 1, + serials: { + $push: '$serials', + }, + }, + }, + { + $project: { + serials: { + $reduce: { + input: '$serials', + initialValue: [], + in: { + $concatArrays: ['$$value', '$$this'], + }, + }, + }, + }, + }, + ]; + } + /** * Find and count * @param filters Filters diff --git a/policy-service/src/policy-engine/blocks/documents-source.ts b/policy-service/src/policy-engine/blocks/documents-source.ts index 68fbf4ea8c..b6a02569bf 100644 --- a/policy-service/src/policy-engine/blocks/documents-source.ts +++ b/policy-service/src/policy-engine/blocks/documents-source.ts @@ -287,7 +287,9 @@ export class InterfaceDocumentsSource { */ private async getDataByAggregationFilters(ref: IPolicySourceBlock, user: PolicyUser, sortState: any, paginationData: any, history? : IPolicyAddonBlock) { const filtersAndDataType = await ref.getGlobalSourcesFilters(user); - let aggregation = [...filtersAndDataType.filters] as unknown[]; + + const aggregation = [...filtersAndDataType.filters] as unknown[]; + ref.databaseServer.getDocumentAggregationFilters({ aggregation, aggregateMethod: 'push', From 924763aaf8fc8470173a787b01a34a3ea7de5b71 Mon Sep 17 00:00:00 2001 From: Ihar Date: Mon, 19 Aug 2024 21:49:41 +0500 Subject: [PATCH 8/9] feat: move grid fs calls from db server to db helper[3966] --- .../src/database-modules/database-server.ts | 27 ++----------- common/src/helpers/db-helper.ts | 40 +++++++++++++++++++ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/common/src/database-modules/database-server.ts b/common/src/database-modules/database-server.ts index 1709227944..27fc289a59 100644 --- a/common/src/database-modules/database-server.ts +++ b/common/src/database-modules/database-server.ts @@ -3543,40 +3543,21 @@ export class DatabaseServer { * Save file * @param uuid * @param buffer + * * @returns file ID */ public static async saveFile(uuid: string, buffer: Buffer): Promise { - return new Promise((resolve, reject) => { - try { - const fileStream = DataBaseHelper.gridFS.openUploadStream(uuid); - fileStream.write(buffer); - fileStream.end(() => { - resolve(fileStream.id); - }); - } catch (error) { - reject(error); - } - }); + return DataBaseHelper.saveFile(uuid, buffer) } /** - * Save file + * Load file * @param id * * @returns file ID */ public static async loadFile(id: ObjectId): Promise { - const files = await DataBaseHelper.gridFS.find(id).toArray(); - if (files.length === 0) { - return null; - } - const file = files[0]; - const fileStream = DataBaseHelper.gridFS.openDownloadStream(file._id); - const bufferArray = []; - for await (const data of fileStream) { - bufferArray.push(data); - } - return Buffer.concat(bufferArray); + return DataBaseHelper.loadFile(id) } /** diff --git a/common/src/helpers/db-helper.ts b/common/src/helpers/db-helper.ts index 7f9e04e844..45e1ac156f 100644 --- a/common/src/helpers/db-helper.ts +++ b/common/src/helpers/db-helper.ts @@ -138,6 +138,46 @@ export class DataBaseHelper { return DataBaseHelper._gridFS; } + /** + * Save file + * @param uuid + * @param buffer + * @returns file ID + */ + public static async saveFile(uuid: string, buffer: Buffer): Promise { + return new Promise((resolve, reject) => { + try { + const fileStream = DataBaseHelper.gridFS.openUploadStream(uuid); + fileStream.write(buffer); + fileStream.end(() => { + resolve(fileStream.id); + }); + } catch (error) { + reject(error); + } + }); + } + + /** + * Load file + * @param id + * + * @returns file ID + */ + public static async loadFile(id: ObjectId): Promise { + const files = await DataBaseHelper.gridFS.find(id).toArray(); + if (files.length === 0) { + return null; + } + const file = files[0]; + const fileStream = DataBaseHelper.gridFS.openDownloadStream(file._id); + const bufferArray = []; + for await (const data of fileStream) { + bufferArray.push(data); + } + return Buffer.concat(bufferArray); + } + /** * Delete entities by filters * @param filters filters From 32d871ffb2b936fc1874e05573fcb98ef489e09f Mon Sep 17 00:00:00 2001 From: Ihar Date: Tue, 20 Aug 2024 12:18:15 +0500 Subject: [PATCH 9/9] feat: connect bd and grid fs in db helper[3966] --- analytics-service/src/app.ts | 5 +++-- auth-service/src/app.ts | 6 ++++-- .../src/database-modules/database-server.ts | 17 ++++++++++++++++- common/src/helpers/db-helper.ts | 19 ++++++++++++++++++- .../secret-manager/migrations/migrations.ts | 4 +++- guardian-service/src/app.ts | 8 ++++---- logger-service/src/app.ts | 5 +++-- notification-service/src/app.ts | 5 +++-- policy-service/src/api/policy-process.ts | 8 +++++--- queue-service/src/app.ts | 5 +++-- 10 files changed, 62 insertions(+), 20 deletions(-) diff --git a/analytics-service/src/app.ts b/analytics-service/src/app.ts index b53873f61d..b310eac82c 100644 --- a/analytics-service/src/app.ts +++ b/analytics-service/src/app.ts @@ -1,6 +1,6 @@ import { COMMON_CONNECTION_CONFIG, - DataBaseHelper, + DataBaseHelper, DatabaseServer, LargePayloadContainer, MessageBrokerChannel, Migration, @@ -44,7 +44,8 @@ Promise.all([ mongoForLoggingInitialization(), ]).then(async ([db, app, cn, loggerMongo]) => { try { - DataBaseHelper.orm = db; + DatabaseServer.connectBD(db); + app.connectMicroservice({ transport: Transport.NATS, options: { diff --git a/auth-service/src/app.ts b/auth-service/src/app.ts index 3749ffe27d..a3863a0c21 100644 --- a/auth-service/src/app.ts +++ b/auth-service/src/app.ts @@ -3,7 +3,7 @@ import { WalletService } from './api/wallet-service.js'; import { ApplicationState, COMMON_CONNECTION_CONFIG, - DataBaseHelper, + DatabaseServer, LargePayloadContainer, MessageBrokerChannel, Migration, @@ -60,7 +60,9 @@ Promise.all([ InitializeVault(process.env.VAULT_PROVIDER), mongoForLoggingInitialization(), ]).then(async ([_, db, cn, app, vault, loggerMongo]) => { - DataBaseHelper.orm = db; + + DatabaseServer.connectBD(db); + const state = new ApplicationState(); await state.setServiceName('AUTH_SERVICE').setConnection(cn).init(); diff --git a/common/src/database-modules/database-server.ts b/common/src/database-modules/database-server.ts index 27fc289a59..8fc6f30cb7 100644 --- a/common/src/database-modules/database-server.ts +++ b/common/src/database-modules/database-server.ts @@ -3547,7 +3547,22 @@ export class DatabaseServer { * @returns file ID */ public static async saveFile(uuid: string, buffer: Buffer): Promise { - return DataBaseHelper.saveFile(uuid, buffer) + return DataBaseHelper.saveFile(uuid, buffer); + } + + /** + * Set MongoDriver + * @param db + */ + public static connectBD(db: any): void { + DataBaseHelper.connectBD(db); + } + + /** + * Grid fs connect + */ + public static connectGridFS() { + DataBaseHelper.connectGridFS(); } /** diff --git a/common/src/helpers/db-helper.ts b/common/src/helpers/db-helper.ts index 45e1ac156f..144bfbb89b 100644 --- a/common/src/helpers/db-helper.ts +++ b/common/src/helpers/db-helper.ts @@ -2,7 +2,7 @@ import { MikroORM, CreateRequestContext, wrap, FilterObject, FilterQuery, FindAl import { MongoDriver, MongoEntityManager, MongoEntityRepository, ObjectId } from '@mikro-orm/mongodb'; import { BaseEntity } from '../models/index.js'; import { DataBaseNamingStrategy } from './db-naming-strategy.js'; -import { GridFSBucket } from 'mongodb'; +import { Db, GridFSBucket } from 'mongodb'; import fixConnectionString from './fix-connection-string.js'; import type { FindOptions } from '@mikro-orm/core/drivers/IDatabaseDriver'; import { MintTransactionStatus } from '@guardian/interfaces'; @@ -138,6 +138,23 @@ export class DataBaseHelper { return DataBaseHelper._gridFS; } + /** + * Set MongoDriver + * @param db + */ + public static connectBD(db: MikroORM) { + DataBaseHelper.orm = db; + } + + /** + * Grid fs connect + */ + public static connectGridFS() { + const connect: Db = DataBaseHelper.orm.em.getDriver().getConnection().getDb(); + + DataBaseHelper.gridFS = new GridFSBucket(connect); + } + /** * Save file * @param uuid diff --git a/common/src/secret-manager/migrations/migrations.ts b/common/src/secret-manager/migrations/migrations.ts index 6cef79b66d..d25d796e0a 100644 --- a/common/src/secret-manager/migrations/migrations.ts +++ b/common/src/secret-manager/migrations/migrations.ts @@ -8,6 +8,7 @@ import { SecretManager } from '../secret-manager.js'; import { Wallet } from '../../wallet/index.js'; import { SecretManagerType } from '../secret-manager-config.js'; import { exit } from 'process'; +import { DatabaseServer } from '../../database-modules'; const globalEnvPath = path.join(process.cwd(), '../.env') // const authEnvPath = path.join(process.cwd(), '../auth-service/.env') @@ -141,7 +142,8 @@ async function migrate() { ensureIndexes: true, }) - DataBaseHelper.orm = db; + DatabaseServer.connectBD(db); + const dbSecret = new DataBaseHelper(WalletAccount) // write IPFS API KEY to Vault diff --git a/guardian-service/src/app.ts b/guardian-service/src/app.ts index 3906f2cbfd..43da24b274 100644 --- a/guardian-service/src/app.ts +++ b/guardian-service/src/app.ts @@ -8,7 +8,7 @@ import { trustChainAPI } from './api/trust-chain.service.js'; import { PolicyEngineService } from './policy-engine/policy-engine.service.js'; import { AggregateVC, ApplicationState, ApprovalDocument, Artifact, ArtifactChunk, AssignEntity, BlockCache, BlockState, Branding, COMMON_CONNECTION_CONFIG, - Contract, DataBaseHelper, DatabaseServer, DidDocument, DocumentState, DryRun, DryRunFiles, Environment, ExternalDocument, ExternalEventChannel, IPFS, + Contract, DatabaseServer, DidDocument, DocumentState, DryRun, DryRunFiles, Environment, ExternalDocument, ExternalEventChannel, IPFS, LargePayloadContainer, MessageBrokerChannel, MessageServer, Migration, MintRequest, MintTransaction, mongoForLoggingInitialization, MultiDocuments, MultiPolicy, MultiPolicyTransaction, OldSecretManager, PinoLogger, pinoLoggerInitialization, Policy, PolicyCache, PolicyCacheData, PolicyCategory, PolicyInvitations, PolicyModule, PolicyProperty, PolicyRoles, PolicyTest, PolicyTool, Record, RetirePool, RetireRequest, Schema, SecretManager, @@ -38,7 +38,6 @@ import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import process from 'process'; import { AppModule } from './app.module.js'; import { analyticsAPI } from './api/analytics.service.js'; -import { GridFSBucket } from 'mongodb'; import { suggestionsAPI } from './api/suggestions.service.js'; import { SynchronizationTask } from './helpers/synchronization-task.js'; import { recordAPI } from './api/record.service.js'; @@ -139,9 +138,10 @@ Promise.all([ app.listen(); - DataBaseHelper.orm = db; + DatabaseServer.connectBD(db); + + DatabaseServer.connectGridFS(); - DataBaseHelper.gridFS = new GridFSBucket(db.em.getDriver().getConnection().getDb()); new PolicyServiceChannelsContainer().setConnection(cn); new TransactionLogger().initialization( cn, diff --git a/logger-service/src/app.ts b/logger-service/src/app.ts index df8278344f..1250e0b07d 100644 --- a/logger-service/src/app.ts +++ b/logger-service/src/app.ts @@ -1,4 +1,4 @@ -import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, LargePayloadContainer, MessageBrokerChannel, Migration, Log, mongoForLoggingInitialization } from '@guardian/common'; +import { ApplicationState, COMMON_CONNECTION_CONFIG, LargePayloadContainer, MessageBrokerChannel, Migration, Log, mongoForLoggingInitialization, DatabaseServer } from '@guardian/common'; import { ApplicationStates } from '@guardian/interfaces'; import { NestFactory } from '@nestjs/core'; import { Deserializer, IncomingRequest, MicroserviceOptions, Serializer, Transport } from '@nestjs/microservices'; @@ -43,7 +43,8 @@ Promise.all([ }), ]).then(async values => { const [_, db, mqConnection, app] = values; - DataBaseHelper.orm = db; + + DatabaseServer.connectBD(db); app.listen(); diff --git a/notification-service/src/app.ts b/notification-service/src/app.ts index e74bb99ac5..90fc3c20da 100644 --- a/notification-service/src/app.ts +++ b/notification-service/src/app.ts @@ -1,4 +1,4 @@ -import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, MessageBrokerChannel, Migration, mongoForLoggingInitialization, PinoLogger, pinoLoggerInitialization } from '@guardian/common'; +import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, DatabaseServer, MessageBrokerChannel, Migration, mongoForLoggingInitialization, PinoLogger, pinoLoggerInitialization } from '@guardian/common'; import { ApplicationStates } from '@guardian/interfaces'; import { MikroORM } from '@mikro-orm/core'; import { MongoDriver } from '@mikro-orm/mongodb'; @@ -39,7 +39,8 @@ Promise.all([ ]).then( async (values) => { const [_, db, mqConnection, app, loggerMongo] = values; - DataBaseHelper.orm = db; + + DatabaseServer.connectBD(db); app.listen(); diff --git a/policy-service/src/api/policy-process.ts b/policy-service/src/api/policy-process.ts index 7845f09536..f28a240a68 100644 --- a/policy-service/src/api/policy-process.ts +++ b/policy-service/src/api/policy-process.ts @@ -74,9 +74,11 @@ Promise.all([ ]).then(async values => { const [db, cn, app, loggerMongo] = values; app.listen(); - DataBaseHelper.orm = db; - // @ts-ignore - DataBaseHelper.gridFS = new GridFSBucket(db.em.getDriver().getConnection().getDb()); + + DatabaseServer.connectBD(db); + + DatabaseServer.connectGridFS(); + Environment.setLocalNodeProtocol(process.env.LOCALNODE_PROTOCOL); Environment.setLocalNodeAddress(process.env.LOCALNODE_ADDRESS); Environment.setNetwork(process.env.HEDERA_NET); diff --git a/queue-service/src/app.ts b/queue-service/src/app.ts index 546b9bfa15..5bc8be1934 100644 --- a/queue-service/src/app.ts +++ b/queue-service/src/app.ts @@ -1,4 +1,4 @@ -import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, MessageBrokerChannel, mongoForLoggingInitialization, NotificationService, PinoLogger, pinoLoggerInitialization } from '@guardian/common'; +import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, DatabaseServer, MessageBrokerChannel, mongoForLoggingInitialization, NotificationService, PinoLogger, pinoLoggerInitialization } from '@guardian/common'; import { ApplicationStates, GenerateUUIDv4 } from '@guardian/interfaces'; import { MikroORM } from '@mikro-orm/core'; import { MongoDriver } from '@mikro-orm/mongodb'; @@ -39,7 +39,8 @@ Promise.all([ mongoForLoggingInitialization() ]).then(async values => { const [db, cn, app, loggerMongo] = values; - DataBaseHelper.orm = db; + + DatabaseServer.connectBD(db); app.listen(); // new MessageBrokerChannel(cn, 'worker');