From aaa4ba19c46a912c21a54ef229905687beda1056 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 23 Jan 2024 20:32:53 +0530 Subject: [PATCH 1/7] Force update operation primary term in replication checkout post failover Signed-off-by: bansvaru --- .../src/main/java/org/opensearch/index/shard/IndexShard.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index cbb246219546b..748c2947cd36b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1697,7 +1697,8 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th } final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint(); if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion() - && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) { + && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration() + && latestReplicationCheckpoint.getPrimaryTerm() == getOperationPrimaryTerm()) { return latestReplicationCheckpoint; } final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); From 3a768180c857ceed1c5ab10f0fb188540e36fa54 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Mon, 29 Jan 2024 18:20:50 +0530 Subject: [PATCH 2/7] add Integ Test and primary term validation during upload Signed-off-by: bansvaru --- .../remotestore/RemoteStoreRestoreIT.java | 59 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 2 +- .../shard/RemoteStoreRefreshListener.java | 11 ++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 72296a1c01a24..6fbf04ff15ea6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -17,13 +17,20 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.indices.IndicesService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.junit.annotations.TestIssueLogging; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -135,6 +142,58 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, boolea restoreAndVerify(shardCount, 0, indexStats); } + @TestIssueLogging(value = "org.opensearch:TRACE", issueUrl = "to get trace level logs for debug") + public void testMultipleWriters() throws Exception { + prepareCluster(1, 2, INDEX_NAME, 1, 1); + Map indexStats = indexData(randomIntBetween(2, 5), true, true, INDEX_NAME); + assertEquals(2, getNumShards(INDEX_NAME).totalNumShards); + + // ensure replica has latest checkpoint + flushAndRefresh(INDEX_NAME); + flushAndRefresh(INDEX_NAME); + + Index indexObj = clusterService().state().metadata().indices().get(INDEX_NAME).getIndex(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName(INDEX_NAME)); + IndexService indexService = indicesService.indexService(indexObj); + IndexShard indexShard = indexService.getShard(0); + RemoteSegmentMetadata remoteSegmentMetadataBeforeFailover = indexShard.getRemoteDirectory().readLatestMetadataFile(); + + // ensure all segments synced to replica + assertBusy( + () -> assertHitCount( + client(primaryNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(), + indexStats.get(TOTAL_OPERATIONS) + ), + 30, + TimeUnit.SECONDS + ); + assertBusy( + () -> assertHitCount( + client(replicaNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(), + indexStats.get(TOTAL_OPERATIONS) + ), + 30, + TimeUnit.SECONDS + ); + + String newPrimaryNodeName = replicaNodeName(INDEX_NAME); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + ensureYellow(INDEX_NAME); + + indicesService = internalCluster().getInstance(IndicesService.class, newPrimaryNodeName); + indexService = indicesService.indexService(indexObj); + indexShard = indexService.getShard(0); + IndexShard finalIndexShard = indexShard; + assertBusy(() -> assertTrue(finalIndexShard.isPrimaryMode())); + RemoteSegmentMetadata remoteSegmentMetadataAfterFailover = indexShard.getRemoteDirectory().readLatestMetadataFile(); + + assertNotEquals( + new ArrayList<>(remoteSegmentMetadataAfterFailover.toMapOfStrings().values()), + new ArrayList<>(remoteSegmentMetadataBeforeFailover.toMapOfStrings().values()) + ); + assertEquals(remoteSegmentMetadataAfterFailover.getPrimaryTerm(), remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1); + } + /** * Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop. * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 748c2947cd36b..5834eabfa9af0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2015,7 +2015,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO /* ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003 */ - private RemoteSegmentStoreDirectory getRemoteDirectory() { + public RemoteSegmentStoreDirectory getRemoteDirectory() { assert indexSettings.isRemoteStoreEnabled(); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d96a7e7c95ecf..e96a4bb19a960 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -209,6 +210,16 @@ private boolean syncSegments() { try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos); + if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "primaryTerm mismatch during segments upload to remote store [%s] != [%s]", + checkpoint.getPrimaryTerm(), + indexShard.getOperationPrimaryTerm() + ) + ); + } // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); From ed00efd458a54ca5bb582704b6ba86e11195a948 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 30 Jan 2024 14:58:17 +0530 Subject: [PATCH 3/7] fix failing UTs Signed-off-by: bansvaru --- .../opensearch/index/shard/RemoteStoreRefreshListenerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 555284e55a0fa..8e692b32c3857 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -500,6 +500,7 @@ private Tuple mockIn when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getLatestReplicationCheckpoint().getPrimaryTerm()); when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint()); // Mock indexShard.routingEntry().primary() From a586345161cfc5e669f3bff4913abcfcd1188aea Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 30 Jan 2024 14:59:21 +0530 Subject: [PATCH 4/7] better fix for UT Signed-off-by: bansvaru --- .../opensearch/index/shard/RemoteStoreRefreshListenerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 8e692b32c3857..b8d4070a655b7 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -500,7 +500,7 @@ private Tuple mockIn when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() - when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getLatestReplicationCheckpoint().getPrimaryTerm()); + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint()); // Mock indexShard.routingEntry().primary() From 1b90a1ba871317203e98f09a6b2806a86371bc89 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 30 Jan 2024 15:31:10 +0530 Subject: [PATCH 5/7] add new failure test Signed-off-by: bansvaru --- .../RemoteStoreRefreshListenerTests.java | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index b8d4070a655b7..6567cb03f3dc6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -425,6 +425,33 @@ public void testTrackerData() throws Exception { assertBusy(() -> assertNoLag(tracker)); } + /** + * Tests segments upload fails with replication checkpoint and replication tracker primary term mismatch + */ + public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception { + int totalAttempt = 1; + int checkpointPublishSucceedOnAttempt = 0; + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(totalAttempt); + + // success latch should change as we would be failed primary term latest validation. + CountDownLatch successLatch = new CountDownLatch(1); + CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + totalAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch, + false + ); + + assertBusy(() -> assertEquals(1, tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId()).getTotalUploadsFailed())); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(1, successLatch.getCount())); + assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount())); + } + private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getRefreshSeqNoLag()); assertEquals(0, tracker.getBytesLag()); @@ -460,6 +487,24 @@ private Tuple mockIn CountDownLatch successLatch, int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + succeedCheckpointPublishOnAttempt, + reachedCheckpointPublishLatch, + true + ); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int succeedCheckpointPublishOnAttempt, + CountDownLatch reachedCheckpointPublishLatch, + boolean mockPrimaryTerm ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -500,7 +545,9 @@ private Tuple mockIn when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() - when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + if (mockPrimaryTerm) { + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + } when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint()); // Mock indexShard.routingEntry().primary() From d39fc54b5f80c2e2d32f0494481b3dddec8471dc Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 30 Jan 2024 16:31:10 +0530 Subject: [PATCH 6/7] remove debug logging from integ test Signed-off-by: bansvaru --- .../java/org/opensearch/remotestore/RemoteStoreRestoreIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 6fbf04ff15ea6..9a9aa5ada3a38 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -26,7 +26,6 @@ import org.opensearch.repositories.Repository; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.junit.annotations.TestIssueLogging; import java.io.IOException; import java.nio.file.Path; @@ -142,7 +141,6 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, boolea restoreAndVerify(shardCount, 0, indexStats); } - @TestIssueLogging(value = "org.opensearch:TRACE", issueUrl = "to get trace level logs for debug") public void testMultipleWriters() throws Exception { prepareCluster(1, 2, INDEX_NAME, 1, 1); Map indexStats = indexData(randomIntBetween(2, 5), true, true, INDEX_NAME); From ebb5b4a0b7341ccf41bc9ffe3fd7469e6aa896a0 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 30 Jan 2024 16:45:11 +0530 Subject: [PATCH 7/7] update integ tests to remote flaky behaviour Signed-off-by: bansvaru --- .../opensearch/remotestore/RemoteStoreRestoreIT.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 9a9aa5ada3a38..dfa5528eafcf2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -182,14 +181,11 @@ public void testMultipleWriters() throws Exception { indexService = indicesService.indexService(indexObj); indexShard = indexService.getShard(0); IndexShard finalIndexShard = indexShard; - assertBusy(() -> assertTrue(finalIndexShard.isPrimaryMode())); - RemoteSegmentMetadata remoteSegmentMetadataAfterFailover = indexShard.getRemoteDirectory().readLatestMetadataFile(); - - assertNotEquals( - new ArrayList<>(remoteSegmentMetadataAfterFailover.toMapOfStrings().values()), - new ArrayList<>(remoteSegmentMetadataBeforeFailover.toMapOfStrings().values()) + assertBusy(() -> assertTrue(finalIndexShard.isStartedPrimary() && finalIndexShard.isPrimaryMode())); + assertEquals( + finalIndexShard.getLatestSegmentInfosAndCheckpoint().v2().getPrimaryTerm(), + remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1 ); - assertEquals(remoteSegmentMetadataAfterFailover.getPrimaryTerm(), remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1); } /**