Skip to content

Commit

Permalink
feat(store): add s3 stream compaction config
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Nov 3, 2023
1 parent 95d846e commit fe02aa6
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ public class S3StreamConfig {
private long networkBaselineBandwidth = 0;
private int refillPeriodMs = 1000;

private int streamObjectCompactionIntervalMinutes = 60;
private long streamObjectCompactionMaxSizeBytes = 10737418240L;
private int streamObjectCompactionLivingTimeMinutes = 60;

private int walObjectCompactionInterval = 20;
private long walObjectCompactionCacheSize = 200 * 1024 * 1024;
private int walObjectCompactionUploadConcurrency = 8;
private long walObjectCompactionStreamSplitSize = 16 * 1024 * 1024;
private int walObjectCompactionForceSplitPeriod = 120;
private int walObjectCompactionMaxObjectNum = 500;

public String s3Endpoint() {
return s3Endpoint;
}
Expand Down Expand Up @@ -65,4 +76,40 @@ public long networkBaselineBandwidth() {
public int refillPeriodMs() {
return refillPeriodMs;
}

public int streamObjectCompactionIntervalMinutes() {
return streamObjectCompactionIntervalMinutes;
}

public long streamObjectCompactionMaxSizeBytes() {
return streamObjectCompactionMaxSizeBytes;
}

public int streamObjectCompactionLivingTimeMinutes() {
return streamObjectCompactionLivingTimeMinutes;
}

public int walObjectCompactionInterval() {
return walObjectCompactionInterval;
}

public long walObjectCompactionCacheSize() {
return walObjectCompactionCacheSize;
}

public int walObjectCompactionUploadConcurrency() {
return walObjectCompactionUploadConcurrency;
}

public long walObjectCompactionStreamSplitSize() {
return walObjectCompactionStreamSplitSize;
}

public int walObjectCompactionForceSplitPeriod() {
return walObjectCompactionForceSplitPeriod;
}

public int walObjectCompactionMaxObjectNum() {
return walObjectCompactionMaxObjectNum;
}
}
11 changes: 11 additions & 0 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.s3SecretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());

// Compaction config
config.s3StreamObjectCompactionIntervalMinutes(streamConfig.streamObjectCompactionIntervalMinutes());
config.s3StreamObjectCompactionMaxSizeBytes(streamConfig.streamObjectCompactionMaxSizeBytes());
config.s3StreamObjectCompactionLivingTimeMinutes(streamConfig.streamObjectCompactionLivingTimeMinutes());
config.s3WALObjectCompactionInterval(streamConfig.walObjectCompactionInterval());
config.s3WALObjectCompactionCacheSize(streamConfig.walObjectCompactionCacheSize());
config.s3WALObjectCompactionUploadConcurrency(streamConfig.walObjectCompactionUploadConcurrency());
config.s3WALObjectCompactionMaxObjectNum(streamConfig.walObjectCompactionMaxObjectNum());
config.s3WALObjectCompactionForceSplitPeriod(streamConfig.walObjectCompactionForceSplitPeriod());
config.s3WALObjectCompactionStreamSplitSize(streamConfig.walObjectCompactionStreamSplitSize());
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public static Operation decodeOperation(ByteBuffer buffer,
operationStreamId, snapshotStreamId, stateMachine,
resetConsumeOffsetOperation.operationTimestamp(), resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset());
}
default -> throw new IllegalStateException("Unexpected value: " + operationLogItem.operationType());
default ->
throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType());
}
}

Expand Down

0 comments on commit fe02aa6

Please sign in to comment.