Skip to content

Commit

Permalink
KAFKA-14966 extension - Extract more reusable logic from OffsetFetche…
Browse files Browse the repository at this point in the history
…r & additions to utils (apache#9)
  • Loading branch information
lianetm authored and philipnee committed Jun 9, 2023
1 parent d272db2 commit 8a1ef4d
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class OffsetFetcherUtils {

/**
* Callback for the response of the list offset call.
*
* @param listOffsetsResponse The response from the server.
* @return {@link OffsetFetcherUtils.ListOffsetResult} extracted from the response, containing the fetched offsets
* and partitions to retry.
Expand Down Expand Up @@ -274,6 +275,11 @@ static class ListOffsetResult {
this.fetchedOffsets = new HashMap<>();
this.partitionsToRetry = new HashSet<>();
}

Map<TopicPartition, Long> offsetAndMetadataMap() {
return fetchedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset));
}
}

/**
Expand Down

0 comments on commit 8a1ef4d

Please sign in to comment.