Skip to content

Commit

Permalink
[ISSUE apache#7878] Performance Improvement and Bug Fixes for the Tie…
Browse files Browse the repository at this point in the history
…red Storage Module (apache#7899)

Performance Improvement and Bug Fixes for the Tiered Storage Module
  • Loading branch information
lizhimins authored Mar 18, 2024
1 parent a32eca0 commit 026a910
Show file tree
Hide file tree
Showing 99 changed files with 5,489 additions and 7,391 deletions.
28 changes: 14 additions & 14 deletions tieredstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@ This article is a cookbook for RocketMQ tiered storage.

Use the following steps to easily use tiered storage

1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.TieredMessageStore` in your `broker.conf`.
1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.MessageStoreExtend` in your `broker.conf`.
2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilepath` to the mount point of storage medium for tiered storage.
3. Start the broker and enjoy!

## Configuration

The following are some core configurations, for more details, see [TieredMessageStoreConfig](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java)

| Configuration | Default value | Unit | Function |
| ------------------------------- | --------------------------------------------------------------- | ----------- | ------------------------------------------------------------------------------- |
| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.TieredMessageStore to use tiered storage |
| tieredMetadataServiceProvider | org.apache.rocketmq.tieredstore.metadata.TieredMetadataManager | | Select your metadata provider |
| tieredBackendServiceProvider | org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment | | Select your backend service provider |
| tieredStoreFilepath | | | Select the directory using for tiered storage, only for POSIX provider. |
| tieredStorageLevel | NOT_IN_DISK | | The options are DISABLE, NOT_IN_DISK, NOT_IN_MEM, FORCE |
| tieredStoreFileReservedTime | 72 | hour | Default topic TTL in tiered storage |
| tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer |
| tieredStoreGroupCommitSize | 33554432 | byte | The size of messages that trigger one batch transfer, 32M by default |
| tieredStoreMaxGroupCommitCount | 10000 | | The maximum number of messages waiting to be transfered per queue |
| readAheadCacheExpireDuration | 1000 | millisecond | Read-ahead cache expiration time |
| readAheadCacheSizeThresholdRate | 0.3 | | The maximum heap space occupied by the read-ahead cache |
| Configuration | Default value | Unit | Function |
| ------------------------------- |---------------------------------------------------------------| ----------- | ------------------------------------------------------------------------------- |
| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.MessageStoreExtend to use tiered storage |
| tieredMetadataServiceProvider | org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore | | Select your metadata provider |
| tieredBackendServiceProvider | org.apache.rocketmq.tieredstore.provider.PosixFileSegment | | Select your backend service provider |
| tieredStoreFilepath | | | Select the directory using for tiered storage, only for POSIX provider. |
| tieredStorageLevel | NOT_IN_DISK | | The options are DISABLE, NOT_IN_DISK, NOT_IN_MEM, FORCE |
| tieredStoreFileReservedTime | 72 | hour | Default topic TTL in tiered storage |
| tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer |
| tieredStoreGroupCommitSize | 33554432 | byte | The size of messages that trigger one batch transfer, 32M by default |
| tieredStoreMaxGroupCommitCount | 10000 | | The maximum number of messages waiting to be transfered per queue |
| readAheadCacheExpireDuration | 1000 | millisecond | Read-ahead cache expiration time |
| readAheadCacheSizeThresholdRate | 0.3 | | The maximum heap space occupied by the read-ahead cache |

## Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore.common;
package org.apache.rocketmq.tieredstore;

import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class TieredMessageStoreConfig {
public class MessageStoreConfig {

private String brokerName = localHostName();
private String brokerClusterName = "DefaultCluster";
private TieredStorageLevel tieredStorageLevel = TieredStorageLevel.NOT_IN_DISK;
Expand Down Expand Up @@ -92,38 +93,40 @@ public boolean check(TieredStorageLevel targetLevel) {
private int tieredStoreIndexFileMaxIndexNum = 5000000 * 4;
// index file will force rolling to next file after idle specified time, default is 3h
private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;
private String tieredMetadataServiceProvider = "org.apache.rocketmq.tieredstore.metadata.TieredMetadataManager";
private String tieredBackendServiceProvider = "org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment";
private String tieredMetadataServiceProvider = "org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore";
private String tieredBackendServiceProvider = "org.apache.rocketmq.tieredstore.provider.MemoryFileSegment";
// file reserved time, default is 72 hour
private int tieredStoreFileReservedTime = 72;
// time of forcing commitLog to roll to next file, default is 24 hour
private int commitLogRollingInterval = 24;
// rolling will only happen if file segment size is larger than commitLogRollingMinimumSize, default is 128M
// rolling will only happen if file segment size is larger than commitcp b LogRollingMinimumSize, default is 128M
private int commitLogRollingMinimumSize = 128 * 1024 * 1024;
// default is 100, unit is millisecond
private int maxCommitJitter = 100;
// Cached message count larger than this value will trigger async commit. default is 1000
private int tieredStoreGroupCommitCount = 2500;
// Cached message size larger than this value will trigger async commit. default is 32M
private int tieredStoreGroupCommitSize = 32 * 1024 * 1024;
// Cached message count larger than this value will suspend append. default is 2000

private boolean tieredStoreGroupCommit = true;
private int tieredStoreGroupCommitTimeout = 30 * 1000;
// Cached message count larger than this value will trigger async commit. default is 4096
private int tieredStoreGroupCommitCount = 4 * 1024;
// Cached message size larger than this value will trigger async commit. default is 4M
private int tieredStoreGroupCommitSize = 4 * 1024 * 1024;
// Cached message count larger than this value will suspend append. default is 10000
private int tieredStoreMaxGroupCommitCount = 10000;
private int readAheadMinFactor = 2;
private int readAheadMaxFactor = 24;
private int readAheadBatchSizeFactorThreshold = 8;
private int readAheadMessageCountThreshold = 2048;
private int readAheadMessageSizeThreshold = 128 * 1024 * 1024;
private long readAheadCacheExpireDuration = 10 * 1000;
private long tieredStoreMaxFallBehindSize = 128 * 1024 * 1024;

private boolean readAheadCacheEnable = true;
private int readAheadMessageCountThreshold = 4096;
private int readAheadMessageSizeThreshold = 16 * 1024 * 1024;
private long readAheadCacheExpireDuration = 15 * 1000;
private double readAheadCacheSizeThresholdRate = 0.3;

private String tieredStoreFilePath = "";
private int tieredStoreMaxPendingLimit = 10000;
private boolean tieredStoreCrcCheckEnable = false;

private String tieredStoreFilePath = "";
private String objectStoreEndpoint = "";

private String objectStoreBucket = "";

private String objectStoreAccessKey = "";

private String objectStoreSecretKey = "";

public static String localHostName() {
Expand Down Expand Up @@ -279,6 +282,22 @@ public void setMaxCommitJitter(int maxCommitJitter) {
this.maxCommitJitter = maxCommitJitter;
}

public boolean isTieredStoreGroupCommit() {
return tieredStoreGroupCommit;
}

public void setTieredStoreGroupCommit(boolean tieredStoreGroupCommit) {
this.tieredStoreGroupCommit = tieredStoreGroupCommit;
}

public int getTieredStoreGroupCommitTimeout() {
return tieredStoreGroupCommitTimeout;
}

public void setTieredStoreGroupCommitTimeout(int tieredStoreGroupCommitTimeout) {
this.tieredStoreGroupCommitTimeout = tieredStoreGroupCommitTimeout;
}

public int getTieredStoreGroupCommitCount() {
return tieredStoreGroupCommitCount;
}
Expand All @@ -303,28 +322,20 @@ public void setTieredStoreMaxGroupCommitCount(int tieredStoreMaxGroupCommitCount
this.tieredStoreMaxGroupCommitCount = tieredStoreMaxGroupCommitCount;
}

public int getReadAheadMinFactor() {
return readAheadMinFactor;
}

public void setReadAheadMinFactor(int readAheadMinFactor) {
this.readAheadMinFactor = readAheadMinFactor;
public long getTieredStoreMaxFallBehindSize() {
return tieredStoreMaxFallBehindSize;
}

public int getReadAheadMaxFactor() {
return readAheadMaxFactor;
public void setTieredStoreMaxFallBehindSize(long tieredStoreMaxFallBehindSize) {
this.tieredStoreMaxFallBehindSize = tieredStoreMaxFallBehindSize;
}

public int getReadAheadBatchSizeFactorThreshold() {
return readAheadBatchSizeFactorThreshold;
public boolean isReadAheadCacheEnable() {
return readAheadCacheEnable;
}

public void setReadAheadBatchSizeFactorThreshold(int readAheadBatchSizeFactorThreshold) {
this.readAheadBatchSizeFactorThreshold = readAheadBatchSizeFactorThreshold;
}

public void setReadAheadMaxFactor(int readAheadMaxFactor) {
this.readAheadMaxFactor = readAheadMaxFactor;
public void setReadAheadCacheEnable(boolean readAheadCacheEnable) {
this.readAheadCacheEnable = readAheadCacheEnable;
}

public int getReadAheadMessageCountThreshold() {
Expand Down Expand Up @@ -359,6 +370,22 @@ public void setReadAheadCacheSizeThresholdRate(double rate) {
this.readAheadCacheSizeThresholdRate = rate;
}

public int getTieredStoreMaxPendingLimit() {
return tieredStoreMaxPendingLimit;
}

public void setTieredStoreMaxPendingLimit(int tieredStoreMaxPendingLimit) {
this.tieredStoreMaxPendingLimit = tieredStoreMaxPendingLimit;
}

public boolean isTieredStoreCrcCheckEnable() {
return tieredStoreCrcCheckEnable;
}

public void setTieredStoreCrcCheckEnable(boolean tieredStoreCrcCheckEnable) {
this.tieredStoreCrcCheckEnable = tieredStoreCrcCheckEnable;
}

public String getTieredStoreFilePath() {
return tieredStoreFilePath;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ThreadUtils;

public class MessageStoreExecutor {

public final BlockingQueue<Runnable> bufferCommitThreadPoolQueue;
public final BlockingQueue<Runnable> bufferFetchThreadPoolQueue;
public final BlockingQueue<Runnable> fileRecyclingThreadPoolQueue;

public final ScheduledExecutorService commonExecutor;
public final ExecutorService bufferCommitExecutor;
public final ExecutorService bufferFetchExecutor;
public final ExecutorService fileRecyclingExecutor;

private static class SingletonHolder {
private static final MessageStoreExecutor INSTANCE = new MessageStoreExecutor();
}

public static MessageStoreExecutor getInstance() {
return SingletonHolder.INSTANCE;
}

public MessageStoreExecutor() {
this(10000);
}

public MessageStoreExecutor(int maxQueueCapacity) {

this.commonExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryImpl("TieredCommonExecutor_"));

this.bufferCommitThreadPoolQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
this.bufferCommitExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
this.bufferCommitThreadPoolQueue,
new ThreadFactoryImpl("BufferCommitExecutor_"));

this.bufferFetchThreadPoolQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
this.bufferFetchExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
this.bufferFetchThreadPoolQueue,
new ThreadFactoryImpl("BufferFetchExecutor_"));

this.fileRecyclingThreadPoolQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
this.fileRecyclingExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
Math.max(4, Runtime.getRuntime().availableProcessors()),
TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
this.fileRecyclingThreadPoolQueue,
new ThreadFactoryImpl("BufferFetchExecutor_"));
}

private void shutdownExecutor(ExecutorService executor) {
if (executor != null) {
executor.shutdown();
}
}

public void shutdown() {
this.shutdownExecutor(this.commonExecutor);
this.shutdownExecutor(this.bufferCommitExecutor);
this.shutdownExecutor(this.bufferFetchExecutor);
this.shutdownExecutor(this.fileRecyclingExecutor);
}
}
Loading

0 comments on commit 026a910

Please sign in to comment.