From 5f28d228498e2bc2873ec8f91ebba40608c444dc Mon Sep 17 00:00:00 2001 From: Naireen Date: Tue, 31 Dec 2024 00:26:18 +0000 Subject: [PATCH] Add metadata on Kafka sink metrics --- .../beam/model/pipeline/v1/metrics.proto | 1 + .../core/metrics/MetricsContainerImpl.java | 7 +++ .../core/metrics/MonitoringInfoConstants.java | 2 + .../metrics/MetricsContainerImplTest.java | 48 +++++++++++++++++++ .../beam/sdk/io/kafka/KafkaMetrics.java | 29 ++++++++++- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 12 ++++- 6 files changed, 96 insertions(+), 3 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 33bb5ae729f8..3a7fed923dbe 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -447,6 +447,7 @@ message MonitoringInfo { SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }]; SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }]; SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }]; + PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }]; } // A set of key and value labels which define the scope of the metric. For diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 99cf98508505..8760b59c158e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -268,6 +268,13 @@ public MetricUpdates getUpdates() { .setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName()) .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName()); } + + // Based on namespace, add per worker metrics label to enable separate runner based sink based + // processing. + if (metricName.getNamespace().equals("BigQuerySink") + || metricName.getNamespace().equals("KafkaSink")) { + builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true"); + } return builder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 2bb935111d38..fcd5db38fddb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -107,6 +107,7 @@ public static final class Labels { public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID"; public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID"; public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME"; + public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC"; static { // Validate that compile time constants match the values stored in the protos. @@ -148,6 +149,7 @@ public static final class Labels { SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID))); checkArgument( SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME))); + checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC))); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 5b3d71f4873e..3feb21784b6e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -204,6 +204,54 @@ public void testMonitoringInfosArePopulatedForUserCounters() { assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); } + @Test + public void testMonitoringInfosLabelsArePopulatedForSinkCounter() { + MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); + CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1")); + CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2")); + CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3")); + + c1.inc(2L); + c2.inc(4L); + c3.inc(5L); + + SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder(); + builder1 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name1") + .setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true") + .setInt64SumValue(2) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder(); + builder2 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name2") + .setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true") + .setInt64SumValue(4) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + // Not in an supported namespace, so extra metadata isn't added. + SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder(); + builder3 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name3") + .setInt64SumValue(5) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + ArrayList actualMonitoringInfos = new ArrayList(); + for (MonitoringInfo mi : testObject.getMonitoringInfos()) { + actualMonitoringInfos.add(mi); + } + + assertThat( + actualMonitoringInfos, + containsInAnyOrder(builder1.build(), builder2.build(), builder3.build())); + } + @Test public void testMonitoringInfosArePopulatedForUserDistributions() { MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 2cb1b5a376e0..398e627b4fda 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,8 @@ public interface KafkaMetrics { void updateKafkaMetrics(); + void recordBacklogBytes(String topic, int partitionId, long backlog); + /** No-op implementation of {@code KafkaResults}. */ class NoOpKafkaMetrics implements KafkaMetrics { private NoOpKafkaMetrics() {} @@ -53,6 +56,9 @@ public void updateBacklogBytes(String topic, int partitionId, long elapsedTime) @Override public void updateKafkaMetrics() {} + @Override + public void recordBacklogBytes(String topic, int partitionId, long backlog) {}; + private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics(); static NoOpKafkaMetrics getInstance() { @@ -108,6 +114,8 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { } /** + * This is for tracking backlog bytes to be added to the Metric Container at a later time. + * * @param topicName topicName * @param partitionId partitionId for the topic Only included in the metric key if * 'supportsMetricsDeletion' is enabled. @@ -143,7 +151,7 @@ private void recordRpcLatencyMetrics() { } } - private void recordBacklogBytes() { + private void recordBacklogBytesInternal() { for (Map.Entry backlogs : perTopicPartitionBacklogs().entrySet()) { Gauge gauge = KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey())); @@ -151,6 +159,23 @@ private void recordBacklogBytes() { } } + /** + * This is for recording backlog bytes on the current thread. + * + * @param topicName topicName + * @param partitionId partitionId for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @param backlogBytes backlog for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + @Override + public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) { + Gauge perPartion = + Metrics.gauge( + "KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName()); + perPartion.set(backlogBytes); + } + /** * Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics} * containers. This function will only report metrics once per instance. Subsequent calls to @@ -162,7 +187,7 @@ public void updateKafkaMetrics() { LOG.warn("Updating stale Kafka metrics container"); return; } - recordBacklogBytes(); + recordBacklogBytesInternal(); recordRpcLatencyMetrics(); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 1cf4aad34e4e..3d6cc910d009 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -549,7 +549,6 @@ public ProcessContinuation processElement( } } } - backlogBytes.set( (long) (BigDecimal.valueOf( @@ -558,6 +557,17 @@ public ProcessContinuation processElement( .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); + kafkaResults.recordBacklogBytes( + kafkaSourceDescriptor.getTopic(), + kafkaSourceDescriptor.getPartition(), + (long) + (BigDecimal.valueOf( + Preconditions.checkStateNotNull( + offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) + .doubleValue() + * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); } } }