Skip to content

Commit

Permalink
[Fleet] Install a default ingest pipeline for datastreams without pip…
Browse files Browse the repository at this point in the history
…eline
  • Loading branch information
nchaulet committed Jun 14, 2022
1 parent 43d5907 commit 26bc7ac
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ElasticsearchAssetType } from '../../../../types';
import type { RegistryDataStream } from '../../../../types';
import { getPathParts } from '../../archive';

export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
pathParts.type === ElasticsearchAssetType.ingestPipeline && pathParts.dataset === undefined
);
};

export const getPipelineNameForInstallation = ({
pipelineName,
dataStream,
packageVersion,
}: {
pipelineName: string;
dataStream?: RegistryDataStream;
packageVersion: string;
}): string => {
if (dataStream !== undefined) {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${getPipelineNameForDatastream({ dataStream, packageVersion })}${suffix}`;
}
// It's a top-level pipeline
return `${packageVersion}-${pipelineName}`;
};

export const getPipelineNameForDatastream = ({
dataStream,
packageVersion,
}: {
dataStream: RegistryDataStream;
packageVersion: string;
}): string => {
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}`;
};

export interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
}

export function rewriteIngestPipeline(
pipeline: string,
substitutions: RewriteSubstitution[]
): string {
substitutions.forEach((sub) => {
const { source, target, templateFunction } = sub;
// This fakes the use of the golang text/template expression {{SomeTemplateFunction 'some-param'}}
// cf. https://github.com/elastic/beats/blob/master/filebeat/fileset/fileset.go#L294

// "Standard style" uses '{{' and '}}' as delimiters
const matchStandardStyle = `{{\\s?${templateFunction}\\s+['"]${source}['"]\\s?}}`;
// "Beats style" uses '{<' and '>}' as delimiters because this is current practice in the beats project
const matchBeatsStyle = `{<\\s?${templateFunction}\\s+['"]${source}['"]\\s?>}`;

const regexStandardStyle = new RegExp(matchStandardStyle);
const regexBeatsStyle = new RegExp(matchBeatsStyle);
pipeline = pipeline.replace(regexStandardStyle, target).replace(regexBeatsStyle, target);
});
return pipeline;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
*/

export { prepareToInstallPipelines, isTopLevelPipeline } from './install';

export { getPipelineNameForDatastream } from './helpers';
export { deletePreviousPipelines, deletePipeline } from './remove';
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import path from 'path';

import type { RegistryDataStream } from '../../../../types';

import { rewriteIngestPipeline, getPipelineNameForInstallation } from './install';
import { getPipelineNameForInstallation, rewriteIngestPipeline } from './helpers';

test('a json-format pipeline with pipeline references is correctly rewritten', () => {
const inputStandard = readFileSync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ import {
import { appendMetadataToIngestPipeline } from '../meta';

import { retryTransientEsErrors } from '../retry';

interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
import {
getPipelineNameForDatastream,
getPipelineNameForInstallation,
rewriteIngestPipeline,
} from './helpers';
import type { RewriteSubstitution } from './helpers';

interface PipelineInstall {
nameForInstallation: string;
contentForInstallation: string;
extension: string;
}

export const isTopLevelPipeline = (path: string) => {
Expand Down Expand Up @@ -59,22 +65,34 @@ export const prepareToInstallPipelines = (
const filteredPaths = pipelinePaths.filter((path) =>
isDataStreamPipeline(path, dataStream.path)
);
let createdDatastreamPipeline = false;
const pipelineObjectRefs = filteredPaths.map((path) => {
const { name } = getNameAndExtension(path);
if (name === dataStream.dataset) {
createdDatastreamPipeline = true;
}
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: pkgVersion,
});
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
});
if (!createdDatastreamPipeline) {
const nameForInstallation = getPipelineNameForDatastream({
dataStream,
packageVersion: pkgVersion,
});
acc.push({ id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline });
}
acc.push(...pipelineObjectRefs);
return acc;
}, [])
: [];

const topLevelPipelineRefs = topLevelPipelinePaths.map((path) => {
const { name } = getNameAndExtension(path);

const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
packageVersion: pkgVersion,
Expand All @@ -89,17 +107,16 @@ export const prepareToInstallPipelines = (
install: async (esClient, logger) => {
const pipelines = dataStreams
? dataStreams.reduce<Array<Promise<EsAssetReference[]>>>((acc, dataStream) => {
if (dataStream.ingest_pipeline) {
acc.push(
installAllPipelines({
dataStream,
esClient,
logger,
paths: pipelinePaths,
installablePackage,
})
);
}
acc.push(
installAllPipelines({
dataStream,
esClient,
logger,
paths: pipelinePaths,
installablePackage,
})
);

return acc;
}, [])
: [];
Expand All @@ -121,27 +138,6 @@ export const prepareToInstallPipelines = (
};
};

export function rewriteIngestPipeline(
pipeline: string,
substitutions: RewriteSubstitution[]
): string {
substitutions.forEach((sub) => {
const { source, target, templateFunction } = sub;
// This fakes the use of the golang text/template expression {{SomeTemplateFunction 'some-param'}}
// cf. https://github.com/elastic/beats/blob/master/filebeat/fileset/fileset.go#L294

// "Standard style" uses '{{' and '}}' as delimiters
const matchStandardStyle = `{{\\s?${templateFunction}\\s+['"]${source}['"]\\s?}}`;
// "Beats style" uses '{<' and '>}' as delimiters because this is current practice in the beats project
const matchBeatsStyle = `{<\\s?${templateFunction}\\s+['"]${source}['"]\\s?>}`;

const regexStandardStyle = new RegExp(matchStandardStyle);
const regexBeatsStyle = new RegExp(matchBeatsStyle);
pipeline = pipeline.replace(regexStandardStyle, target).replace(regexBeatsStyle, target);
});
return pipeline;
}

export async function installAllPipelines({
esClient,
logger,
Expand All @@ -158,18 +154,27 @@ export async function installAllPipelines({
const pipelinePaths = dataStream
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
: paths;
let pipelines: any[] = [];
const pipelinesInfos: Array<{
name: string;
nameForInstallation: string;
content: string;
extension: string;
}> = [];
const substitutions: RewriteSubstitution[] = [];

let datastreamPipelineCreated = false;
pipelinePaths.forEach((path) => {
const { name, extension } = getNameAndExtension(path);
if (name === dataStream?.ingest_pipeline) {
datastreamPipelineCreated = true;
}
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: installablePackage.version,
});
const content = getAsset(path).toString('utf-8');
pipelines.push({
pipelinesInfos.push({
name,
nameForInstallation,
content,
Expand All @@ -182,14 +187,27 @@ export async function installAllPipelines({
});
});

pipelines = pipelines.map((pipeline) => {
const pipelinesToInstall: PipelineInstall[] = pipelinesInfos.map((pipeline) => {
return {
...pipeline,
contentForInstallation: rewriteIngestPipeline(pipeline.content, substitutions),
};
});

const installationPromises = pipelines.map(async (pipeline) => {
if (!datastreamPipelineCreated && dataStream) {
const nameForInstallation = getPipelineNameForDatastream({
dataStream,
packageVersion: installablePackage.version,
});

pipelinesToInstall.push({
nameForInstallation,
contentForInstallation: 'processors: []',
extension: 'yml',
});
}

const installationPromises = pipelinesToInstall.map(async (pipeline) => {
return installPipeline({ esClient, pipeline, installablePackage, logger });
});

Expand All @@ -204,7 +222,7 @@ async function installPipeline({
}: {
esClient: ElasticsearchClient;
logger: Logger;
pipeline: any;
pipeline: PipelineInstall;
installablePackage?: InstallablePackage;
}): Promise<EsAssetReference> {
const pipelineWithMetadata = appendMetadataToIngestPipeline({
Expand Down Expand Up @@ -304,22 +322,3 @@ const getNameAndExtension = (
extension: filename.split('.')[1],
};
};

export const getPipelineNameForInstallation = ({
pipelineName,
dataStream,
packageVersion,
}: {
pipelineName: string;
dataStream?: RegistryDataStream;
packageVersion: string;
}): string => {
if (dataStream !== undefined) {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}${suffix}`;
}
// It's a top-level pipeline
return `${packageVersion}-${pipelineName}`;
};
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type {
} from '../../../../types';

import { loadFieldsFromYaml, processFields } from '../../fields/field';
import { getPipelineNameForInstallation } from '../ingest_pipeline/install';
import { getPipelineNameForDatastream } from '../ingest_pipeline';
import { getAsset, getPathParts } from '../../archive';
import {
FLEET_COMPONENT_TEMPLATES,
Expand Down Expand Up @@ -365,15 +365,8 @@ export function prepareTemplate({
const templateIndexPattern = generateTemplateIndexPattern(dataStream);
const templatePriority = getTemplatePriority(dataStream);

let pipelineName;
if (dataStream.ingest_pipeline) {
pipelineName = getPipelineNameForInstallation({
pipelineName: dataStream.ingest_pipeline,
dataStream,
packageVersion,
});
}

const pipelineName = getPipelineNameForDatastream({ dataStream, packageVersion });

const defaultSettings = buildDefaultSettings({
templateName,
packageName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export default function (providerContext: FtrProviderContext) {
.type('application/gzip')
.send(buf)
.expect(200);
expect(res.body.items.length).to.be(29);
expect(res.body.items.length).to.be(30);
});

it('should install a zip archive correctly and package info should return correctly after validation', async function () {
Expand All @@ -86,7 +86,7 @@ export default function (providerContext: FtrProviderContext) {
.type('application/zip')
.send(buf)
.expect(200);
expect(res.body.items.length).to.be(29);
expect(res.body.items.length).to.be(30);
});

it('should throw an error if the archive is zip but content type is gzip', async function () {
Expand Down

0 comments on commit 26bc7ac

Please sign in to comment.