Skip to content

Commit

Permalink
ingest pipeline reroute: allow external inspection/reset of state
Browse files Browse the repository at this point in the history
Logstash's Integration filter works directly with the processors, but cannot
use the IngestService that is tightly-coupled with cluster state and must
therefore emulate the behavior introduced in elastic#94000. To do so, the additional
methods for inquiring about and resetting the reroute state need to be
externally-accessible.
  • Loading branch information
yaauie committed Jun 19, 2023
1 parent 2f93cf5 commit 0b9558f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class RerouteProcessorTests extends ESTestCase {

Expand All @@ -29,6 +30,7 @@ public void testDefaults() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testEventDataset() throws Exception {
Expand All @@ -38,6 +40,7 @@ public void testEventDataset() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
}

Expand All @@ -48,6 +51,7 @@ public void testEventDatasetDottedFieldName() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo"));
assertFalse(ingestDocument.getCtxMap().containsKey("event"));
}
Expand All @@ -59,6 +63,7 @@ public void testNoDataset() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{ds}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("event.dataset"));
}

Expand All @@ -70,6 +75,7 @@ public void testSkipFirstProcessor() throws Exception {
CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testSkipLastProcessor() throws Exception {
Expand All @@ -80,6 +86,7 @@ public void testSkipLastProcessor() throws Exception {
CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDataStreamFieldsFromDocument() throws Exception {
Expand All @@ -90,6 +97,7 @@ public void testDataStreamFieldsFromDocument() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
Expand All @@ -101,6 +109,7 @@ public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testInvalidDataStreamFieldsFromDocument() throws Exception {
Expand All @@ -111,6 +120,7 @@ public void testInvalidDataStreamFieldsFromDocument() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDestination() throws Exception {
Expand All @@ -120,6 +130,7 @@ public void testDestination() throws Exception {
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField("data_stream"));
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("foo"));
assertThat(ingestDocument.isReroute(), is(true));
}

public void testFieldReference() throws Exception {
Expand All @@ -130,6 +141,7 @@ public void testFieldReference() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testRerouteToCurrentTarget() throws Exception {
Expand All @@ -142,6 +154,7 @@ public void testRerouteToCurrentTarget() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("pipeline_is_continued"));
}

Expand All @@ -156,6 +169,7 @@ public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Except
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("logs-generic-default"));
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("pipeline_is_continued"));
}

Expand All @@ -170,6 +184,7 @@ public void testDataStreamFieldReference() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "dataset_from_doc", "namespace_from_doc");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDatasetFieldReferenceMissingValue() throws Exception {
Expand All @@ -181,6 +196,7 @@ public void testDatasetFieldReferenceMissingValue() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "fallback", "fallback");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDatasetFieldReference() throws Exception {
Expand All @@ -194,6 +210,7 @@ public void testDatasetFieldReference() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testFallbackToValuesFrom_index() throws Exception {
Expand All @@ -204,6 +221,7 @@ public void testFallbackToValuesFrom_index() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}"));
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testInvalidDataStreamName() throws Exception {
Expand All @@ -212,6 +230,7 @@ public void testInvalidDataStreamName() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme <type>-<dataset>-<namespace>"));
assertThat(ingestDocument.isReroute(), is(false));
}

{
Expand All @@ -220,6 +239,7 @@ public void testInvalidDataStreamName() throws Exception {
RerouteProcessor processor = createRerouteProcessor("bar");
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar"));
assertThat(ingestDocument.isReroute(), is(true));
}
}

Expand All @@ -229,6 +249,7 @@ public void testRouteOnNonStringFieldFails() {
RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
assertThat(ingestDocument.isReroute(), is(false));
}

public void testDatasetSanitization() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,15 +932,15 @@ public void reroute(String destIndex) {
*
* @return whether the document is redirected to another target
*/
boolean isReroute() {
public boolean isReroute() {
return reroute;
}

/**
* Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless
* {@link #reroute(String)} is called.
*/
void resetReroute() {
public void resetReroute() {
reroute = false;
}

Expand Down

0 comments on commit 0b9558f

Please sign in to comment.