Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the ability to skip the ingest pipeline #93016

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93016.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93016
summary: Adding the ability to skip the ingest pipeline
area: Ingest Node
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions docs/reference/docs/bulk.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ on.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]

`require_alias`::
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/docs/index_.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ required. See <<add-documents-to-a-data-stream>>.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=routing]
Expand Down
11 changes: 9 additions & 2 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ shards. Statuses are:
All shards are assigned.

* `yellow`:
All primary shards are assigned, but one or more replica shards are
unassigned. If a node in the cluster fails, some data could be unavailable
All primary shards are assigned, but one or more replica shards are
unassigned. If a node in the cluster fails, some data could be unavailable
until that node is repaired.

* `red`:
Expand Down Expand Up @@ -716,6 +716,13 @@ tag::pipeline[]
(Optional, string) ID of the pipeline to use to preprocess incoming documents.
end::pipeline[]

tag::skip_pipeline[]
`skip_pipeline`::
(Optional, Boolean) If `true`, the pipeline (whether explicit or default) is not
executed. However the final pipeline (if one exists) is still executed. This paramter
is meant to support integrations that run pipelines external to {es}.
end::skip_pipeline[]

tag::pages-processed[]
The number of search or bulk index operations processed. Documents are
processed in batches instead of individually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,56 @@ teardown:
index: test_index
id: 1
- match: { _source: { my_field: "upper" } }

---
"Test bulk request with skip_pipeline":

- do:
bulk:
body:
- index:
_index: test_index
_id: test_id1
pipeline: pipeline1
skip_pipeline: true
- f1: v1
- index:
_index: test_index
pipeline: pipeline2
_id: test_id2
- f1: v2
- gte: { ingest_took: 0 }

- do:
get:
index: test_index
id: test_id1

- is_false: _source.field1
- is_false: _source.field2

- do:
get:
index: test_index
id: test_id2

- is_false: _source.field1
- match: {_source.field2: value2}

- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }

- do:
nodes.stats:
metric: [ ingest ]
#we can't assert anything here since we might have more than one node in the cluster
- gte: {nodes.$master.ingest.total.count: 0}
- gte: {nodes.$master.ingest.total.failed: 0}
- gte: {nodes.$master.ingest.total.time_in_millis: 0}
- match: {nodes.$master.ingest.total.current: 0}
- gte: {nodes.$master.ingest.pipelines.pipeline1.count: 0}
- match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0}
- gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0}
- match: {nodes.$master.ingest.pipelines.pipeline1.current: 0}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class BulkRequestParser {
private static final ParseField VERSION_TYPE = new ParseField("version_type");
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SKIP_PIPELINE = new ParseField("skip_pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
Expand Down Expand Up @@ -206,6 +207,7 @@ public void parse(
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean skipPipeline = false;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
Map<String, String> dynamicTemplates = Map.of();

Expand Down Expand Up @@ -256,6 +258,8 @@ public void parse(
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SKIP_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
skipPipeline = parser.booleanValue();
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -339,6 +343,7 @@ public void parse(
.version(version)
.versionType(versionType)
.setPipeline(pipeline)
.skipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand All @@ -354,6 +359,7 @@ public void parse(
.versionType(versionType)
.create("create".equals(opType))
.setPipeline(pipeline)
.skipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand All @@ -370,6 +376,7 @@ public void parse(
.versionType(versionType)
.create(true)
.setPipeline(pipeline)
.skipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand Down Expand Up @@ -411,6 +418,7 @@ public void parse(
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(pipeline);
upsertRequest.skipPipeline(skipPipeline);
}

updateRequestConsumer.accept(updateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private XContentType contentType;

private String pipeline;
private boolean skipPipeline;
private String finalPipeline;

private boolean isPipelineResolved;
Expand Down Expand Up @@ -139,6 +140,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
skipPipeline = in.readBoolean();
} else {
skipPipeline = false;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
finalPipeline = in.readOptionalString();
}
Expand Down Expand Up @@ -313,6 +319,26 @@ public String getPipeline() {
return this.pipeline;
}

/**
* Sets if the pipeline for this request should be skipped.
*
* @param skipPipeline true if the pipeline should be skipped
* @return the request
*/
public IndexRequest skipPipeline(final boolean skipPipeline) {
this.skipPipeline = skipPipeline;
return this;
}

/**
* Returns whether or not the pipeline for this request should be skipped.
*
* @return true if the pipeline has been resolved
*/
public boolean skipPipeline() {
return this.skipPipeline;
}

/**
* Sets the final ingest pipeline to be executed before indexing the document.
*
Expand Down Expand Up @@ -683,6 +709,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
out.writeBoolean(skipPipeline);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalString(finalPipeline);
}
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static boolean resolvePipelines(
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
// find the default pipeline if one is defined from an existing index setting
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
setPipelineOrSkip(indexRequest, defaultPipeline);
}
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
// find the final pipeline if one is defined from an existing index setting
Expand All @@ -255,7 +255,7 @@ public static boolean resolvePipelines(
if (IndexSettings.FINAL_PIPELINE.exists(settings)) {
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
}
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
} else {
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
Expand All @@ -279,13 +279,13 @@ public static boolean resolvePipelines(
break;
}
}
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
}
}

if (requestPipeline != null) {
indexRequest.setPipeline(requestPipeline);
setPipelineOrSkip(indexRequest, requestPipeline);
}

/*
Expand All @@ -306,6 +306,14 @@ public static boolean resolvePipelines(
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}

private static void setPipelineOrSkip(IndexRequest indexRequest, String pipelineName) {
if (indexRequest.skipPipeline()) {
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} else {
indexRequest.setPipeline(pipelineName);
}
}

public ClusterService getClusterService() {
return clusterService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.setPipeline(request.param("pipeline"));
indexRequest.skipPipeline(request.paramAsBoolean("skip_pipeline", false));
indexRequest.source(request.requiredContent(), request.getXContentType());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.setRefreshPolicy(request.param("refresh"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.is;

public class BulkRequestParserTests extends ESTestCase {

public void testIndexRequest() throws IOException {
Expand Down Expand Up @@ -309,4 +311,35 @@ public void testFailOnInvalidAction() {
);
}

public void testSkipPipeline() throws IOException {
BytesArray request = new BytesArray("""
{ "index":{ "_index": "bar", "pipeline": "foo", "skip_pipeline": "true"} }
{}
{ "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub" } }
{}
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final List<IndexRequest> indexRequests = new ArrayList<>();
parser.parse(
request,
null,
null,
null,
null,
null,
true,
XContentType.JSON,
(indexRequest, type) -> indexRequests.add(indexRequest),
req -> fail(),
req -> fail()
);
assertThat(indexRequests, Matchers.hasSize(2));
final IndexRequest first = indexRequests.get(0);
final IndexRequest second = indexRequests.get(1);
assertThat(first.skipPipeline(), is(true));
assertThat(first.getPipeline(), is("foo"));
assertThat(second.skipPipeline(), is(false));
assertThat(second.getPipeline(), is("foo"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,71 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
}
}

public void testResolvePipelinesWithSkipPipeline() {
// no pipeline:
{
Metadata metadata = Metadata.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline:
{
Metadata metadata = Metadata.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// default pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline with default pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline with final pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
}
}

public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception {
var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry);

Expand Down
Loading