Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): optimize s3stream metrics name #545

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.19-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.20-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.19-SNAPSHOT</version>
<version>0.1.20-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.stream.s3.metrics;

public interface Counter {
String SUFFIX = "_total";
void inc();
void inc(long n);
long count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup {
@Override
public Counter newCounter(String type, String name, Map<String, String> tags) {
public Counter newCounter(String name, Map<String, String> tags) {
return new Counter() {
@Override
public void inc() {
Expand All @@ -41,7 +41,7 @@ public long count() {
}

@Override
public Histogram newHistogram(String type, String name, Map<String, String> tags) {
public Histogram newHistogram(String name, Map<String, String> tags) {
return new Histogram() {
@Override
public void update(long value) {
Expand All @@ -61,7 +61,7 @@ public double mean() {
}

@Override
public void newGauge(String type, String name, Map<String, String> tags, Gauge gauge) {
public void newGauge(String name, Map<String, String> tags, Gauge gauge) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.util.Map;

public interface S3StreamMetricsGroup {
Counter newCounter(String type, String name, Map<String, String> tags);
Counter newCounter(String name, Map<String, String> tags);

Histogram newHistogram(String type, String name, Map<String, String> tags);
Histogram newHistogram(String name, Map<String, String> tags);

void newGauge(String type, String name, Map<String, String> tags, Gauge gauge);
void newGauge(String name, Map<String, String> tags, Gauge gauge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -30,8 +29,7 @@ public class ByteBufMetricsStats {
public static Histogram getHistogram(String source) {
return SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> {
Map<String, String> tags = Map.of("source", k);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Storage.getName(),
S3MetricsType.S3Storage.getName() + "size", tags);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_stream_byte_buf_size", tags);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,25 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Gauge;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;

import java.util.Collections;

public class NetworkMetricsStats {

public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + "InboundUsage", Collections.emptyMap());
public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());

public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + "OutboundUsage", Collections.emptyMap());
public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());

public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + type.getName() + "AvailableBandwidth", Collections.emptyMap(), gauge);
String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase());
S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}

public static void registerNetworkLimiterQueueSize(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
S3StreamMetricsRegistry.getMetricsGroup().newGauge(S3MetricsType.S3Network.getName(),
S3MetricsType.S3Network.getName() + type.getName() + "QueueSize", Collections.emptyMap(), gauge);
String metricName = String.format("network_%s_limiter_queue_size", type.getName().toLowerCase());
S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public static class OperationMetrics {
public final Histogram operationTime;

public OperationMetrics(S3Operation s3Operation) {
Map<String, String> tags = Map.of("operation", s3Operation.getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter(s3Operation.getType().getName(),
s3Operation.getType().getName() + "OperationCount", tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(s3Operation.getType().getName(),
s3Operation.getType().getName() + "OperationTime", tags);
Map<String, String> tags = Map.of(
"operation", s3Operation.getName(),
"type", s3Operation.getType().getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;

import java.util.Collections;
Expand All @@ -29,16 +28,14 @@

public class S3ObjectMetricsStats {
private static final Map<String, Histogram> S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>();
public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Count", Collections.emptyMap());
public static final Histogram S3_OBJECT_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Size", Collections.emptyMap());
public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());

public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map<String, String> tags = Map.of("stage", stage.getName());
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Object.getName(),
S3MetricsType.S3Object.getName() + "Time", tags);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
Expand Down Expand Up @@ -249,7 +250,9 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
.thenAccept(responsePublisher -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1), "merge_read");
long size = end - start + 1;
S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size);
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
S3ObjectMetricsStats.S3_OBJECT_SIZE.update(totalWriteSize.get());
S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get());
});
return closeCf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class BaseStreamMetrics {
protected final Map<String, String> tags;
protected final String metricsName;

protected BaseStreamMetrics(String type, String name, Map<String, String> tags,
protected BaseStreamMetrics(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
this.metricsName = metricsName(type, name);
this.metricsName = metricsName(name);
this.tags = tags;
this.meter = meter;
this.attributesBuilderSupplier = attributesBuilderSupplier;
Expand All @@ -51,10 +51,8 @@ protected AttributesBuilder newAttributesBuilder() {
return builder;
}

protected String metricsName(String type, String name) {
name = name.replace(type, "");
protected String metricsName(String name) {
name = name.toLowerCase();
type = type.toLowerCase();
return STREAM_METRICS_PREFIX + type + "_" + name;
return STREAM_METRICS_PREFIX + "_" + name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import java.util.function.Supplier;

public class StreamMetricsCounter extends BaseStreamMetrics implements Counter {
private static final String STREAM_METRICS_COUNTER_SUFFIX = "_total";
private final LongCounter counter;

public StreamMetricsCounter(String type, String name, Map<String, String> tags,
public StreamMetricsCounter(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
super(type, name, tags, meter, attributesBuilderSupplier);
this.counter = this.meter.counterBuilder(this.metricsName + STREAM_METRICS_COUNTER_SUFFIX)
super(name, tags, meter, attributesBuilderSupplier);
this.counter = this.meter.counterBuilder(this.metricsName)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

public class StreamMetricsGauge extends BaseStreamMetrics {

public StreamMetricsGauge(String type, String name, Map<String, String> tags,
public StreamMetricsGauge(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, Gauge gauge) {
super(type, name, tags, meter, attributesBuilderSupplier);
super(name, tags, meter, attributesBuilderSupplier);
this.meter.gaugeBuilder(this.metricsName)
.ofLongs()
.buildWithCallback(measurement -> measurement.record(gauge.value(), newAttributesBuilder().build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
public class StreamMetricsHistogram extends BaseStreamMetrics implements Histogram {
private final LongHistogram histogram;

public StreamMetricsHistogram(String type, String name, Map<String, String> tags,
public StreamMetricsHistogram(String name, Map<String, String> tags,
Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
super(type, name, tags, meter, attributesBuilderSupplier);
super(name, tags, meter, attributesBuilderSupplier);
histogram = this.meter.histogramBuilder(this.metricsName)
.ofLongs()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ public void initDynamicMetrics(Meter meter) {
}

@Override
public Counter newCounter(String type, String name, Map<String, String> tags) {
return new StreamMetricsCounter(type, name, tags, meter, attributesBuilderSupplier);
public Counter newCounter(String name, Map<String, String> tags) {
return new StreamMetricsCounter(name, tags, meter, attributesBuilderSupplier);
}

@Override
public Histogram newHistogram(String type, String name, Map<String, String> tags) {
return new StreamMetricsHistogram(type, name, tags, meter, attributesBuilderSupplier);
public Histogram newHistogram(String name, Map<String, String> tags) {
return new StreamMetricsHistogram(name, tags, meter, attributesBuilderSupplier);
}

@Override
public void newGauge(String type, String name, Map<String, String> tags, Gauge gauge) {
new StreamMetricsGauge(type, name, tags, meter, attributesBuilderSupplier, gauge);
public void newGauge(String name, Map<String, String> tags, Gauge gauge) {
new StreamMetricsGauge(name, tags, meter, attributesBuilderSupplier, gauge);
}
}