Skip to content

Commit

Permalink
[Segment Replication] Add setting to enable segment replication as de…
Browse files Browse the repository at this point in the history
…fault replication type. (opensearch-project#6791)

* Add setting to enable segment replication as default replication type.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix failing unit tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix failing unit test.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments.

Signed-off-by: Rishikesh1159 <[email protected]>

* Refactoring PR.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
Signed-off-by: Valentin Mitrofanov <[email protected]>
  • Loading branch information
Rishikesh1159 authored and mitrofmep committed Apr 5, 2023
1 parent 1ca9ee4 commit 249f50b
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.Arrays;

import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
private static final String SYSTEM_INDEX_NAME = ".test-system-index";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(
new SystemIndexDescriptor(SYSTEM_INDEX_NAME, "System index for [" + getTestClass().getName() + ']')
);
}
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(SegmentReplicationClusterSettingIT.TestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testReplicationWithSegmentReplicationClusterSetting() throws Exception {

boolean isSystemIndex = randomBoolean();
String indexName = isSystemIndex ? SYSTEM_INDEX_NAME : INDEX_NAME;

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
createIndex(indexName);
ensureYellowAndNoInitializingShards(indexName);
final String replicaNode = internalCluster().startNode();
ensureGreen(indexName);

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

refresh(indexName);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(), initialDocCount);
});

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(indexName)
.execute()
.actionGet();
if (isSystemIndex) {
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(indexName));
} else {
// Verify that Segment Replication happened on the replica shard.
assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty());
}
}

public void testIndexReplicationSettingOverridesClusterSetting() throws Exception {
// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

refresh(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
});

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void testFlushAfterRelocation() throws Exception {
}

// Verify segment replication event never happened on replica shard
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
import static org.opensearch.common.settings.Settings.readSettingsFromStream;
import static org.opensearch.common.settings.Settings.writeSettingsToStream;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;

/**
* Index metadata information
Expand Down Expand Up @@ -317,22 +318,29 @@ public void validate(final Boolean value) {}

@Override
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
final Object clusterSettingReplicationType = settings.get(CLUSTER_REPLICATION_TYPE_SETTING);
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
if (replicationType != ReplicationType.SEGMENT && value == true) {
if ((replicationType).equals(ReplicationType.SEGMENT) == false
&& (clusterSettingReplicationType).equals(ReplicationType.SEGMENT) == false
&& value == true) {
throw new IllegalArgumentException(
"To enable "
+ INDEX_REMOTE_STORE_ENABLED_SETTING.getKey()
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " should be set to "
+ ReplicationType.SEGMENT
+ " or "
+ CLUSTER_REPLICATION_TYPE_SETTING.getKey()
+ " should be set to "
+ Boolean.TRUE
);
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REPLICATION_TYPE_SETTING);
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, CLUSTER_REPLICATION_TYPE_SETTING);
return settings.iterator();
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ public void apply(Settings value, Settings current, Settings previous) {
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING),
FeatureFlags.REPLICATION_TYPE,
List.of(IndicesService.CLUSTER_REPLICATION_TYPE_SETTING)
);

}
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -750,7 +751,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)
&& indexMetadata.isSystem() == false
&& settings.get(IndexMetadata.SETTING_REPLICATION_TYPE) == null) {
replicationType = IndicesService.CLUSTER_REPLICATION_TYPE_SETTING.get(settings);
} else {
replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings);
}
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.extensions.ExtensionsManager;
Expand Down Expand Up @@ -230,6 +231,19 @@ public class IndicesService extends AbstractLifecycleComponent
Setting.Property.NodeScope
);

/**
* Used to specify SEGMENT replication type as the default replication strategy for all indices in a cluster. By default, this is false.
*/
public static final String CLUSTER_SETTING_REPLICATION_TYPE = "cluster.indices.replication.strategy";

public static final Setting<ReplicationType> CLUSTER_REPLICATION_TYPE_SETTING = new Setting<>(
CLUSTER_SETTING_REPLICATION_TYPE,
ReplicationType.DOCUMENT.toString(),
ReplicationType::parseString,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,10 @@ public void testEnablingRemoteStoreFailsWhenReplicationTypeIsDocument() {
IllegalArgumentException.class,
() -> IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings)
);
assertEquals("To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT", iae.getMessage());
assertEquals(
"To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT or cluster.indices.replication.strategy should be set to true",
iae.getMessage()
);
}

public void testEnablingRemoteStoreFailsWhenReplicationTypeIsDefault() {
Expand All @@ -878,7 +881,10 @@ public void testEnablingRemoteStoreFailsWhenReplicationTypeIsDefault() {
IllegalArgumentException.class,
() -> IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings)
);
assertEquals("To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT", iae.getMessage());
assertEquals(
"To enable index.remote_store.enabled, index.replication.type should be set to SEGMENT or cluster.indices.replication.strategy should be set to true",
iae.getMessage()
);
}

public void testRemoteRepositoryDefaultSetting() {
Expand Down

0 comments on commit 249f50b

Please sign in to comment.