Skip to content

Commit

Permalink
[ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in De…
Browse files Browse the repository at this point in the history
…faultLitePullConsumer (#6197)

* Update lastConsumeTimestamp and lastPullTimestamp in DefaultLitePullConsumer

* Add lastConsumeTimestamp and lastPullTimestamp in consumerRunningInfo for DefaultLitePullConsumer

* Pass the check style
  • Loading branch information
RongtongJin authored Feb 28, 2023
1 parent 33371e5 commit 88000da
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
this.rebalanceImpl = rebalanceImpl;
}

public Set<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}

public boolean isPaused(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
Expand Down Expand Up @@ -158,7 +159,8 @@ private enum SubscriptionType {
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();

// only for test purpose, will be modified by reflection in unit test.
@SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
@SuppressWarnings("FieldMayBeFinal")
private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;

public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
Expand Down Expand Up @@ -394,7 +396,7 @@ private void operateAfterRunning() throws MQClientException {
}
// If assign function invoke before start function, then update pull task after initialization.
if (subscriptionType == SubscriptionType.ASSIGN) {
updateAssignPullTask(assignedMessageQueue.messageQueues());
updateAssignPullTask(assignedMessageQueue.getAssignedMessageQueues());
}

for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
Expand Down Expand Up @@ -484,12 +486,14 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {

/**
* subscribe data by customizing messageQueueListener
*
* @param topic
* @param subExpression
* @param messageQueueListener
* @throws MQClientException
*/
public synchronized void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
public synchronized void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
try {
if (StringUtils.isEmpty(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
Expand All @@ -516,7 +520,6 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa
}
}


public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
Expand Down Expand Up @@ -637,6 +640,7 @@ public synchronized List<MessageExt> poll(long timeout) {
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
return messages;
}
} catch (InterruptedException ignore) {
Expand All @@ -655,7 +659,7 @@ public void resume(Collection<MessageQueue> messageQueues) {
}

public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
if (!assignedMessageQueue.getAssignedMessageQueues().contains(messageQueue)) {
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
} else {
Expand Down Expand Up @@ -721,7 +725,7 @@ private void removePullTask(final String topic) {
}

public synchronized void commitAll() {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
for (MessageQueue messageQueue : assignedMessageQueue.getAssignedMessageQueues()) {
try {
commit(messageQueue);
} catch (Exception e) {
Expand All @@ -732,6 +736,7 @@ public synchronized void commitAll() {

/**
* Specify offset commit
*
* @param messageQueues
* @param persist
*/
Expand Down Expand Up @@ -760,6 +765,7 @@ public synchronized void commit(final Map<MessageQueue, Long> messageQueues, boo

/**
* Get the queue assigned in subscribe mode
*
* @return
*/
public synchronized Set<MessageQueue> assignment() {
Expand Down Expand Up @@ -895,6 +901,8 @@ public void run() {
return;
}

processQueue.setLastPullTimestamp(System.currentTimeMillis());

if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
Expand Down Expand Up @@ -1172,6 +1180,15 @@ public ConsumerRunningInfo consumerRunningInfo() {
info.setProperties(prop);

info.getSubscriptionSet().addAll(this.subscriptions());

for (MessageQueue mq : this.assignedMessageQueue.getAssignedMessageQueues()) {
ProcessQueue pq = this.assignedMessageQueue.getProcessQueue(mq);
ProcessQueueInfo pqInfo = new ProcessQueueInfo();
pqInfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
pq.fillProcessQueueInfo(pqInfo);
info.getMqTable().put(mq, pqInfo);
}

return info;
}

Expand Down Expand Up @@ -1234,7 +1251,7 @@ public AssignedMessageQueue getAssignedMessageQueue() {
}

public synchronized void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener listener) throws MQClientException {
TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}
Expand Down

0 comments on commit 88000da

Please sign in to comment.