From c7f85e1b587511ace4f4eff6418cc91b2290ba32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20L=C3=B3pez?= Date: Mon, 30 Oct 2023 18:18:06 +0100 Subject: [PATCH] Add storage module on Reputation Oracle to remove dependency on deprecated SDK storage module --- .../server/src/common/config/index.ts | 1 + .../server/src/common/config/s3.ts | 13 + .../server/src/common/interfaces/s3.ts | 4 + .../server/src/common/utils/index.ts | 65 +---- .../src/modules/storage/storage.module.ts | 11 + .../modules/storage/storage.service.spec.ts | 229 ++++++++++++++++++ .../src/modules/storage/storage.service.ts | 100 ++++++++ .../src/modules/webhook/webhook.module.ts | 2 + .../modules/webhook/webhook.service.spec.ts | 94 ++++--- .../src/modules/webhook/webhook.service.ts | 103 +++----- .../server/test/constants.ts | 8 +- 11 files changed, 466 insertions(+), 164 deletions(-) create mode 100644 packages/apps/reputation-oracle/server/src/common/config/s3.ts create mode 100644 packages/apps/reputation-oracle/server/src/common/interfaces/s3.ts create mode 100644 packages/apps/reputation-oracle/server/src/modules/storage/storage.module.ts create mode 100644 packages/apps/reputation-oracle/server/src/modules/storage/storage.service.spec.ts create mode 100644 packages/apps/reputation-oracle/server/src/modules/storage/storage.service.ts diff --git a/packages/apps/reputation-oracle/server/src/common/config/index.ts b/packages/apps/reputation-oracle/server/src/common/config/index.ts index b257a909e5..20c54299ce 100644 --- a/packages/apps/reputation-oracle/server/src/common/config/index.ts +++ b/packages/apps/reputation-oracle/server/src/common/config/index.ts @@ -1,2 +1,3 @@ export * from './env'; export * from './networks'; +export * from './s3'; diff --git a/packages/apps/reputation-oracle/server/src/common/config/s3.ts b/packages/apps/reputation-oracle/server/src/common/config/s3.ts new file mode 100644 index 0000000000..b2d4e3731b --- /dev/null +++ b/packages/apps/reputation-oracle/server/src/common/config/s3.ts @@ -0,0 +1,13 @@ +import { ConfigType, registerAs } from '@nestjs/config'; + +export const s3Config = registerAs('s3', () => ({ + endPoint: process.env.S3_ENDPOINT!, + port: +process.env.S3_PORT!, + accessKey: process.env.S3_ACCESS_KEY!, + secretKey: process.env.S3_SECRET_KEY!, + bucket: process.env.S3_BUCKET!, + useSSL: process.env.S3_USE_SSL === 'true', +})); + +export const s3ConfigKey = s3Config.KEY; +export type S3ConfigType = ConfigType; diff --git a/packages/apps/reputation-oracle/server/src/common/interfaces/s3.ts b/packages/apps/reputation-oracle/server/src/common/interfaces/s3.ts new file mode 100644 index 0000000000..81e92ae447 --- /dev/null +++ b/packages/apps/reputation-oracle/server/src/common/interfaces/s3.ts @@ -0,0 +1,4 @@ +export class UploadedFile { + public url: string; + public hash: string; +} diff --git a/packages/apps/reputation-oracle/server/src/common/utils/index.ts b/packages/apps/reputation-oracle/server/src/common/utils/index.ts index 5303ccd060..5dfad7f3a1 100644 --- a/packages/apps/reputation-oracle/server/src/common/utils/index.ts +++ b/packages/apps/reputation-oracle/server/src/common/utils/index.ts @@ -1,68 +1,7 @@ -import { - StorageCredentials, - StorageParams, - UploadFile, -} from '@human-protocol/sdk'; -import * as Minio from 'minio'; import * as crypto from 'crypto'; -import { PassThrough, Readable } from 'stream'; -import axios from 'axios'; -import { Logger } from '@nestjs/common'; +import { Readable } from 'stream'; -/** - * **Copy file from a URL to cloud storage** - * - * @param {string} url - URL of the source file - * @param {string} destBucket - Destination bucket name - * @param {StorageParams} clientParams - Configuration parameters for cloud storage - * @param {StorageCredentials} [credentials] - Optional. Cloud storage access data. If credentials are not provided, use anonymous access to the bucket - * @returns {Promise} - Uploaded file with key/hash - */ -export async function copyFileFromURLToBucket( - url: string, - destBucket: string, - clientParams: StorageParams, - credentials?: StorageCredentials, -): Promise { - try { - const client: Minio.Client = new Minio.Client({ - ...clientParams, - accessKey: credentials?.accessKey ?? '', - secretKey: credentials?.secretKey ?? '', - }); - - const { data } = await axios.get(url, { responseType: 'stream' }); - const stream = new PassThrough(); - data.pipe(stream); - - const hash = await hashStream(data); - const key = `s3${hash}.zip`; - - await new Promise((resolve, reject) => { - client.putObject(destBucket, key, stream, (err, etag) => { - if (err) reject(err); - else resolve(etag); - }); - }); - - Logger.log(`File from ${url} copied to ${destBucket}/${key}`); - - return { - key, - url: `${clientParams.useSSL ? 'https' : 'http'}://${ - clientParams.endPoint - }${ - clientParams.port ? `:${clientParams.port}` : '' - }/${destBucket}/${key}`, - hash, - }; - } catch (error) { - Logger.error('Error copying file:', error); - throw new Error('File not uploaded'); - } -} - -function hashStream(stream: Readable): Promise { +export function hashStream(stream: Readable): Promise { return new Promise((resolve, reject) => { const hash = crypto.createHash('sha1'); diff --git a/packages/apps/reputation-oracle/server/src/modules/storage/storage.module.ts b/packages/apps/reputation-oracle/server/src/modules/storage/storage.module.ts new file mode 100644 index 0000000000..f9cf1659df --- /dev/null +++ b/packages/apps/reputation-oracle/server/src/modules/storage/storage.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { StorageService } from './storage.service'; +import { ConfigModule } from '@nestjs/config'; +import { s3Config } from '../../common/config'; + +@Module({ + imports: [ConfigModule.forFeature(s3Config)], + providers: [StorageService], + exports: [StorageService], +}) +export class StorageModule {} diff --git a/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.spec.ts b/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.spec.ts new file mode 100644 index 0000000000..cb1c3a59f9 --- /dev/null +++ b/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.spec.ts @@ -0,0 +1,229 @@ +import { ChainId, StorageClient } from '@human-protocol/sdk'; +import { ConfigModule, registerAs } from '@nestjs/config'; +import { Test } from '@nestjs/testing'; +import { + MOCK_FILE_URL, + MOCK_MANIFEST, + MOCK_S3_ACCESS_KEY, + MOCK_S3_BUCKET, + MOCK_S3_ENDPOINT, + MOCK_S3_PORT, + MOCK_S3_SECRET_KEY, + MOCK_S3_USE_SSL, +} from '../../../test/constants'; +import { StorageService } from './storage.service'; +import crypto from 'crypto'; +import axios from 'axios'; +import stream from 'stream'; + +jest.mock('@human-protocol/sdk', () => ({ + ...jest.requireActual('@human-protocol/sdk'), + StorageClient: { + downloadFileFromUrl: jest.fn(), + }, +})); + +jest.mock('minio', () => { + class Client { + putObject = jest.fn(); + bucketExists = jest.fn(); + constructor() { + (this as any).protocol = 'http:'; + (this as any).host = 'localhost'; + (this as any).port = 9000; + } + } + + return { Client }; +}); + +jest.mock('axios'); + +describe('Web3Service', () => { + let storageService: StorageService; + + beforeAll(async () => { + const moduleRef = await Test.createTestingModule({ + imports: [ + ConfigModule.forFeature( + registerAs('s3', () => ({ + accessKey: MOCK_S3_ACCESS_KEY, + secretKey: MOCK_S3_SECRET_KEY, + endPoint: MOCK_S3_ENDPOINT, + port: MOCK_S3_PORT, + useSSL: MOCK_S3_USE_SSL, + bucket: MOCK_S3_BUCKET, + })), + ), + ], + providers: [StorageService], + }).compile(); + + storageService = moduleRef.get(StorageService); + }); + + describe('uploadJobSolutions', () => { + it('should upload the solutions correctly', async () => { + const workerAddress = '0x1234567890123456789012345678901234567891'; + const escrowAddress = '0x1234567890123456789012345678901234567890'; + const chainId = ChainId.LOCALHOST; + const solution = 'test'; + + storageService.minioClient.bucketExists = jest + .fn() + .mockResolvedValueOnce(true); + + const jobSolution = { + workerAddress, + solution, + }; + const fileData = await storageService.uploadJobSolutions( + escrowAddress, + chainId, + [jobSolution], + ); + expect(fileData).toEqual({ + url: `http://${MOCK_S3_ENDPOINT}:${MOCK_S3_PORT}/${MOCK_S3_BUCKET}/${escrowAddress}-${chainId}.json`, + hash: crypto + .createHash('sha1') + .update(JSON.stringify([jobSolution])) + .digest('hex'), + }); + expect(storageService.minioClient.putObject).toHaveBeenCalledWith( + MOCK_S3_BUCKET, + `${escrowAddress}-${chainId}.json`, + expect.stringContaining(solution), + { + 'Content-Type': 'application/json', + }, + ); + }); + + it('should fail if the bucket does not exist', async () => { + const workerAddress = '0x1234567890123456789012345678901234567891'; + const escrowAddress = '0x1234567890123456789012345678901234567890'; + const chainId = ChainId.LOCALHOST; + const solution = 'test'; + + storageService.minioClient.bucketExists = jest + .fn() + .mockResolvedValueOnce(false); + + const jobSolution = { + workerAddress, + solution, + }; + await expect( + storageService.uploadJobSolutions(escrowAddress, chainId, [ + jobSolution, + ]), + ).rejects.toThrow('Bucket not found'); + }); + it('should fail if the file cannot be uploaded', async () => { + const workerAddress = '0x1234567890123456789012345678901234567891'; + const escrowAddress = '0x1234567890123456789012345678901234567890'; + const chainId = ChainId.LOCALHOST; + const solution = 'test'; + + storageService.minioClient.bucketExists = jest + .fn() + .mockResolvedValueOnce(true); + storageService.minioClient.putObject = jest + .fn() + .mockRejectedValueOnce('Network error'); + + const jobSolution = { + workerAddress, + solution, + }; + + await expect( + storageService.uploadJobSolutions(escrowAddress, chainId, [ + jobSolution, + ]), + ).rejects.toThrow('File not uploaded'); + }); + }); + + describe('download', () => { + it('should download the file correctly', async () => { + const exchangeAddress = '0x1234567890123456789012345678901234567892'; + const workerAddress = '0x1234567890123456789012345678901234567891'; + const solution = 'test'; + + const expectedJobFile = { + exchangeAddress, + solutions: [ + { + workerAddress, + solution, + }, + ], + }; + + StorageClient.downloadFileFromUrl = jest + .fn() + .mockResolvedValueOnce(expectedJobFile); + const solutionsFile = await storageService.download(MOCK_FILE_URL); + expect(solutionsFile).toBe(expectedJobFile); + }); + + it('should return empty array when file cannot be downloaded', async () => { + StorageClient.downloadFileFromUrl = jest + .fn() + .mockRejectedValue('Network error'); + + const solutionsFile = await storageService.download(MOCK_FILE_URL); + expect(solutionsFile).toStrictEqual([]); + }); + }); + + describe('copyFileFromURLToBucket', () => { + it('should copy a file from a valid URL to a bucket', async () => { + const streamResponseData = new stream.Readable(); + streamResponseData.push(JSON.stringify(MOCK_MANIFEST)); + streamResponseData.push(null); + (axios.get as any).mockResolvedValueOnce({ data: streamResponseData }); + + const uploadedFile = await storageService.copyFileFromURLToBucket( + MOCK_FILE_URL, + ); + + expect( + uploadedFile.url.includes( + `http://${MOCK_S3_ENDPOINT}:${MOCK_S3_PORT}/${MOCK_S3_BUCKET}/`, + ), + ).toBeTruthy(); + expect(uploadedFile.hash).toBeDefined(); + expect(storageService.minioClient.putObject).toBeCalledWith( + MOCK_S3_BUCKET, + expect.any(String), + expect.any(stream), + ); + }); + + it('should handle an invalid URL', async () => { + (axios.get as any).mockRejectedValue('Network error'); + + await expect( + storageService.copyFileFromURLToBucket(MOCK_FILE_URL), + ).rejects.toThrow('File not uploaded'); + }); + + it('should handle errors when copying the file', async () => { + const streamResponseData = new stream.Readable(); + streamResponseData.push(JSON.stringify(MOCK_MANIFEST)); + streamResponseData.push(null); + (axios.get as any).mockResolvedValueOnce({ data: streamResponseData }); + storageService.minioClient.putObject = jest + .fn() + .mockRejectedValue('Network error'); + + await expect( + storageService.copyFileFromURLToBucket( + 'https://example.com/archivo.zip', + ), + ).rejects.toThrow('File not uploaded'); + }); + }); +}); diff --git a/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.ts b/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.ts new file mode 100644 index 0000000000..11237b00da --- /dev/null +++ b/packages/apps/reputation-oracle/server/src/modules/storage/storage.service.ts @@ -0,0 +1,100 @@ +import { ChainId, StorageClient } from '@human-protocol/sdk'; +import { BadRequestException, Inject, Injectable } from '@nestjs/common'; +import * as Minio from 'minio'; +import { S3ConfigType, s3ConfigKey } from '../../common/config'; +import crypto from 'crypto'; +import { UploadedFile } from 'src/common/interfaces/s3'; +import { FortuneFinalResult } from '../webhook/webhook.dto'; +import { PassThrough } from 'stream'; +import axios from 'axios'; +import { Logger } from '@nestjs/common'; +import { hashStream } from '../../common/utils'; + +@Injectable() +export class StorageService { + public readonly minioClient: Minio.Client; + + constructor( + @Inject(s3ConfigKey) + private s3Config: S3ConfigType, + ) { + this.minioClient = new Minio.Client({ + endPoint: this.s3Config.endPoint, + port: this.s3Config.port, + accessKey: this.s3Config.accessKey, + secretKey: this.s3Config.secretKey, + useSSL: this.s3Config.useSSL, + }); + } + public getUrl(key: string): string { + return `${this.s3Config.useSSL ? 'https' : 'http'}://${ + this.s3Config.endPoint + }:${this.s3Config.port}/${this.s3Config.bucket}/${key}`; + } + + public async download(url: string): Promise { + try { + return await StorageClient.downloadFileFromUrl(url); + } catch { + return []; + } + } + + public async uploadJobSolutions( + escrowAddress: string, + chainId: ChainId, + solutions: FortuneFinalResult[], + ): Promise { + if (!(await this.minioClient.bucketExists(this.s3Config.bucket))) { + throw new BadRequestException('Bucket not found'); + } + + const content = JSON.stringify(solutions); + const key = `${escrowAddress}-${chainId}.json`; + try { + const hash = crypto.createHash('sha1').update(content).digest('hex'); + await this.minioClient.putObject( + this.s3Config.bucket, + key, + JSON.stringify(content), + { + 'Content-Type': 'application/json', + }, + ); + + return { url: this.getUrl(key), hash }; + } catch (e) { + throw new BadRequestException('File not uploaded'); + } + } + + /** + * **Copy file from a URL to cloud storage** + * + * @param {string} url - URL of the source file + * @returns {Promise} - Uploaded file with key/hash + */ + public async copyFileFromURLToBucket(url: string): Promise { + try { + const { data } = await axios.get(url, { responseType: 'stream' }); + const stream = new PassThrough(); + data.pipe(stream); + + const hash = await hashStream(data); + const key = `s3${hash}.zip`; + + await this.minioClient.putObject(this.s3Config.bucket, key, stream); + + Logger.log(`File from ${url} copied to ${this.s3Config.bucket}/${key}`); + + return { + url: this.getUrl(key), + hash, + }; + } catch (error) { + Logger.error('Error copying file:', error); + console.log(error); + throw new Error('File not uploaded'); + } + } +} diff --git a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.module.ts b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.module.ts index 6db9f62c22..901751fe8f 100644 --- a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.module.ts +++ b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.module.ts @@ -8,6 +8,7 @@ import { WebhookController } from './webhook.controller'; import { WebhookRepository } from './webhook.repository'; import { ReputationModule } from '../reputation/reputation.module'; import { Web3Module } from '../web3/web3.module'; +import { StorageModule } from '../storage/storage.module'; @Module({ imports: [ @@ -15,6 +16,7 @@ import { Web3Module } from '../web3/web3.module'; ConfigModule, ReputationModule, Web3Module, + StorageModule, ], controllers: [WebhookController], providers: [Logger, WebhookService, WebhookRepository], diff --git a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.spec.ts b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.spec.ts index a6ddf1b182..13f2a36da7 100644 --- a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.spec.ts +++ b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.spec.ts @@ -1,7 +1,7 @@ import { Test } from '@nestjs/testing'; import { WebhookService } from './webhook.service'; import { WebhookRepository } from './webhook.repository'; -import { ConfigService } from '@nestjs/config'; +import { ConfigModule, ConfigService, registerAs } from '@nestjs/config'; import { HttpService } from '@nestjs/axios'; import { createMock } from '@golevelup/ts-jest'; import { ReputationService } from '../reputation/reputation.service'; @@ -25,7 +25,6 @@ import { } from '../../common/enums'; import { MOCK_ADDRESS, - MOCK_BUCKET_NAME, MOCK_EXCHANGE_ORACLE_ADDRESS, MOCK_FILE_HASH, MOCK_FILE_KEY, @@ -35,10 +34,17 @@ import { MOCK_RECORDING_ORACLE_ADDRESS, MOCK_REQUESTER_DESCRIPTION, MOCK_REQUESTER_TITLE, + MOCK_S3_ACCESS_KEY, + MOCK_S3_BUCKET, + MOCK_S3_ENDPOINT, + MOCK_S3_PORT, + MOCK_S3_SECRET_KEY, + MOCK_S3_USE_SSL, } from '../../../test/constants'; import { WebhookStatus } from '../../common/enums'; import { RETRIES_COUNT_THRESHOLD } from '../../common/constants'; import { Web3Service } from '../web3/web3.service'; +import { StorageService } from '../storage/storage.service'; jest.mock('@human-protocol/sdk', () => ({ ...jest.requireActual('@human-protocol/sdk'), @@ -61,17 +67,25 @@ jest.mock('@human-protocol/sdk', () => ({ })), })); -jest.mock('../../common/utils', () => ({ - ...jest.requireActual('../../common/utils'), - copyFileFromURLToBucket: jest.fn().mockImplementation(() => { - return { url: MOCK_FILE_URL, hash: MOCK_FILE_HASH }; - }), -})); +jest.mock('minio', () => { + class Client { + putObject = jest.fn(); + bucketExists = jest.fn().mockResolvedValue(true); + constructor() { + (this as any).protocol = 'http:'; + (this as any).host = MOCK_S3_ENDPOINT; + (this as any).port = MOCK_S3_PORT; + } + } + + return { Client }; +}); describe('WebhookService', () => { let webhookService: WebhookService, webhookRepository: WebhookRepository, - reputationService: ReputationService; + reputationService: ReputationService, + storageService: StorageService; const signerMock = { address: MOCK_ADDRESS, @@ -82,25 +96,29 @@ describe('WebhookService', () => { const mockConfigService: Partial = { get: jest.fn((key: string) => { switch (key) { - case 'S3_ENDPOINT': - return '127.0.0.1'; - case 'S3_PORT': - return 9000; - case 'S3_USE_SSL': - return false; case 'HOST': return '127.0.0.1'; case 'PORT': return 5000; case 'WEB3_PRIVATE_KEY': return MOCK_PRIVATE_KEY; - case 'S3_BUCKET': - return MOCK_BUCKET_NAME; } }), }; const moduleRef = await Test.createTestingModule({ + imports: [ + ConfigModule.forFeature( + registerAs('s3', () => ({ + accessKey: MOCK_S3_ACCESS_KEY, + secretKey: MOCK_S3_SECRET_KEY, + endPoint: MOCK_S3_ENDPOINT, + port: MOCK_S3_PORT, + useSSL: MOCK_S3_USE_SSL, + bucket: MOCK_S3_BUCKET, + })), + ), + ], providers: [ WebhookService, { @@ -109,6 +127,7 @@ describe('WebhookService', () => { getSigner: jest.fn().mockReturnValue(signerMock), }, }, + StorageService, ReputationService, { provide: ReputationRepository, @@ -126,6 +145,7 @@ describe('WebhookService', () => { webhookService = moduleRef.get(WebhookService); webhookRepository = moduleRef.get(WebhookRepository); reputationService = moduleRef.get(ReputationService); + storageService = moduleRef.get(StorageService); }); afterEach(() => { @@ -344,26 +364,23 @@ describe('WebhookService', () => { }, ]; - const intermediateResultsUrl = MOCK_FILE_URL; + const escrowAddress = MOCK_ADDRESS; + const chainId = ChainId.LOCALHOST; jest .spyOn(webhookService as any, 'getIntermediateResults') .mockResolvedValue(intermediateResults); - jest - .spyOn(webhookService.storageClient, 'uploadFiles') - .mockResolvedValue([ - { url: MOCK_FILE_URL, hash: MOCK_FILE_HASH }, - ] as any); const result = await webhookService.processFortune( manifest, - intermediateResultsUrl, + chainId, + escrowAddress, ); expect(result).toEqual({ recipients: expect.any(Array), amounts: expect.any(Array), - url: MOCK_FILE_URL, - hash: MOCK_FILE_HASH, + url: storageService.getUrl(`${escrowAddress}-${chainId}.json`), + hash: expect.any(String), checkPassed: expect.any(Boolean), }); }); @@ -384,19 +401,20 @@ describe('WebhookService', () => { }, ]; - const intermediateResultsUrl = MOCK_FILE_URL; + const escrowAddress = MOCK_ADDRESS; + const chainId = ChainId.LOCALHOST; jest .spyOn(webhookService as any, 'getIntermediateResults') .mockResolvedValue(intermediateResults); - jest - .spyOn(webhookService.storageClient, 'uploadFiles') - .mockResolvedValue([ - { url: MOCK_FILE_URL, hash: MOCK_FILE_HASH }, - ] as any); + // jest + // .spyOn(webhookService.storageClient, 'uploadFiles') + // .mockResolvedValue([ + // { url: MOCK_FILE_URL, hash: MOCK_FILE_HASH }, + // ] as any); await expect( - webhookService.processFortune(manifest, intermediateResultsUrl), + webhookService.processFortune(manifest, chainId, escrowAddress), ).rejects.toThrowError(ErrorResults.NotAllRequiredSolutionsHaveBeenSent); }); }); @@ -422,7 +440,8 @@ describe('WebhookService', () => { }; it('should successfully process and return correct result values', async () => { - const intermediateResultsUrl = MOCK_FILE_URL; + const escrowAddress = MOCK_ADDRESS; + const chainId = ChainId.LOCALHOST; StorageClient.downloadFileFromUrl = jest.fn().mockReturnValueOnce({ jobs: [ { @@ -442,9 +461,14 @@ describe('WebhookService', () => { ], }); + jest + .spyOn(storageService, 'copyFileFromURLToBucket') + .mockResolvedValue({ url: MOCK_FILE_URL, hash: MOCK_FILE_HASH }); + const result = await webhookService.processCvat( manifest as any, - intermediateResultsUrl, + chainId, + escrowAddress, ); expect(result).toEqual({ recipients: expect.any(Array), diff --git a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.ts b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.ts index 9801842ae7..db4c827226 100644 --- a/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.ts +++ b/packages/apps/reputation-oracle/server/src/modules/webhook/webhook.service.ts @@ -1,18 +1,12 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ import { BadRequestException, + Inject, Injectable, Logger, NotFoundException, } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; -import { - ChainId, - EscrowClient, - StorageClient, - StorageCredentials, - StorageParams, -} from '@human-protocol/sdk'; +import { ChainId, EscrowClient, StorageClient } from '@human-protocol/sdk'; import { WebhookIncomingEntity } from './webhook-incoming.entity'; import { CvatAnnotationMeta, @@ -38,7 +32,6 @@ import { import { ReputationService } from '../reputation/reputation.service'; import { BigNumber, ethers } from 'ethers'; import { Web3Service } from '../web3/web3.service'; -import { ConfigNames } from '../../common/config'; import { EventType, SolutionError, @@ -47,44 +40,19 @@ import { } from '../../common/enums'; import { JobRequestType } from '../../common/enums'; import { ReputationEntityType } from '../../common/enums'; -import { copyFileFromURLToBucket } from '../../common/utils'; import { LessThanOrEqual } from 'typeorm'; +import { StorageService } from '../storage/storage.service'; @Injectable() export class WebhookService { private readonly logger = new Logger(WebhookService.name); - public readonly storageClient: StorageClient; - public readonly storageParams: StorageParams; - public readonly storageCredentials: StorageCredentials; - public readonly bucket: string; - constructor( private readonly web3Service: Web3Service, + @Inject(StorageService) + private readonly storageService: StorageService, private readonly webhookRepository: WebhookRepository, private readonly reputationService: ReputationService, - private readonly configService: ConfigService, - ) { - this.storageCredentials = { - accessKey: this.configService.get(ConfigNames.S3_ACCESS_KEY)!, - secretKey: this.configService.get(ConfigNames.S3_SECRET_KEY)!, - }; - - const useSSL = - this.configService.get(ConfigNames.S3_USE_SSL) === 'true'; - - this.storageParams = { - endPoint: this.configService.get(ConfigNames.S3_ENDPOINT)!, - port: Number(this.configService.get(ConfigNames.S3_PORT)!), - useSSL, - }; - - this.bucket = this.configService.get(ConfigNames.S3_BUCKET)!; - - this.storageClient = new StorageClient( - this.storageParams, - this.storageCredentials, - ); - } + ) {} /** * Create a incoming webhook using the DTO data. @@ -157,11 +125,7 @@ export class WebhookService { } const manifest: FortuneManifestDto | CvatManifestDto = - await StorageClient.downloadFileFromUrl(manifestUrl); - const intermediateResultsUrl = await this.getIntermediateResultsUrl( - chainId, - escrowAddress, - ); + await this.storageService.download(manifestUrl); let results: { recipients: string[]; @@ -176,14 +140,16 @@ export class WebhookService { ) { results = await this.processFortune( manifest as FortuneManifestDto, - intermediateResultsUrl, + chainId, + escrowAddress, ); } else if ( CVAT_JOB_TYPES.includes((manifest as CvatManifestDto).annotation.type) ) { results = await this.processCvat( manifest as CvatManifestDto, - intermediateResultsUrl, + chainId, + escrowAddress, ); } else { this.logger.log( @@ -226,8 +192,13 @@ export class WebhookService { */ public async processFortune( manifest: FortuneManifestDto, - intermediateResultsUrl: string, + chainId: ChainId, + escrowAddress: string, ): Promise { + const intermediateResultsUrl = await this.getIntermediateResultsUrl( + chainId, + escrowAddress, + ); const intermediateResults = (await this.getIntermediateResults( intermediateResultsUrl, )) as FortuneFinalResult[]; @@ -241,9 +212,10 @@ export class WebhookService { throw new Error(ErrorResults.NotAllRequiredSolutionsHaveBeenSent); } - const [{ url, hash }] = await this.storageClient.uploadFiles( - [intermediateResults], - this.bucket, + const { url, hash } = await this.storageService.uploadJobSolutions( + escrowAddress, + chainId, + intermediateResults, ); const recipients = intermediateResults @@ -266,18 +238,19 @@ export class WebhookService { */ public async processCvat( manifest: CvatManifestDto, - intermediateResultsUrl: string, + chainId: ChainId, + escrowAddress: string, ): Promise { - const { url, hash } = await copyFileFromURLToBucket( + const intermediateResultsUrl = await this.getIntermediateResultsUrl( + chainId, + escrowAddress, + ); + const { url, hash } = await this.storageService.copyFileFromURLToBucket( `${intermediateResultsUrl}/${CVAT_RESULTS_ANNOTATIONS_FILENAME}`, - this.bucket, - this.storageParams, - this.storageCredentials, ); - const annotations: CvatAnnotationMeta = - await StorageClient.downloadFileFromUrl( - `${intermediateResultsUrl}/${CVAT_VALIDATION_META_FILENAME}`, - ); + const annotations: CvatAnnotationMeta = await this.storageService.download( + `${intermediateResultsUrl}/${CVAT_VALIDATION_META_FILENAME}`, + ); const bountyValue = ethers.utils.parseUnits(manifest.job_bounty, 18); const accumulatedBounties = annotations.results.reduce((accMap, curr) => { @@ -369,9 +342,9 @@ export class WebhookService { private async getIntermediateResults( url: string, ): Promise { - const intermediateResults = await StorageClient.downloadFileFromUrl( - url, - ).catch(() => []); + const intermediateResults = await this.storageService + .download(url) + .catch(() => []); if (intermediateResults.length === 0) { this.logger.log( @@ -422,7 +395,7 @@ export class WebhookService { } const manifest: FortuneManifestDto | CvatManifestDto = - await StorageClient.downloadFileFromUrl(manifestUrl); + await this.storageService.download(manifestUrl); let decreaseExchangeReputation = false; if ( @@ -432,9 +405,9 @@ export class WebhookService { webhookEntity.escrowAddress, ); - const finalResults = await StorageClient.downloadFileFromUrl( - finalResultsUrl, - ).catch(() => []); + const finalResults = await this.storageService + .download(finalResultsUrl) + .catch(() => []); if (finalResults.length === 0) { this.logger.log( diff --git a/packages/apps/reputation-oracle/server/test/constants.ts b/packages/apps/reputation-oracle/server/test/constants.ts index d4fe3946a3..ee26399e94 100644 --- a/packages/apps/reputation-oracle/server/test/constants.ts +++ b/packages/apps/reputation-oracle/server/test/constants.ts @@ -177,7 +177,6 @@ export const MOCK_IMAGE_BINARY_LABEL_JOB_RESULTS: ImageLabelBinaryJobResults = { }, ], }; -export const MOCK_BUCKET_NAME = 'bucket-name'; export const MOCK_PRIVATE_KEY = 'd334daf65a631f40549cc7de126d5a0016f32a2d00c49f94563f9737f7135e55'; export const MOCK_EMAIL = 'test@example.com'; @@ -192,3 +191,10 @@ export const MOCK_SENDGRID_API_KEY = 'SG.xxxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'; export const MOCK_SENDGRID_FROM_EMAIL = 'info@hmt.ai'; export const MOCK_SENDGRID_FROM_NAME = 'John Doe'; + +export const MOCK_S3_ENDPOINT = 'localhost'; +export const MOCK_S3_PORT = 9000; +export const MOCK_S3_ACCESS_KEY = 'access_key'; +export const MOCK_S3_SECRET_KEY = 'secret_key'; +export const MOCK_S3_BUCKET = 'solution'; +export const MOCK_S3_USE_SSL = false;