diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java index 38a67d0656..23425c66d8 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java @@ -15,7 +15,6 @@ */ package io.aeron.logbuffer; -import org.agrona.BitUtil; import org.agrona.concurrent.UnsafeBuffer; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; @@ -31,8 +30,6 @@ */ public class TermGapScanner { - private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT); - /** * Handler for notifying of gaps in the log. */ @@ -82,19 +79,17 @@ public static int scanForGap( final int gapBeginOffset = offset; if (offset < limitOffset) { - final int limit = limitOffset - ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += HEADER_LENGTH; + while (offset < limitOffset) { - offset += FRAME_ALIGNMENT; - - if (0 != termBuffer.getIntVolatile(offset)) + if (0 != frameLengthVolatile(termBuffer, offset)) { - offset -= ALIGNED_HEADER_LENGTH; break; } + offset += HEADER_LENGTH; } - final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH; + final int gapLength = offset - gapBeginOffset; handler.onGap(termId, gapBeginOffset, gapLength); } diff --git a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java index f8e3c40a04..8b0e6adfd5 100644 --- a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java +++ b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java @@ -24,6 +24,7 @@ import static org.agrona.BitUtil.align; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verifyNoMoreInteractions; class TermGapScannerTest { @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer() assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, 0, frameOffset); + verifyNoMoreInteractions(gapHandler); } @Test @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding() verifyNoInteractions(gapHandler); } + + @Test + void shouldReportSingleHeaderGap() + { + final int offset = 8192 + 384; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH); + verifyNoMoreInteractions(gapHandler, termBuffer); + } + + @Test + void shouldReportGapAtTheEndOfTheBuffer() + { + final int offset = LOG_BUFFER_CAPACITY - 128; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, 128); + verifyNoMoreInteractions(gapHandler, termBuffer); + } }