Skip to content

Commit

Permalink
Add data_stream_router processor
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Apr 15, 2022
1 parent 6d6c7d5 commit 2d80949
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/reference/ingest/processors.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include::processors/circle.asciidoc[]
include::processors/community-id.asciidoc[]
include::processors/convert.asciidoc[]
include::processors/csv.asciidoc[]
include::processors/data-stream-router.asciidoc[]
include::processors/date.asciidoc[]
include::processors/date-index-name.asciidoc[]
include::processors/dissect.asciidoc[]
Expand Down
43 changes: 43 additions & 0 deletions docs/reference/ingest/processors/data-stream-router.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[[data-stream-router-processor]]
=== Data stream router processor
++++
<titleabbrev>Data stream router</titleabbrev>
++++

The `data_stream_router` processor allows to route a document from one data stream to another data stream.
It can use both static values or values from the document to determine the target data stream.

The name of a data stream is comprised of three parts and looks like this: `<type>-<dataset>-<namespace>`.
See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.

NOTE: `data_stream_router` processor can only be used on data streams that follow the data streams naming scheme.
Trying to use this processor on a data stream with a non-compliant name will raise an exception.

After a `data_stream_router` processor has been executed, all the other processors of the current pipeline are skipped.
This means that at most one `data_stream_router` processor is ever executed within a pipeline,
allowing to define mutually exclusive routing conditions,
similar to a if, else-if, else-if, … condition.

[[data-stream-router-options]]
.Data stream router options
[options="header"]
|======
| Name | Required | Default | Description
| `dataset` | no | - | A static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`. If not set, gets the value of the field `data_stream.dataset` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<dataset>` part of the index name as a fallback.
| `namespace` | no | - | A static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters. If not set, gets the value of the field `data_stream.namespace` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<namespace>` part of the index name as a fallback.
include::common-options.asciidoc[]
|======

NOTE: It's not possible to change the `type` of the data stream by setting the `data_stream.type` in the document.

[source,js]
--------------------------------------------------
{
"data_stream_router": {
"tag": "nginx",
"if" : "ctx?.log?.file?.path?.contains('nginx')",
"dataset": "nginx"
}
}
--------------------------------------------------
// NOTCONSOLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

public final class DataStreamRouterProcessor extends AbstractProcessor {
public static final String TYPE = "data_stream_router";

private static final String DATA_STREAM_PREFIX = "data_stream.";
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
private static final String EVENT_DATASET = "event.dataset";

private static final char[] DISALLOWED_IN_DATASET = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':', '-' };
private static final char[] DISALLOWED_IN_NAMESPACE = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':' };
private static final int MAX_LENGTH = 100;
private static final char REPLACEMENT_CHAR = '_';
private final String dataset;
private final String namespace;

DataStreamRouterProcessor(String tag, String description, String dataset, String namespace) {
super(tag, description);
this.dataset = dataset;
this.namespace = namespace;
}

private static String sanitizeDataStreamField(String s, char[] disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
for (char c : disallowedInDataset) {
s = s.replace(c, REPLACEMENT_CHAR);
}
return s;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
final String type;
final String datasetFallback;
final String namespaceFallback;
int indexOfFirstDash = indexName.indexOf('-');
String illegalDataStreamNameMessage = "invalid data stream name: ["
+ indexName
+ "]; must follow naming scheme <type>-<dataset>-<namespace>";
if (indexOfFirstDash < 0) {
throw new IllegalArgumentException(illegalDataStreamNameMessage);
}
type = indexName.substring(0, indexOfFirstDash);
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
if (indexOfSecondDash < 0) {
throw new IllegalArgumentException(illegalDataStreamNameMessage);
}
datasetFallback = indexName.substring(indexOfFirstDash + 1, indexOfSecondDash);
namespaceFallback = indexName.substring(indexOfSecondDash + 1);

String dataset = getDataset(ingestDocument, datasetFallback);
String namespace = getNamespace(ingestDocument, namespaceFallback);
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
if (ingestDocument.hasField(EVENT_DATASET)) {
ingestDocument.setFieldValue(EVENT_DATASET, dataset);
}
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), type + "-" + dataset + "-" + namespace);
ingestDocument.skipCurrentPipeline();
return ingestDocument;
}

private String getDataset(IngestDocument ingestDocument, String datasetFallback) {
String dataset = this.dataset;
if (dataset == null) {
dataset = sanitizeDataStreamField(ingestDocument.getFieldValue(DATA_STREAM_DATASET, String.class, true), DISALLOWED_IN_DATASET);
}
if (dataset == null) {
dataset = datasetFallback;
}
return dataset;
}

private String getNamespace(IngestDocument ingestDocument, String namespaceFallback) {
String namespace = this.namespace;
if (namespace == null) {
namespace = sanitizeDataStreamField(
ingestDocument.getFieldValue(DATA_STREAM_NAMESPACE, String.class, true),
DISALLOWED_IN_NAMESPACE
);
}
if (namespace == null) {
namespace = namespaceFallback;
}
return namespace;
}

@Override
public String getType() {
return TYPE;
}

public String getDataStreamDataset() {
return dataset;
}

public String getDataStreamNamespace() {
return namespace;
}

public static final class Factory implements Processor.Factory {

@Override
public DataStreamRouterProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
String dataset = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "dataset");
if (Objects.equals(sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET), dataset) == false) {
throw newConfigurationException(TYPE, tag, "dataset", "contains illegal characters");
}
String namespace = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "namespace");
if (Objects.equals(sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE), namespace) == false) {
throw newConfigurationException(TYPE, tag, "namespace", "contains illegal characters");
}
return new DataStreamRouterProcessor(tag, description, dataset, namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)),
entry(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()),
entry(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()),
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory())
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
entry(DataStreamRouterProcessor.TYPE, new DataStreamRouterProcessor.Factory())
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.nullValue;

public class DataStreamRouterProcessorFactoryTests extends ESTestCase {

public void testSuccess() throws Exception {
DataStreamRouterProcessor processor = create(null, null);
assertThat(processor.getDataStreamDataset(), nullValue());
assertThat(processor.getDataStreamNamespace(), nullValue());
}

public void testInvalidDataset() throws Exception {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("my-service", null));
assertThat(e.getMessage(), Matchers.equalTo("[dataset] contains illegal characters"));
}

public void testInvalidNamespace() throws Exception {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("generic", "foo:bar"));
assertThat(e.getMessage(), Matchers.equalTo("[namespace] contains illegal characters"));
}

private static DataStreamRouterProcessor create(String dataset, String namespace) throws Exception {
Map<String, Object> config = new HashMap<>();
if (dataset != null) {
config.put("dataset", dataset);
}
if (namespace != null) {
config.put("namespace", namespace);
}
return new DataStreamRouterProcessor.Factory().create(null, null, null, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class DataStreamRouterProcessorTests extends ESTestCase {

public void testDefaults() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
}

public void testSkipFirstProcessor() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
}

public void testSkipLastProcessor() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");

DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
}

public void testDataStreamFieldsFromDocument() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
ingestDocument.setFieldValue("data_stream.dataset", "foo");
ingestDocument.setFieldValue("data_stream.namespace", "bar");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
}

public void testInvalidDataStreamFieldsFromDocument() throws Exception {
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");

DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
}

private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
}

private static IngestDocument createIngestDocument(String dataStream) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
ingestDocument.setFieldValue("_index", dataStream);
return ingestDocument;
}

private static class SkipProcessor implements WrappingProcessor {
private final Processor processor;

SkipProcessor(Processor processor) {
this.processor = processor;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

@Override
public Processor getInnerProcessor() {
return processor;
}

@Override
public String getType() {
return "skip";
}

@Override
public String getTag() {
return null;
}

@Override
public String getDescription() {
return null;
}
}
}
Loading

0 comments on commit 2d80949

Please sign in to comment.