Skip to content

Commit

Permalink
[ISSUE #7601] Fix slave acting master bug (#7603)
Browse files Browse the repository at this point in the history
* fix NullPointerException when message escape to remote

* fix NumberFormatException when message retry to escape to remote

* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted

* Use properties copies instead of referencing the same map when converting message
  • Loading branch information
gaoyf authored Dec 7, 2023
1 parent c2c29c2 commit faae647
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,7 @@ public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
isScheduleServiceStart = shouldStart;

if (timerMessageStore != null) {
timerMessageStore.syncLastReadTimeMs();
timerMessageStore.setShouldRunningDequeue(shouldStart);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,13 @@ public void syncTimerCheckPoint() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
if (null != brokerController.getMessageStore().getTimerMessageStore()) {
if (null != brokerController.getMessageStore().getTimerMessageStore() &&
!brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
if (null != this.brokerController.getTimerCheckpoint()) {
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.common.message;

import java.util.HashMap;
import java.util.Map;

public class MessageAccessor {
Expand Down Expand Up @@ -96,4 +97,10 @@ public static Message cloneMessage(final Message msg) {
return newMsg;
}

public static Map<String, String> deepCopyProperties(Map<String, String> properties) {
if (properties == null) {
return null;
}
return new HashMap<>(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ public void setShouldRunningDequeue(final boolean shouldRunningDequeue) {
this.shouldRunningDequeue = shouldRunningDequeue;
}

public boolean isShouldRunningDequeue() {
return shouldRunningDequeue;
}

public void addMetric(MessageExt msg, int value) {
try {
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
Expand Down Expand Up @@ -1084,8 +1088,10 @@ public int doPut(MessageExtBrokerInner message, boolean roll) throws Exception {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
if (putMessageResult.getAppendMessageResult() != null) {
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
}
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;
Expand Down Expand Up @@ -1119,7 +1125,7 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll)
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties()));
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
Expand Down

0 comments on commit faae647

Please sign in to comment.