Skip to content

Commit

Permalink
feat(north-cache): Add actions to file tables
Browse files Browse the repository at this point in the history
  • Loading branch information
nagyszabi authored and burgerni10 committed Dec 20, 2023
1 parent 098c7d4 commit 9026e8f
Show file tree
Hide file tree
Showing 33 changed files with 816 additions and 31 deletions.
18 changes: 18 additions & 0 deletions backend/src/engine/oibus-engine.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,17 @@ const createdNorth = {
setLogger: jest.fn(),
updateScanMode: jest.fn(),
getErrorFiles: jest.fn(),
getErrorFileContent: jest.fn(),
removeErrorFiles: jest.fn(),
retryErrorFiles: jest.fn(),
removeAllErrorFiles: jest.fn(),
retryAllErrorFiles: jest.fn(),
getCacheFiles: jest.fn(),
getCacheFileContent: jest.fn(),
removeCacheFiles: jest.fn(),
archiveCacheFiles: jest.fn(),
getArchiveFiles: jest.fn(),
getArchiveFileContent: jest.fn(),
removeArchiveFiles: jest.fn(),
retryArchiveFiles: jest.fn(),
removeAllArchiveFiles: jest.fn(),
Expand Down Expand Up @@ -289,6 +292,11 @@ describe('OIBusEngine', () => {
await engine.getErrorFiles(northConnectors[1].id, '2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');
expect(createdNorth.getErrorFiles).toHaveBeenCalledWith('2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');

await engine.getErrorFileContent('northId', 'file1');
expect(createdNorth.getErrorFileContent).not.toHaveBeenCalled();
await engine.getErrorFileContent(northConnectors[1].id, 'file1');
expect(createdNorth.getErrorFileContent).toHaveBeenCalledWith('file1');

await engine.retryErrorFiles('northId', ['file1']);
expect(createdNorth.retryErrorFiles).not.toHaveBeenCalled();
await engine.retryErrorFiles(northConnectors[1].id, ['file1']);
Expand All @@ -314,6 +322,11 @@ describe('OIBusEngine', () => {
await engine.getCacheFiles(northConnectors[1].id, '2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');
expect(createdNorth.getCacheFiles).toHaveBeenCalledWith('2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');

await engine.getCacheFileContent('northId', 'file1');
expect(createdNorth.getCacheFileContent).not.toHaveBeenCalled();
await engine.getCacheFileContent(northConnectors[1].id, 'file1');
expect(createdNorth.getCacheFileContent).toHaveBeenCalledWith('file1');

await engine.removeCacheFiles('northId', ['file1']);
expect(createdNorth.removeCacheFiles).not.toHaveBeenCalled();
await engine.removeCacheFiles(northConnectors[1].id, ['file1']);
Expand All @@ -329,6 +342,11 @@ describe('OIBusEngine', () => {
await engine.getArchiveFiles(northConnectors[1].id, '2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');
expect(createdNorth.getArchiveFiles).toHaveBeenCalledWith('2020-02-02T02:02:02.222Z', '2022-02-02T02:02:02.222Z', '');

await engine.getArchiveFileContent('northId', 'file1');
expect(createdNorth.getArchiveFileContent).not.toHaveBeenCalled();
await engine.getArchiveFileContent(northConnectors[1].id, 'file1');
expect(createdNorth.getArchiveFileContent).toHaveBeenCalledWith('file1');

await engine.retryArchiveFiles('northId', ['file1']);
expect(createdNorth.retryArchiveFiles).not.toHaveBeenCalled();
await engine.retryArchiveFiles(northConnectors[1].id, ['file1']);
Expand Down
12 changes: 12 additions & 0 deletions backend/src/engine/oibus-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ export default class OIBusEngine extends BaseEngine {
return (await this.northConnectors.get(northId)?.getErrorFiles(start, end, fileNameContains)) || [];
}

async getErrorFileContent(northId: string, filename: string) {
return (await this.northConnectors.get(northId)?.getErrorFileContent(filename)) || null;
}

async removeErrorFiles(northId: string, filenames: Array<string>): Promise<void> {
await this.northConnectors.get(northId)?.removeErrorFiles(filenames);
}
Expand All @@ -285,6 +289,10 @@ export default class OIBusEngine extends BaseEngine {
return (await this.northConnectors.get(northId)?.getCacheFiles(start, end, fileNameContains)) || [];
}

async getCacheFileContent(northId: string, filename: string) {
return (await this.northConnectors.get(northId)?.getCacheFileContent(filename)) || null;
}

async removeCacheFiles(northId: string, filenames: Array<string>): Promise<void> {
await this.northConnectors.get(northId)?.removeCacheFiles(filenames);
}
Expand All @@ -297,6 +305,10 @@ export default class OIBusEngine extends BaseEngine {
return (await this.northConnectors.get(northId)?.getArchiveFiles(start, end, fileNameContains)) || [];
}

async getArchiveFileContent(northId: string, filename: string) {
return (await this.northConnectors.get(northId)?.getArchiveFileContent(filename)) || null;
}

async removeArchiveFiles(northId: string, filenames: Array<string>): Promise<void> {
await this.northConnectors.get(northId)?.removeArchiveFiles(filenames);
}
Expand Down
21 changes: 21 additions & 0 deletions backend/src/north/north-connector.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const retryAllErrorValues = jest.fn();
const valueTrigger = new EventEmitter();
const fileTrigger = new EventEmitter();
const archiveTrigger = new EventEmitter();
const getErrorFileContent = jest.fn();
const getCacheFileContent = jest.fn();
const getArchiveFileContent = jest.fn();

// Mock services
jest.mock('../service/repository.service');
Expand Down Expand Up @@ -75,6 +78,8 @@ jest.mock(
fileCacheServiceMock.getFileToSend = getFileToSend;
fileCacheServiceMock.isEmpty = fileCacheIsEmpty;
fileCacheServiceMock.triggerRun = fileTrigger;
fileCacheServiceMock.getErrorFileContent = getErrorFileContent;
fileCacheServiceMock.getCacheFileContent = getCacheFileContent;
return fileCacheServiceMock;
}
);
Expand All @@ -84,6 +89,7 @@ jest.mock(
function () {
const archiveServiceMock = new ArchiveServiceMock();
archiveServiceMock.triggerRun = archiveTrigger;
archiveServiceMock.getArchiveFileContent = getArchiveFileContent;
return archiveServiceMock;
}
);
Expand Down Expand Up @@ -567,6 +573,21 @@ describe('NorthConnector enabled', () => {
expect(retryAllErrorValues).toHaveBeenCalled();
expect(logger.trace).toHaveBeenCalledWith(`Retrying all value error files in North connector "${configuration.name}"...`);
});

it('should get error file content', async () => {
await north.getErrorFileContent('file1.queue.tmp');
expect(getErrorFileContent).toHaveBeenCalledWith('file1.queue.tmp');
});

it('should get cache file content', async () => {
await north.getCacheFileContent('file1.queue.tmp');
expect(getCacheFileContent).toHaveBeenCalledWith('file1.queue.tmp');
});

it('should get archive file content', async () => {
await north.getArchiveFileContent('file1.queue.tmp');
expect(getArchiveFileContent).toHaveBeenCalledWith('file1.queue.tmp');
});
});

describe('NorthConnector disabled', () => {
Expand Down
22 changes: 22 additions & 0 deletions backend/src/north/north-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { OIBusDataValue, OIBusError } from '../../../shared/model/engine.model';
import { ExternalSubscriptionDTO, SubscriptionDTO } from '../../../shared/model/subscription.model';
import { DateTime } from 'luxon';
import { PassThrough } from 'node:stream';
import { ReadStream } from 'node:fs';
import path from 'node:path';
import { HandlesFile, HandlesValues } from './north-interface';
import NorthConnectorMetricsService from '../service/north-connector-metrics.service';
Expand Down Expand Up @@ -431,6 +432,13 @@ export default class NorthConnector<T extends NorthSettings = any> {
return await this.fileCacheService.getErrorFiles(fromDate, toDate, fileNameContains);
}

/**
* Get error file content as a read stream.
*/
async getErrorFileContent(filename: string): Promise<ReadStream | null> {
return await this.fileCacheService.getErrorFileContent(filename);
}

/**
* Remove error files from file cache.
*/
Expand Down Expand Up @@ -470,6 +478,13 @@ export default class NorthConnector<T extends NorthSettings = any> {
return await this.fileCacheService.getCacheFiles(fromDate, toDate, fileNameContains);
}

/**
* Get cache file content as a read stream.
*/
async getCacheFileContent(filename: string): Promise<ReadStream | null> {
return await this.fileCacheService.getCacheFileContent(filename);
}

/**
* Remove cache files.
*/
Expand All @@ -496,6 +511,13 @@ export default class NorthConnector<T extends NorthSettings = any> {
return await this.archiveService.getArchiveFiles(fromDate, toDate, fileNameContains);
}

/**
* Get archive file content as a read stream.
*/
async getArchiveFileContent(filename: string): Promise<ReadStream | null> {
return await this.archiveService.getArchiveFileContent(filename);
}

/**
* Remove archive files from file cache.
*/
Expand Down
24 changes: 24 additions & 0 deletions backend/src/service/cache/archive.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'node:path';
import fs from 'node:fs/promises';
import { createReadStream } from 'node:fs';

import ArchiveService from './archive.service';

Expand All @@ -12,6 +13,7 @@ import { NorthArchiveSettings } from '../../../../shared/model/north-connector.m

jest.mock('../../service/utils');
jest.mock('node:fs/promises');
jest.mock('node:fs');

const logger: pino.Logger = new PinoLogger();
const anotherLogger: pino.Logger = new PinoLogger();
Expand Down Expand Up @@ -247,6 +249,28 @@ describe('ArchiveService', () => {

removeFilesSpy.mockRestore();
});

it('should properly get archived file content', async () => {
const filename = 'myFile.csv';
(createReadStream as jest.Mock).mockImplementation(() => {});
await archiveService.getArchiveFileContent(filename);

expect(createReadStream).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'archive', filename));
});

it('should handle error while getting archived file content', async () => {
const filename = 'myFile.csv';
const error = new Error('file does not exist');
(fs.stat as jest.Mock).mockImplementation(() => {
throw error;
});
const readStream = await archiveService.getArchiveFileContent(filename);

expect(readStream).toBeNull();
expect(logger.error).toHaveBeenCalledWith(
`Error while reading file "${path.resolve('myCacheFolder', 'archive', filename)}": ${error}`
);
});
});

describe('with disabled service', () => {
Expand Down
14 changes: 14 additions & 0 deletions backend/src/service/cache/archive.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from 'node:fs/promises';
import { createReadStream, ReadStream } from 'node:fs';
import path from 'node:path';

import { createFolder } from '../utils';
Expand Down Expand Up @@ -162,6 +163,19 @@ export default class ArchiveService {
return filteredFilenames;
}

/**
* Get archive file content.
*/
async getArchiveFileContent(filename: string): Promise<ReadStream | null> {
try {
await fs.stat(path.join(this.archiveFolder, filename));
} catch (error) {
this._logger.error(`Error while reading file "${path.join(this.archiveFolder, filename)}": ${error}`);
return null;
}
return createReadStream(path.join(this.archiveFolder, filename));
}

/**
* Remove archive files.
*/
Expand Down
42 changes: 42 additions & 0 deletions backend/src/service/cache/file-cache.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'node:path';
import fs from 'node:fs/promises';
import { createReadStream } from 'node:fs';

import FileCache from './file-cache.service';
import { createFolder, getFilesFiltered } from '../utils';
Expand All @@ -8,6 +9,7 @@ import PinoLogger from '../../tests/__mocks__/logger.mock';
import { NorthCacheSettingsDTO } from '../../../../shared/model/north-connector.model';

jest.mock('node:fs/promises');
jest.mock('node:fs');
jest.mock('../../service/utils');

const logger: pino.Logger = new PinoLogger();
Expand Down Expand Up @@ -337,4 +339,44 @@ describe('FileCache', () => {
cache.settings = otherSettings;
await cache.cacheFile('myFile.csv');
});

it('should properly get error file content', async () => {
const filename = 'myFile.csv';
(createReadStream as jest.Mock).mockImplementation(() => {});
await cache.getErrorFileContent(filename);

expect(createReadStream).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files-errors', filename));
});

it('should handle error when getting error file content', async () => {
const filename = 'myFile.csv';
const error = new Error('file does not exist');
(fs.stat as jest.Mock).mockImplementation(() => {
throw error;
});
await cache.getErrorFileContent(filename);

expect(logger.error).toHaveBeenCalledWith(
`Error while reading file "${path.resolve('myCacheFolder', 'files-errors', filename)}": ${error}`
);
});

it('should properly get cache file content', async () => {
const filename = 'myFile.csv';
(createReadStream as jest.Mock).mockImplementation(() => {});
await cache.getCacheFileContent(filename);

expect(createReadStream).toHaveBeenCalledWith(path.resolve('myCacheFolder', 'files', filename));
});

it('should handle error when getting cache file content', async () => {
const filename = 'myFile.csv';
const error = new Error('file does not exist');
(fs.stat as jest.Mock).mockImplementation(() => {
throw error;
});
await cache.getCacheFileContent(filename);

expect(logger.error).toHaveBeenCalledWith(`Error while reading file "${path.resolve('myCacheFolder', 'files', filename)}": ${error}`);
});
});
28 changes: 28 additions & 0 deletions backend/src/service/cache/file-cache.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from 'node:fs/promises';
import { createReadStream, ReadStream } from 'node:fs';
import path from 'node:path';

import { createFolder, getFilesFiltered } from '../utils';
Expand Down Expand Up @@ -178,6 +179,19 @@ export default class FileCacheService {
return getFilesFiltered(this._errorFolder, fromDate, toDate, nameFilter, this._logger);
}

/**
* Get error file content.
*/
async getErrorFileContent(filename: string): Promise<ReadStream | null> {
try {
await fs.stat(path.join(this._errorFolder, filename));
} catch (error) {
this._logger.error(`Error while reading file "${path.join(this._errorFolder, filename)}": ${error}`);
return null;
}
return createReadStream(path.join(this._errorFolder, filename));
}

/**
* Remove files from folder.
*/
Expand Down Expand Up @@ -241,6 +255,20 @@ export default class FileCacheService {
return getFilesFiltered(this._fileFolder, fromDate, toDate, nameFilter, this._logger);
}

/**
* Get cache file content.
*/
async getCacheFileContent(filename: string): Promise<ReadStream | null> {
try {
await fs.stat(path.join(this._fileFolder, filename));
} catch (error) {
this._logger.error(`Error while reading file "${path.join(this._fileFolder, filename)}": ${error}`);
return null;
}

return createReadStream(path.join(this._fileFolder, filename));
}

/**
* Retry files from folder.
*/
Expand Down
1 change: 1 addition & 0 deletions backend/src/tests/__mocks__/archive-service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ export default class ArchiveServiceMock {
on: jest.fn(),
emit: jest.fn()
} as unknown as EventEmitter;
getArchiveFileContent = jest.fn();
}
2 changes: 2 additions & 0 deletions backend/src/tests/__mocks__/file-cache-service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ export default class FileCacheServiceMock {
{ filename: 'file5.name', modificationDate: '', size: 2 },
{ filename: 'file6.name', modificationDate: '', size: 3 }
]);
getErrorFileContent = jest.fn();
getCacheFileContent = jest.fn();
}
3 changes: 3 additions & 0 deletions backend/src/tests/__mocks__/reload-service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ export default jest.fn().mockImplementation(() => ({
resetSouthMetrics: jest.fn(),
resetNorthMetrics: jest.fn(),
getErrorFiles: jest.fn(),
getErrorFileContent: jest.fn(),
removeErrorFiles: jest.fn(),
retryErrorFiles: jest.fn(),
removeAllErrorFiles: jest.fn(),
retryAllErrorFiles: jest.fn(),
getArchiveFiles: jest.fn(),
getArchiveFileContent: jest.fn(),
removeArchiveFiles: jest.fn(),
retryArchiveFiles: jest.fn(),
removeAllArchiveFiles: jest.fn(),
retryAllArchiveFiles: jest.fn(),
testSouth: jest.fn(),
getCacheFiles: jest.fn(),
getCacheFileContent: jest.fn(),
removeCacheFiles: jest.fn(),
archiveCacheFiles: jest.fn(),
getCacheValues: jest.fn(),
Expand Down
Loading

0 comments on commit 9026e8f

Please sign in to comment.