Skip to content

Commit

Permalink
feat(s3stream): add metrics for read ahead latency and r/w queued time
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Dec 21, 2023
1 parent a27786d commit f90cc00
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -125,6 +127,7 @@ public CompletableFuture<List<StreamRecordBatch>> syncReadAhead(long streamId, l
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true);
});
}

Expand Down Expand Up @@ -279,6 +282,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public static void setNamespace(String namespace) {
ObjectUtils.namespace = namespace;
}

public static void main(String[] args) {
System.out.printf("%s%n", genKey(0, 11154));
}

public static String genKey(int version, long objectId) {
if (namespace.isEmpty()) {
throw new IllegalStateException("NAMESPACE is not set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_OUTBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_outbound_available_bandwidth";
public static final String NETWORK_INBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_inbound_limiter_queue_size";
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size";
public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time";
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time";
public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
public static final String WAL_START_OFFSET = "wal_start_offset";
Expand All @@ -96,9 +98,9 @@ public class S3StreamMetricsConstant {
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total";
public static final AttributeKey<String> LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
public static final AttributeKey<String> LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
public static final AttributeKey<String> LABEL_OBJECT_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_CACHE_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source");
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class S3StreamMetricsManager {
private static ObservableLongGauge networkOutboundAvailableBandwidth = new NoopObservableLongGauge();
private static ObservableLongGauge networkInboundLimiterQueueSize = new NoopObservableLongGauge();
private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge();
private static LongHistogram networkInboundLimiterQueueTime = new NoopLongHistogram();
private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram();
private static LongHistogram allocateByteBufSize = new NoopLongHistogram();
private static LongHistogram readAheadSize = new NoopLongHistogram();
private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge();
Expand Down Expand Up @@ -140,6 +142,16 @@ public static void initMetrics(Meter meter, String prefix) {
.setDescription("Network outbound limiter queue size")
.ofLongs()
.buildWithCallback(result -> result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), newAttributesBuilder().build()));
networkInboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME)
.setDescription("Network inbound limiter queue time")
.setUnit("nanoseconds")
.ofLongs()
.build();
networkOutboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME)
.setDescription("Network outbound limiter queue time")
.setUnit("nanoseconds")
.ofLongs()
.build();
allocateByteBufSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME)
.setDescription("Allocate byte buf size")
.setUnit("bytes")
Expand Down Expand Up @@ -213,6 +225,7 @@ public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.T
}
}

//TODO: 各broker当前stream数量、各stream流量?、各broker的s3 object number, size
public static void registerDeltaWalOffsetSupplier(Supplier<Long> deltaWalStartOffsetSupplier,
Supplier<Long> deltaWalTrimmedOffsetSupplier) {
S3StreamMetricsManager.deltaWalStartOffsetSupplier = deltaWalStartOffsetSupplier;
Expand Down Expand Up @@ -264,7 +277,7 @@ public static void recordOperationLatency(long value, S3Operation operation, lon
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName());
if (operation == S3Operation.GET_OBJECT || operation == S3Operation.PUT_OBJECT || operation == S3Operation.UPLOAD_PART) {
attributesBuilder.put(S3StreamMetricsConstant.LABEL_OBJECT_SIZE_NAME, getObjectBucketLabel(size));
attributesBuilder.put(S3StreamMetricsConstant.LABEL_SIZE_NAME, getObjectBucketLabel(size));
}
operationLatency.record(value, attributesBuilder.build());
}
Expand All @@ -282,7 +295,16 @@ public static void recordReadCacheLatency(long value, S3Operation operation, boo
Attributes attributes = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
.put(S3StreamMetricsConstant.LABEL_CACHE_STATUS, isCacheHit ? "hit" : "miss")
.put(S3StreamMetricsConstant.LABEL_STATUS, isCacheHit ? "hit" : "miss")
.build();
operationLatency.record(value, attributes);
}

public static void recordReadAheadLatency(long value, S3Operation operation, boolean isSync) {
Attributes attributes = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
.put(S3StreamMetricsConstant.LABEL_STATUS, isSync ? "sync" : "async")
.build();
operationLatency.record(value, attributes);
}
Expand Down Expand Up @@ -320,6 +342,13 @@ public static void recordNetworkOutboundUsage(long value) {
networkOutboundUsageInTotal.add(value, newAttributesBuilder().build());
}

public static void recordNetworkLimiterQueueTime(long value, AsyncNetworkBandwidthLimiter.Type type) {
switch (type) {
case INBOUND -> networkInboundLimiterQueueTime.record(value, newAttributesBuilder().build());
case OUTBOUND -> networkOutboundLimiterQueueTime.record(value, newAttributesBuilder().build());
}
}

public static void recordAllocateByteBufSize(long value, String source) {
Attributes attributes = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_ALLOCATE_BYTE_BUF_SOURCE, source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum S3Operation {
READ_STORAGE(S3MetricsType.S3Storage, "read"),
READ_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "read_log_cache"),
READ_STORAGE_BLOCK_CACHE(S3MetricsType.S3Storage, "read_block_cache"),
BLOCK_CACHE_READ_AHEAD(S3MetricsType.S3Storage, "read_ahead"),
/* S3 storage operations end */

/* S3 request operations start */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
return cf;
}
if (networkInboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.INBOUND);
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -315,7 +317,9 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
return retCf;
}
if (networkOutboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.OUTBOUND);
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand Down

0 comments on commit f90cc00

Please sign in to comment.