Skip to content

Commit

Permalink
[Attack discovery] Optimize attack discovery test data (#206885)
Browse files Browse the repository at this point in the history
## Summary

Followup for #182918. 
Compressed content and switched to load `.ndjson.gz`directly
  • Loading branch information
patrykkopycinski authored Jan 24, 2025
1 parent 9a3fc89 commit 67bedde
Show file tree
Hide file tree
Showing 33 changed files with 38 additions and 107,928 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import path from 'path';
import type { Client } from '@elastic/elasticsearch';
import type { ToolingLog } from '@kbn/tooling-log';
import type { KbnClient } from '@kbn/test';
import { createGunzip } from 'zlib';
import { pipeline } from 'stream';
import { promisify } from 'util';

const pipelineAsync = promisify(pipeline);

const PIPELINE_NAME = 'insights_pipeline';
const DIRECTORY_PATH = path.resolve(
Expand Down Expand Up @@ -63,6 +68,36 @@ const getRule = async ({ kbnClient, log }: { kbnClient: KbnClient; log: ToolingL
return response.data.data?.[0];
};

async function readAndDecompress({ filePath, log }: { filePath: string; log: ToolingLog }) {
try {
const decompressedChunks: Uint8Array[] = [];

// Create a read stream for the gzipped file
const fileStream = fs.createReadStream(filePath);

// Decompress the file stream using zlib
await pipelineAsync(
fileStream, // Readable stream for the file
createGunzip(), // Decompression stream
async function* (source) {
// Collect decompressed chunks
for await (const chunk of source) {
decompressedChunks.push(chunk);
}
}
);

// Combine decompressed chunks into a single buffer and convert to string
const decompressedBuffer = Buffer.concat(decompressedChunks);
const decompressedText = decompressedBuffer.toString('utf-8');

return decompressedText;
} catch (error) {
log.error('Error during file reading or decompression:');
log.error(error);
}
}

const importRule = async ({ kbnClient, log }: { kbnClient: KbnClient; log: ToolingLog }) => {
log.info('Importing rule from endpoint_alert.ndjson...');

Expand Down Expand Up @@ -201,7 +236,7 @@ const processFile = async ({

log.info(`Processing and indexing file: ${file} ...`);

const fileData = await fs.readFileSync(file).toString().split('\n');
const fileData = (await readAndDecompress({ filePath: file, log }))?.split('\n') ?? [];

try {
const response = await esClient.bulk<string>({
Expand Down Expand Up @@ -237,10 +272,10 @@ const processFilesForEpisode = async ({
}) => {
const dataFiles = fs
.readdirSync(DIRECTORY_PATH)
.filter((file) => file.includes(`ep${epNum}data.ndjson`));
.filter((file) => file.includes(`ep${epNum}data.ndjson.gz`));
const alertFiles = fs
.readdirSync(DIRECTORY_PATH)
.filter((file) => file.includes(`ep${epNum}alerts.ndjson`));
.filter((file) => file.includes(`ep${epNum}alerts.ndjson.gz`));

for (const file of dataFiles) {
await processFile({ esClient, file: path.join(DIRECTORY_PATH, file), indexType: 'data', log });
Expand Down

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Loading

0 comments on commit 67bedde

Please sign in to comment.