Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/3966 move grid fs from services and db server #4051

53 changes: 11 additions & 42 deletions analytics-service/src/analytics/report.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DatabaseServer, MessageAction, PinoLogger } from '@guardian/common';
import { DatabaseServer, MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS, MessageAction, PinoLogger } from '@guardian/common';
import { GenerateUUIDv4 } from '@guardian/interfaces';
import JSZip from 'jszip';
import xl from 'excel4node';
Expand Down Expand Up @@ -218,36 +218,14 @@ export class ReportService {
const schemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSchema });
const systemSchemaCount = await databaseServer.find(Schema, { uuid, action: MessageAction.PublishSystemSchema });

const docByPolicy = await databaseServer.aggregate(Document, [
{ $match: { uuid, } },
{
$group: {
_id: {
policyTopicId: '$policyTopicId',
type: '$type',
action: '$action'
}, count: { $sum: 1 }
}
}
] as FilterObject<any>[]);

const docByInstance = await databaseServer.aggregate(Document, [
{ $match: { uuid } },
{
$group: {
_id: {
instanceTopicId: '$instanceTopicId',
type: '$type',
action: '$action'
}, count: { $sum: 1 }
}
}
] as FilterObject<any>[]);
const docByPolicy =
await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_POLICY, uuid) as FilterObject<any>[])

const docByInstance =
await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOC_BY_INSTANCE, uuid) as FilterObject<any>[])

const docsGroups = await databaseServer.aggregate(Document, [
{ $match: { uuid } },
{ $group: { _id: { type: '$type', action: '$action' }, count: { $sum: 1 } } }
] as FilterObject<any>[]);
const docsGroups =
await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.DOCS_GROUPS, uuid) as FilterObject<any>[])

const didCount = docsGroups
.filter(g => g._id.type === DocumentType.DID && g._id.action !== MessageAction.RevokeDocument)
Expand Down Expand Up @@ -482,18 +460,9 @@ export class ReportService {
}
}), size);

const schemasByName = await new DatabaseServer().aggregate(Document, [
{ $match: { uuid: report.uuid } },
{
$group: {
_id: {
name: '$name',
action: '$action',
}, count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
] as FilterObject<any>[]);
const databaseServer = new DatabaseServer();

const schemasByName = await databaseServer.aggregate(Document, databaseServer.getAnalyticsDocAggregationFilters(MAP_REPORT_ANALYTICS_AGGREGATION_FILTERS.SCHEMA_BY_NAME, report.uuid) as FilterObject<any>[])

const topAllSchemasByName = [];
const topSystemSchemasByName = [];
Expand Down
5 changes: 3 additions & 2 deletions analytics-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
COMMON_CONNECTION_CONFIG,
DataBaseHelper,
DataBaseHelper, DatabaseServer,
LargePayloadContainer,
MessageBrokerChannel,
Migration,
Expand Down Expand Up @@ -44,7 +44,8 @@ Promise.all([
mongoForLoggingInitialization(),
]).then(async ([db, app, cn, loggerMongo]) => {
try {
DataBaseHelper.orm = db;
DatabaseServer.connectBD(db);

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.NATS,
options: {
Expand Down
12 changes: 9 additions & 3 deletions api-gateway/src/helpers/interceptors/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ export class CacheInterceptor implements NestInterceptor {
if (cachedResponse) {
let result = JSON.parse(cachedResponse);

if (typeof result === 'string') {
result = Buffer.from(result, 'base64');
if (result.type === 'buffer') {
result = Buffer.from(result.data, 'base64');
} else {
result = result.data;
}

return result;
Expand All @@ -73,7 +75,11 @@ export class CacheInterceptor implements NestInterceptor {
}

if (Buffer.isBuffer(result)) {
result = result.toString('base64');
result = { type: 'buffer', data: result.toString('base64') };
} else if (typeof response === 'object') {
result = { type: 'json', data: result };
} else {
result = { type: 'string', data: result };
}

await this.cacheService.set(cacheKey, JSON.stringify(result), ttl, cacheTag);
Expand Down
6 changes: 4 additions & 2 deletions auth-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { WalletService } from './api/wallet-service.js';
import {
ApplicationState,
COMMON_CONNECTION_CONFIG,
DataBaseHelper,
DatabaseServer,
LargePayloadContainer,
MessageBrokerChannel,
Migration,
Expand Down Expand Up @@ -60,7 +60,9 @@ Promise.all([
InitializeVault(process.env.VAULT_PROVIDER),
mongoForLoggingInitialization(),
]).then(async ([_, db, cn, app, vault, loggerMongo]) => {
DataBaseHelper.orm = db;

DatabaseServer.connectBD(db);

const state = new ApplicationState();
await state.setServiceName('AUTH_SERVICE').setConnection(cn).init();

Expand Down
168 changes: 79 additions & 89 deletions common/src/database-modules/database-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
TopicType,
} from '@guardian/interfaces';
import { BaseEntity } from '../models/index.js';
import { DataBaseHelper } from '../helpers/index.js';
import { DataBaseHelper, IGetDocumentAggregationFilters, MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS } from '../helpers/index.js';
import { Theme } from '../entity/theme.js';
import { GetConditionsPoliciesByCategories } from '../helpers/policy-category.js';
import { PolicyTool } from '../entity/tool.js';
Expand Down Expand Up @@ -281,20 +281,9 @@ export class DatabaseServer {
*/
public async aggregate<T extends BaseEntity>(entityClass: new () => T, aggregation: FilterObject<T>[]): Promise<T[]> {
if (this.dryRun) {
if (Array.isArray(aggregation)) {
aggregation.unshift({
$match: {
dryRunId: this.dryRun,
dryRunClass: this.classMap.get(entityClass),
},
} as FilterObject<unknown> & {
$match?: {
dryRunId?: string;
dryRunClass?: string;
}
});
}
return await new DataBaseHelper(DryRun).aggregate(aggregation) as unknown as T[];
const dryRunClass = this.classMap.get(entityClass)

return await new DataBaseHelper(DryRun).aggregateDryRan(aggregation, this.dryRun, dryRunClass) as unknown as T[];
} else {
return await new DataBaseHelper(entityClass).aggregate(aggregation);
}
Expand Down Expand Up @@ -1070,6 +1059,50 @@ export class DatabaseServer {
return await this.aggregate(ApprovalDocumentCollection, aggregation) as ApprovalDocumentCollection[];
}

/**
* get document aggregation filters
* @param props
*
* @returns Result
*/
public getDocumentAggregationFilters(props: IGetDocumentAggregationFilters): void {
return DataBaseHelper.getDocumentAggregationFilters(props);
}

/**
* get document aggregation filters for analytics
* @param nameFilter
* @param uuid
*
* @returns Result
*/
public getAnalyticsDocAggregationFilters(nameFilter: string, uuid: string): unknown[] {
return DataBaseHelper.getAnalyticsDocAggregationFilters(nameFilter, uuid);
}

/**
* get document aggregation filters for analytics
* @param nameFilterMap
* @param nameFilterAttributes
* @param existingAttributes
*
* @returns Result
*/
public getAttributesAggregationFilters(nameFilterMap: string, nameFilterAttributes: string, existingAttributes: string[] | []): unknown[] {
return DataBaseHelper.getAttributesAggregationFilters(nameFilterMap, nameFilterAttributes, existingAttributes);
}

/**
* get tasks aggregation filters
* @param nameFilter
* @param processTimeout
*
* @returns Result
*/
public getTasksAggregationFilters(nameFilter: string, processTimeout: number): unknown[] {
return DataBaseHelper.getTasksAggregationFilters(nameFilter, processTimeout);
}

/**
* Get Vc Documents
* @param filters
Expand Down Expand Up @@ -1879,21 +1912,25 @@ export class DatabaseServer {
/**
* Get transactions serials count
* @param mintRequestId Mint request identifier
* @param transferStatus Transfer status
*
* @returns Serials count
*/
public async getTransactionsSerialsCount(
mintRequestId: string,
transferStatus?: MintTransactionStatus | any
): Promise<number> {
const aggregation = this._getTransactionsSerialsAggregation(
const aggregation = DataBaseHelper._getTransactionsSerialsAggregation(
mintRequestId,
transferStatus
);
aggregation.push({
$project: {
serials: { $size: '$serials' },
},

DataBaseHelper.getTransactionsSerialsAggregationFilters({
aggregation,
aggregateMethod: 'push',
nameFilter: MAP_TRANSACTION_SERIALS_AGGREGATION_FILTERS.COUNT
});

const result: any = await this.aggregate(MintTransaction, aggregation);
return result[0]?.serials || 0;
}
Expand Down Expand Up @@ -2050,61 +2087,18 @@ export class DatabaseServer {
];
}

/**
* Get aggregation filter for transactions serials
* @param mintRequestId Mint request identifier
* @returns Aggregation filter
*/
private _getTransactionsSerialsAggregation(
mintRequestId: string,
transferStatus?: MintTransactionStatus | any
): any[] {
const match: any = {
mintRequestId,
};
if (transferStatus) {
match.transferStatus = transferStatus;
}
const aggregation: any[] = [
{
$match: match,
},
{
$group: {
_id: 1,
serials: {
$push: '$serials',
},
},
},
{
$project: {
serials: {
$reduce: {
input: '$serials',
initialValue: [],
in: {
$concatArrays: ['$$value', '$$this'],
},
},
},
},
},
];

return aggregation;
}

/**
* Get transactions serials
* @param mintRequestId Mint request identifier
* @param transferStatus Transfer status
*
* @returns Serials
*/
public async getTransactionsSerials(
mintRequestId: string,
transferStatus?: MintTransactionStatus | any
): Promise<number[]> {
const aggregation = this._getTransactionsSerialsAggregation(
const aggregation = DataBaseHelper._getTransactionsSerialsAggregation(
mintRequestId,
transferStatus
);
Expand Down Expand Up @@ -3549,40 +3543,36 @@ export class DatabaseServer {
* Save file
* @param uuid
* @param buffer
*
* @returns file ID
*/
public static async saveFile(uuid: string, buffer: Buffer): Promise<ObjectId> {
return new Promise<ObjectId>((resolve, reject) => {
try {
const fileStream = DataBaseHelper.gridFS.openUploadStream(uuid);
fileStream.write(buffer);
fileStream.end(() => {
resolve(fileStream.id);
});
} catch (error) {
reject(error);
}
});
return DataBaseHelper.saveFile(uuid, buffer);
}

/**
* Save file
* Set MongoDriver
* @param db
*/
public static connectBD(db: any): void {
DataBaseHelper.connectBD(db);
}

/**
* Grid fs connect
*/
public static connectGridFS() {
DataBaseHelper.connectGridFS();
}

/**
* Load file
* @param id
*
* @returns file ID
*/
public static async loadFile(id: ObjectId): Promise<Buffer> {
const files = await DataBaseHelper.gridFS.find(id).toArray();
if (files.length === 0) {
return null;
}
const file = files[0];
const fileStream = DataBaseHelper.gridFS.openDownloadStream(file._id);
const bufferArray = [];
for await (const data of fileStream) {
bufferArray.push(data);
}
return Buffer.concat(bufferArray);
return DataBaseHelper.loadFile(id)
}

/**
Expand Down
Loading
Loading