diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 29eb1466056e8..1d58e0be65a2a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -76,7 +76,6 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.mockito.ArgumentCaptor; import java.io.IOException; import java.io.OutputStream; @@ -108,7 +107,6 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { @@ -205,9 +203,6 @@ public void testSendSnapshotSendsOps() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class); - final RecoverySourceHandler handler = - new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); final List operations = new ArrayList<>(); final int initialNumberOfDocs = randomIntBetween(16, 64); for (int i = 0; i < initialNumberOfDocs; i++) { @@ -219,38 +214,23 @@ public void testSendSnapshotSendsOps() throws IOException { final Engine.Index index = getIndex(Integer.toString(i)); operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true))); } - operations.add(null); final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); - RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo, - endingSeqNo, new Translog.Snapshot() { - @Override - public void close() { - - } - - private int counter = 0; - @Override - public int totalOperations() { - return operations.size() - 1; - } - - @Override - public Translog.Operation next() throws IOException { - return operations.get(counter++); - } - }, randomNonNegativeLong(), randomNonNegativeLong()); + final List shippedOps = new ArrayList<>(); + RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + @Override + public long indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu) { + shippedOps.addAll(operations); + return SequenceNumbers.NO_OPS_PERFORMED; + } + }; + RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); + RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong()); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); - final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); - verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), - ArgumentCaptor.forClass(Long.class).capture(), ArgumentCaptor.forClass(Long.class).capture()); - List shippedOps = new ArrayList<>(); - for (List list: shippedOpsCaptor.getAllValues()) { - shippedOps.addAll(list); - } shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); assertThat(shippedOps.size(), equalTo(expectedOps)); for (int i = 0; i < shippedOps.size(); i++) { @@ -261,30 +241,8 @@ public Translog.Operation next() throws IOException { List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); - expectThrows(IllegalStateException.class, () -> - handler.phase2(startingSeqNo, requiredStartingSeqNo, - endingSeqNo, new Translog.Snapshot() { - @Override - public void close() { - - } - - private int counter = 0; - - @Override - public int totalOperations() { - return operations.size() - 1 - opsToSkip.size(); - } - - @Override - public Translog.Operation next() throws IOException { - Translog.Operation op; - do { - op = operations.get(counter++); - } while (op != null && opsToSkip.contains(op)); - return op; - } - }, randomNonNegativeLong(), randomNonNegativeLong())); + expectThrows(IllegalStateException.class, () -> handler.phase2(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong())); } } @@ -716,4 +674,39 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR int totalTranslogOps, ActionListener listener) { } } + + private Translog.Snapshot newTranslogSnapshot(List operations, List operationsToSkip) { + return new Translog.Snapshot() { + int index = 0; + int skippedCount = 0; + + @Override + public int totalOperations() { + return operations.size(); + } + + @Override + public int skippedOperations() { + return skippedCount; + } + + @Override + public Translog.Operation next() { + while (index < operations.size()) { + Translog.Operation op = operations.get(index++); + if (operationsToSkip.contains(op)) { + skippedCount++; + } else { + return op; + } + } + return null; + } + + @Override + public void close() { + + } + }; + } }