Skip to content

Commit

Permalink
fix(south): Add an output datetime format section in sql items
Browse files Browse the repository at this point in the history
  • Loading branch information
burgerni10 committed Jun 21, 2023
1 parent b9098d1 commit 6f561b6
Show file tree
Hide file tree
Showing 30 changed files with 851 additions and 820 deletions.
431 changes: 202 additions & 229 deletions backend/src/service/utils.spec.ts

Large diffs are not rendered by default.

218 changes: 107 additions & 111 deletions backend/src/service/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import path from 'node:path';
import minimist from 'minimist';
import { DateTime } from 'luxon';

import { DateTimeFormat, DateTimeSerialization, Instant, Interval, Serialization } from '../../../shared/model/types';
import csv from 'papaparse';
import { CsvCharacter, DateTimeFormat, DateTimeSerialization, Instant, Interval, Serialization } from '../../../shared/model/types';
import pino from 'pino';
import csv from 'papaparse';

const COMPRESSION_LEVEL = 9;

Expand Down Expand Up @@ -68,15 +68,6 @@ export const createFolder = async (folder: string): Promise<void> => {
}
};

/**
* Replace the variables such as @CurrentDate in the file name with their values
*/
export const replaceFilenameWithVariable = (filename: string, queryPart: number, connectorName: string): string =>
filename
.replace('@CurrentDate', DateTime.now().toUTC().toFormat('yyyy_MM_dd_HH_mm_ss_SSS'))
.replace('@ConnectorName', connectorName)
.replace('@QueryPart', `${queryPart}`);

/**
* Compress the specified file
*/
Expand Down Expand Up @@ -144,64 +135,6 @@ export const dirSize = async (dir: string): Promise<number> => {
return (await Promise.all(paths)).flat(Infinity).reduce((i, size) => i + size, 0);
};

/**
* Generate CSV file from the values.
*/
export const generateCSV = (result: Array<any>, delimiter: string): string => {
const options = {
header: true,
delimiter
};
return csv.unparse(result, options);
};

/**
* Parse an entry list and get the most recent date
*/
export const getMaxInstant = (entryList: Array<any>, startTime: Instant, datetimeSerialization: Array<DateTimeSerialization>): Instant => {
if (datetimeSerialization.length === 0) return startTime;
let maxInstant = DateTime.fromISO(startTime);
entryList.forEach(entry => {
for (const serialization of datetimeSerialization) {
if (!entry[serialization.field]) {
continue;
}

let entryDate: DateTime;
switch (serialization.datetimeFormat.type) {
case 'unix-epoch':
entryDate = DateTime.fromMillis(parseInt(entry[serialization.field], 10) * 1000);
break;
case 'unix-epoch-ms':
entryDate = DateTime.fromMillis(parseInt(entry[serialization.field], 10));
break;
case 'iso-8601-string':
entryDate = DateTime.fromISO(entry[serialization.field]);
break;
case 'specific-string':
entryDate = DateTime.fromFormat(entry[serialization.field], serialization.datetimeFormat.format, {
zone: serialization.datetimeFormat.timezone,
locale: serialization.datetimeFormat.locale,
setZone: true
});
break;
case 'date-object':
entryDate = DateTime.fromJSDate(entry[serialization.field]).setZone(serialization.datetimeFormat.timezone, {
keepLocalTime: true
});
break;
}

if (serialization.useAsReference) {
if (entryDate > maxInstant) {
maxInstant = entryDate;
}
}
}
});
return maxInstant.toUTC().toISO()!;
};

/**
* Get all occurrences of a substring with a value
*/
Expand Down Expand Up @@ -233,60 +166,90 @@ export const generateReplacementParameters = (
return occurrences.map(occurrence => occurrence.value);
};

export const serializeResults = async (
export const convertDelimiter = (delimiter: CsvCharacter): string => {
switch (delimiter) {
case 'NON_BREAKING_SPACE':
return ' ';
case 'COLON':
return ':';
case 'COMMA':
return ',';
case 'DOT':
return '.';
case 'SLASH':
return '/';
case 'PIPE':
return '|';
case 'SEMI_COLON':
return ';';
case 'TAB':
return ' ';
}
};

export const persistResults = async (
data: Array<any>,
settings: Serialization,
serializationSettings: Serialization,
connectorName: string,
tmpFolder: string,
baseFolder: string,
addFileFn: (filePath: string) => Promise<void>,
logger: pino.Logger
): Promise<void> => {
const csvContent = generateCSV(data, settings.delimiter);
const filePath = path.join(tmpFolder, replaceFilenameWithVariable(settings.filename, 0, connectorName));
logger.debug(`Writing ${csvContent.length} bytes into file at "${filePath}"`);
await fs.writeFile(filePath, csvContent);
switch (serializationSettings.type) {
case 'csv':
const options = {
header: true,
delimiter: convertDelimiter(serializationSettings.delimiter)
};
const filePath = path.join(
baseFolder,
serializationSettings.filename
.replace('@CurrentDate', DateTime.now().toUTC().toFormat('yyyy_MM_dd_HH_mm_ss_SSS'))
.replace('@ConnectorName', connectorName)
);
const csvContent = csv.unparse(data, options);

if (settings.compression) {
// Compress and send the compressed file
const gzipPath = `${filePath}.gz`;
await compress(filePath, gzipPath);
logger.debug(`Writing ${csvContent.length} bytes into file at "${filePath}"`);
await fs.writeFile(filePath, csvContent);

try {
await fs.unlink(filePath);
logger.info(`File "${filePath}" compressed and deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting file "${filePath}" after compression. ${unlinkError}`);
}
if (serializationSettings.compression) {
// Compress and send the compressed file
const gzipPath = `${filePath}.gz`;
await compress(filePath, gzipPath);

logger.debug(`Sending compressed file "${gzipPath}" to Engine`);
await addFileFn(gzipPath);
try {
await fs.unlink(gzipPath);
logger.trace(`File "${gzipPath}" deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting compressed file "${gzipPath}" after caching it. ${unlinkError}`);
}
} else {
logger.debug(`Sending file "${filePath}" to Engine`);
await addFileFn(filePath);
try {
await fs.unlink(filePath);
logger.trace(`File ${filePath} deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting file "${filePath}" after caching it. ${unlinkError}`);
}
try {
await fs.unlink(filePath);
logger.info(`File "${filePath}" compressed and deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting file "${filePath}" after compression. ${unlinkError}`);
}

logger.debug(`Sending compressed file "${gzipPath}" to Engine`);
await addFileFn(gzipPath);
try {
await fs.unlink(gzipPath);
logger.trace(`File "${gzipPath}" deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting compressed file "${gzipPath}" after caching it. ${unlinkError}`);
}
} else {
logger.debug(`Sending file "${filePath}" to Engine`);
await addFileFn(filePath);
try {
await fs.unlink(filePath);
logger.trace(`File ${filePath} deleted`);
} catch (unlinkError) {
logger.error(`Error when deleting file "${filePath}" after caching it. ${unlinkError}`);
}
}
break;
}
};

/**
* Log the executed query with replacements values for query variables
*/
export const logQuery = (
query: string,
startTime: string | number | DateTime,
endTime: string | number | DateTime,
logger: pino.Logger
): void => {
export const logQuery = (query: string, startTime: string | number, endTime: string | number, logger: pino.Logger): void => {
const startTimeLog = query.indexOf('@StartTime') !== -1 ? `@StartTime = ${startTime}` : '';
const endTimeLog = query.indexOf('@EndTime') !== -1 ? `@EndTime = ${endTime}` : '';
let log = `Sending "${query}"`;
Expand All @@ -302,7 +265,10 @@ export const logQuery = (
logger.info(log);
};

export const convertDateTimeFromISO = (dateTime: Instant, dateTimeFormat: DateTimeFormat): string | number | DateTime => {
export const convertDateTimeFromInstant = (dateTime: Instant, dateTimeFormat: DateTimeFormat | null): string | number => {
if (!dateTimeFormat) {
return dateTime;
}
switch (dateTimeFormat.type) {
case 'unix-epoch':
return Math.floor(DateTime.fromISO(dateTime).toMillis() / 1000);
Expand All @@ -315,6 +281,36 @@ export const convertDateTimeFromISO = (dateTime: Instant, dateTimeFormat: DateTi
case 'iso-8601-string':
return dateTime;
case 'date-object':
return DateTime.fromISO(dateTime);
return DateTime.fromISO(dateTime).setZone(dateTimeFormat.timezone).toISO()!;
}
};

export const convertDateTimeToInstant = (dateTime: any, dateTimeFormat: DateTimeFormat | null): Instant => {
if (!dateTimeFormat) {
return dateTime;
}
switch (dateTimeFormat.type) {
case 'unix-epoch':
return DateTime.fromMillis(parseInt(dateTime, 10) * 1000)
.toUTC()
.toISO()!;
case 'unix-epoch-ms':
return DateTime.fromMillis(parseInt(dateTime, 10)).toUTC().toISO()!;
case 'iso-8601-string':
return DateTime.fromISO(dateTime).toUTC().toISO()!;
case 'specific-string':
return DateTime.fromFormat(dateTime, dateTimeFormat.format, {
zone: dateTimeFormat.timezone,
locale: dateTimeFormat.locale
})
.toUTC()
.toISO()!;
case 'date-object':
// TODO: test datetimeoffset
return DateTime.fromJSDate(dateTime, {
zone: dateTimeFormat.timezone
})
.toUTC()
.toISO()!;
}
};
7 changes: 0 additions & 7 deletions backend/src/south/south-mssql/manifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ const manifest: SouthConnectorManifest = {
key: 'serialization',
type: 'OibSerialization',
label: 'Serialization',
defaultValue: {
type: 'file',
filename: '[email protected]',
delimiter: 'COMMA',
compression: true,
datetimeSerialization: []
},
allowedDateObjectTypes: ['Date', 'DateTime', 'DateTime2', 'DateTimeOffset', 'SmallDateTime'],
class: 'col',
newRow: true,
Expand Down
Loading

0 comments on commit 6f561b6

Please sign in to comment.