diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java similarity index 99% rename from server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index 4c26b9946ab90..34b60d5f3e9b3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -37,7 +37,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteDualMigrationIT extends MigrationBaseTestCase { +public class RemoteDualReplicationIT extends MigrationBaseTestCase { private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index fc7738673852e..c1d13128c18b1 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -363,8 +363,7 @@ protected ReplicationOperation.Replicas primaryTermValidationR /** * This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is * used for primary term validation and is only relevant for TransportShardBulkAction replication action. - *

- * Visible for tests + * * @opensearch.internal */ private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index a897549a20c50..9f5e31a9c6926 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -42,13 +42,13 @@ public ReplicationModeAwareProxy( DiscoveryNodes discoveryNodes, ReplicationOperation.Replicas replicasProxy, ReplicationOperation.Replicas primaryTermValidationProxy, - boolean isRemoteStoreIndexSettingEnabled + boolean remoteIndexSettingsEnabled ) { super(replicasProxy); this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride); this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy); this.discoveryNodes = discoveryNodes; - this.isRemoteEnabled = isRemoteStoreIndexSettingEnabled; + this.isRemoteEnabled = remoteIndexSettingsEnabled; } @Override diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index d55766c75f862..ec5fc1d19e40d 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; @@ -443,7 +444,7 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception { 0, new ReplicationModeAwareProxy<>( ReplicationMode.NO_REPLICATION, - buildMixedModeEnabledDiscoveryNodes(routingTable), + buildDiscoveryNodes(routingTable), replicasProxy, replicasProxy, false @@ -463,6 +464,74 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception { assertEquals(initializingIds.size() + activeIds.size(), shardInfo.getTotal()); } + public void testReplicationInDualModeWithMixedReplicasSomeInDocrepOthersOnRemote() throws Exception { + Set initializingIds = new HashSet<>(); + IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> initializingIds.add(AllocationId.newInitializing())); + Set activeIds = new HashSet<>(); + IntStream.range(0, randomIntBetween(2, 5)).forEach(x -> activeIds.add(AllocationId.newInitializing())); + + AllocationId primaryId = activeIds.iterator().next(); + + ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); + final ShardRouting primaryShard = newShardRouting( + shardId, + nodeIdFromAllocationId(primaryId), + null, + true, + ShardRoutingState.STARTED, + primaryId + ); + initializingIds.forEach(aId -> { + ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId); + builder.addShard(routing); + }); + activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> { + ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); + builder.addShard(routing); + }); + builder.addShard(primaryShard); + IndexShardRoutingTable routingTable = builder.build(); + + Set inSyncAllocationIds = activeIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); + ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0); + List replicationTargets = replicationGroup.getReplicationTargets(); + assertEquals(inSyncAllocationIds.size(), replicationTargets.size()); + assertTrue( + replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds) + ); + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + Map simulatedFailures = new HashMap<>(); + TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures); + TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); + // Generating data nodes in mixed mode wherein some of the allocated replicas + // are in docrep nodes whereas others are on remote enabled ones + Tuple discoveryNodesDetails = buildMixedModeDiscoveryNodes(routingTable); + int docRepNodes = discoveryNodesDetails.v1(); + final TestReplicationOperation op = new TestReplicationOperation( + request, + primary, + listener, + replicasProxy, + 0, + new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, discoveryNodesDetails.v2(), replicasProxy, replicasProxy, false) + ); + op.execute(); + assertTrue("request was not processed on primary", request.processedOnPrimary.get()); + // Only docrep nodes should have the request fanned out to + assertEquals(docRepNodes, request.processedOnReplicas.size()); + assertEquals(0, replicasProxy.failedReplicas.size()); + assertEquals(0, replicasProxy.markedAsStaleCopies.size()); + assertTrue("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get()); + assertTrue("listener is not marked as done", listener.isDone()); + + ShardInfo shardInfo = listener.actionGet().getShardInfo(); + // Listener should be invoked for initializing Ids, primary and the operations on docrep nodes + assertEquals(1 + docRepNodes + initializingIds.size(), shardInfo.getTotal()); + } + static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } @@ -907,7 +976,7 @@ private DiscoveryNodes buildRemoteStoreEnabledDiscoveryNodes(IndexShardRoutingTa return builder.build(); } - private DiscoveryNodes buildMixedModeEnabledDiscoveryNodes(IndexShardRoutingTable routingTable) { + private DiscoveryNodes buildDiscoveryNodes(IndexShardRoutingTable routingTable) { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); for (ShardRouting shardRouting : routingTable) { if (shardRouting.primary()) { @@ -919,6 +988,26 @@ private DiscoveryNodes buildMixedModeEnabledDiscoveryNodes(IndexShardRoutingTabl return builder.build(); } + private Tuple buildMixedModeDiscoveryNodes(IndexShardRoutingTable routingTable) { + int docrepNodes = 0; + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (ShardRouting shardRouting : routingTable) { + if (shardRouting.primary()) { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); + } else { + // Only add docrep nodes for allocationIds that are active + // since the test cases creates replication group with active allocationIds only + if (shardRouting.active() && randomBoolean()) { + builder.add(IndexShardTestUtils.getFakeDiscoNode(shardRouting.currentNodeId())); + docrepNodes += 1; + } else { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); + } + } + } + return new Tuple<>(docrepNodes, builder.build()); + } + public static class Request extends ReplicationRequest { public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean();