Skip to content

Commit

Permalink
Move the new class to TimerMessageStore
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghuaiyuan committed Jan 20, 2025
1 parent 3c4e922 commit 1a1252e
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -831,12 +831,12 @@ public boolean initializeMessageStore() {
messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));

if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));this.messageStore.setTimerMessageStore(this.timerMessageStore);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (Exception e) {
result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ private void syncTimerMetrics() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
if (null != brokerController.getMessageStore().getTimerMessageRocksDBStore()) {
// TODO metric sync
} else if (null != brokerController.getMessageStore().getTimerMessageStore()) {
if (null != brokerController.getMessageStore().getTimerMessageStore()) {
TimerMetrics.TimerMetricsSerializeWrapper metricsSerializeWrapper =
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
if (!brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().equals(metricsSerializeWrapper.getDataVersion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes
private String transactionId;
private String batchUniqId;
private String recallHandle;
private Long delayTime;

@Override
public void checkFields() throws RemotingCommandException {
Expand All @@ -51,7 +50,6 @@ public void encode(ByteBuf out) {
writeIfNotNull(out, "transactionId", transactionId);
writeIfNotNull(out, "batchUniqId", batchUniqId);
writeIfNotNull(out, "recallHandle", recallHandle);
writeIfNotNull(out, "delayTime", delayTime);
}

@Override
Expand Down Expand Up @@ -85,11 +83,6 @@ public void decode(HashMap<String, String> fields) throws RemotingCommandExcepti
if (str != null) {
this.recallHandle = str;
}

str = fields.get("delayTime");
if (str != null) {
this.delayTime = Long.parseLong(str);
}
}

public String getMsgId() {
Expand Down Expand Up @@ -139,12 +132,4 @@ public String getRecallHandle() {
public void setRecallHandle(String recallHandle) {
this.recallHandle = recallHandle;
}

public Long getDelayTime() {
return delayTime;
}

public void setDelayTime(Long delayTime) {
this.delayTime = delayTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,7 @@ public void setRocksdbCompressionType(String compressionType) {
private int readCountTimerOnRocksDB = -1;

/**
* When enabled, the scheduled task is started.
* if time wheel is enabled, the time wheel only the correct ones are supported
* The message will be written to rocksdb.
* Close the time wheel when the file timing message is 0
* When enabled, the scheduled message is using rocksdb
*/
private boolean enableTimerMessageOnRocksDB = false;

Expand Down

0 comments on commit 1a1252e

Please sign in to comment.