Skip to content

Commit

Permalink
Merge branch 'asf-develop' into refactor-0615
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin committed Jun 23, 2023
2 parents 7303977 + 4e09a9c commit a4f8372
Show file tree
Hide file tree
Showing 28 changed files with 1,756 additions and 635 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,9 @@ public void registerProcessor() {
*/
this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);

this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
/**
* ChangeInvisibleTimeProcessor
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -538,12 +538,23 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
return false;
}

int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
if (ackMsg instanceof BatchAckMsg) {
for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
} else {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
}
}
} else {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
return true;
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
} else {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
return true;
}
}

if (brokerController.getBrokerConfig().isEnablePopLog()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
*/
package org.apache.rocketmq.broker.processor;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.pop.PopCheckPoint;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class PopInflightMessageCounter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

Expand Down Expand Up @@ -61,26 +61,24 @@ public void incrementInFlightMessageNum(String topic, String group, int queueId,
});
}

public void decrementInFlightMessageNum(String topic, String group, String ckInfo) {
String[] ckInfoList = ExtraInfoUtil.split(ckInfo);
long popTime = ExtraInfoUtil.getPopTime(ckInfoList);
public void decrementInFlightMessageNum(String topic, String group, long popTime, int qId, int delta) {
if (popTime < this.brokerController.getShouldStartTime()) {
return;
}
decrementInFlightMessageNum(topic, group, ExtraInfoUtil.getQueueId(ckInfoList));
decrementInFlightMessageNum(topic, group, qId, delta);
}

public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) {
return;
}
decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId());
decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId(), 1);
}

public void decrementInFlightMessageNum(String topic, String group, int queueId) {
private void decrementInFlightMessageNum(String topic, String group, int queueId, int delta) {
topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> {
queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> {
if (counter.decrementAndGet() <= 0) {
if (counter.addAndGet(-delta) <= 0) {
return null;
}
return counter;
Expand Down
Loading

0 comments on commit a4f8372

Please sign in to comment.