Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Apr 2, 2024
1 parent 19e7221 commit 77798c7
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteDualMigrationIT extends MigrationBaseTestCase {
public class RemoteDualReplicationIT extends MigrationBaseTestCase {
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";
private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica";
private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ protected ReplicationOperation.Replicas<BulkShardRequest> primaryTermValidationR
/**
* This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is
* used for primary term validation and is only relevant for TransportShardBulkAction replication action.
* <p>
* Visible for tests
*
* @opensearch.internal
*/
private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public ReplicationModeAwareProxy(
DiscoveryNodes discoveryNodes,
ReplicationOperation.Replicas<ReplicaRequest> replicasProxy,
ReplicationOperation.Replicas<ReplicaRequest> primaryTermValidationProxy,
boolean isRemoteStoreIndexSettingEnabled
boolean remoteIndexSettingsEnabled
) {
super(replicasProxy);
this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride);
this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy);
this.discoveryNodes = discoveryNodes;
this.isRemoteEnabled = isRemoteStoreIndexSettingEnabled;
this.isRemoteEnabled = remoteIndexSettingsEnabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.set.Sets;
Expand Down Expand Up @@ -443,7 +444,7 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception {
0,
new ReplicationModeAwareProxy<>(
ReplicationMode.NO_REPLICATION,
buildMixedModeEnabledDiscoveryNodes(routingTable),
buildDiscoveryNodes(routingTable),
replicasProxy,
replicasProxy,
false
Expand All @@ -463,6 +464,74 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception {
assertEquals(initializingIds.size() + activeIds.size(), shardInfo.getTotal());
}

public void testReplicationInDualModeWithMixedReplicasSomeInDocrepOthersOnRemote() throws Exception {
Set<AllocationId> initializingIds = new HashSet<>();
IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> initializingIds.add(AllocationId.newInitializing()));
Set<AllocationId> activeIds = new HashSet<>();
IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> activeIds.add(AllocationId.newInitializing()));

AllocationId primaryId = activeIds.iterator().next();

ShardId shardId = new ShardId("test", "_na_", 0);
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
final ShardRouting primaryShard = newShardRouting(
shardId,
nodeIdFromAllocationId(primaryId),
null,
true,
ShardRoutingState.STARTED,
primaryId
);
initializingIds.forEach(aId -> {
ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId);
builder.addShard(routing);
});
activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> {
ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId);
builder.addShard(routing);
});
builder.addShard(primaryShard);
IndexShardRoutingTable routingTable = builder.build();

Set<String> inSyncAllocationIds = activeIds.stream().map(AllocationId::getId).collect(Collectors.toSet());
ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0);
List<ShardRouting> replicationTargets = replicationGroup.getReplicationTargets();
assertEquals(inSyncAllocationIds.size(), replicationTargets.size());
assertTrue(
replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds)
);

Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
Map<ShardRouting, Exception> simulatedFailures = new HashMap<>();
TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures);
TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool);
// Generating data nodes in mixed mode wherein some of the allocated replicas
// are in docrep nodes whereas others are on remote enabled ones
Tuple<Integer, DiscoveryNodes> discoveryNodesDetails = buildMixedModeDiscoveryNodes(routingTable);
int docRepNodes = discoveryNodesDetails.v1();
final TestReplicationOperation op = new TestReplicationOperation(
request,
primary,
listener,
replicasProxy,
0,
new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, discoveryNodesDetails.v2(), replicasProxy, replicasProxy, false)
);
op.execute();
assertTrue("request was not processed on primary", request.processedOnPrimary.get());
// Only docrep nodes should have the request fanned out to
assertEquals(docRepNodes, request.processedOnReplicas.size());
assertEquals(0, replicasProxy.failedReplicas.size());
assertEquals(0, replicasProxy.markedAsStaleCopies.size());
assertTrue("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get());
assertTrue("listener is not marked as done", listener.isDone());

ShardInfo shardInfo = listener.actionGet().getShardInfo();
// Listener should be invoked for initializing Ids, primary and the operations on docrep nodes
assertEquals(1 + docRepNodes + initializingIds.size(), shardInfo.getTotal());
}

static String nodeIdFromAllocationId(final AllocationId allocationId) {
return "n-" + allocationId.getId().substring(0, 8);
}
Expand Down Expand Up @@ -907,7 +976,7 @@ private DiscoveryNodes buildRemoteStoreEnabledDiscoveryNodes(IndexShardRoutingTa
return builder.build();
}

private DiscoveryNodes buildMixedModeEnabledDiscoveryNodes(IndexShardRoutingTable routingTable) {
private DiscoveryNodes buildDiscoveryNodes(IndexShardRoutingTable routingTable) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.primary()) {
Expand All @@ -919,6 +988,26 @@ private DiscoveryNodes buildMixedModeEnabledDiscoveryNodes(IndexShardRoutingTabl
return builder.build();
}

private Tuple<Integer, DiscoveryNodes> buildMixedModeDiscoveryNodes(IndexShardRoutingTable routingTable) {
int docrepNodes = 0;
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.primary()) {
builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId()));
} else {
// Only add docrep nodes for allocationIds that are active
// since the test cases creates replication group with active allocationIds only
if (shardRouting.active() && randomBoolean()) {
builder.add(IndexShardTestUtils.getFakeDiscoNode(shardRouting.currentNodeId()));
docrepNodes += 1;
} else {
builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId()));
}
}
}
return new Tuple<>(docrepNodes, builder.build());
}

public static class Request extends ReplicationRequest<Request> {
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean();
Expand Down

0 comments on commit 77798c7

Please sign in to comment.