Skip to content

Commit

Permalink
Add setting to enable segment replication as default replication type.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Mar 22, 2023
1 parent e6a3700 commit 5065732
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.ClusterInfoServiceIT;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.Arrays;

import static org.opensearch.cluster.metadata.IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationDefaultIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final String SYSTEM_INDEX_NAME = ".test-cluster-info-system-index";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.getKey(), true).build();
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(
new SystemIndexDescriptor(SYSTEM_INDEX_NAME, "System index for [" + getTestClass().getName() + ']')
);
}
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ClusterInfoServiceIT.TestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testReplicationWithDefaultSegmentReplicationSetting() throws Exception {

boolean isSystemIndex = randomBoolean();
String indexName = isSystemIndex ? SYSTEM_INDEX_NAME : INDEX_NAME;

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
createIndex(indexName);
ensureYellowAndNoInitializingShards(indexName);
final String replicaNode = internalCluster().startNode();
ensureGreen(indexName);

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

refresh(indexName);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(), initialDocCount);
});

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(indexName)
.execute()
.actionGet();
if (isSystemIndex) {
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(indexName));
} else {
// Verify that Segment Replication happened on the replica shard.
assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void testFlushAfterRelocation() throws Exception {
}

// Verify segment replication event never happened on replica shard
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,16 @@ public Iterator<Setting<?>> settings() {
Property.IndexScope
);

/**
* Used to specify SEGMENT replication type as default for all indices. By default, this is false.
*/
public static final Setting<Boolean> DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT = Setting.boolSetting(
"opensearch.index.default.replication.type.segment",
false,
Property.NodeScope,
Property.Final
);

/**
* Used to specify the replication type for the index. By default, document replication is used.
*/
Expand Down Expand Up @@ -317,22 +327,27 @@ public void validate(final Boolean value) {}

@Override
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
final Object isDefaultReplicationTypeEnabled = settings.get(DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT);
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
if (replicationType != ReplicationType.SEGMENT && value == true) {
if (replicationType != ReplicationType.SEGMENT && isDefaultReplicationTypeEnabled == Boolean.FALSE && value == true) {
throw new IllegalArgumentException(
"To enable "
+ INDEX_REMOTE_STORE_ENABLED_SETTING.getKey()
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " should be set to "
+ ReplicationType.SEGMENT
+ "or "
+ DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.getKey()
+ " should be set to "
+ Boolean.TRUE
);
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REPLICATION_TYPE_SETTING);
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT);
return settings.iterator();
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
Expand Down Expand Up @@ -644,7 +645,9 @@ public void apply(Settings value, Settings current, Settings previous) {
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING),
FeatureFlags.REPLICATION_TYPE,
List.of(IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT)
);

}
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,14 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)
&& indexMetadata.isSystem() == false
&& settings.get(IndexMetadata.SETTING_REPLICATION_TYPE) == null
&& IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.get(nodeSettings)) {
replicationType = ReplicationType.SEGMENT;
} else {
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
}
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
Expand Down

0 comments on commit 5065732

Please sign in to comment.