From ed2770849e0c580686561f0911bd497946b320fb Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 20 Jun 2023 16:06:13 +0000 Subject: [PATCH] ingest: expose reroute inquiry/reset via Elastic-internal API bridge Logstash's Integration filter works directly with the processors, but cannot use the IngestService that is tightly-coupled with cluster state and must therefore emulate the behavior introduced in #94000. To do so, the additional methods for inquiring about and resetting the reroute state need to be externally-accessible. Exposing them through a clearly-named bridge allows us to avoid making these Elastic-internal bits a part of the public APIs that are subject to years-long stability and deprecation notice policies. --- .../ingest/LogstashInternalBridge.java | 39 +++++++++++++++++++ .../ingest/LogstashInternalBridgeTests.java | 32 +++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/ingest/LogstashInternalBridge.java create mode 100644 server/src/test/java/org/elasticsearch/ingest/LogstashInternalBridgeTests.java diff --git a/server/src/main/java/org/elasticsearch/ingest/LogstashInternalBridge.java b/server/src/main/java/org/elasticsearch/ingest/LogstashInternalBridge.java new file mode 100644 index 0000000000000..5d0a0943d0982 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/LogstashInternalBridge.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +/** + * This bridge class exposes package-private components of Ingest in a way + * that can be consumed by Logstash's Elastic Integration Filter without + * expanding our externally-consumable API. + * + * @apiNote this is an Elastic-internal API bridge intended for exclusive use by + * Logstash and its Elastic Integration Filter. + */ +public class LogstashInternalBridge { + + private LogstashInternalBridge() {} + + /** + * The document has been redirected to another target. + * This implies that the default pipeline of the new target needs to be invoked. + * + * @return whether the document is redirected to another target + */ + public static boolean isReroute(final IngestDocument ingestDocument) { + return ingestDocument.isReroute(); + } + + /** + * Set the reroute flag of the provided {@link IngestDocument} to {@code false}. + */ + public static void resetReroute(final IngestDocument ingestDocument) { + ingestDocument.resetReroute(); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/LogstashInternalBridgeTests.java b/server/src/test/java/org/elasticsearch/ingest/LogstashInternalBridgeTests.java new file mode 100644 index 0000000000000..d8f1e0284e0bd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/LogstashInternalBridgeTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.ingest.TestIngestDocument.emptyIngestDocument; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class LogstashInternalBridgeTests extends ESTestCase { + public void testIngestDocumentRerouteBridge() { + final IngestDocument ingestDocument = emptyIngestDocument(); + ingestDocument.setFieldValue("_index", "nowhere"); + assertThat(ingestDocument.getFieldValue("_index", String.class), is(equalTo("nowhere"))); + assertThat(LogstashInternalBridge.isReroute(ingestDocument), is(false)); + + ingestDocument.reroute("somewhere"); + assertThat(ingestDocument.getFieldValue("_index", String.class), is(equalTo("somewhere"))); + assertThat(LogstashInternalBridge.isReroute(ingestDocument), is(true)); + + LogstashInternalBridge.resetReroute(ingestDocument); + assertThat(ingestDocument.getFieldValue("_index", String.class), is(equalTo("somewhere"))); + assertThat(LogstashInternalBridge.isReroute(ingestDocument), is(false)); + } +}