From 16337cdb3cd8c187cb897ebd08acb882bbab3c7a Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 2 Nov 2023 14:48:13 +0800 Subject: [PATCH 1/4] feat(s3stream): optimize s3stream metrics name Signed-off-by: Shichao Nie --- .../s3/metrics/NoopS3StreamMetricsGroup.java | 6 +++--- .../stream/s3/metrics/S3StreamMetricsGroup.java | 6 +++--- .../s3/metrics/stats/ByteBufMetricsStats.java | 4 +--- .../s3/metrics/stats/NetworkMetricsStats.java | 16 ++++++++-------- .../s3/metrics/stats/OperationMetricsStats.java | 10 +++++----- .../s3/metrics/stats/S3ObjectMetricsStats.java | 9 +++------ 6 files changed, 23 insertions(+), 28 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java index 537255ecd..03d53c673 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java @@ -21,7 +21,7 @@ public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup { @Override - public Counter newCounter(String type, String name, Map tags) { + public Counter newCounter(String name, Map tags) { return new Counter() { @Override public void inc() { @@ -41,7 +41,7 @@ public long count() { } @Override - public Histogram newHistogram(String type, String name, Map tags) { + public Histogram newHistogram(String name, Map tags) { return new Histogram() { @Override public void update(long value) { @@ -61,7 +61,7 @@ public double mean() { } @Override - public void newGauge(String type, String name, Map tags, Gauge gauge) { + public void newGauge(String name, Map tags, Gauge gauge) { } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java index 2ac92e0f8..cedf24747 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java @@ -20,9 +20,9 @@ import java.util.Map; public interface S3StreamMetricsGroup { - Counter newCounter(String type, String name, Map tags); + Counter newCounter(String name, Map tags); - Histogram newHistogram(String type, String name, Map tags); + Histogram newHistogram(String name, Map tags); - void newGauge(String type, String name, Map tags, Gauge gauge); + void newGauge(String name, Map tags, Gauge gauge); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java index 03dc18720..f2b347a9d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java @@ -19,7 +19,6 @@ import com.automq.stream.s3.metrics.Histogram; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; -import com.automq.stream.s3.metrics.operations.S3MetricsType; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +29,7 @@ public class ByteBufMetricsStats { public static Histogram getHistogram(String source) { return SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> { Map tags = Map.of("source", k); - return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Storage.getName(), - S3MetricsType.S3Storage.getName() + "size", tags); + return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_stream_byte_buf_size", tags); }); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java index 8ce79ae09..ed68c3c05 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java @@ -27,19 +27,19 @@ public class NetworkMetricsStats { - public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(), - S3MetricsType.S3Network.getName() + "InboundUsage", Collections.emptyMap()); + public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() + .newCounter("network_inbound_usage", Collections.emptyMap()); - public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(), - S3MetricsType.S3Network.getName() + "OutboundUsage", Collections.emptyMap()); + public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() + .newCounter("network_outbound_usage", Collections.emptyMap()); public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) { - S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(), - S3MetricsType.S3Network.getName() + type.getName() + "AvailableBandwidth", Collections.emptyMap(), gauge); + String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase()); + S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge); } public static void registerNetworkLimiterQueueSize(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) { - S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(), - S3MetricsType.S3Network.getName() + type.getName() + "QueueSize", Collections.emptyMap(), gauge); + String metricName = String.format("network_%s_limiter_queue_size", type.getName().toLowerCase()); + S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java index 878337889..03c73ec55 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java @@ -38,11 +38,11 @@ public static class OperationMetrics { public final Histogram operationTime; public OperationMetrics(S3Operation s3Operation) { - Map tags = Map.of("operation", s3Operation.getName()); - operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter(s3Operation.getType().getName(), - s3Operation.getType().getName() + "OperationCount", tags); - operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(s3Operation.getType().getName(), - s3Operation.getType().getName() + "OperationTime", tags); + Map tags = Map.of( + "operation", s3Operation.getName(), + "type", s3Operation.getType().getName()); + operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count", tags); + operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java index 6ed739dc8..65266db28 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java @@ -29,16 +29,13 @@ public class S3ObjectMetricsStats { private static final Map S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>(); - public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Object.getName(), - S3MetricsType.S3Object.getName() + "Count", Collections.emptyMap()); - public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(), - S3MetricsType.S3Object.getName() + "Size", Collections.emptyMap()); + public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count", Collections.emptyMap()); + public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_size", Collections.emptyMap()); public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) { return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> { Map tags = Map.of("stage", stage.getName()); - return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(), - S3MetricsType.S3Object.getName() + "Time", tags); + return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags); }); } } From d52945bb0f124aabf37f99dee7b0b20261c842b4 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 2 Nov 2023 14:52:15 +0800 Subject: [PATCH 2/4] chore(s3stream): upgrade s3stream to 0.1.20-SNAPSHOT Signed-off-by: Shichao Nie --- pom.xml | 2 +- s3stream/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 11ed55e39..a1b7a318c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.0.1-jre 2.0.9 2.2 - 0.1.19-SNAPSHOT + 0.1.20-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index cea038e4d..18dab23d8 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.1.19-SNAPSHOT + 0.1.20-SNAPSHOT 5.5.0 5.10.0 From 9e5925b20844bd1032a5cd5ba72d4069878a289f Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 2 Nov 2023 15:05:16 +0800 Subject: [PATCH 3/4] chore(stream): refactor metrics builder Signed-off-by: SSpirits --- .../rocketmq/store/metrics/BaseStreamMetrics.java | 10 ++++------ .../rocketmq/store/metrics/StreamMetricsCounter.java | 7 +++---- .../rocketmq/store/metrics/StreamMetricsGauge.java | 4 ++-- .../store/metrics/StreamMetricsHistogram.java | 4 ++-- .../rocketmq/store/metrics/StreamMetricsManager.java | 12 ++++++------ 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java b/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java index bebeec395..cc3cb439a 100644 --- a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java +++ b/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java @@ -31,9 +31,9 @@ public class BaseStreamMetrics { protected final Map tags; protected final String metricsName; - protected BaseStreamMetrics(String type, String name, Map tags, + protected BaseStreamMetrics(String name, Map tags, Meter meter, Supplier attributesBuilderSupplier) { - this.metricsName = metricsName(type, name); + this.metricsName = metricsName(name); this.tags = tags; this.meter = meter; this.attributesBuilderSupplier = attributesBuilderSupplier; @@ -51,10 +51,8 @@ protected AttributesBuilder newAttributesBuilder() { return builder; } - protected String metricsName(String type, String name) { - name = name.replace(type, ""); + protected String metricsName(String name) { name = name.toLowerCase(); - type = type.toLowerCase(); - return STREAM_METRICS_PREFIX + type + "_" + name; + return STREAM_METRICS_PREFIX + "_" + name; } } diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java index 30f35faf7..988123845 100644 --- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java +++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java @@ -25,13 +25,12 @@ import java.util.function.Supplier; public class StreamMetricsCounter extends BaseStreamMetrics implements Counter { - private static final String STREAM_METRICS_COUNTER_SUFFIX = "_total"; private final LongCounter counter; - public StreamMetricsCounter(String type, String name, Map tags, + public StreamMetricsCounter(String name, Map tags, Meter meter, Supplier attributesBuilderSupplier) { - super(type, name, tags, meter, attributesBuilderSupplier); - this.counter = this.meter.counterBuilder(this.metricsName + STREAM_METRICS_COUNTER_SUFFIX) + super(name, tags, meter, attributesBuilderSupplier); + this.counter = this.meter.counterBuilder(this.metricsName) .build(); } diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java index d918d50d5..e0c89b37f 100644 --- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java +++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java @@ -25,9 +25,9 @@ public class StreamMetricsGauge extends BaseStreamMetrics { - public StreamMetricsGauge(String type, String name, Map tags, + public StreamMetricsGauge(String name, Map tags, Meter meter, Supplier attributesBuilderSupplier, Gauge gauge) { - super(type, name, tags, meter, attributesBuilderSupplier); + super(name, tags, meter, attributesBuilderSupplier); this.meter.gaugeBuilder(this.metricsName) .ofLongs() .buildWithCallback(measurement -> measurement.record(gauge.value(), newAttributesBuilder().build())); diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java index 7b90228fc..5013ec54c 100644 --- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java +++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java @@ -27,9 +27,9 @@ public class StreamMetricsHistogram extends BaseStreamMetrics implements Histogram { private final LongHistogram histogram; - public StreamMetricsHistogram(String type, String name, Map tags, + public StreamMetricsHistogram(String name, Map tags, Meter meter, Supplier attributesBuilderSupplier) { - super(type, name, tags, meter, attributesBuilderSupplier); + super(name, tags, meter, attributesBuilderSupplier); histogram = this.meter.histogramBuilder(this.metricsName) .ofLongs() .build(); diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java index e3d28664e..14cdcde90 100644 --- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java +++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java @@ -48,17 +48,17 @@ public void initDynamicMetrics(Meter meter) { } @Override - public Counter newCounter(String type, String name, Map tags) { - return new StreamMetricsCounter(type, name, tags, meter, attributesBuilderSupplier); + public Counter newCounter(String name, Map tags) { + return new StreamMetricsCounter(name, tags, meter, attributesBuilderSupplier); } @Override - public Histogram newHistogram(String type, String name, Map tags) { - return new StreamMetricsHistogram(type, name, tags, meter, attributesBuilderSupplier); + public Histogram newHistogram(String name, Map tags) { + return new StreamMetricsHistogram(name, tags, meter, attributesBuilderSupplier); } @Override - public void newGauge(String type, String name, Map tags, Gauge gauge) { - new StreamMetricsGauge(type, name, tags, meter, attributesBuilderSupplier, gauge); + public void newGauge(String name, Map tags, Gauge gauge) { + new StreamMetricsGauge(name, tags, meter, attributesBuilderSupplier, gauge); } } From bf693413be359b36eb787b83c30ea08d5780ceee Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 2 Nov 2023 15:40:01 +0800 Subject: [PATCH 4/4] feat(s3stream): counter metrics optimization 1. add _total suffix to metric name for counter type 2. add metrics to record object size for read operation Signed-off-by: Shichao Nie --- .../src/main/java/com/automq/stream/s3/metrics/Counter.java | 1 + .../automq/stream/s3/metrics/stats/NetworkMetricsStats.java | 5 ++--- .../stream/s3/metrics/stats/OperationMetricsStats.java | 2 +- .../stream/s3/metrics/stats/S3ObjectMetricsStats.java | 6 +++--- .../com/automq/stream/s3/operator/DefaultS3Operator.java | 5 ++++- .../java/com/automq/stream/s3/operator/MultiPartWriter.java | 2 +- 6 files changed, 12 insertions(+), 9 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java index 338ecfb0a..fc09f9d14 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.metrics; public interface Counter { + String SUFFIX = "_total"; void inc(); void inc(long n); long count(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java index ed68c3c05..9db95a188 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java @@ -20,7 +20,6 @@ import com.automq.stream.s3.metrics.Counter; import com.automq.stream.s3.metrics.Gauge; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; -import com.automq.stream.s3.metrics.operations.S3MetricsType; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import java.util.Collections; @@ -28,10 +27,10 @@ public class NetworkMetricsStats { public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() - .newCounter("network_inbound_usage", Collections.emptyMap()); + .newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap()); public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() - .newCounter("network_outbound_usage", Collections.emptyMap()); + .newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap()); public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) { String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase()); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java index 03c73ec55..86138117e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java @@ -41,7 +41,7 @@ public OperationMetrics(S3Operation s3Operation) { Map tags = Map.of( "operation", s3Operation.getName(), "type", s3Operation.getType().getName()); - operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count", tags); + operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags); operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java index 65266db28..8a4d72a27 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java @@ -20,7 +20,6 @@ import com.automq.stream.s3.metrics.Counter; import com.automq.stream.s3.metrics.Histogram; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; -import com.automq.stream.s3.metrics.operations.S3MetricsType; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import java.util.Collections; @@ -29,8 +28,9 @@ public class S3ObjectMetricsStats { private static final Map S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>(); - public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count", Collections.emptyMap()); - public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_size", Collections.emptyMap()); + public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap()); + public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap()); + public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap()); public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) { return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 69c036555..ed63f43a6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -21,6 +21,7 @@ import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.stats.OperationMetricsStats; +import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.utils.FutureUtil; @@ -249,7 +250,9 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc(); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed()); - ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1), "merge_read"); + long size = end - start + 1; + S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size); + ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read"); responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf)); }) .exceptionally(ex -> { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 8c3e662d5..6b0e74edb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -146,7 +146,7 @@ public CompletableFuture close() { closeCf.whenComplete((nil, ex) -> { S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed()); S3ObjectMetricsStats.S3_OBJECT_COUNT.inc(); - S3ObjectMetricsStats.S3_OBJECT_SIZE.update(totalWriteSize.get()); + S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get()); }); return closeCf; }