From b59edb6cf7bbbc9da641e9b70b2f74c94398afe8 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 17 May 2024 14:00:45 -0500 Subject: [PATCH 1/3] Adding isSimulated methods to be used in simulate mapping validation work --- .../org/elasticsearch/TransportVersions.java | 1 + .../action/bulk/BulkOperation.java | 3 +- .../action/bulk/BulkRequest.java | 8 +++ .../action/bulk/BulkShardRequest.java | 22 +++++++ .../action/bulk/SimulateBulkRequest.java | 5 ++ .../action/bulk/BulkShardRequestTests.java | 44 ++++++++++++++ .../ingest/SimulateIndexResponseTests.java | 58 +++++++++++++++++-- 7 files changed, 136 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 24e40c1cd9115..2f836c78cd7ab 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -169,6 +169,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_COHERE_COMPLETION_ADDED = def(8_660_00_0); public static final TransportVersion ESQL_REMOVE_ES_SOURCE_OPTIONS = def(8_661_00_0); public static final TransportVersion NODE_STATS_INGEST_BYTES = def(8_662_00_0); + public static final TransportVersion SIMULATE_VALIDATES_MAPPINGS = def(8_663_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 7356dc0ea140e..9f174c3adca1b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -296,7 +296,8 @@ private void executeBulkRequestsByShard( BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, bulkRequest.getRefreshPolicy(), - requests.toArray(new BulkItemRequest[0]) + requests.toArray(new BulkItemRequest[0]), + bulkRequest.isSimulated() ); var indexMetadata = clusterState.getMetadata().index(shardId.getIndexName()); if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index b73c853421e71..83b572afb2853 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -463,4 +463,12 @@ public long ramBytesUsed() { public Set getIndices() { return Collections.unmodifiableSet(indices); } + + /** + * Returns true if this is a request for a simulation rather than a real bulk request. + * @return true if this is a simulated bulk request + */ + public boolean isSimulated() { + return false; // Always false, but may be overridden by a subclass + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 85b7fc03ff667..0d2942e688382 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; @@ -34,18 +35,29 @@ public final class BulkShardRequest extends ReplicatedWriteRequest inferenceFieldMap = null; public BulkShardRequest(StreamInput in) throws IOException { super(in); items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(shardId, inpt)), BulkItemRequest[]::new); + if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) { + isSimulated = in.readBoolean(); + } else { + isSimulated = false; + } } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { + this(shardId, refreshPolicy, items, false); + } + + public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) { super(shardId); this.items = items; setRefreshPolicy(refreshPolicy); + this.isSimulated = isSimulated; } /** @@ -126,6 +138,9 @@ public void writeTo(StreamOutput out) throws IOException { o.writeBoolean(false); } }, items); + if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) { + out.writeBoolean(isSimulated); + } } @Override @@ -149,6 +164,9 @@ public String toString() { case NONE: break; } + if (isSimulated) { + b.append(", simulated"); + } return b.toString(); } @@ -186,4 +204,8 @@ public long ramBytesUsed() { } return sum; } + + public boolean isSimulated() { + return isSimulated; + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java index c167c88954b38..1987d758eb09a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java @@ -78,4 +78,9 @@ public void writeTo(StreamOutput out) throws IOException { public Map> getPipelineSubstitutions() { return pipelineSubstitutions; } + + @Override + public boolean isSimulated() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkShardRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkShardRequestTests.java index 66c98063a4b06..b66ccb4721645 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkShardRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkShardRequestTests.java @@ -8,11 +8,18 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; + import static org.apache.lucene.tests.util.TestUtil.randomSimpleString; +import static org.hamcrest.Matchers.equalTo; public class BulkShardRequestTests extends ESTestCase { public void testToString() { @@ -30,5 +37,42 @@ public void testToString() { r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count]); assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString()); assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription()); + + r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count], true); + assertEquals( + "BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh, simulated", + r.toString() + ); + assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription()); + + r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count], false); + assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString()); + assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription()); + } + + public void testSerialization() throws IOException { + // Note: BulkShardRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a Writable + BulkShardRequest bulkShardRequest = randomBulkShardRequest(); + BulkShardRequest copy = copyWriteable(bulkShardRequest, null, BulkShardRequest::new); + assertThat(bulkShardRequest.items().length, equalTo(copy.items().length)); + assertThat(bulkShardRequest.isSimulated(), equalTo(copy.isSimulated())); + assertThat(bulkShardRequest.getRefreshPolicy(), equalTo(copy.getRefreshPolicy())); + } + + protected BulkShardRequest randomBulkShardRequest() { + String indexName = randomAlphaOfLength(100); + ShardId shardId = new ShardId(indexName, randomAlphaOfLength(50), randomInt()); + RefreshPolicy refreshPolicy = randomFrom(RefreshPolicy.values()); + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(0, 100)]; + for (int i = 0; i < items.length; i++) { + final DocWriteRequest request = switch (randomFrom(DocWriteRequest.OpType.values())) { + case INDEX -> new IndexRequest(indexName).id("id_" + i); + case CREATE -> new IndexRequest(indexName).id("id_" + i).create(true); + case UPDATE -> new UpdateRequest(indexName, "id_" + i); + case DELETE -> new DeleteRequest(indexName, "id_" + i); + }; + items[i] = new BulkItemRequest(i, request); + } + return new BulkShardRequest(shardId, refreshPolicy, items, randomBoolean()); } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java index 7ce3b411e978f..7b14b2cb0dd53 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -38,8 +39,17 @@ public void testToXContent() throws IOException { String source = """ {"doc": {"key1": "val1", "key2": "val2"}}"""; BytesReference sourceBytes = BytesReference.fromByteBuffer(ByteBuffer.wrap(source.getBytes(StandardCharsets.UTF_8))); - SimulateIndexResponse indexResponse = new SimulateIndexResponse(id, index, version, sourceBytes, XContentType.JSON, pipelines); - String output = Strings.toString(indexResponse); + + SimulateIndexResponse indexResponse = new SimulateIndexResponse( + id, + index, + version, + sourceBytes, + XContentType.JSON, + pipelines, + null + ); + assertEquals( XContentHelper.stripWhitespace( Strings.format( @@ -58,7 +68,39 @@ public void testToXContent() throws IOException { pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(",")) ) ), - output + Strings.toString(indexResponse) + ); + + SimulateIndexResponse indexResponseWithException = new SimulateIndexResponse( + id, + index, + version, + sourceBytes, + XContentType.JSON, + pipelines, + new ElasticsearchException("Some failure") + ); + + assertEquals( + XContentHelper.stripWhitespace( + Strings.format( + """ + { + "_id": "%s", + "_index": "%s", + "_version": %d, + "_source": %s, + "executed_pipelines": [%s], + "error":{"type":"exception","reason":"Some failure"} + }""", + id, + index, + version, + source, + pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(",")) + ) + ), + Strings.toString(indexResponseWithException) ); } @@ -85,6 +127,14 @@ private static SimulateIndexResponse randomIndexResponse() { } XContentType xContentType = randomFrom(XContentType.values()); BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType); - return new SimulateIndexResponse(id, index, version, sourceBytes, xContentType, pipelines); + return new SimulateIndexResponse( + id, + index, + version, + sourceBytes, + xContentType, + pipelines, + randomBoolean() ? null : new ElasticsearchException("failed") + ); } } From 59a571f18a01e9cc328e67256bb418930e66f889 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 17 May 2024 14:07:53 -0500 Subject: [PATCH 2/3] adding exception field to SimulateIndexResponse --- .../bulk/TransportSimulateBulkAction.java | 3 ++- .../action/ingest/SimulateIndexResponse.java | 20 ++++++++++++++++++- .../ingest/RestSimulateIngestActionTests.java | 3 ++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java index 1b3949f3c00ac..d543e3b56a9ef 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -93,7 +93,8 @@ protected void createMissingIndicesAndIndexData( request.version(), ((IndexRequest) request).source(), ((IndexRequest) request).getContentType(), - ((IndexRequest) request).getExecutedPipelines() + ((IndexRequest) request).getExecutedPipelines(), + null ) ) ); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java index 8c6d452fb6298..053dcdeab4908 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java @@ -8,6 +8,8 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -29,6 +31,7 @@ public class SimulateIndexResponse extends IndexResponse { private final BytesReference source; private final XContentType sourceXContentType; + private final Exception exception; @SuppressWarnings("this-escape") public SimulateIndexResponse(StreamInput in) throws IOException { @@ -36,6 +39,11 @@ public SimulateIndexResponse(StreamInput in) throws IOException { this.source = in.readBytesReference(); this.sourceXContentType = XContentType.valueOf(in.readString()); setShardInfo(ShardInfo.EMPTY); + if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) { + this.exception = in.readException(); + } else { + this.exception = null; + } } @SuppressWarnings("this-escape") @@ -45,13 +53,15 @@ public SimulateIndexResponse( long version, BytesReference source, XContentType sourceXContentType, - List pipelines + List pipelines, + Exception exception ) { // We don't actually care about most of the IndexResponse fields: super(new ShardId(index, "", 0), id == null ? "" : id, 0, 0, version, true, pipelines); this.source = source; this.sourceXContentType = sourceXContentType; setShardInfo(ShardInfo.EMPTY); + this.exception = exception; } @Override @@ -62,6 +72,11 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2()); assert executedPipelines != null : "executedPipelines is null when it shouldn't be - we always list pipelines in simulate mode"; builder.array("executed_pipelines", executedPipelines.toArray()); + if (exception != null) { + builder.startObject("error"); + ElasticsearchException.generateThrowableXContent(builder, params, exception); + builder.endObject(); + } return builder; } @@ -75,6 +90,9 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBytesReference(source); out.writeString(sourceXContentType.name()); + if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) { + out.writeException(exception); + } } @Override diff --git a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java index a738a13f62c21..80be61993057a 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java @@ -226,7 +226,8 @@ private BulkItemResponse getSuccessBulkItemResponse(String id, String source) { 3, BytesReference.fromByteBuffers(sourceByteBuffer), XContentType.JSON, - List.of("pipeline1", "pipeline2") + List.of("pipeline1", "pipeline2"), + null ) ); } From bf327786b9d414bfff815c9cf3cb91f0f6bab7dd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 20 May 2024 15:15:44 -0500 Subject: [PATCH 3/3] marking exception as nullable --- .../org/elasticsearch/action/ingest/SimulateIndexResponse.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java index 053dcdeab4908..445492f037926 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xcontent.XContentBuilder; @@ -54,7 +55,7 @@ public SimulateIndexResponse( BytesReference source, XContentType sourceXContentType, List pipelines, - Exception exception + @Nullable Exception exception ) { // We don't actually care about most of the IndexResponse fields: super(new ShardId(index, "", 0), id == null ? "" : id, 0, 0, version, true, pipelines);