Skip to content

Commit

Permalink
First IT, some serialisation fixes, and node command parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Mar 12, 2024
1 parent 9397659 commit e8c8d07
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class DataStreamAutoshardingIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestAutoshardingPlugin.class);
}

@Before
public void configureClusterSettings() {
updateClusterSettings(
Settings.builder().putList(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(), List.of())
);
}

@After
public void resetClusterSetting() {
updateClusterSettings(
Settings.builder().putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey())
);
}

public void testRolloverOnAutoShardCondition() throws Exception {
final String dataStreamName = "logs-es";

putComposableIndexTemplate("my-template", null, List.of("logs-*"), Settings.EMPTY);
final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());

for (int i = 0; i < 10; i++) {
indexDocs(dataStreamName, randomIntBetween(100, 200));
}

final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
final String assignedShardNodeId = clusterStateBeforeRollover.routingTable()
.index(dataStreamBeforeRollover.getWriteIndex())
.shard(0)
.primaryShard()
.currentNodeId();

Index writeIndex = clusterStateBeforeRollover.metadata().dataStreams().get(dataStreamName).getWriteIndex();
IndexMetadata indexMeta = clusterStateBeforeRollover.getMetadata().index(writeIndex);
ShardId shardId = new ShardId(indexMeta.getIndex(), 0);
Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(0));
ShardRouting shardRouting = ShardRouting.newUnassigned(
shardId,
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null),
ShardRouting.Role.DEFAULT
);
shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
CommonStats stats = new CommonStats();
stats.docs = new DocsStats();
stats.store = new StoreStats();
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, false, 1, 4, 1));
final ShardStats shardStats = new ShardStats(
shardRouting,
new ShardPath(false, path, path, shardId),
stats,
null,
null,
null,
false,
0
);

for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) {
MockTransportService.getInstance(node.getName())
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, node.getName());
channel.sendResponse(instance.new NodeResponse(node.getId(), 1, List.of(shardStats), List.of()));
});
}

assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());

final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
final IndexMetadata currentWriteIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex());

assertThat(currentWriteIndexMetadata.getNumberOfShards(), is(3));
}

static void putComposableIndexTemplate(String id, @Nullable String mappings, List<String> patterns, @Nullable Settings settings)
throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(patterns)
.template(new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
}

static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

/**
* Test plugin that registers an additional setting.
*/
public static class TestAutoshardingPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(
Setting.boolSetting(DATA_STREAMS_AUTO_SHARDING_ENABLED, false, Setting.Property.Dynamic, Setting.Property.NodeScope)
);
}

@Override
public Settings additionalSettings() {
return Settings.builder().put(DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;

import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -96,7 +95,7 @@ public String getWriteableName() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(type);
out.writeEnum(value.type());
out.writeVInt(value.currentNumberOfShards());
out.writeVInt(value.targetNumberOfShards());
out.writeTimeValue(value.coolDownRemaining());
Expand All @@ -107,7 +106,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// we only save this representation in the cluster state as part of meet_conditions when this condition is met
builder.startObject(NAME);
builder.field(AUTO_SHARDING_TYPE.getPreferredName(), INCREASE_SHARDS);
builder.field(AUTO_SHARDING_TYPE.getPreferredName(), value.type());
builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards());
builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards());
builder.field(COOLDOWN_REMAINING.getPreferredName(), value.coolDownRemaining().toHumanReadableString(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ public String toString() {
}
}

class NodeResponse extends TransportResponse {
// visible for testing
public class NodeResponse extends TransportResponse {
protected String nodeId;
protected int totalShards;
protected List<BroadcastShardOperationFailedException> exceptions;
Expand All @@ -560,7 +561,8 @@ class NodeResponse extends TransportResponse {
}
}

NodeResponse(
// visible for testing
public NodeResponse(
String nodeId,
int totalShards,
List<ShardOperationResult> results,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.admin.indices.rollover.AutoShardCondition;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.cli.ProcessInfo;
import org.elasticsearch.cli.Terminal;
Expand Down Expand Up @@ -89,6 +90,12 @@ public <T, C> T parseNamedObject(Class<T> categoryClass, String name, XContentPa
throw new UnsupportedOperationException("Unexpected token for Condition: " + parser.currentToken());
}
parser.nextToken();
if (parser.currentToken().isValue() == false
&& parser.currentToken().equals(XContentParser.Token.START_OBJECT)
&& name.equals(AutoShardCondition.NAME)) {
return (T) AutoShardCondition.fromXContent(parser);
}

assert parser.currentToken().isValue() : parser.currentToken();
if (parser.currentToken().isValue() == false) {
throw new UnsupportedOperationException("Unexpected token for Condition: " + parser.currentToken());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.datastreams.autosharding.AutoShardingType;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.io.IOException;

public class AutoShardConditionTests extends AbstractWireSerializingTestCase<AutoShardCondition> {

@Override
protected Writeable.Reader<AutoShardCondition> instanceReader() {
return AutoShardCondition::new;
}

@Override
protected AutoShardCondition createTestInstance() {
return new AutoShardCondition(
new IncreaseShardsDetails(
randomFrom(AutoShardingType.INCREASE_SHARDS, AutoShardingType.COOLDOWN_PREVENTED_INCREASE),
randomNonNegativeInt(),
randomNonNegativeInt(),
TimeValue.ZERO,
randomDoubleBetween(0.0, 300.0, true)
)
);
}

@Override
protected AutoShardCondition mutateInstance(AutoShardCondition instance) throws IOException {
var type = instance.value.type();
var numberOfShards = instance.value.currentNumberOfShards();
var targetNumberOfShards = instance.value.targetNumberOfShards();
var cooldown = instance.value.coolDownRemaining();
var writeLoad = instance.value.writeLoad();
switch (randomInt(4)) {
case 0 -> type = randomValueOtherThan(
type,
() -> randomFrom(AutoShardingType.INCREASE_SHARDS, AutoShardingType.COOLDOWN_PREVENTED_INCREASE)
);
case 1 -> numberOfShards++;
case 2 -> targetNumberOfShards++;
case 3 -> {
if (type.equals(AutoShardingType.INCREASE_SHARDS)) {
type = AutoShardingType.COOLDOWN_PREVENTED_INCREASE;
}
cooldown = TimeValue.timeValueMillis(randomNonNegativeLong());
}
case 4 -> writeLoad = randomValueOtherThan(writeLoad, () -> randomDoubleBetween(0.0, 500.0, true));
}
return new AutoShardCondition(new IncreaseShardsDetails(type, numberOfShards, targetNumberOfShards, cooldown, writeLoad));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,22 @@ public void testEqualsAndHashCode() {
condition -> new MinPrimaryShardDocsCondition(condition.value),
condition -> new MinPrimaryShardDocsCondition(randomNonNegativeLong())
);
AutoShardCondition autoShardCondition = new AutoShardCondition(
new IncreaseShardsDetails(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 3.0)
);
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
autoShardCondition,
condition -> new AutoShardCondition(condition.value),
condition -> new AutoShardCondition(
new IncreaseShardsDetails(
AutoShardingType.COOLDOWN_PREVENTED_INCREASE,
randomNonNegativeInt(),
randomNonNegativeInt(),
TimeValue.timeValueMillis(randomNonNegativeLong()),
5.0
)
)
);
}

public void testAutoShardCondtionXContent() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.action.admin.indices.rollover.AutoShardCondition;
import org.elasticsearch.action.admin.indices.rollover.IncreaseShardsDetails;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardDocsCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardSizeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.action.datastreams.autosharding.AutoShardingType;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -97,7 +100,8 @@ public void testIndexMetadataSerialization() throws IOException {
new MaxDocsCondition(randomNonNegativeLong()),
new MaxSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())),
new MaxPrimaryShardSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())),
new MaxPrimaryShardDocsCondition(randomNonNegativeLong())
new MaxPrimaryShardDocsCondition(randomNonNegativeLong()),
new AutoShardCondition(new IncreaseShardsDetails(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 4.0))
),
randomNonNegativeLong()
)
Expand Down Expand Up @@ -128,6 +132,7 @@ public void testIndexMetadataSerialization() throws IOException {
assertEquals(metadata.getCreationVersion(), fromXContentMeta.getCreationVersion());
assertEquals(metadata.getCompatibilityVersion(), fromXContentMeta.getCompatibilityVersion());
assertEquals(metadata.getRoutingNumShards(), fromXContentMeta.getRoutingNumShards());
assertEquals(metadata.getRolloverInfos(), fromXContentMeta.getRolloverInfos());
assertEquals(metadata.getCreationDate(), fromXContentMeta.getCreationDate());
assertEquals(metadata.getRoutingFactor(), fromXContentMeta.getRoutingFactor());
assertEquals(metadata.primaryTerm(0), fromXContentMeta.primaryTerm(0));
Expand Down

0 comments on commit e8c8d07

Please sign in to comment.