Skip to content

Commit

Permalink
chore(stream): add network limit config for stream module (#549)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 2, 2023
1 parent c82e456 commit a4bece7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class S3StreamConfig {
private String s3AccessKey;
private String s3SecretKey;

// Max bandwidth in bytes per refillPeriodMs, default to 0 which means no limit.
private long networkBaselineBandwidth = 0;
private int refillPeriodMs = 1000;

public String s3Endpoint() {
return s3Endpoint;
}
Expand Down Expand Up @@ -53,4 +57,12 @@ public String s3AccessKey() {
public String s3SecretKey() {
return s3SecretKey;
}

public long networkBaselineBandwidth() {
return networkBaselineBandwidth;
}

public int refillPeriodMs() {
return refillPeriodMs;
}
}
10 changes: 6 additions & 4 deletions distribution/conf/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ innerAccessKey: ""
innerSecretKey: ""
s3Stream:
s3WALPath: "/tmp/s3rocketmq/wal"
s3Endpoint: "http://minio.hellocorp.test"
s3Bucket: "lzh"
s3Endpoint: "http://minio.local:9000"
s3Bucket: "bucket-name"
s3Region: "us-east-1"
s3ForcePathStyle: true
s3AccessKey: "kicaSWtNBPf7XCCsdW8Z"
s3SecretKey: "dgWxasHBEQBBXxAxBpFmp4VWIoQ0XyDU3gGdFnaT"
s3AccessKey: "access-key"
s3SecretKey: "secret-key"
networkBaselineBandwidth: 15
refillPeriodMs: 100
db:
url: "jdbc:mysql://mysql-server:3306/metadata"
userName: "root"
Expand Down
30 changes: 22 additions & 8 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,13 @@

public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
private final Config s3Config;
private final StreamClient streamClient;
private final Storage storage;
private final CompactionManager compactionManager;
private final ThreadPoolExecutor storeWorkingThreadPool;

public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, StoreMetadataService metadataService) {
this(storeConfig, streamConfig, metadataService, null, null);
}

public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, StoreMetadataService metadataService,
AsyncNetworkBandwidthLimiter networkInboundBucket, AsyncNetworkBandwidthLimiter networkOutboundBucket) {
this.s3Config = configFrom(streamConfig);
Config s3Config = configFrom(streamConfig);

// Build meta service and related manager
StreamManager streamManager = new S3StreamManager(metadataService);
Expand All @@ -85,7 +79,25 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey());
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundBucket, networkOutboundBucket);
AsyncNetworkBandwidthLimiter networkInboundLimiter = null;
AsyncNetworkBandwidthLimiter networkOutboundLimiter = null;

if (s3Config.networkBaselineBandwidth() > 0 && s3Config.refillPeriodMs() > 0) {
networkInboundLimiter = new AsyncNetworkBandwidthLimiter(
AsyncNetworkBandwidthLimiter.Type.INBOUND,
s3Config.networkBaselineBandwidth(),
s3Config.refillPeriodMs(),
s3Config.networkBaselineBandwidth()
);
networkOutboundLimiter = new AsyncNetworkBandwidthLimiter(
AsyncNetworkBandwidthLimiter.Type.OUTBOUND,
s3Config.networkBaselineBandwidth(),
s3Config.refillPeriodMs(),
s3Config.networkBaselineBandwidth()
);
}

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
this.storeWorkingThreadPool = ThreadPoolMonitor.createAndMonitor(
storeConfig.workingThreadPoolNums(),
storeConfig.workingThreadQueueCapacity(),
Expand Down Expand Up @@ -205,6 +217,8 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.s3WALPath(streamConfig.s3WALPath());
config.s3AccessKey(streamConfig.s3AccessKey());
config.s3SecretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());
return config;
}
}

0 comments on commit a4bece7

Please sign in to comment.