Skip to content

Commit

Permalink
Add storage module
Browse files Browse the repository at this point in the history
  • Loading branch information
eugenvoronov committed Nov 3, 2023
1 parent e316f1d commit 0eabfd3
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 0 deletions.
13 changes: 13 additions & 0 deletions packages/apps/job-launcher/server/src/common/config/s3.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { ConfigType, registerAs } from '@nestjs/config';

export const s3Config = registerAs('s3', () => ({
endPoint: process.env.S3_ENDPOINT!,
port: +process.env.S3_PORT!,
accessKey: process.env.S3_ACCESS_KEY!,
secretKey: process.env.S3_SECRET_KEY!,
bucket: process.env.S3_BUCKET!,
useSSL: process.env.S3_USE_SSL === 'true',
}));

export const s3ConfigKey = s3Config.KEY;
export type S3ConfigType = ConfigType<typeof s3Config>;
4 changes: 4 additions & 0 deletions packages/apps/job-launcher/server/src/common/interfaces/s3.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export class UploadedFile {
public url: string;
public hash: string;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

import { Module } from '@nestjs/common';
import { StorageService } from './storage.service';
import { ConfigModule } from '@nestjs/config';
import { s3Config } from '../../common/config';

@Module({
imports: [ConfigModule.forFeature(s3Config)],
providers: [StorageService],
exports: [StorageService],
})
export class StorageModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { ChainId, StorageClient } from '@human-protocol/sdk';
import { ConfigModule, registerAs } from '@nestjs/config';
import { Test } from '@nestjs/testing';
import {
MOCK_FILE_URL,
MOCK_MANIFEST,
MOCK_S3_ACCESS_KEY,
MOCK_S3_BUCKET,
MOCK_S3_ENDPOINT,
MOCK_S3_PORT,
MOCK_S3_SECRET_KEY,
MOCK_S3_USE_SSL,
} from '../../../test/constants';
import { StorageService } from './storage.service';
import crypto from 'crypto';
import axios from 'axios';
import stream from 'stream';

jest.mock('@human-protocol/sdk', () => ({
...jest.requireActual('@human-protocol/sdk'),
StorageClient: {
downloadFileFromUrl: jest.fn(),
},
}));

jest.mock('minio', () => {
class Client {
putObject = jest.fn();
bucketExists = jest.fn();
constructor() {
(this as any).protocol = 'http:';
(this as any).host = 'localhost';
(this as any).port = 9000;
}
}

return { Client };
});

jest.mock('axios');

describe('Web3Service', () => {
let storageService: StorageService;

beforeAll(async () => {
const moduleRef = await Test.createTestingModule({
imports: [
ConfigModule.forFeature(
registerAs('s3', () => ({
accessKey: MOCK_S3_ACCESS_KEY,
secretKey: MOCK_S3_SECRET_KEY,
endPoint: MOCK_S3_ENDPOINT,
port: MOCK_S3_PORT,
useSSL: MOCK_S3_USE_SSL,
bucket: MOCK_S3_BUCKET,
})),
),
],
providers: [StorageService],
}).compile();

storageService = moduleRef.get<StorageService>(StorageService);
});

describe('uploadManifest', () => {
it('should upload the manifest correctly', async () => {
storageService.minioClient.bucketExists = jest
.fn()
.mockResolvedValueOnce(true);

const hash = crypto.createHash('sha1').update(JSON.stringify(MOCK_MANIFEST)).digest('hex');

const fileData = await storageService.uploadManifest(MOCK_MANIFEST);
expect(fileData).toEqual({
url: `http://${MOCK_S3_ENDPOINT}:${MOCK_S3_PORT}/${MOCK_S3_BUCKET}/s3${hash}.json`,
hash: crypto
.createHash('sha1')
.update(JSON.stringify(MOCK_MANIFEST))
.digest('hex'),
});
expect(storageService.minioClient.putObject).toHaveBeenCalledWith(
MOCK_S3_BUCKET,
`s3${hash}.json`,
expect.any(String),
{
'Content-Type': 'application/json',
},
);
});

it('should fail if the bucket does not exist', async () => {
storageService.minioClient.bucketExists = jest
.fn()
.mockResolvedValueOnce(false);

await expect(
storageService.uploadManifest(MOCK_MANIFEST),
).rejects.toThrow('Bucket not found');
});

it('should fail if the file cannot be uploaded', async () => {
storageService.minioClient.bucketExists = jest
.fn()
.mockResolvedValueOnce(true);
storageService.minioClient.putObject = jest
.fn()
.mockRejectedValueOnce('Network error');

await expect(
storageService.uploadManifest(MOCK_MANIFEST),
).rejects.toThrow('File not uploaded');
});
});

describe('download', () => {
it('should download the file correctly', async () => {
const exchangeAddress = '0x1234567890123456789012345678901234567892';
const workerAddress = '0x1234567890123456789012345678901234567891';
const solution = 'test';

const expectedJobFile = {
exchangeAddress,
solutions: [
{
workerAddress,
solution,
},
],
};

StorageClient.downloadFileFromUrl = jest
.fn()
.mockResolvedValueOnce(expectedJobFile);
const solutionsFile = await storageService.download(MOCK_FILE_URL);
expect(solutionsFile).toBe(expectedJobFile);
});

it('should return empty array when file cannot be downloaded', async () => {
StorageClient.downloadFileFromUrl = jest
.fn()
.mockRejectedValue('Network error');

const solutionsFile = await storageService.download(MOCK_FILE_URL);
expect(solutionsFile).toStrictEqual([]);
});
});

describe('copyFileFromURLToBucket', () => {
it('should copy a file from a valid URL to a bucket', async () => {
const streamResponseData = new stream.Readable();
streamResponseData.push(JSON.stringify(MOCK_MANIFEST));
streamResponseData.push(null);
(axios.get as any).mockResolvedValueOnce({ data: streamResponseData });

const uploadedFile = await storageService.copyFileFromURLToBucket(
MOCK_FILE_URL,
);

expect(
uploadedFile.url.includes(
`http://${MOCK_S3_ENDPOINT}:${MOCK_S3_PORT}/${MOCK_S3_BUCKET}/`,
),
).toBeTruthy();
expect(uploadedFile.hash).toBeDefined();
expect(storageService.minioClient.putObject).toBeCalledWith(
MOCK_S3_BUCKET,
expect.any(String),
expect.any(stream),
);
});

it('should handle an invalid URL', async () => {
(axios.get as any).mockRejectedValue('Network error');

await expect(
storageService.copyFileFromURLToBucket(MOCK_FILE_URL),
).rejects.toThrow('File not uploaded');
});

it('should handle errors when copying the file', async () => {
const streamResponseData = new stream.Readable();
streamResponseData.push(JSON.stringify(MOCK_MANIFEST));
streamResponseData.push(null);
(axios.get as any).mockResolvedValueOnce({ data: streamResponseData });
storageService.minioClient.putObject = jest
.fn()
.mockRejectedValue('Network error');

await expect(
storageService.copyFileFromURLToBucket(
'https://example.com/archivo.zip',
),
).rejects.toThrow('File not uploaded');
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { ChainId, StorageClient } from '@human-protocol/sdk';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import * as Minio from 'minio';
import { S3ConfigType, s3ConfigKey } from '../../common/config';
import crypto from 'crypto';
import { UploadedFile } from '../../common/interfaces/s3';
import { PassThrough } from 'stream';
import axios from 'axios';
import { Logger } from '@nestjs/common';
import { hashStream } from '../../common/utils';
import { CvatManifestDto, FortuneManifestDto } from '../job/job.dto';

@Injectable()
export class StorageService {
public readonly minioClient: Minio.Client;

constructor(
@Inject(s3ConfigKey)
private s3Config: S3ConfigType,
) {
this.minioClient = new Minio.Client({
endPoint: this.s3Config.endPoint,
port: this.s3Config.port,
accessKey: this.s3Config.accessKey,
secretKey: this.s3Config.secretKey,
useSSL: this.s3Config.useSSL,
});
}
public getUrl(key: string): string {
return `${this.s3Config.useSSL ? 'https' : 'http'}://${
this.s3Config.endPoint
}:${this.s3Config.port}/${this.s3Config.bucket}/${key}`;
}

public async download(url: string): Promise<any> {
try {
return await StorageClient.downloadFileFromUrl(url);
} catch {
return [];
}
}

public async uploadManifest(
manifest: FortuneManifestDto | CvatManifestDto | string
): Promise<UploadedFile> {
if (!(await this.minioClient.bucketExists(this.s3Config.bucket))) {
throw new BadRequestException('Bucket not found');
}

const isString = typeof manifest === "string";

const contentType = isString ? 'text/plain' : 'application/json'

const content = isString ? manifest : JSON.stringify(manifest);

const hash = crypto.createHash('sha1').update(content).digest('hex');
const key = isString ? `s3${hash}`: `s3${hash}.json`;

try {
const hash = crypto.createHash('sha1').update(content).digest('hex');
await this.minioClient.putObject(
this.s3Config.bucket,
key,
JSON.stringify(content),
{
'Content-Type': contentType,
},
);

return { url: this.getUrl(key), hash };
} catch (e) {
throw new BadRequestException('File not uploaded');
}
}

/**
* **Copy file from a URL to cloud storage**
*
* @param {string} url - URL of the source file
* @returns {Promise<UploadedFile>} - Uploaded file with key/hash
*/
public async copyFileFromURLToBucket(url: string): Promise<UploadedFile> {
try {
const { data } = await axios.get(url, { responseType: 'stream' });
const stream = new PassThrough();
data.pipe(stream);

const hash = await hashStream(data);
const key = `s3${hash}.zip`;

await this.minioClient.putObject(this.s3Config.bucket, key, stream);

Logger.log(`File from ${url} copied to ${this.s3Config.bucket}/${key}`);

return {
url: this.getUrl(key),
hash,
};
} catch (error) {
Logger.error('Error copying file:', error);
console.log(error);
throw new Error('File not uploaded');
}
}
}

0 comments on commit 0eabfd3

Please sign in to comment.