diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index b67735c4a4d..4063b1d0544 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -1649,6 +1649,22 @@ public void setTimerPrecisionMs(int timerPrecisionMs) { this.timerPrecisionMs = candidates[candidates.length - 1]; } + // visible for test + public void setTimerPrecision(int timerPrecisionMs) { + if (enableTimerMessageOnRocksDB) { + this.timerPrecisionMs = timerPrecisionMs; + return; + } + int[] candidates = {100, 200, 500, 1000}; + for (int i = 1; i < candidates.length; i++) { + if (timerPrecisionMs < candidates[i]) { + this.timerPrecisionMs = candidates[i - 1]; + return; + } + } + this.timerPrecisionMs = candidates[candidates.length - 1]; + } + public int getTimerRollWindowSlot() { return timerRollWindowSlot; } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java index 02948750799..a850ab7067d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java @@ -76,6 +76,7 @@ public interface TimerMessageKVStore { /** * Get the checkpoint of the timer message kv store. + * Key : columnFamily byte[]; Value : checkpoint long. * @param columnFamily the column family of the timer message kv store. * @return the checkpoint of the timer message kv store. */ @@ -86,5 +87,5 @@ public interface TimerMessageKVStore { * @param key the key of the metric. * @param update the value of the metric. */ - void syncMetric(long key, long update); + void syncMetric(long key, int update); } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java index 4fce0b454e5..3bf33be24d0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java @@ -238,7 +238,7 @@ public int getMetricSize(long lowerTime, long upperTime) { try (ReadOptions readOptions = new ReadOptions() .setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array())) - .setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array())); + .setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array())); RocksIterator iterator = db.newIterator(metricColumnFamilyHandle, readOptions)) { iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array()); while (iterator.isValid()) { @@ -260,9 +260,14 @@ public long getCheckpoint(byte[] columnFamily) { } @Override - public void syncMetric(long key, long update) { + public void syncMetric(long key, int update) { try { - db.put(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array(), ByteBuffer.allocate(4).putInt((int) update).array()); + byte[] keyBytes = db.get(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array()); + if (keyBytes != null) { + ByteBuffer oldValue = ByteBuffer.wrap(keyBytes); + update = oldValue.getInt() + update; + } + db.put(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array(), ByteBuffer.allocate(4).putInt(update).array()); } catch (RocksDBException e) { throw new RuntimeException("Sync metric to RocksDB error", e); } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java index 2e6abb17d42..46ff237ba54 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java @@ -227,7 +227,7 @@ private void calcTimerDistribution() { int slotNumber = precisionMs; int rocksdbNumber = 0; for (int i = 0; i < this.slotSize; i++) { - timerMetrics.updateDistPair(i, timerMessageKVStore.getMetricSize(rocksdbNumber, rocksdbNumber + slotNumber - 1)); + timerMetrics.updateDistPair(i, timerMessageKVStore.getMetricSize(rocksdbNumber, rocksdbNumber + slotNumber)); rocksdbNumber += slotNumber; } } @@ -436,12 +436,16 @@ public void run() { try { List timerMessageRecord = dequeuePutQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS); int flag = 0; + long delayTime = -1; if (null == timerMessageRecord || timerMessageRecord.isEmpty()) { continue; } for (TimerMessageRecord record : timerMessageRecord) { MessageExt msg = record.getMessageExt(); MessageExtBrokerInner messageExtBrokerInner = convert(msg, record.isRoll()); + if (delayTime == -1) { + delayTime = Long.parseLong(record.getMessageExt().getProperty(TIMER_OUT_MS)); + } flag = record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG) == null ? 0 : Integer.parseInt(record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG)); boolean processed = false; @@ -470,6 +474,7 @@ public void run() { addMetric(msg, -1); addMetric((int) (Long.parseLong(msg.getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), -1); } + timerMessageKVStore.syncMetric(delayTime % metricsIntervalMs, -timerMessageRecord.size()); timerMessageKVStore.deleteAssignRecords(getColumnFamily(flag), timerMessageRecord, timerMessageRecord.get(0).getReadOffset()); } catch (InterruptedException e) { TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e); @@ -802,4 +807,12 @@ public long getEnqueueBehindMillis() { public long getDequeueBehindMillis() { return System.currentTimeMillis() - timerGetMessageServices.get(0).checkpoint; } + + public TimerMessageKVStore getTimerMessageKVStore() { + return timerMessageKVStore; + } + + public long getMetricsIntervalMs() { + return metricsIntervalMs; + } } \ No newline at end of file diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java index 696eb723bb4..767433f1c55 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java @@ -66,7 +66,7 @@ public class TimerMessageRocksDBStoreTest { MessageStore messageStore; MessageStoreConfig storeConfig; - int precisionMs = 500; + int precisionMs = 1; AtomicInteger counter = new AtomicInteger(0); private SocketAddress bornHost; private SocketAddress storeHost; @@ -80,6 +80,8 @@ public void setUp() throws Exception { storeConfig.setEnableTimerMessageOnRocksDB(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog"); + storeConfig.setTimerPrecision(1); + precisionMs = storeConfig.getTimerPrecisionMs(); messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest", false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); @@ -158,9 +160,7 @@ private PutMessageResult transformTimerMessage(TimerMessageRocksDBStore timerMes } int timerPrecisionMs = storeConfig.getTimerPrecisionMs(); - if (deliverMs % timerPrecisionMs == 0) { - deliverMs -= timerPrecisionMs; - } else { + if (deliverMs % timerPrecisionMs != 0) { deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs; } @@ -333,6 +333,31 @@ public void testExtractUniqueKey() { assertEquals("123456", TimerMessageStore.extractUniqueKey(deleteKey)); } + @Test + public void testGetTimerMetrics() { + String topic = "TimerRocksdbTest_testGetTimerMetrics"; + TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null); + timerMessageStore.load(); + timerMessageStore.start(); + storeConfig.setTimerStopDequeue(true); + long delayMs = System.currentTimeMillis() + 3000; + + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner inner = buildMessage(delayMs, topic, false); + transformTimerMessage(timerMessageStore, inner); + assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus()); + } + await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore(). + getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(), + delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 10); + + storeConfig.setTimerStopDequeue(false); + await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore(). + getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(), + delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 0); + timerMessageStore.shutdown(); + } + private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,