Skip to content

Commit

Permalink
[ISSUE #8268] Fix pop orderly commitOffset when NO_MATCHED_MESSAGE (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma authored Jun 6, 2024
1 parent d1974c5 commit d9ebe88
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,10 +638,13 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
|| GetMessageStatus.MESSAGE_WAS_REMOVING.equals(result.getStatus())
|| GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus()))
&& result.getNextBeginOffset() > -1) {
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, finalOffset,
requestHeader.getInvisibleTime(), popTime, reviveQid, result.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName());
// this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
if (isOrder) {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
queueId, result.getNextBeginOffset());
} else {
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, finalOffset,
requestHeader.getInvisibleTime(), popTime, reviveQid, result.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName());
}
}

atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
Expand Down

0 comments on commit d9ebe88

Please sign in to comment.