From 1a1252e4fda76ff9cadcdee63dfbc6758d54defc Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Mon, 20 Jan 2025 18:55:24 +0800 Subject: [PATCH] Move the new class to TimerMessageStore --- .../apache/rocketmq/broker/BrokerController.java | 4 ++-- .../rocketmq/broker/slave/SlaveSynchronize.java | 4 +--- .../header/SendMessageResponseHeader.java | 15 --------------- .../rocketmq/store/config/MessageStoreConfig.java | 5 +---- 4 files changed, 4 insertions(+), 24 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index db4a3545f06..fa60d87e624 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -831,12 +831,12 @@ public boolean initializeMessageStore() { messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration); this.messageStore = MessageStoreFactory.build(context, defaultMessageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); - if (messageStoreConfig.isTimerWheelEnable()) { this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir())); TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir())); this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager); - this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));this.messageStore.setTimerMessageStore(this.timerMessageStore); + this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg)); + this.messageStore.setTimerMessageStore(this.timerMessageStore); } } catch (Exception e) { result = false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 4dc87fc1989..bfb5c9dcd03 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -238,9 +238,7 @@ private void syncTimerMetrics() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { - if (null != brokerController.getMessageStore().getTimerMessageRocksDBStore()) { - // TODO metric sync - } else if (null != brokerController.getMessageStore().getTimerMessageStore()) { + if (null != brokerController.getMessageStore().getTimerMessageStore()) { TimerMetrics.TimerMetricsSerializeWrapper metricsSerializeWrapper = this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak); if (!brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().equals(metricsSerializeWrapper.getDataVersion())) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java index 7351172f84a..7563b910331 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageResponseHeader.java @@ -37,7 +37,6 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes private String transactionId; private String batchUniqId; private String recallHandle; - private Long delayTime; @Override public void checkFields() throws RemotingCommandException { @@ -51,7 +50,6 @@ public void encode(ByteBuf out) { writeIfNotNull(out, "transactionId", transactionId); writeIfNotNull(out, "batchUniqId", batchUniqId); writeIfNotNull(out, "recallHandle", recallHandle); - writeIfNotNull(out, "delayTime", delayTime); } @Override @@ -85,11 +83,6 @@ public void decode(HashMap fields) throws RemotingCommandExcepti if (str != null) { this.recallHandle = str; } - - str = fields.get("delayTime"); - if (str != null) { - this.delayTime = Long.parseLong(str); - } } public String getMsgId() { @@ -139,12 +132,4 @@ public String getRecallHandle() { public void setRecallHandle(String recallHandle) { this.recallHandle = recallHandle; } - - public Long getDelayTime() { - return delayTime; - } - - public void setDelayTime(Long delayTime) { - this.delayTime = delayTime; - } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 73e90a0d055..a6d15cdd61f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -479,10 +479,7 @@ public void setRocksdbCompressionType(String compressionType) { private int readCountTimerOnRocksDB = -1; /** - * When enabled, the scheduled task is started. - * if time wheel is enabled, the time wheel only the correct ones are supported - * The message will be written to rocksdb. - * Close the time wheel when the file timing message is 0 + * When enabled, the scheduled message is using rocksdb */ private boolean enableTimerMessageOnRocksDB = false;