Skip to content

Commit

Permalink
Address comments on PR.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Mar 22, 2023
1 parent 7b3e333 commit e0243ff
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.ClusterInfoServiceIT;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -24,14 +24,14 @@
import java.util.Collections;
import java.util.Arrays;

import static org.opensearch.cluster.metadata.IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationDefaultIT extends OpenSearchIntegTestCase {
public class SegmentReplicationDefaultClusterSettingIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final String SYSTEM_INDEX_NAME = ".test-system-index";
private static final String SYSTEM_INDEX_NAME = ".test-system-index";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

Expand All @@ -57,7 +57,10 @@ protected boolean addMockInternalEngine() {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.getKey(), true).build();
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
Expand All @@ -71,7 +74,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ClusterInfoServiceIT.TestPlugin.class, MockTransportService.TestPlugin.class);
return Arrays.asList(SegmentReplicationDefaultClusterSettingIT.TestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testReplicationWithDefaultSegmentReplicationSetting() throws Exception {
Expand Down Expand Up @@ -109,4 +112,36 @@ public void testReplicationWithDefaultSegmentReplicationSetting() throws Excepti
assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty());
}
}

public void testIndexReplicationSettingsOverridesClusterSettings() throws Exception {
// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

refresh(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
});

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
import static org.opensearch.common.settings.Settings.readSettingsFromStream;
import static org.opensearch.common.settings.Settings.writeSettingsToStream;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING;

/**
* Index metadata information
Expand Down Expand Up @@ -282,16 +283,6 @@ public Iterator<Setting<?>> settings() {
Property.IndexScope
);

/**
* Used to specify SEGMENT replication type as default for all indices. By default, this is false.
*/
public static final Setting<Boolean> DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT = Setting.boolSetting(
"opensearch.index.default.replication.type.segment",
false,
Property.NodeScope,
Property.Final
);

/**
* Used to specify the replication type for the index. By default, document replication is used.
*/
Expand Down Expand Up @@ -327,9 +318,11 @@ public void validate(final Boolean value) {}

@Override
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
final Object isDefaultReplicationTypeEnabled = settings.get(DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT);
final Object defaultClusterSettingReplicationType = settings.get(CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING);
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
if (replicationType != ReplicationType.SEGMENT && isDefaultReplicationTypeEnabled == Boolean.FALSE && value == true) {
if (replicationType != ReplicationType.SEGMENT
&& defaultClusterSettingReplicationType != ReplicationType.SEGMENT
&& value == true) {
throw new IllegalArgumentException(
"To enable "
+ INDEX_REMOTE_STORE_ENABLED_SETTING.getKey()
Expand All @@ -338,7 +331,7 @@ public void validate(final Boolean value, final Map<Setting<?>, Object> settings
+ " should be set to "
+ ReplicationType.SEGMENT
+ " or "
+ DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.getKey()
+ CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING.getKey()
+ " should be set to "
+ Boolean.TRUE
);
Expand All @@ -347,7 +340,7 @@ public void validate(final Boolean value, final Map<Setting<?>, Object> settings

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT);
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING);
return settings.iterator();
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
Expand Down Expand Up @@ -647,7 +646,7 @@ public void apply(Settings value, Settings current, Settings previous) {
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING),
FeatureFlags.REPLICATION_TYPE,
List.of(IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT)
List.of(IndicesService.CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING)
);

}
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -752,9 +753,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)
&& indexMetadata.isSystem() == false
&& settings.get(IndexMetadata.SETTING_REPLICATION_TYPE) == null
&& IndexMetadata.DEFAULT_REPLICATION_TYPE_SETTING_SEGMENT.get(nodeSettings)) {
replicationType = ReplicationType.SEGMENT;
&& settings.get(IndexMetadata.SETTING_REPLICATION_TYPE) == null) {
replicationType = ReplicationType.parseString(settings.get(IndicesService.CLUSTER_SETTING_REPLICATION_TYPE));
} else {
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.extensions.ExtensionsManager;
Expand Down Expand Up @@ -230,6 +231,19 @@ public class IndicesService extends AbstractLifecycleComponent
Setting.Property.NodeScope
);

/**
* Used to specify SEGMENT replication type as the default replication strategy for all indices in a cluster. By default, this is false.
*/
public static final String CLUSTER_SETTING_REPLICATION_TYPE = "indices.replication.strategy";

public static final Setting<ReplicationType> CLUSTER_DEFAULT_REPLICATION_TYPE_SETTING = new Setting<>(
CLUSTER_SETTING_REPLICATION_TYPE,
ReplicationType.DOCUMENT.toString(),
ReplicationType::parseString,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down

0 comments on commit e0243ff

Please sign in to comment.