Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Apr 1, 2024
1 parent c7b2dab commit 04fc5cb
Show file tree
Hide file tree
Showing 22 changed files with 241 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected Path segmentRepoPath;
protected Path translogRepoPath;
boolean addRemote = false;
Settings extraSettings = Settings.EMPTY;

private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
Expand All @@ -59,6 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
logger.info("Adding remote store node");
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(extraSettings)
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.indices.stats.IndexStatsIT.persistGlobalCheckpoint;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -40,7 +44,14 @@ public class RemoteDualMigrationIT extends MigrationBaseTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(InternalSettingsPlugin.class);
/* Adding the following mock plugins:
- InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync
- MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated
*/
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class)
).collect(Collectors.toList());
}

/*
Expand All @@ -62,7 +73,11 @@ public void testRemotePrimaryDocRepReplica() throws Exception {
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 1 replica");
Settings oneReplica = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
createIndex(REMOTE_PRI_DOCREP_REP, oneReplica);
ensureGreen(REMOTE_PRI_DOCREP_REP);

Expand Down Expand Up @@ -137,14 +152,16 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
logger.info("---> Creating index with 0 replica");
Settings zeroReplicas = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, zeroReplicas);
ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP);
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;

String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
Expand Down Expand Up @@ -210,9 +227,10 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
Checks if retention leases are published on primary shard and it's docrep copies, but not on remote copies
*/
public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Exception {
// Reducing indices.memory.shard_inactive_time to force a flush and trigger translog sync,
// instead of relying on Global CKP Sync action which doesn't run on remote enabled copies
extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build();
testRemotePrimaryDocRepAndRemoteReplica();
// Ensuring global checkpoint is synced across all the shard copies
persistGlobalCheckpoint(REMOTE_PRI_DOCREP_REMOTE_REP);
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertBusy(() -> {
for (ShardStats shardStats : internalCluster().client()
Expand All @@ -227,13 +245,15 @@ public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Except
if (shardRouting.primary()) {
// Primary copy should be on remote node and should have retention leases
assertTrue(discoveryNode.isRemoteStoreNode());
assertCheckpointsConsistency(shardStats);
assertRetentionLeaseConsistency(shardStats, retentionLeases);
} else {
// Checkpoints and Retention Leases are not synced to remote replicas
if (discoveryNode.isRemoteStoreNode()) {
// Replica copy on remote node should not have retention leases
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().isEmpty());
} else {
// Replica copy on docrep node should have retention leases
assertCheckpointsConsistency(shardStats);
assertRetentionLeaseConsistency(shardStats, retentionLeases);
}
}
Expand Down Expand Up @@ -478,6 +498,23 @@ private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch
*/
private static void assertRetentionLeaseConsistency(ShardStats shardStats, RetentionLeases retentionLeases) {
long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(retentionLeases.leases().stream().allMatch(l -> l.retainingSequenceNumber() == maxSeqNo + 1));
for (RetentionLease rl : retentionLeases.leases()) {
assertEquals(maxSeqNo + 1, rl.retainingSequenceNumber());
}
}

/**
* For a docrep enabled shard copy or a primary shard copy,
* asserts that local and global checkpoints are up-to-date with maxSeqNo of doc operations
*
* @param shardStats ShardStats object from NodesStats API
*/
private static void assertCheckpointsConsistency(ShardStats shardStats) {
long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
long localCkp = shardStats.getSeqNoStats().getLocalCheckpoint();
long globalCkp = shardStats.getSeqNoStats().getGlobalCheckpoint();

assertEquals(maxSeqNo, localCkp);
assertEquals(maxSeqNo, globalCkp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ protected ReplicationOperation.Replicas<BulkShardRequest> primaryTermValidationR
* Visible for tests
* @opensearch.internal
*/
public final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {
private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {

@Override
public void performOn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.support.replication;

import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionListener;
Expand All @@ -34,16 +35,20 @@ public class ReplicationModeAwareProxy<ReplicaRequest extends ReplicationRequest

private final DiscoveryNodes discoveryNodes;

private final boolean isRemoteStoreIndexSettingEnabled;

public ReplicationModeAwareProxy(
ReplicationMode replicationModeOverride,
DiscoveryNodes discoveryNodes,
ReplicationOperation.Replicas<ReplicaRequest> replicasProxy,
ReplicationOperation.Replicas<ReplicaRequest> primaryTermValidationProxy
ReplicationOperation.Replicas<ReplicaRequest> primaryTermValidationProxy,
boolean isRemoteStoreIndexSettingEnabled
) {
super(replicasProxy);
this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride);
this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy);
this.discoveryNodes = discoveryNodes;
this.isRemoteStoreIndexSettingEnabled = isRemoteStoreIndexSettingEnabled;
}

@Override
Expand Down Expand Up @@ -74,11 +79,16 @@ ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting
return ReplicationMode.FULL_REPLICATION;
}
/*
Perform full replication if replica is hosted on a non-remote node.
Only applicable during remote migration
Only applicable during remote store migration.
During the migration process, remote based index settings will not be enabled,
thus we will rely on node attributes to figure out the replication mode
*/
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() == false) {
return ReplicationMode.FULL_REPLICATION;
if (isRemoteStoreIndexSettingEnabled == false) {
DiscoveryNode targetNode = discoveryNodes.get(shardRouting.currentNodeId());
if (targetNode != null && targetNode.isRemoteStoreNode() == false) {
// Perform full replication if replica is hosted on a non-remote node.
return ReplicationMode.FULL_REPLICATION;
}
}
return replicationModeOverride;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ public void handleException(TransportException exp) {
getReplicationMode(indexShard),
clusterState.getNodes(),
replicasProxy,
termValidationProxy
termValidationProxy,
indexShard.isRemoteTranslogEnabled()
)
: new FanoutReplicationProxy<>(replicasProxy)
).execute();
Expand Down
6 changes: 1 addition & 5 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1233,11 +1233,7 @@ public boolean isSegRepEnabledOrRemoteNode() {
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabledOrRemoteNode() && isRemoteNode();
return ReplicationType.SEGMENT.equals(replicationType) && !isRemoteStoreEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica,
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
&& indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()
&& indexShard.isRemoteTranslogEnabled() == false) {
&& indexShard.indexSettings().isRemoteNode() == false) {
indexShard.sync();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ private ReplicationGroup calculateReplicationGroup() {
} else {
newVersion = replicationGroup.getVersion() + 1;
}
assert indexSettings.isRemoteNode()
assert indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| (replicationGroup != null
&& replicationGroup.getReplicationTargets()
Expand Down Expand Up @@ -1457,9 +1457,8 @@ public synchronized void updateFromClusterManager(
+ " as in-sync but it does not exist locally";
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode()
|| (routingTable.getByAllocationId(initializingId) != null
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId()));
final boolean assignedToRemoteStoreNode = indexSettings.isRemoteStoreEnabled()
|| onRemoteEnabledNode(routingTable, initializingId);
checkpoints.put(
initializingId,
new CheckpointState(
Expand All @@ -1479,9 +1478,8 @@ public synchronized void updateFromClusterManager(
for (String initializingId : initializingAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode()
|| (routingTable.getByAllocationId(initializingId) != null
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId()));
final boolean assignedToRemoteStoreNode = indexSettings().isRemoteStoreEnabled()
|| onRemoteEnabledNode(routingTable, initializingId);
checkpoints.put(
initializingId,
new CheckpointState(
Expand All @@ -1496,10 +1494,8 @@ public synchronized void updateFromClusterManager(
for (String inSyncId : inSyncAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
// Handling cases where node leaves the cluster but the insyncAids are not yet updated
final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode()
|| (routingTable.getByAllocationId(inSyncId) != null
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(inSyncId).currentNodeId()));
final boolean assignedToRemoteStoreNode = indexSettings().isRemoteStoreEnabled()
|| onRemoteEnabledNode(routingTable, inSyncId);
checkpoints.put(
inSyncId,
new CheckpointState(
Expand All @@ -1524,6 +1520,11 @@ public synchronized void updateFromClusterManager(
assert invariant();
}

private boolean onRemoteEnabledNode(IndexShardRoutingTable routingTable, String initializingId) {
return routingTable.getByAllocationId(initializingId) != null
&& isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId());
}

/**
* Returns whether the requests are replicated considering the remote translog existence, current/primary/primary target allocation ids.
*
Expand All @@ -1541,8 +1542,7 @@ private boolean isReplicated(
// If assigned to a remote node, returns true if given allocation id matches the primary or it's relocation target allocation
// primary and primary target allocation id.
if (assignedToRemoteStoreNode == true) {
boolean toReturn = allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId);
return toReturn;
return allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId);
}
// For other case which is local translog, return true as the requests are replicated to all shards in the replication group.
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
&& shard.indexSettings.isRemoteNode() == false) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
return true;
Expand Down
18 changes: 4 additions & 14 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -520,25 +520,15 @@ public boolean shouldSeedRemoteStore() {
* checks does not fail during a cluster manager state update when the latest replication group
* calculation is not yet done and the cached replication group details are available
*/
public Function<String, Boolean> isShardOnRemoteEnabledNode = (shardId) -> {
DiscoveryNode discoveryNode = this.discoveryNodes.get(shardId);
public Function<String, Boolean> isShardOnRemoteEnabledNode = (nodeId) -> {
DiscoveryNode discoveryNode = this.discoveryNodes.get(nodeId);
if (discoveryNode != null) {
logger.trace(
"ShardID {} is assigned to Node {} which has remote_enabled as {}",
shardId,
discoveryNode,
discoveryNode.isRemoteStoreNode()
);
logger.trace("Node {} has remote_enabled as {}", nodeId, discoveryNode.isRemoteStoreNode());
return discoveryNode.isRemoteStoreNode();
}
return false;
};

// Only to be used for Unit Tests
public void setDiscoveryNodes(DiscoveryNodes discoveryNodes) {
this.discoveryNodes = discoveryNodes;
}

public boolean isRemoteSeeded() {
return shardMigrationState == REMOTE_MIGRATING_SEEDED;
}
Expand Down Expand Up @@ -4028,7 +4018,7 @@ private boolean isRemoteStoreEnabled() {
}

public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
return indexSettings() != null && (indexSettings().isRemoteTranslogStoreEnabled());
}

/**
Expand Down
Loading

0 comments on commit 04fc5cb

Please sign in to comment.