Skip to content

Commit

Permalink
add new failure test
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Jan 30, 2024
1 parent a586345 commit 1b90a1b
Showing 1 changed file with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,33 @@ public void testTrackerData() throws Exception {
assertBusy(() -> assertNoLag(tracker));
}

/**
* Tests segments upload fails with replication checkpoint and replication tracker primary term mismatch
*/
public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
int totalAttempt = 1;
int checkpointPublishSucceedOnAttempt = 0;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(totalAttempt);

// success latch should change as we would be failed primary term latest validation.
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
totalAttempt,
refreshCountLatch,
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch,
false
);

assertBusy(() -> assertEquals(1, tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId()).getTotalUploadsFailed()));
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(1, successLatch.getCount()));
assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount()));
}

private void assertNoLag(RemoteSegmentTransferTracker tracker) {
assertEquals(0, tracker.getRefreshSeqNoLag());
assertEquals(0, tracker.getBytesLag());
Expand Down Expand Up @@ -460,6 +487,24 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch
) throws IOException {
return mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
succeedCheckpointPublishOnAttempt,
reachedCheckpointPublishLatch,
true
);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -500,7 +545,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

// Mock indexShard.getOperationPrimaryTerm()
when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm());
if (mockPrimaryTerm) {
when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm());
}
when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint());

// Mock indexShard.routingEntry().primary()
Expand Down

0 comments on commit 1b90a1b

Please sign in to comment.