Skip to content

Commit

Permalink
feat(s3stream): optimize s3stream metrics name (#545)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
Signed-off-by: SSpirits <[email protected]>
Co-authored-by: SSpirits <[email protected]>
  • Loading branch information
SCNieh and ShadowySpirits authored Nov 2, 2023
1 parent 6ceeed3 commit 8e769fc
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.19-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.20-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.19-SNAPSHOT</version>
<version>0.1.20-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.stream.s3.metrics;

public interface Counter {
String SUFFIX = "_total";
void inc();
void inc(long n);
long count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup {
@Override
public Counter newCounter(String type, String name, Map<String, String> tags) {
public Counter newCounter(String name, Map<String, String> tags) {
return new Counter() {
@Override
public void inc() {
Expand All @@ -41,7 +41,7 @@ public long count() {
}

@Override
public Histogram newHistogram(String type, String name, Map<String, String> tags) {
public Histogram newHistogram(String name, Map<String, String> tags) {
return new Histogram() {
@Override
public void update(long value) {
Expand All @@ -61,7 +61,7 @@ public double mean() {
}

@Override
public void newGauge(String type, String name, Map<String, String> tags, Gauge gauge) {
public void newGauge(String name, Map<String, String> tags, Gauge gauge) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.util.Map;

public interface S3StreamMetricsGroup {
Counter newCounter(String type, String name, Map<String, String> tags);
Counter newCounter(String name, Map<String, String> tags);

Histogram newHistogram(String type, String name, Map<String, String> tags);
Histogram newHistogram(String name, Map<String, String> tags);

void newGauge(String type, String name, Map<String, String> tags, Gauge gauge);
void newGauge(String name, Map<String, String> tags, Gauge gauge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -30,8 +29,7 @@ public class ByteBufMetricsStats {
public static Histogram getHistogram(String source) {
return SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> {
Map<String, String> tags = Map.of("source", k);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Storage.getName(),
S3MetricsType.S3Storage.getName() + "size", tags);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_stream_byte_buf_size", tags);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,25 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Gauge;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;

import java.util.Collections;

public class NetworkMetricsStats {

public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + "InboundUsage", Collections.emptyMap());
public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());

public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + "OutboundUsage", Collections.emptyMap());
public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());

public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + type.getName() + "AvailableBandwidth", Collections.emptyMap(), gauge);
String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase());
S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}

public static void registerNetworkLimiterQueueSize(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + type.getName() + "QueueSize", Collections.emptyMap(), gauge);
String metricName = String.format("network_%s_limiter_queue_size", type.getName().toLowerCase());
S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public static class OperationMetrics {
public final Histogram operationTime;

public OperationMetrics(S3Operation s3Operation) {
Map<String, String> tags = Map.of("operation", s3Operation.getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter(s3Operation.getType().getName(),
s3Operation.getType().getName() + "OperationCount", tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(s3Operation.getType().getName(),
s3Operation.getType().getName() + "OperationTime", tags);
Map<String, String> tags = Map.of(
"operation", s3Operation.getName(),
"type", s3Operation.getType().getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;

import java.util.Collections;
Expand All @@ -29,16 +28,14 @@

public class S3ObjectMetricsStats {
private static final Map<String, Histogram> S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>();
public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Count", Collections.emptyMap());
public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Size", Collections.emptyMap());
public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());

public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map<String, String> tags = Map.of("stage", stage.getName());
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Time", tags);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
Expand Down Expand Up @@ -249,7 +250,9 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
.thenAccept(responsePublisher -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1), "merge_read");
long size = end - start + 1;
S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size);
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
S3ObjectMetricsStats.S3_OBJECT_SIZE.update(totalWriteSize.get());
S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get());
});
return closeCf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class BaseStreamMetrics {
protected final Map<String, String> tags;
protected final String metricsName;

protected BaseStreamMetrics(String type, String name, Map<String, String> tags,
protected BaseStreamMetrics(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
this.metricsName = metricsName(type, name);
this.metricsName = metricsName(name);
this.tags = tags;
this.meter = meter;
this.attributesBuilderSupplier = attributesBuilderSupplier;
Expand All @@ -51,10 +51,8 @@ protected AttributesBuilder newAttributesBuilder() {
return builder;
}

protected String metricsName(String type, String name) {
name = name.replace(type, "");
protected String metricsName(String name) {
name = name.toLowerCase();
type = type.toLowerCase();
return STREAM_METRICS_PREFIX + type + "_" + name;
return STREAM_METRICS_PREFIX + "_" + name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import java.util.function.Supplier;

public class StreamMetricsCounter extends BaseStreamMetrics implements Counter {
private static final String STREAM_METRICS_COUNTER_SUFFIX = "_total";
private final LongCounter counter;

public StreamMetricsCounter(String type, String name, Map<String, String> tags,
public StreamMetricsCounter(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
super(type, name, tags, meter, attributesBuilderSupplier);
this.counter = this.meter.counterBuilder(this.metricsName + STREAM_METRICS_COUNTER_SUFFIX)
super(name, tags, meter, attributesBuilderSupplier);
this.counter = this.meter.counterBuilder(this.metricsName)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

public class StreamMetricsGauge extends BaseStreamMetrics {

public StreamMetricsGauge(String type, String name, Map<String, String> tags,
public StreamMetricsGauge(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, Gauge gauge) {
super(type, name, tags, meter, attributesBuilderSupplier);
super(name, tags, meter, attributesBuilderSupplier);
this.meter.gaugeBuilder(this.metricsName)
.ofLongs()
.buildWithCallback(measurement -> measurement.record(gauge.value(), newAttributesBuilder().build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
public class StreamMetricsHistogram extends BaseStreamMetrics implements Histogram {
private final LongHistogram histogram;

public StreamMetricsHistogram(String type, String name, Map<String, String> tags,
public StreamMetricsHistogram(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
super(type, name, tags, meter, attributesBuilderSupplier);
super(name, tags, meter, attributesBuilderSupplier);
histogram = this.meter.histogramBuilder(this.metricsName)
.ofLongs()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ public void initDynamicMetrics(Meter meter) {
}

@Override
public Counter newCounter(String type, String name, Map<String, String> tags) {
return new StreamMetricsCounter(type, name, tags, meter, attributesBuilderSupplier);
public Counter newCounter(String name, Map<String, String> tags) {
return new StreamMetricsCounter(name, tags, meter, attributesBuilderSupplier);
}

@Override
public Histogram newHistogram(String type, String name, Map<String, String> tags) {
return new StreamMetricsHistogram(type, name, tags, meter, attributesBuilderSupplier);
public Histogram newHistogram(String name, Map<String, String> tags) {
return new StreamMetricsHistogram(name, tags, meter, attributesBuilderSupplier);
}

@Override
public void newGauge(String type, String name, Map<String, String> tags, Gauge gauge) {
new StreamMetricsGauge(type, name, tags, meter, attributesBuilderSupplier, gauge);
public void newGauge(String name, Map<String, String> tags, Gauge gauge) {
new StreamMetricsGauge(name, tags, meter, attributesBuilderSupplier, gauge);
}
}

0 comments on commit 8e769fc

Please sign in to comment.