diff --git a/pom.xml b/pom.xml
index 11ed55e39..a1b7a318c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.0.1-jre
2.0.9
2.2
- 0.1.19-SNAPSHOT
+ 0.1.20-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index cea038e4d..18dab23d8 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.1.19-SNAPSHOT
+ 0.1.20-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java
index 338ecfb0a..fc09f9d14 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.metrics;
public interface Counter {
+ String SUFFIX = "_total";
void inc();
void inc(long n);
long count();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java
index 537255ecd..03d53c673 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java
@@ -21,7 +21,7 @@
public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup {
@Override
- public Counter newCounter(String type, String name, Map tags) {
+ public Counter newCounter(String name, Map tags) {
return new Counter() {
@Override
public void inc() {
@@ -41,7 +41,7 @@ public long count() {
}
@Override
- public Histogram newHistogram(String type, String name, Map tags) {
+ public Histogram newHistogram(String name, Map tags) {
return new Histogram() {
@Override
public void update(long value) {
@@ -61,7 +61,7 @@ public double mean() {
}
@Override
- public void newGauge(String type, String name, Map tags, Gauge gauge) {
+ public void newGauge(String name, Map tags, Gauge gauge) {
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java
index 2ac92e0f8..cedf24747 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java
@@ -20,9 +20,9 @@
import java.util.Map;
public interface S3StreamMetricsGroup {
- Counter newCounter(String type, String name, Map tags);
+ Counter newCounter(String name, Map tags);
- Histogram newHistogram(String type, String name, Map tags);
+ Histogram newHistogram(String name, Map tags);
- void newGauge(String type, String name, Map tags, Gauge gauge);
+ void newGauge(String name, Map tags, Gauge gauge);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java
index 03dc18720..f2b347a9d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java
@@ -19,7 +19,6 @@
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.metrics.operations.S3MetricsType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,8 +29,7 @@ public class ByteBufMetricsStats {
public static Histogram getHistogram(String source) {
return SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> {
Map tags = Map.of("source", k);
- return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Storage.getName(),
- S3MetricsType.S3Storage.getName() + "size", tags);
+ return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_stream_byte_buf_size", tags);
});
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java
index 8ce79ae09..9db95a188 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java
@@ -20,26 +20,25 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Gauge;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import java.util.Collections;
public class NetworkMetricsStats {
- public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
- S3MetricsType.S3Network.getName() + "InboundUsage", Collections.emptyMap());
+ public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
+ .newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());
- public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
- S3MetricsType.S3Network.getName() + "OutboundUsage", Collections.emptyMap());
+ public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
+ .newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());
public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
- S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
- S3MetricsType.S3Network.getName() + type.getName() + "AvailableBandwidth", Collections.emptyMap(), gauge);
+ String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase());
+ S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}
public static void registerNetworkLimiterQueueSize(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
- S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
- S3MetricsType.S3Network.getName() + type.getName() + "QueueSize", Collections.emptyMap(), gauge);
+ String metricName = String.format("network_%s_limiter_queue_size", type.getName().toLowerCase());
+ S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
index 878337889..86138117e 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
@@ -38,11 +38,11 @@ public static class OperationMetrics {
public final Histogram operationTime;
public OperationMetrics(S3Operation s3Operation) {
- Map tags = Map.of("operation", s3Operation.getName());
- operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter(s3Operation.getType().getName(),
- s3Operation.getType().getName() + "OperationCount", tags);
- operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(s3Operation.getType().getName(),
- s3Operation.getType().getName() + "OperationTime", tags);
+ Map tags = Map.of(
+ "operation", s3Operation.getName(),
+ "type", s3Operation.getType().getName());
+ operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
+ operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
index 6ed739dc8..8a4d72a27 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
@@ -20,7 +20,6 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import java.util.Collections;
@@ -29,16 +28,14 @@
public class S3ObjectMetricsStats {
private static final Map S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>();
- public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Object.getName(),
- S3MetricsType.S3Object.getName() + "Count", Collections.emptyMap());
- public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
- S3MetricsType.S3Object.getName() + "Size", Collections.emptyMap());
+ public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
+ public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
+ public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());
public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map tags = Map.of("stage", stage.getName());
- return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
- S3MetricsType.S3Object.getName() + "Time", tags);
+ return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
});
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index 69c036555..ed63f43a6 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -21,6 +21,7 @@
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
+import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
@@ -249,7 +250,9 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
- ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1), "merge_read");
+ long size = end - start + 1;
+ S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size);
+ ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
.exceptionally(ex -> {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
index 8c3e662d5..6b0e74edb 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
@@ -146,7 +146,7 @@ public CompletableFuture close() {
closeCf.whenComplete((nil, ex) -> {
S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
- S3ObjectMetricsStats.S3_OBJECT_SIZE.update(totalWriteSize.get());
+ S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get());
});
return closeCf;
}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java b/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java
index bebeec395..cc3cb439a 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java
@@ -31,9 +31,9 @@ public class BaseStreamMetrics {
protected final Map tags;
protected final String metricsName;
- protected BaseStreamMetrics(String type, String name, Map tags,
+ protected BaseStreamMetrics(String name, Map tags,
Meter meter, Supplier attributesBuilderSupplier) {
- this.metricsName = metricsName(type, name);
+ this.metricsName = metricsName(name);
this.tags = tags;
this.meter = meter;
this.attributesBuilderSupplier = attributesBuilderSupplier;
@@ -51,10 +51,8 @@ protected AttributesBuilder newAttributesBuilder() {
return builder;
}
- protected String metricsName(String type, String name) {
- name = name.replace(type, "");
+ protected String metricsName(String name) {
name = name.toLowerCase();
- type = type.toLowerCase();
- return STREAM_METRICS_PREFIX + type + "_" + name;
+ return STREAM_METRICS_PREFIX + "_" + name;
}
}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java
index 30f35faf7..988123845 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java
@@ -25,13 +25,12 @@
import java.util.function.Supplier;
public class StreamMetricsCounter extends BaseStreamMetrics implements Counter {
- private static final String STREAM_METRICS_COUNTER_SUFFIX = "_total";
private final LongCounter counter;
- public StreamMetricsCounter(String type, String name, Map tags,
+ public StreamMetricsCounter(String name, Map tags,
Meter meter, Supplier attributesBuilderSupplier) {
- super(type, name, tags, meter, attributesBuilderSupplier);
- this.counter = this.meter.counterBuilder(this.metricsName + STREAM_METRICS_COUNTER_SUFFIX)
+ super(name, tags, meter, attributesBuilderSupplier);
+ this.counter = this.meter.counterBuilder(this.metricsName)
.build();
}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java
index d918d50d5..e0c89b37f 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java
@@ -25,9 +25,9 @@
public class StreamMetricsGauge extends BaseStreamMetrics {
- public StreamMetricsGauge(String type, String name, Map tags,
+ public StreamMetricsGauge(String name, Map tags,
Meter meter, Supplier attributesBuilderSupplier, Gauge gauge) {
- super(type, name, tags, meter, attributesBuilderSupplier);
+ super(name, tags, meter, attributesBuilderSupplier);
this.meter.gaugeBuilder(this.metricsName)
.ofLongs()
.buildWithCallback(measurement -> measurement.record(gauge.value(), newAttributesBuilder().build()));
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java
index 7b90228fc..5013ec54c 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java
@@ -27,9 +27,9 @@
public class StreamMetricsHistogram extends BaseStreamMetrics implements Histogram {
private final LongHistogram histogram;
- public StreamMetricsHistogram(String type, String name, Map tags,
+ public StreamMetricsHistogram(String name, Map tags,
Meter meter, Supplier attributesBuilderSupplier) {
- super(type, name, tags, meter, attributesBuilderSupplier);
+ super(name, tags, meter, attributesBuilderSupplier);
histogram = this.meter.histogramBuilder(this.metricsName)
.ofLongs()
.build();
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
index e3d28664e..14cdcde90 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
@@ -48,17 +48,17 @@ public void initDynamicMetrics(Meter meter) {
}
@Override
- public Counter newCounter(String type, String name, Map tags) {
- return new StreamMetricsCounter(type, name, tags, meter, attributesBuilderSupplier);
+ public Counter newCounter(String name, Map tags) {
+ return new StreamMetricsCounter(name, tags, meter, attributesBuilderSupplier);
}
@Override
- public Histogram newHistogram(String type, String name, Map tags) {
- return new StreamMetricsHistogram(type, name, tags, meter, attributesBuilderSupplier);
+ public Histogram newHistogram(String name, Map tags) {
+ return new StreamMetricsHistogram(name, tags, meter, attributesBuilderSupplier);
}
@Override
- public void newGauge(String type, String name, Map tags, Gauge gauge) {
- new StreamMetricsGauge(type, name, tags, meter, attributesBuilderSupplier, gauge);
+ public void newGauge(String name, Map tags, Gauge gauge) {
+ new StreamMetricsGauge(name, tags, meter, attributesBuilderSupplier, gauge);
}
}