Skip to content

Commit

Permalink
Feature/3966 move grid fs from services and db server (#4051)
Browse files Browse the repository at this point in the history
* fix: get cache from base64

* fix: devide cases responce bufer and base64 in cache

* feat: move aggregation filters to bd helper[3966]

* feat: move aggregation filters to bd helper for logger service[3966]

* feat: move aggregation filters to bd helper for policy service[3966]

* feat: move aggregation filters to bd helper for queue service[3966]

* feat: move aggregation filters to bd helper and server[3966]

* feat: move grid fs calls from db server to db helper[3966]

* feat: connect bd and grid fs in db helper[3966]

---------

Co-authored-by: Ihar <[email protected]>
  • Loading branch information
ihar-tsykala and Ihar authored Aug 20, 2024
1 parent 9543b3c commit 73bc559
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 42 deletions.
5 changes: 3 additions & 2 deletions analytics-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
COMMON_CONNECTION_CONFIG,
DataBaseHelper,
DataBaseHelper, DatabaseServer,
LargePayloadContainer,
MessageBrokerChannel,
Migration,
Expand Down Expand Up @@ -44,7 +44,8 @@ Promise.all([
mongoForLoggingInitialization(),
]).then(async ([db, app, cn, loggerMongo]) => {
try {
DataBaseHelper.orm = db;
DatabaseServer.connectBD(db);

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.NATS,
options: {
Expand Down
6 changes: 4 additions & 2 deletions auth-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { WalletService } from './api/wallet-service.js';
import {
ApplicationState,
COMMON_CONNECTION_CONFIG,
DataBaseHelper,
DatabaseServer,
LargePayloadContainer,
MessageBrokerChannel,
Migration,
Expand Down Expand Up @@ -60,7 +60,9 @@ Promise.all([
InitializeVault(process.env.VAULT_PROVIDER),
mongoForLoggingInitialization(),
]).then(async ([_, db, cn, app, vault, loggerMongo]) => {
DataBaseHelper.orm = db;

DatabaseServer.connectBD(db);

const state = new ApplicationState();
await state.setServiceName('AUTH_SERVICE').setConnection(cn).init();

Expand Down
42 changes: 19 additions & 23 deletions common/src/database-modules/database-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3543,40 +3543,36 @@ export class DatabaseServer {
* Save file
* @param uuid
* @param buffer
*
* @returns file ID
*/
public static async saveFile(uuid: string, buffer: Buffer): Promise<ObjectId> {
return new Promise<ObjectId>((resolve, reject) => {
try {
const fileStream = DataBaseHelper.gridFS.openUploadStream(uuid);
fileStream.write(buffer);
fileStream.end(() => {
resolve(fileStream.id);
});
} catch (error) {
reject(error);
}
});
return DataBaseHelper.saveFile(uuid, buffer);
}

/**
* Save file
* Set MongoDriver
* @param db
*/
public static connectBD(db: any): void {
DataBaseHelper.connectBD(db);
}

/**
* Grid fs connect
*/
public static connectGridFS() {
DataBaseHelper.connectGridFS();
}

/**
* Load file
* @param id
*
* @returns file ID
*/
public static async loadFile(id: ObjectId): Promise<Buffer> {
const files = await DataBaseHelper.gridFS.find(id).toArray();
if (files.length === 0) {
return null;
}
const file = files[0];
const fileStream = DataBaseHelper.gridFS.openDownloadStream(file._id);
const bufferArray = [];
for await (const data of fileStream) {
bufferArray.push(data);
}
return Buffer.concat(bufferArray);
return DataBaseHelper.loadFile(id)
}

/**
Expand Down
59 changes: 58 additions & 1 deletion common/src/helpers/db-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { MikroORM, CreateRequestContext, wrap, FilterObject, FilterQuery, FindAl
import { MongoDriver, MongoEntityManager, MongoEntityRepository, ObjectId } from '@mikro-orm/mongodb';
import { BaseEntity } from '../models/index.js';
import { DataBaseNamingStrategy } from './db-naming-strategy.js';
import { GridFSBucket } from 'mongodb';
import { Db, GridFSBucket } from 'mongodb';
import fixConnectionString from './fix-connection-string.js';
import type { FindOptions } from '@mikro-orm/core/drivers/IDatabaseDriver';
import { MintTransactionStatus } from '@guardian/interfaces';
Expand Down Expand Up @@ -138,6 +138,63 @@ export class DataBaseHelper<T extends BaseEntity> {
return DataBaseHelper._gridFS;
}

/**
* Set MongoDriver
* @param db
*/
public static connectBD(db: MikroORM<MongoDriver>) {
DataBaseHelper.orm = db;
}

/**
* Grid fs connect
*/
public static connectGridFS() {
const connect: Db = DataBaseHelper.orm.em.getDriver().getConnection().getDb();

DataBaseHelper.gridFS = new GridFSBucket(connect);
}

/**
* Save file
* @param uuid
* @param buffer
* @returns file ID
*/
public static async saveFile(uuid: string, buffer: Buffer): Promise<ObjectId> {
return new Promise<ObjectId>((resolve, reject) => {
try {
const fileStream = DataBaseHelper.gridFS.openUploadStream(uuid);
fileStream.write(buffer);
fileStream.end(() => {
resolve(fileStream.id);
});
} catch (error) {
reject(error);
}
});
}

/**
* Load file
* @param id
*
* @returns file ID
*/
public static async loadFile(id: ObjectId): Promise<Buffer> {
const files = await DataBaseHelper.gridFS.find(id).toArray();
if (files.length === 0) {
return null;
}
const file = files[0];
const fileStream = DataBaseHelper.gridFS.openDownloadStream(file._id);
const bufferArray = [];
for await (const data of fileStream) {
bufferArray.push(data);
}
return Buffer.concat(bufferArray);
}

/**
* Delete entities by filters
* @param filters filters
Expand Down
4 changes: 3 additions & 1 deletion common/src/secret-manager/migrations/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { SecretManager } from '../secret-manager.js';
import { Wallet } from '../../wallet/index.js';
import { SecretManagerType } from '../secret-manager-config.js';
import { exit } from 'process';
import { DatabaseServer } from '../../database-modules';

const globalEnvPath = path.join(process.cwd(), '../.env')
// const authEnvPath = path.join(process.cwd(), '../auth-service/.env')
Expand Down Expand Up @@ -141,7 +142,8 @@ async function migrate() {
ensureIndexes: true,
})

DataBaseHelper.orm = db;
DatabaseServer.connectBD(db);

const dbSecret = new DataBaseHelper(WalletAccount)

// write IPFS API KEY to Vault
Expand Down
8 changes: 4 additions & 4 deletions guardian-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { trustChainAPI } from './api/trust-chain.service.js';
import { PolicyEngineService } from './policy-engine/policy-engine.service.js';
import {
AggregateVC, ApplicationState, ApprovalDocument, Artifact, ArtifactChunk, AssignEntity, BlockCache, BlockState, Branding, COMMON_CONNECTION_CONFIG,
Contract, DataBaseHelper, DatabaseServer, DidDocument, DocumentState, DryRun, DryRunFiles, Environment, ExternalDocument, ExternalEventChannel, IPFS,
Contract, DatabaseServer, DidDocument, DocumentState, DryRun, DryRunFiles, Environment, ExternalDocument, ExternalEventChannel, IPFS,
LargePayloadContainer, MessageBrokerChannel, MessageServer, Migration, MintRequest, MintTransaction, mongoForLoggingInitialization, MultiDocuments,
MultiPolicy, MultiPolicyTransaction, OldSecretManager, PinoLogger, pinoLoggerInitialization, Policy, PolicyCache, PolicyCacheData, PolicyCategory,
PolicyInvitations, PolicyModule, PolicyProperty, PolicyRoles, PolicyTest, PolicyTool, Record, RetirePool, RetireRequest, Schema, SecretManager,
Expand Down Expand Up @@ -38,7 +38,6 @@ import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import process from 'process';
import { AppModule } from './app.module.js';
import { analyticsAPI } from './api/analytics.service.js';
import { GridFSBucket } from 'mongodb';
import { suggestionsAPI } from './api/suggestions.service.js';
import { SynchronizationTask } from './helpers/synchronization-task.js';
import { recordAPI } from './api/record.service.js';
Expand Down Expand Up @@ -139,9 +138,10 @@ Promise.all([

app.listen();

DataBaseHelper.orm = db;
DatabaseServer.connectBD(db);

DatabaseServer.connectGridFS();

DataBaseHelper.gridFS = new GridFSBucket(db.em.getDriver().getConnection().getDb());
new PolicyServiceChannelsContainer().setConnection(cn);
new TransactionLogger().initialization(
cn,
Expand Down
5 changes: 3 additions & 2 deletions logger-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, LargePayloadContainer, MessageBrokerChannel, Migration, Log, mongoForLoggingInitialization } from '@guardian/common';
import { ApplicationState, COMMON_CONNECTION_CONFIG, LargePayloadContainer, MessageBrokerChannel, Migration, Log, mongoForLoggingInitialization, DatabaseServer } from '@guardian/common';
import { ApplicationStates } from '@guardian/interfaces';
import { NestFactory } from '@nestjs/core';
import { Deserializer, IncomingRequest, MicroserviceOptions, Serializer, Transport } from '@nestjs/microservices';
Expand Down Expand Up @@ -43,7 +43,8 @@ Promise.all([
}),
]).then(async values => {
const [_, db, mqConnection, app] = values;
DataBaseHelper.orm = db;

DatabaseServer.connectBD(db);

app.listen();

Expand Down
5 changes: 3 additions & 2 deletions notification-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, MessageBrokerChannel, Migration, mongoForLoggingInitialization, PinoLogger, pinoLoggerInitialization } from '@guardian/common';
import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, DatabaseServer, MessageBrokerChannel, Migration, mongoForLoggingInitialization, PinoLogger, pinoLoggerInitialization } from '@guardian/common';
import { ApplicationStates } from '@guardian/interfaces';
import { MikroORM } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
Expand Down Expand Up @@ -39,7 +39,8 @@ Promise.all([
]).then(
async (values) => {
const [_, db, mqConnection, app, loggerMongo] = values;
DataBaseHelper.orm = db;

DatabaseServer.connectBD(db);

app.listen();

Expand Down
8 changes: 5 additions & 3 deletions policy-service/src/api/policy-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ Promise.all([
]).then(async values => {
const [db, cn, app, loggerMongo] = values;
app.listen();
DataBaseHelper.orm = db;
// @ts-ignore
DataBaseHelper.gridFS = new GridFSBucket(db.em.getDriver().getConnection().getDb());

DatabaseServer.connectBD(db);

DatabaseServer.connectGridFS();

Environment.setLocalNodeProtocol(process.env.LOCALNODE_PROTOCOL);
Environment.setLocalNodeAddress(process.env.LOCALNODE_ADDRESS);
Environment.setNetwork(process.env.HEDERA_NET);
Expand Down
5 changes: 3 additions & 2 deletions queue-service/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, MessageBrokerChannel, mongoForLoggingInitialization, NotificationService, PinoLogger, pinoLoggerInitialization } from '@guardian/common';
import { ApplicationState, COMMON_CONNECTION_CONFIG, DataBaseHelper, DatabaseServer, MessageBrokerChannel, mongoForLoggingInitialization, NotificationService, PinoLogger, pinoLoggerInitialization } from '@guardian/common';
import { ApplicationStates, GenerateUUIDv4 } from '@guardian/interfaces';
import { MikroORM } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
Expand Down Expand Up @@ -39,7 +39,8 @@ Promise.all([
mongoForLoggingInitialization()
]).then(async values => {
const [db, cn, app, loggerMongo] = values;
DataBaseHelper.orm = db;

DatabaseServer.connectBD(db);

app.listen();
// new MessageBrokerChannel(cn, 'worker');
Expand Down

0 comments on commit 73bc559

Please sign in to comment.