Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16452: Don't throw OOORE when converting the offset to metadata #15825

Merged
merged 27 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
33bf18b
KAFKA-16452: Don't throw OOORE when converting the offset to metadata
kamalcph Mar 31, 2024
5fc04f6
Addressed the review comments
kamalcph May 7, 2024
9143e45
Addressed the review comments
kamalcph May 10, 2024
5345bd3
fix the condition in KafkaMetadataLog#highWatermark method
kamalcph May 10, 2024
d23f426
Renanme convertToOffsetMetadata to maybeConvertToOffsetMetadata
kamalcph May 13, 2024
a4e31e0
Update UNKNOWN_OFFSET_METADATA constant to be messageOffsetOnly
kamalcph May 20, 2024
afca50e
Accumulate 1 byte only when fetchOffset < endOffset
kamalcph May 20, 2024
ff3ab0e
Address the case when maxOffsetMetadata can be messageOffsetOnly
kamalcph May 20, 2024
5945dec
Add unit tests
kamalcph May 20, 2024
9f22047
Address luke's review comments.
kamalcph May 21, 2024
f26d381
fix typo
kamalcph May 21, 2024
14efe65
Expose filename filter in the test builder
kamalcph May 21, 2024
503732a
Addressed some of the review comments
kamalcph May 21, 2024
06ee554
add filename filter to the output
kamalcph May 21, 2024
56604d6
Address the pending comments.
kamalcph May 22, 2024
ce09bcd
cover with unit tests
kamalcph May 22, 2024
7e97904
extract variable in unit tests for clarity
kamalcph May 22, 2024
eb75573
Addressed the review comments
kamalcph May 22, 2024
f5ce2a6
Addressed the review comments
kamalcph May 22, 2024
6e3f113
Use Optional for maxPosition
kamalcph May 22, 2024
7ce211e
update comments
kamalcph May 22, 2024
5d080ad
Add debug log
kamalcph May 23, 2024
34d8ca0
Add explicit check in the LogOffsetMetadata
kamalcph May 23, 2024
59e6a7b
update LogOffsetMetadata
kamalcph May 23, 2024
21a6d27
Addressed the review comments
kamalcph May 23, 2024
91bd96b
Refactor the DelayedFetch condition to complete the request
kamalcph May 23, 2024
d4668be
Addressed the review comment
kamalcph May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,19 @@ class LocalLog(@volatile private var _dir: File,
val segment = segmentOpt.get
val baseOffset = segment.baseOffset

val maxPosition =
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
else segment.size

fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
// 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
// 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
// 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
// return maxPosition as empty to avoid reading beyond the max-offset
val maxPositionOpt: Optional[java.lang.Long] =
if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
Optional.of(segment.size)
else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
Optional.of(maxOffsetMetadata.relativePositionInSegment)
else
Optional.empty()

fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val offsetMetadata = highWatermarkMetadata
if (offsetMetadata.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
val fullOffset = maybeConvertToOffsetMetadata(highWatermark)
updateHighWatermarkMetadata(fullOffset)
fullOffset
}
Expand Down Expand Up @@ -405,7 +405,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
val fullOffset = maybeConvertToOffsetMetadata(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
Expand Down Expand Up @@ -964,7 +964,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.asScala match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
Some(convertToOffsetMetadataOrThrow(offset))
Some(maybeConvertToOffsetMetadata(offset))
case other => other
}

Expand Down Expand Up @@ -1425,11 +1425,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,

/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
* 1. If the message offset is less than the log-start-offset (or) local-log-start-offset, then it returns the
* message-only metadata.
* 2. If the message offset is beyond the log-end-offset, then it returns the message-only metadata.
* 3. For all other cases, it returns the offset metadata from the log.
*/
private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
checkLogStartOffset(offset)
localLog.convertToOffsetMetadataOrThrow(offset)
private[log] def maybeConvertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
try {
localLog.convertToOffsetMetadataOrThrow(offset)
} catch {
case _: OffsetOutOfRangeException =>
new LogOffsetMetadata(offset)
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ final class KafkaMetadataLog private (

override def highWatermark: LogOffsetMetadata = {
val hwm = log.fetchOffsetSnapshot.highWatermark
val segmentPosition: Optional[OffsetMetadata] = if (hwm.messageOffsetOnly) {
val segmentPosition: Optional[OffsetMetadata] = if (!hwm.messageOffsetOnly) {
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
} else {
Optional.empty()
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,25 @@ class DelayedFetch(
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
if (fetchOffset.messageOffset > endOffset.messageOffset) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fold this into the condition above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear on this one. The current if checks are easy to read, we can add one debug log to avoid the empty if block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest changes in LogOffsetMetadata, it seems we only have to add one check in DelayedFetch.scala#106 compared to trunk, that would suffice:

else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < endOffset.messageOffset) 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since onOlderSegment() and onSameSegment() do the messageOffsetOnly() check already. We could just get rid of this condition.

// If fetchOffset or endOffset is message only, we return empty records when reading from the log.
// So, to be consistent, we want to avoid accumulating new bytes in this case. This can happen when the
// high-watermark is stale, but should be rare.
debug(s"Not satisfying fetch $this since the fetchOffset (or) endOffset is message-offset only " +
s"for partition $topicIdPartition.")
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $this immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
} else if (fetchOffset.onSameSegment(endOffset)) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, when}

Expand All @@ -46,7 +48,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand Down Expand Up @@ -92,7 +94,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand All @@ -116,6 +118,9 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.error)
}

@Test
Expand Down Expand Up @@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NONE, fetchResult.error)
}

@ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}")
@ValueSource(longs = Array(0, 500))
def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val fetchOffset = 450L
val logStartOffset = 5L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
params = fetchParams,
fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
)

val partition: Partition = mock(classOf[Partition])
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
// Note that the high-watermark does not contain the complete metadata
val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
when(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE)

// 1. When `endOffset` is 0, it refers to the truncation case
// 2. When `endOffset` is 500, it refers to the normal case
val expected = endOffset == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in both cases, we are not forcing the completion of the delayed fetch, right?

Copy link
Contributor Author

@kamalcph kamalcph May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when fetchOffset > endOffset, forceComplete gets called. In this case. 450 > 0, so it triggers force completion.

This is the only behavior change post the refactor. Previously when fetchOffset > endOffset:

  1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request.
  2. If the endOffset lie on the older segment compared to fetch-offset, then we complete the request by calling force-complete.

We can retain the same behavior if required.

if (fetchOffset.messageOffset > endOffset.messageOffset) {
  if (endOffset.onOlderSegment(fetchOffset)) {
    // Case F, this can happen when the new fetch operation is on a truncated leader
    debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
    return forceComplete()
  }
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
               ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. We won't complete when endOffset is 500 because it doesn't contain offset metadata. But this is not the normal case. So, we want to adjust "it refers to the normal case" accordingly.

assertEquals(expected, delayedFetch.tryComplete())
assertEquals(expected, delayedFetch.isCompleted)
assertEquals(expected, fetchResultOpt.isDefined)
if (fetchResultOpt.isDefined) {
assertEquals(Errors.NONE, fetchResultOpt.get.error)
}
}

private def buildFollowerFetchParams(
replicaId: Int,
maxWaitMs: Int
maxWaitMs: Int,
minBytes: Int = 1,
): FetchParams = {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
1,
maxWaitMs,
1,
minBytes,
maxBytes,
FetchIsolation.LOG_END,
Optional.empty()
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ final class KafkaMetadataLogTest {
)
}

@Test
def testHighWatermarkOffsetMetadata(): Unit = {
val numberOfRecords = 10
val epoch = 1
val log = buildMetadataLog(tempDir, mockTime)

append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))

val highWatermarkMetadata = log.highWatermark
assertEquals(numberOfRecords, highWatermarkMetadata.offset)
assertTrue(highWatermarkMetadata.metadata.isPresent)

val segmentPosition = highWatermarkMetadata.metadata.get().asInstanceOf[SegmentPosition]
assertEquals(0, segmentPosition.baseOffset)
assertTrue(segmentPosition.relativePosition > 0)
}

@Test
def testCreateSnapshotBeforeLogStartOffset(): Unit = {
val numberOfRecords = 10
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/scala/unit/kafka/log/LocalLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,43 @@ class LocalLogTest {
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
}

@Test
def testWhenFetchOffsetHigherThanMaxOffset(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
for (offset <- 0 to 4) {
appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0)
log.roll()
}
assertEquals(3, log.segments.numberOfSegments)

// case-0: valid case, `startOffset` < `maxOffsetMetadata.offset`
var fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertEquals(1, fetchDataInfo.records.records.asScala.size)
assertEquals(new LogOffsetMetadata(3, 2L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-1: `startOffset` == `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 4L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(4L, 4L, 0), fetchDataInfo.fetchOffsetMetadata)

// case-2: `startOffset` > `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 5L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(5L, 4L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-3: `startOffset` < `maxMessageOffset.offset` but `maxMessageOffset.messageOnlyOffset` is true
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, -1L, -1))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, 2L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-4: `startOffset` < `maxMessageOffset.offset`, `maxMessageOffset.messageOnlyOffset` is false, but
// `maxOffsetMetadata.segmentBaseOffset` < `startOffset.segmentBaseOffset`
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 0L, 40))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, 2L, 69), fetchDataInfo.fetchOffsetMetadata)
}

@Test
def testTruncateTo(): Unit = {
for (offset <- 0 to 11) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class LogLoaderTest {
val wrapper = Mockito.spy(segment)
Mockito.doAnswer { in =>
segmentsWithReads += wrapper
segment.read(in.getArgument(0, classOf[java.lang.Long]), in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, classOf[java.lang.Long]), in.getArgument(3, classOf[java.lang.Boolean]))
segment.read(in.getArgument(0, classOf[java.lang.Long]), in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, classOf[java.util.Optional[java.lang.Long]]), in.getArgument(3, classOf[java.lang.Boolean]))
}.when(wrapper).read(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())
Mockito.doAnswer { in =>
recoveredSegments += wrapper
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

import java.io.{File, RandomAccessFile}
import java.util
Expand Down Expand Up @@ -144,6 +144,34 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator, read.records.records.iterator)
}

@ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}")
@ValueSource(booleans = Array(true, false))
def testReadWhenNoMaxPosition(minOneMessage: Boolean): Unit = {
val maxPosition: Optional[java.lang.Long] = Optional.empty()
val maxSize = 1
val seg = createSegment(40)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
// read before first offset
var read = seg.read(48, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at first offset
read = seg.read(50, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at last offset
read = seg.read(51, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at log-end-offset
read = seg.read(52, maxSize, maxPosition, minOneMessage)
assertNull(read)
// read beyond log-end-offset
read = seg.read(53, maxSize, maxPosition, minOneMessage)
assertNull(read)
}

/**
* In a loop append two messages then truncate off the second of those messages and check that we can read
* the first but not the second message.
Expand Down Expand Up @@ -331,7 +359,7 @@ class LogSegmentTest {
writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty())
for (i <- 0 until 100) {
val records = seg.read(i, 1, seg.size(), true).records.records
val records = seg.read(i, 1, Optional.of(seg.size()), true).records.records
assertEquals(i, records.iterator.next().offset)
}
}
Expand Down
Loading