Skip to content

Commit

Permalink
Merge branch 'main' into routing_hash_value_format_writeable
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Sep 24, 2024
2 parents fb7f789 + 4309eaa commit 84000df
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static ZoneId newDateTimeZone(String timezone) {
}

private static Locale newLocale(String locale) {
return locale == null ? Locale.ROOT : LocaleUtils.parse(locale);
return locale == null ? Locale.ENGLISH : LocaleUtils.parse(locale);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;

import java.util.HashSet;
import java.util.Set;
Expand All @@ -41,15 +40,6 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class PrevalidateShardPathIT extends ESIntegTestCase {

@TestIssueLogging(
value = "org.elasticsearch.cluster.service.MasterService:DEBUG,"
+ "org.elasticsearch.indices.store.IndicesStore:TRACE,"
+ "org.elasticsearch.indices.cluster.IndicesClusterStateService:DEBUG,"
+ "org.elasticsearch.indices.IndicesService:TRACE,"
+ "org.elasticsearch.index.IndexService:TRACE,"
+ "org.elasticsearch.env.NodeEnvironment:TRACE",
issueUrl = "https://github.com/elastic/elasticsearch/issues/111134"
)
public void testCheckShards() throws Exception {
internalCluster().startMasterOnlyNode();
String node1 = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -43,58 +42,45 @@ public void testThatNonDynamicSettingChangesTakeEffect() throws Exception {
MetadataUpdateSettingsService metadataUpdateSettingsService = internalCluster().getCurrentMasterNodeInstance(
MetadataUpdateSettingsService.class
);
List<Index> indicesList = new ArrayList<>();
UpdateSettingsClusterStateUpdateRequest request = new UpdateSettingsClusterStateUpdateRequest().ackTimeout(TimeValue.ZERO);
List<Index> indices = new ArrayList<>();
for (IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
indicesList.add(indexService.index());
indices.add(indexService.index());
}
}
final var indices = indicesList.toArray(Index.EMPTY_ARRAY);

final Function<UpdateSettingsClusterStateUpdateRequest.OnStaticSetting, UpdateSettingsClusterStateUpdateRequest> requestFactory =
onStaticSetting -> new UpdateSettingsClusterStateUpdateRequest(
TEST_REQUEST_TIMEOUT,
TimeValue.ZERO,
Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build(),
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
onStaticSetting,
indices
);
request.indices(indices.toArray(Index.EMPTY_ARRAY));
request.settings(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build());

// First make sure it fails if reopenShards is not set on the request:
AtomicBoolean expectedFailureOccurred = new AtomicBoolean(false);
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail("Should have failed updating a non-dynamic setting without reopenShards set to true");
}
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail("Should have failed updating a non-dynamic setting without reopenShards set to true");
}

@Override
public void onFailure(Exception e) {
expectedFailureOccurred.set(true);
}
@Override
public void onFailure(Exception e) {
expectedFailureOccurred.set(true);
}
);
});
assertBusy(() -> assertThat(expectedFailureOccurred.get(), equalTo(true)));

// Now we set reopenShards and expect it to work:
request.reopenShards(true);
AtomicBoolean success = new AtomicBoolean(false);
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
@Override
public void onFailure(Exception e) {
fail(e);
}
);
});
assertBusy(() -> assertThat(success.get(), equalTo(true)));

// Now we look into the IndexShard objects to make sure that the code was actually updated (vs just the setting):
Expand Down Expand Up @@ -124,23 +110,16 @@ public void testThatNonDynamicSettingChangesDoNotUnncessesarilyCauseReopens() th
MetadataUpdateSettingsService metadataUpdateSettingsService = internalCluster().getCurrentMasterNodeInstance(
MetadataUpdateSettingsService.class
);
List<Index> indicesList = new ArrayList<>();
UpdateSettingsClusterStateUpdateRequest request = new UpdateSettingsClusterStateUpdateRequest().ackTimeout(TimeValue.ZERO);
List<Index> indices = new ArrayList<>();
for (IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
indicesList.add(indexService.index());
indices.add(indexService.index());
}
}
final var indices = indicesList.toArray(Index.EMPTY_ARRAY);

final Function<Settings.Builder, UpdateSettingsClusterStateUpdateRequest> requestFactory =
settings -> new UpdateSettingsClusterStateUpdateRequest(
TEST_REQUEST_TIMEOUT,
TimeValue.ZERO,
settings.build(),
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
indices
);
request.indices(indices.toArray(Index.EMPTY_ARRAY));
request.settings(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build());
request.reopenShards(true);

ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
AtomicBoolean shardsUnassigned = new AtomicBoolean(false);
Expand All @@ -163,49 +142,47 @@ public void testThatNonDynamicSettingChangesDoNotUnncessesarilyCauseReopens() th

AtomicBoolean success = new AtomicBoolean(false);
// Make the first request, just to set things up:
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData")),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
@Override
public void onFailure(Exception e) {
fail(e);
}
);
});
assertBusy(() -> assertThat(success.get(), equalTo(true)));
assertBusy(() -> assertThat(expectedSettingsChangeInClusterState.get(), equalTo(true)));
assertThat(shardsUnassigned.get(), equalTo(true));

assertBusy(() -> assertThat(hasUnassignedShards(clusterService.state(), indexName), equalTo(false)));

// Same request, except now we'll also set the dynamic "index.max_result_window" setting:
request.settings(
Settings.builder()
.put("index.codec", "FastDecompressionCompressingStoredFieldsData")
.put("index.max_result_window", "1500")
.build()
);
success.set(false);
expectedSettingsChangeInClusterState.set(false);
shardsUnassigned.set(false);
expectedSetting.set("index.max_result_window");
expectedSettingValue.set("1500");
// Making this request ought to add this new setting but not unassign the shards:
metadataUpdateSettingsService.updateSettings(
// Same request, except now we'll also set the dynamic "index.max_result_window" setting:
requestFactory.apply(
Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").put("index.max_result_window", "1500")
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
@Override
public void onFailure(Exception e) {
fail(e);
}
);
});

assertBusy(() -> assertThat(success.get(), equalTo(true)));
assertBusy(() -> assertThat(expectedSettingsChangeInClusterState.get(), equalTo(true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,19 @@ protected void masterOperation(
return;
}

updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
request.masterNodeTimeout(),
request.ackTimeout(),
requestSettings,
request.isPreserveExisting()
? UpdateSettingsClusterStateUpdateRequest.OnExisting.PRESERVE
: UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
request.reopen()
? UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES
: UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
concreteIndices
),
listener.delegateResponse((l, e) -> {
logger.debug(() -> "failed to update settings on indices [" + Arrays.toString(concreteIndices) + "]", e);
l.onFailure(e);
})
);
UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
concreteIndices
)
.settings(requestSettings)
.setPreserveExisting(request.isPreserveExisting())
.reopenShards(request.reopen())
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());

updateSettingsService.updateSettings(clusterStateUpdateRequest, listener.delegateResponse((l, e) -> {
logger.debug(() -> "failed to update settings on indices [" + Arrays.toString(concreteIndices) + "]", e);
l.onFailure(e);
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,17 @@

package org.elasticsearch.action.admin.indices.settings.put;

import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;

import java.util.Objects;
import java.util.Arrays;

/**
* Cluster state update request that allows to update settings for some indices
*/
public record UpdateSettingsClusterStateUpdateRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
Settings settings,
OnExisting onExisting,
OnStaticSetting onStaticSetting,
Index... indices
) {
public class UpdateSettingsClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<UpdateSettingsClusterStateUpdateRequest> {

/**
* Specifies the behaviour of an update-settings action on existing settings.
Expand Down Expand Up @@ -59,10 +53,79 @@ public enum OnStaticSetting {
REOPEN_INDICES
}

public UpdateSettingsClusterStateUpdateRequest {
Objects.requireNonNull(masterNodeTimeout);
Objects.requireNonNull(ackTimeout);
Objects.requireNonNull(settings);
Objects.requireNonNull(indices);
private Settings settings;

private boolean preserveExisting = false;

private boolean reopenShards = false;

public UpdateSettingsClusterStateUpdateRequest() {}

@SuppressWarnings("this-escape")
public UpdateSettingsClusterStateUpdateRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
Settings settings,
OnExisting onExisting,
OnStaticSetting onStaticSetting,
Index... indices
) {
masterNodeTimeout(masterNodeTimeout);
ackTimeout(ackTimeout);
settings(settings);
setPreserveExisting(onExisting == OnExisting.PRESERVE);
reopenShards(onStaticSetting == OnStaticSetting.REOPEN_INDICES);
indices(indices);
}

/**
* Returns <code>true</code> iff the settings update should only add but not update settings. If the setting already exists
* it should not be overwritten by this update. The default is <code>false</code>
*/
public boolean isPreserveExisting() {
return preserveExisting;
}

/**
* Returns <code>true</code> if non-dynamic setting updates should go through, by automatically unassigning shards in the same cluster
* state change as the setting update. The shards will be automatically reassigned after the cluster state update is made. The
* default is <code>false</code>.
*/
public boolean reopenShards() {
return reopenShards;
}

public UpdateSettingsClusterStateUpdateRequest reopenShards(boolean reopenShards) {
this.reopenShards = reopenShards;
return this;
}

/**
* Iff set to <code>true</code> this settings update will only add settings not already set on an index. Existing settings remain
* unchanged.
*/
public UpdateSettingsClusterStateUpdateRequest setPreserveExisting(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
return this;
}

/**
* Returns the {@link Settings} to update
*/
public Settings settings() {
return settings;
}

/**
* Sets the {@link Settings} to update
*/
public UpdateSettingsClusterStateUpdateRequest settings(Settings settings) {
this.settings = settings;
return this;
}

@Override
public String toString() {
return Arrays.toString(indices()) + settings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ClusterState execute(ClusterState currentState) {
}
final Settings closedSettings = settingsForClosedIndices.build();
final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.onExisting() == UpdateSettingsClusterStateUpdateRequest.OnExisting.PRESERVE;
final boolean preserveExisting = request.isPreserveExisting();

RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
Expand All @@ -199,7 +199,7 @@ ClusterState execute(ClusterState currentState) {
}

if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
if (request.onStaticSetting() == UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES) {
if (request.reopenShards()) {
// We have non-dynamic settings and open indices. We will unassign all of the shards in these indices so that the new
// changed settings are applied when the shards are re-assigned.
routingTableBuilder = RoutingTable.builder(
Expand Down
Loading

0 comments on commit 84000df

Please sign in to comment.