From e8c8d07be84586ce407933bc341c883dfc26e84f Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Tue, 12 Mar 2024 17:36:28 +0000 Subject: [PATCH] First IT, some serialisation fixes, and node command parsing --- .../datastreams/DataStreamAutoshardingIT.java | 202 ++++++++++++++++++ .../indices/rollover/AutoShardCondition.java | 5 +- .../node/TransportBroadcastByNodeAction.java | 6 +- .../ElasticsearchNodeCommand.java | 7 + .../rollover/AutoShardConditionTests.java | 62 ++++++ .../indices/rollover/ConditionTests.java | 16 ++ .../cluster/metadata/IndexMetadataTests.java | 7 +- 7 files changed, 299 insertions(+), 6 deletions(-) create mode 100644 modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/rollover/AutoShardConditionTests.java diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java new file mode 100644 index 0000000000000..c779fcf93c728 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -0,0 +1,202 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xcontent.XContentType; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; + +public class DataStreamAutoshardingIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestAutoshardingPlugin.class); + } + + @Before + public void configureClusterSettings() { + updateClusterSettings( + Settings.builder().putList(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(), List.of()) + ); + } + + @After + public void resetClusterSetting() { + updateClusterSettings( + Settings.builder().putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey()) + ); + } + + public void testRolloverOnAutoShardCondition() throws Exception { + final String dataStreamName = "logs-es"; + + putComposableIndexTemplate("my-template", null, List.of("logs-*"), Settings.EMPTY); + final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + + for (int i = 0; i < 10; i++) { + indexDocs(dataStreamName, randomIntBetween(100, 200)); + } + + final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + final String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + Index writeIndex = clusterStateBeforeRollover.metadata().dataStreams().get(dataStreamName).getWriteIndex(); + IndexMetadata indexMeta = clusterStateBeforeRollover.getMetadata().index(writeIndex); + ShardId shardId = new ShardId(indexMeta.getIndex(), 0); + Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(0)); + ShardRouting shardRouting = ShardRouting.newUnassigned( + shardId, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null), + ShardRouting.Role.DEFAULT + ); + shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + CommonStats stats = new CommonStats(); + stats.docs = new DocsStats(); + stats.store = new StoreStats(); + stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, false, 1, 4, 1)); + final ShardStats shardStats = new ShardStats( + shardRouting, + new ShardPath(false, path, path, shardId), + stats, + null, + null, + null, + false, + 0 + ); + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, node.getName()); + channel.sendResponse(instance.new NodeResponse(node.getId(), 1, List.of(shardStats), List.of())); + }); + } + + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + + final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + final IndexMetadata currentWriteIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex()); + + assertThat(currentWriteIndexMetadata.getNumberOfShards(), is(3)); + } + + static void putComposableIndexTemplate(String id, @Nullable String mappings, List patterns, @Nullable Settings settings) + throws IOException { + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template(new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + } + + static void indexDocs(String dataStream, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON) + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream; + for (BulkItemResponse itemResponse : bulkResponse) { + assertThat(itemResponse.getFailureMessage(), nullValue()); + assertThat(itemResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix)); + } + indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); + } + + /** + * Test plugin that registers an additional setting. + */ + public static class TestAutoshardingPlugin extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.boolSetting(DATA_STREAMS_AUTO_SHARDING_ENABLED, false, Setting.Property.Dynamic, Setting.Property.NodeScope) + ); + } + + @Override + public Settings additionalSettings() { + return Settings.builder().put(DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardCondition.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardCondition.java index bfe0b63de1b15..9f3283b85ed6f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardCondition.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardCondition.java @@ -21,7 +21,6 @@ import java.io.IOException; -import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -96,7 +95,7 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeEnum(type); + out.writeEnum(value.type()); out.writeVInt(value.currentNumberOfShards()); out.writeVInt(value.targetNumberOfShards()); out.writeTimeValue(value.coolDownRemaining()); @@ -107,7 +106,7 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // we only save this representation in the cluster state as part of meet_conditions when this condition is met builder.startObject(NAME); - builder.field(AUTO_SHARDING_TYPE.getPreferredName(), INCREASE_SHARDS); + builder.field(AUTO_SHARDING_TYPE.getPreferredName(), value.type()); builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards()); builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards()); builder.field(COOLDOWN_REMAINING.getPreferredName(), value.coolDownRemaining().toHumanReadableString(2)); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 19c7561ccdb15..45c13dde29d06 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -542,7 +542,8 @@ public String toString() { } } - class NodeResponse extends TransportResponse { + // visible for testing + public class NodeResponse extends TransportResponse { protected String nodeId; protected int totalShards; protected List exceptions; @@ -560,7 +561,8 @@ class NodeResponse extends TransportResponse { } } - NodeResponse( + // visible for testing + public NodeResponse( String nodeId, int totalShards, List results, diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index 82abf4b4c7d5d..fc80ec5f7fd02 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.admin.indices.rollover.AutoShardCondition; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.cli.ProcessInfo; import org.elasticsearch.cli.Terminal; @@ -89,6 +90,12 @@ public T parseNamedObject(Class categoryClass, String name, XContentPa throw new UnsupportedOperationException("Unexpected token for Condition: " + parser.currentToken()); } parser.nextToken(); + if (parser.currentToken().isValue() == false + && parser.currentToken().equals(XContentParser.Token.START_OBJECT) + && name.equals(AutoShardCondition.NAME)) { + return (T) AutoShardCondition.fromXContent(parser); + } + assert parser.currentToken().isValue() : parser.currentToken(); if (parser.currentToken().isValue() == false) { throw new UnsupportedOperationException("Unexpected token for Condition: " + parser.currentToken()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/AutoShardConditionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/AutoShardConditionTests.java new file mode 100644 index 0000000000000..4b285b4fcbcf7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/AutoShardConditionTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.rollover; + +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class AutoShardConditionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return AutoShardCondition::new; + } + + @Override + protected AutoShardCondition createTestInstance() { + return new AutoShardCondition( + new IncreaseShardsDetails( + randomFrom(AutoShardingType.INCREASE_SHARDS, AutoShardingType.COOLDOWN_PREVENTED_INCREASE), + randomNonNegativeInt(), + randomNonNegativeInt(), + TimeValue.ZERO, + randomDoubleBetween(0.0, 300.0, true) + ) + ); + } + + @Override + protected AutoShardCondition mutateInstance(AutoShardCondition instance) throws IOException { + var type = instance.value.type(); + var numberOfShards = instance.value.currentNumberOfShards(); + var targetNumberOfShards = instance.value.targetNumberOfShards(); + var cooldown = instance.value.coolDownRemaining(); + var writeLoad = instance.value.writeLoad(); + switch (randomInt(4)) { + case 0 -> type = randomValueOtherThan( + type, + () -> randomFrom(AutoShardingType.INCREASE_SHARDS, AutoShardingType.COOLDOWN_PREVENTED_INCREASE) + ); + case 1 -> numberOfShards++; + case 2 -> targetNumberOfShards++; + case 3 -> { + if (type.equals(AutoShardingType.INCREASE_SHARDS)) { + type = AutoShardingType.COOLDOWN_PREVENTED_INCREASE; + } + cooldown = TimeValue.timeValueMillis(randomNonNegativeLong()); + } + case 4 -> writeLoad = randomValueOtherThan(writeLoad, () -> randomDoubleBetween(0.0, 500.0, true)); + } + return new AutoShardCondition(new IncreaseShardsDetails(type, numberOfShards, targetNumberOfShards, cooldown, writeLoad)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java index 956e566bfd3fe..5a85489166da8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java @@ -353,6 +353,22 @@ public void testEqualsAndHashCode() { condition -> new MinPrimaryShardDocsCondition(condition.value), condition -> new MinPrimaryShardDocsCondition(randomNonNegativeLong()) ); + AutoShardCondition autoShardCondition = new AutoShardCondition( + new IncreaseShardsDetails(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 3.0) + ); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + autoShardCondition, + condition -> new AutoShardCondition(condition.value), + condition -> new AutoShardCondition( + new IncreaseShardsDetails( + AutoShardingType.COOLDOWN_PREVENTED_INCREASE, + randomNonNegativeInt(), + randomNonNegativeInt(), + TimeValue.timeValueMillis(randomNonNegativeLong()), + 5.0 + ) + ) + ); } public void testAutoShardCondtionXContent() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index b4c9f670f66b6..c281f45c50883 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -8,12 +8,15 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.action.admin.indices.rollover.AutoShardCondition; +import org.elasticsearch.action.admin.indices.rollover.IncreaseShardsDetails; import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardDocsCondition; import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardSizeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -97,7 +100,8 @@ public void testIndexMetadataSerialization() throws IOException { new MaxDocsCondition(randomNonNegativeLong()), new MaxSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())), new MaxPrimaryShardSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())), - new MaxPrimaryShardDocsCondition(randomNonNegativeLong()) + new MaxPrimaryShardDocsCondition(randomNonNegativeLong()), + new AutoShardCondition(new IncreaseShardsDetails(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 4.0)) ), randomNonNegativeLong() ) @@ -128,6 +132,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getCreationVersion(), fromXContentMeta.getCreationVersion()); assertEquals(metadata.getCompatibilityVersion(), fromXContentMeta.getCompatibilityVersion()); assertEquals(metadata.getRoutingNumShards(), fromXContentMeta.getRoutingNumShards()); + assertEquals(metadata.getRolloverInfos(), fromXContentMeta.getRolloverInfos()); assertEquals(metadata.getCreationDate(), fromXContentMeta.getCreationDate()); assertEquals(metadata.getRoutingFactor(), fromXContentMeta.getRoutingFactor()); assertEquals(metadata.primaryTerm(0), fromXContentMeta.primaryTerm(0));