diff --git a/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java index e67a689cb..231669464 100644 --- a/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java +++ b/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java @@ -30,6 +30,17 @@ public class S3StreamConfig { private long networkBaselineBandwidth = 0; private int refillPeriodMs = 1000; + private int streamObjectCompactionIntervalMinutes = 60; + private long streamObjectCompactionMaxSizeBytes = 10737418240L; + private int streamObjectCompactionLivingTimeMinutes = 60; + + private int walObjectCompactionInterval = 20; + private long walObjectCompactionCacheSize = 200 * 1024 * 1024; + private int walObjectCompactionUploadConcurrency = 8; + private long walObjectCompactionStreamSplitSize = 16 * 1024 * 1024; + private int walObjectCompactionForceSplitPeriod = 120; + private int walObjectCompactionMaxObjectNum = 500; + public String s3Endpoint() { return s3Endpoint; } @@ -65,4 +76,40 @@ public long networkBaselineBandwidth() { public int refillPeriodMs() { return refillPeriodMs; } + + public int streamObjectCompactionIntervalMinutes() { + return streamObjectCompactionIntervalMinutes; + } + + public long streamObjectCompactionMaxSizeBytes() { + return streamObjectCompactionMaxSizeBytes; + } + + public int streamObjectCompactionLivingTimeMinutes() { + return streamObjectCompactionLivingTimeMinutes; + } + + public int walObjectCompactionInterval() { + return walObjectCompactionInterval; + } + + public long walObjectCompactionCacheSize() { + return walObjectCompactionCacheSize; + } + + public int walObjectCompactionUploadConcurrency() { + return walObjectCompactionUploadConcurrency; + } + + public long walObjectCompactionStreamSplitSize() { + return walObjectCompactionStreamSplitSize; + } + + public int walObjectCompactionForceSplitPeriod() { + return walObjectCompactionForceSplitPeriod; + } + + public int walObjectCompactionMaxObjectNum() { + return walObjectCompactionMaxObjectNum; + } } diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java index 08f6a8b38..d67cf9529 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java @@ -219,6 +219,17 @@ private Config configFrom(S3StreamConfig streamConfig) { config.s3SecretKey(streamConfig.s3SecretKey()); config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth()); config.refillPeriodMs(streamConfig.refillPeriodMs()); + + // Compaction config + config.s3StreamObjectCompactionIntervalMinutes(streamConfig.streamObjectCompactionIntervalMinutes()); + config.s3StreamObjectCompactionMaxSizeBytes(streamConfig.streamObjectCompactionMaxSizeBytes()); + config.s3StreamObjectCompactionLivingTimeMinutes(streamConfig.streamObjectCompactionLivingTimeMinutes()); + config.s3WALObjectCompactionInterval(streamConfig.walObjectCompactionInterval()); + config.s3WALObjectCompactionCacheSize(streamConfig.walObjectCompactionCacheSize()); + config.s3WALObjectCompactionUploadConcurrency(streamConfig.walObjectCompactionUploadConcurrency()); + config.s3WALObjectCompactionMaxObjectNum(streamConfig.walObjectCompactionMaxObjectNum()); + config.s3WALObjectCompactionForceSplitPeriod(streamConfig.walObjectCompactionForceSplitPeriod()); + config.s3WALObjectCompactionStreamSplitSize(streamConfig.walObjectCompactionStreamSplitSize()); return config; } } diff --git a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java index 1e18b720a..13e99fb6b 100644 --- a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java +++ b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java @@ -167,7 +167,8 @@ public static Operation decodeOperation(ByteBuffer buffer, operationStreamId, snapshotStreamId, stateMachine, resetConsumeOffsetOperation.operationTimestamp(), resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset()); } - default -> throw new IllegalStateException("Unexpected value: " + operationLogItem.operationType()); + default -> + throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType()); } }