Skip to content

Commit

Permalink
[ISSUE apache#7872] Fix Tiered Store Query Message Async return diffe…
Browse files Browse the repository at this point in the history
…rent view each time
  • Loading branch information
AYue-94 committed Feb 29, 2024
1 parent eed303d commit 557e33e
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,7 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
CompletableFuture<List<IndexItem>> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);

return future.thenCompose(indexItemList -> {
QueryMessageResult result = new QueryMessageResult();
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
List<CompletableFuture<SelectMappedBufferResult>> futureList = new ArrayList<>(maxCount);
for (IndexItem indexItem : indexItemList) {
if (topicId != indexItem.getTopicId()) {
continue;
Expand All @@ -563,17 +562,19 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
if (flatFile == null) {
continue;
}
CompletableFuture<Void> getMessageFuture = flatFile
CompletableFuture<SelectMappedBufferResult> getMessageFuture = flatFile
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
.thenApply(messageBuffer -> new SelectMappedBufferResult(indexItem.getOffset(), messageBuffer, indexItem.getSize(), null));
futureList.add(getMessageFuture);
if (futureList.size() >= maxCount) {
break;
}
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> {
QueryMessageResult result = new QueryMessageResult();
futureList.forEach(f -> f.thenAccept(result::addMessage));
return result;
});
}).whenComplete((result, throwable) -> {
if (result != null) {
LOGGER.info("MessageFetcher#queryMessageAsync, " +
Expand Down

0 comments on commit 557e33e

Please sign in to comment.