Skip to content

Commit

Permalink
feat(south): single run of south item
Browse files Browse the repository at this point in the history
  • Loading branch information
kukukk authored and burgerni10 committed Jan 6, 2025
1 parent f033770 commit 6ddac14
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 21 deletions.
39 changes: 20 additions & 19 deletions backend/src/service/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,7 @@ export const persistResults = async (
case 'json':
return addContentFn({ type: 'time-values', content: data });
case 'file':
const filePath = path.join(
baseFolder,
serializationSettings.filename
.replace('@CurrentDate', DateTime.now().toUTC().toFormat('yyyy_MM_dd_HH_mm_ss_SSS'))
.replace('@ConnectorName', connectorName)
.replace('@ItemName', itemName)
);
const filePath = generateFilenameForSerialization(baseFolder, serializationSettings.filename, connectorName, itemName);
logger.debug(`Writing ${data.length} bytes into file at "${filePath}"`);
await fs.writeFile(filePath, data);

Expand Down Expand Up @@ -266,18 +260,8 @@ export const persistResults = async (
}
break;
case 'csv':
const options = {
header: true,
delimiter: convertDelimiter(serializationSettings.delimiter)
};
const csvPath = path.join(
baseFolder,
serializationSettings.filename
.replace('@CurrentDate', DateTime.now().toUTC().toFormat('yyyy_MM_dd_HH_mm_ss_SSS'))
.replace('@ConnectorName', connectorName)
.replace('@ItemName', itemName)
);
const csvContent = csv.unparse(data, options);
const csvPath = generateFilenameForSerialization(baseFolder, serializationSettings.filename, connectorName, itemName);
const csvContent = generateCsvContent(data, serializationSettings.delimiter);

logger.debug(`Writing ${csvContent.length} bytes into CSV file at "${csvPath}"`);
await fs.writeFile(csvPath, csvContent);
Expand Down Expand Up @@ -318,6 +302,23 @@ export const persistResults = async (
}
};

export const generateFilenameForSerialization = (baseFolder: string, filename: string, connectorName: string, itemName: string): string => {
return path.join(
baseFolder,
filename
.replace('@CurrentDate', DateTime.now().toUTC().toFormat('yyyy_MM_dd_HH_mm_ss_SSS'))
.replace('@ConnectorName', connectorName)
.replace('@ItemName', itemName)
);
};

export const generateCsvContent = (data: Array<any>, delimiter: CsvCharacter): string => {
const options = {
header: true,
delimiter: convertDelimiter(delimiter)
};
return csv.unparse(data, options);
};
/**
* Log the executed query with replacements values for query variables
*/
Expand Down
4 changes: 4 additions & 0 deletions backend/src/south/south-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ export default class SouthConnector<T extends SouthSettings = any, I extends Sou
this.logger.warn('testConnection must be override');
}

async testItem(item: SouthConnectorItemDTO, _callback: (data: OIBusContent) => Promise<void>): Promise<void> {
this.logger.warn(`testItem must be override to test item ${item.name}`);
}

get settings(): SouthConnectorDTO<T> {
return this.connector;
}
Expand Down
18 changes: 17 additions & 1 deletion backend/src/south/south-folder-scanner/south-folder-scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import EncryptionService from '../../service/encryption.service';
import RepositoryService from '../../service/repository.service';
import { QueriesFile } from '../south-interface';
import { SouthFolderScannerItemSettings, SouthFolderScannerSettings } from '../../../../shared/model/south-settings.model';
import { OIBusContent } from '../../../../shared/model/engine.model';
import { OIBusContent, OIBusTimeValue } from '../../../../shared/model/engine.model';
import { DateTime } from 'luxon';

/**
Expand Down Expand Up @@ -61,6 +61,22 @@ export default class SouthFolderScanner
}
}

override async testItem(
item: SouthConnectorItemDTO<SouthFolderScannerItemSettings>,
callback: (data: OIBusContent) => void
): Promise<void> {
await this.testConnection();

const inputFolder = path.resolve(this.connector.settings.inputFolder);
const files = await fs.readdir(inputFolder);
const values: OIBusTimeValue[] = files.map(file => ({
pointId: item.name,
timestamp: DateTime.now().toUTC().toISO()!,
data: { value: file }
}));
callback({ type: 'time-values', content: values });
}

async start(dataStream = true): Promise<void> {
await super.start(dataStream);
// Create a custom table in the south cache database to manage file already sent when preserve file is set to true
Expand Down
35 changes: 35 additions & 0 deletions backend/src/south/south-modbus/south-modbus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,41 @@ export default class SouthModbus extends SouthConnector<SouthModbusSettings, Sou
}
}

override async testItem(item: SouthConnectorItemDTO<SouthModbusItemSettings>, callback: (data: OIBusContent) => void): Promise<void> {
try {
await new Promise<void>((resolve, reject) => {
this.socket = new net.Socket();
this.client = new client.TCP(this.socket, this.connector.settings.slaveId);
this.socket.connect(
{
host: this.connector.settings.host,
port: this.connector.settings.port
},
async () => {
const dataValues: OIBusTimeValue[] = await this.modbusFunction(item);
callback({
type: 'time-values',
content: dataValues
});
await this.disconnect();
resolve();
}
);
this.socket.on('error', async error => {
reject(error);
});
});
} catch (error: any) {
switch (error.code) {
case 'ENOTFOUND':
case 'ECONNREFUSED':
throw new Error(`Please check host and port. ${error.message}`);
default:
throw new Error(`Unable to connect to socket. ${error.message}`);
}
}
}

/**
* Retrieve the right buffer function name according to the data type
*/
Expand Down
61 changes: 61 additions & 0 deletions backend/src/south/south-mysql/south-mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
convertDateTimeToInstant,
createFolder,
formatInstant,
generateCsvContent,
generateFilenameForSerialization,
generateReplacementParameters,
logQuery,
persistResults
Expand Down Expand Up @@ -112,6 +114,65 @@ export default class SouthMySQL extends SouthConnector<SouthMySQLSettings, South
}
}

override async testItem(item: SouthConnectorItemDTO<SouthMySQLItemSettings>, callback: (data: OIBusContent) => void): Promise<void> {
await this.testConnection();

const config = await this.createConnectionOptions();
let connection;
try {
connection = await mysql.createConnection(config);
await connection.ping();
} catch {
await connection?.end();
}

const startTime = DateTime.now()
.minus(3600 * 1000)
.toUTC()
.toISO() as Instant;
const endTime = DateTime.now().toUTC().toISO() as Instant;
const result: Array<any> = await this.queryData(item, startTime, endTime);
await connection?.end();

const formattedResults = result.map(entry => {
const formattedEntry: Record<string, any> = {};
Object.entries(entry).forEach(([key, value]) => {
const datetimeField = item.settings.dateTimeFields?.find(dateTimeField => dateTimeField.fieldName === key) || null;
if (!datetimeField) {
formattedEntry[key] = value;
} else {
const entryDate = convertDateTimeToInstant(value, datetimeField);
formattedEntry[key] = formatInstant(entryDate, {
type: 'string',
format: item.settings.serialization.outputTimestampFormat,
timezone: item.settings.serialization.outputTimezone,
locale: 'en-En'
});
}
});
return formattedEntry;
});

let oibusContent: OIBusContent;
switch (item.settings.serialization.type) {
case 'csv': {
const filePath = generateFilenameForSerialization(
this.tmpFolder,
item.settings.serialization.filename,
this.connector.name,
item.name
);
const content = generateCsvContent(formattedResults, item.settings.serialization.delimiter);
oibusContent = { type: 'raw', filePath, content };
break;
}
default: {
oibusContent = { type: 'time-values', content: formattedResults as Array<any> };
}
}
callback(oibusContent);
}

/**
* Get entries from the database between startTime and endTime (if used in the SQL query)
* and write them into a CSV file and send it to the engine.
Expand Down
76 changes: 75 additions & 1 deletion backend/src/web-server/controllers/south-connector.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
SouthConnectorItemDTO,
SouthConnectorItemSearchParam,
SouthType,
SouthConnectorItemScanModeNameDTO
SouthConnectorItemScanModeNameDTO,
SouthConnectorItemTestCommandDTO
} from '../../../../shared/model/south-connector.model';
import { Page } from '../../../../shared/model/types';
import JoiValidator from './validators/joi.validator';
Expand Down Expand Up @@ -275,6 +276,79 @@ export default class SouthConnectorController {
}
}

async testSouthItem(ctx: KoaContext<SouthConnectorItemTestCommandDTO, void>): Promise<void> {
try {
// South validation
const manifest = ctx.request.body
? ctx.app.southService.getInstalledSouthManifests().find(southManifest => southManifest.id === ctx.request.body!.south.type)
: null;
if (!manifest) {
return ctx.notFound('South manifest not found');
}

let southConnector: SouthConnectorDTO | null = null;
if (ctx.params.id !== 'create') {
southConnector = ctx.app.repositoryService.southConnectorRepository.getSouthConnector(ctx.params.id);
if (!southConnector) {
return ctx.notFound(`South not found: ${ctx.params.id}`);
}
}
if (!southConnector && ctx.query.duplicateId) {
southConnector = ctx.app.repositoryService.southConnectorRepository.getSouthConnector(ctx.query.duplicateId);
if (!southConnector) {
return ctx.notFound(`South not found: ${ctx.query.duplicateId}`);
}
}
await this.validator.validateSettings(manifest.settings, ctx.request.body!.south.settings);

// South item validation
const itemCommand = ctx.request.body!.item;
if (!itemCommand.scanModeId && !itemCommand.scanModeName) {
return ctx.badRequest(`Scan mode not specified for item ${itemCommand.name}`);
}

let scanModeId = itemCommand.scanModeId;
if (!itemCommand.scanModeId && itemCommand.scanModeName) {
const scanModes = ctx.app.repositoryService.scanModeRepository.getScanModes();
const scanMode = scanModes.find(element => element.name === itemCommand.scanModeName);
if (!scanMode) {
return ctx.badRequest(`Scan mode ${itemCommand.scanModeName} not found for item ${itemCommand.name}`);
}
scanModeId = scanMode.id;
}
await this.validator.validateSettings(manifest.items.settings, ctx.request.body!.item.settings);

// Prepare South and South item to test
const southCommand: SouthConnectorDTO = { id: southConnector?.id || 'test', ...ctx.request.body!.south };
southCommand.settings = await ctx.app.encryptionService.encryptConnectorSecrets(
southCommand.settings,
southConnector?.settings,
manifest.settings
);
ctx.request.body!.south.name = southConnector ? southConnector.name : `${ctx.request.body!.south.type}:test-connection`;
const logger = ctx.app.logger.child(
{
scopeType: 'south',
scopeId: southCommand.id,
scopeName: southCommand.name
},
{ level: 'silent' }
);
const southToTest = ctx.app.southService.createSouth(southCommand, this.addContent, 'baseFolder', logger);

const southItemToTest: SouthConnectorItemDTO = {
id: 'test',
connectorId: southCommand.id,
scanModeId: scanModeId!,
...ctx.request.body!.item
};

await southToTest.testItem(southItemToTest, ctx.ok);
} catch (error: any) {
ctx.badRequest(error.message);
}
}

async listSouthItems(ctx: KoaContext<void, Array<SouthConnectorItemDTO>>): Promise<void> {
const southItems = ctx.app.repositoryService.southItemRepository.listSouthItems(ctx.params.southId, {});
ctx.ok(southItems);
Expand Down
1 change: 1 addition & 0 deletions backend/src/web-server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ router.put('/api/south/:id', (ctx: KoaContext<any, any>) => southConnectorContro
router.delete('/api/south/:id', (ctx: KoaContext<any, any>) => southConnectorController.deleteSouthConnector(ctx));
router.put('/api/south/:id/start', (ctx: KoaContext<any, any>) => southConnectorController.startSouthConnector(ctx));
router.put('/api/south/:id/stop', (ctx: KoaContext<any, any>) => southConnectorController.stopSouthConnector(ctx));
router.put('/api/south/:id/items/test-item', (ctx: KoaContext<any, any>) => southConnectorController.testSouthItem(ctx));
router.get('/api/south/:southId/items', (ctx: KoaContext<any, any>) => southConnectorController.searchSouthItems(ctx));
router.get('/api/south/:southId/items/all', (ctx: KoaContext<any, any>) => southConnectorController.listSouthItems(ctx));
router.post('/api/south/:southId/items', (ctx: KoaContext<any, any>) => southConnectorController.createSouthItem(ctx));
Expand Down
1 change: 1 addition & 0 deletions shared/model/engine.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ export interface OIBusTimeValueContent extends BaseOIBusContent {
export interface OIBusRawContent extends BaseOIBusContent {
type: 'raw';
filePath: string;
content?: string;
}

export type OIBusContent = OIBusTimeValueContent | OIBusRawContent;
5 changes: 5 additions & 0 deletions shared/model/south-connector.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ export interface SouthConnectorItemCommandDTO<T extends SouthItemSettings = any>
scanModeName?: string;
}

export interface SouthConnectorItemTestCommandDTO {
south: SouthConnectorCommandDTO;
item: SouthConnectorItemCommandDTO;
}

export interface SouthConnectorItemSearchParam {
name?: string;
scanModeId?: string;
Expand Down

0 comments on commit 6ddac14

Please sign in to comment.