From 0572a47999c6a388b7542fcbfc81b495892b1ee9 Mon Sep 17 00:00:00 2001
From: Siddharth Agrawal <siddag@google.com>
Date: Thu, 30 May 2024 16:52:53 -0700
Subject: [PATCH] feat: add opentelemetry counters for sent and acked messages

Also add network latency, queue length and error counts.

The metrics (other than error counts) are now reported periodically,
every second.
---
 .../bigquery/storage/v1/ConnectionWorker.java | 149 ++++++++++++++++--
 1 file changed, 134 insertions(+), 15 deletions(-)

diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index af6cbe8c3a..8930d8ad05 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -31,6 +31,7 @@
 import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Int64Value;
 import io.grpc.Status;
@@ -40,6 +41,7 @@
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.MeterProvider;
 import java.io.IOException;
@@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable {
   private static Pattern streamPatternTable = Pattern.compile(tableMatching);
   private Meter writeMeter;
   static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
+  static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
   private static String dataflowPrefix = "dataflow:";
   static List<AttributeKey<String>> telemetryKeysTraceId =
       new ArrayList<AttributeKey<String>>() {
@@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable {
           add(AttributeKey.stringKey("trace_field_3"));
         }
       };
+  static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
+  static AttributeKey<String> telemetryKeyIsRetry = AttributeKey.stringKey("is_retry");
   private Attributes telemetryAttributes;
-  private LongCounter instrumentIncomingRequestCount;
-  private LongCounter instrumentIncomingRequestSize;
-  private LongCounter instrumentIncomingRequestRows;
+  // Buckets are based on a list of 1.5 ^ n
+  private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
+      ImmutableList.of(
+          1L, 3L, 8L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L,
+          127834L, 287627L, 647160L, 1456110L);
+
+  private static final class OpenTelemetryMetrics {
+    private LongCounter instrumentAckedRequestCount;
+    private LongCounter instrumentAckedRequestSize;
+    private LongCounter instrumentAckedRequestRows;
+    private LongHistogram instrumentNetworkResponseLatency;
+    private LongCounter instrumentConnectionStartCount;
+    private LongCounter instrumentConnectionEndCount;
+  }
+
+  private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();
 
   public static Boolean isDefaultStreamName(String streamName) {
     Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
@@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) {
     }
   }
 
+  // Specify common attributes for all metrics.
+  // For example, table name and writer id.
+  // Metrics dashboards can be filtered on available attributes.
   private Attributes buildOpenTelemetryAttributes() {
     AttributesBuilder builder = Attributes.builder();
     String tableName = getTableName();
     if (!tableName.isEmpty()) {
       builder.put(telemetryKeyTableId, tableName);
     }
+    builder.put(telemetryKeyWriterId, writerId);
     setTraceIdAttributes(builder);
     return builder.build();
   }
 
+  // Refresh the table name attribute when multiplexing switches between tables.
   private void refreshOpenTelemetryTableNameAttributes() {
     String tableName = getTableName();
     if (!tableName.isEmpty()
@@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() {
     }
   }
 
+  // Build new attributes augmented with an error code string.
+  private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
+    AttributesBuilder builder = attributes.toBuilder();
+    if ((errorCode != null) && !errorCode.isEmpty()) {
+      builder.put(telemetryKeyErrorCode, errorCode);
+    }
+    return builder.build();
+  }
+
+  // Build new attributes augmented with a flag indicating this was a retry.
+  private Attributes augmentAttributesWithRetry(Attributes attributes) {
+    AttributesBuilder builder = attributes.toBuilder();
+    builder.put(telemetryKeyIsRetry, "1");
+    return builder.build();
+  }
+
   @VisibleForTesting
   Attributes getTelemetryAttributes() {
     return telemetryAttributes;
@@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() {
             .setInstrumentationVersion(
                 ConnectionWorker.class.getPackage().getImplementationVersion())
             .build();
-    instrumentIncomingRequestCount =
+    telemetryMetrics.instrumentAckedRequestCount =
+        writeMeter
+            .counterBuilder("append_requests_acked")
+            .setDescription("Counts number of requests acked by the server")
+            .build();
+    telemetryMetrics.instrumentAckedRequestSize =
+        writeMeter
+            .counterBuilder("append_request_bytes_acked")
+            .setDescription("Counts byte size of requests acked by the server")
+            .build();
+    telemetryMetrics.instrumentAckedRequestRows =
+        writeMeter
+            .counterBuilder("append_rows_acked")
+            .setDescription("Counts number of request rows acked by the server")
+            .build();
+    writeMeter
+        .gaugeBuilder("active_connection_count")
+        .ofLongs()
+        .setDescription("Reports number of active connections")
+        .buildWithCallback(
+            measurement -> {
+              int count = 0;
+              this.lock.lock();
+              try {
+                if (streamConnectionIsConnected) {
+                  count = 1;
+                }
+              } finally {
+                this.lock.unlock();
+              }
+              measurement.record(count, getTelemetryAttributes());
+            });
+    writeMeter
+        .gaugeBuilder("inflight_queue_length")
+        .ofLongs()
+        .setDescription(
+            "Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
+        .buildWithCallback(
+            measurement -> {
+              int length = 0;
+              this.lock.lock();
+              try {
+                length = inflightRequestQueue.size();
+              } finally {
+                this.lock.unlock();
+              }
+              measurement.record(length, getTelemetryAttributes());
+            });
+    telemetryMetrics.instrumentNetworkResponseLatency =
         writeMeter
-            .counterBuilder("append_requests")
-            .setDescription("Counts number of incoming requests")
+            .histogramBuilder("network_response_latency")
+            .ofLongs()
+            .setDescription(
+                "Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
+            .setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
             .build();
-    instrumentIncomingRequestSize =
+    telemetryMetrics.instrumentConnectionStartCount =
         writeMeter
-            .counterBuilder("append_request_bytes")
-            .setDescription("Counts byte size of incoming requests")
+            .counterBuilder("connection_start_count")
+            .setDescription(
+                "Counts number of connection attempts made, regardless of whether these are initial or retry.")
             .build();
-    instrumentIncomingRequestRows =
+    telemetryMetrics.instrumentConnectionEndCount =
         writeMeter
-            .counterBuilder("append_rows")
-            .setDescription("Counts number of incoming request rows")
+            .counterBuilder("connection_end_count")
+            .setDescription("Counts number of connection end events.")
             .build();
   }
 
@@ -465,6 +556,7 @@ public void run() {
 
   private void resetConnection() {
     log.info("Start connecting stream: " + streamName + " id: " + writerId);
+    telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes());
     if (this.streamConnection != null) {
       // It's safe to directly close the previous connection as the in flight messages
       // will be picked up by the next connection.
@@ -618,9 +710,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
                           + requestWrapper.messageSize)));
       return requestWrapper.appendResult;
     }
-    instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
-    instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
-    instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
     this.lock.lock();
     try {
       if (userClosed) {
@@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
         connectionRetryStartTime = 0;
       }
       if (!this.inflightRequestQueue.isEmpty()) {
+        Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
+        if (sendInstant != null) {
+          Duration durationLatency = Duration.between(sendInstant, Instant.now());
+          telemetryMetrics.instrumentNetworkResponseLatency.record(
+              durationLatency.toMillis(), getTelemetryAttributes());
+        }
+
         requestWrapper = pollFirstInflightRequestQueue();
         requestProfilerHook.endOperation(
             RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
@@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) {
       this.lock.unlock();
     }
 
+    Attributes augmentedTelemetryAttributes =
+        augmentAttributesWithErrorCode(
+            getTelemetryAttributes(),
+            Code.values()[
+                response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()]
+                .toString());
+    if (requestWrapper.retryCount > 0) {
+      augmentedTelemetryAttributes = augmentAttributesWithRetry(augmentedTelemetryAttributes);
+    }
+    telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes);
+    telemetryMetrics.instrumentAckedRequestSize.add(
+        requestWrapper.messageSize, augmentedTelemetryAttributes);
+    telemetryMetrics.instrumentAckedRequestRows.add(
+        requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
+        augmentedTelemetryAttributes);
+
     // Retries need to happen on the same thread as queue locking may occur
     if (response.hasError()) {
       if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
@@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) {
     this.lock.lock();
     try {
       this.streamConnectionIsConnected = false;
+      this.telemetryMetrics.instrumentConnectionEndCount.add(
+          1,
+          augmentAttributesWithErrorCode(
+              getTelemetryAttributes(),
+              Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()));
       if (connectionFinalStatus == null) {
         if (connectionRetryStartTime == 0) {
           connectionRetryStartTime = System.currentTimeMillis();
@@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) {
                 || System.currentTimeMillis() - connectionRetryStartTime
                     <= maxRetryDuration.toMillis())) {
           this.conectionRetryCountWithoutCallback++;
+          this.telemetryMetrics.instrumentConnectionStartCount.add(
+              1, augmentAttributesWithRetry(getTelemetryAttributes()));
           log.info(
               "Connection is going to be reestablished with the next request. Retriable error "
                   + finalStatus.toString()