From 23f3a70d64f0f72cf18dd3a7640125ff9027dec7 Mon Sep 17 00:00:00 2001
From: cloud-java-bot <cloud-java-bot@google.com>
Date: Mon, 30 Sep 2024 21:06:05 +0000
Subject: [PATCH] chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024

---
 .../pubsub/v1/OpenTelemetryPubsubTracer.java  | 460 ------------
 .../cloud/pubsub/v1/PubsubMessageWrapper.java | 430 -----------
 .../cloud/pubsub/v1/OpenTelemetryTest.java    | 669 ------------------
 3 files changed, 1559 deletions(-)
 delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java
 delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
 delete mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java

diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java
deleted file mode 100644
index b946f44bf..000000000
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub.v1;
-
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.SubscriptionName;
-import com.google.pubsub.v1.TopicName;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanBuilder;
-import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.api.trace.StatusCode;
-import io.opentelemetry.api.trace.Tracer;
-import io.opentelemetry.context.Context;
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
-import java.util.List;
-
-public class OpenTelemetryPubsubTracer {
-  private final Tracer tracer;
-  private boolean enabled = false;
-
-  private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
-  private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
-  private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME =
-      "subscriber concurrency control";
-  private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
-
-  private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
-  private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
-  private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
-  private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
-      "messaging.gcp_pubsub.message.exactly_once_delivery";
-  private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
-      "messaging.gcp_pubsub.message.delivery_attempt";
-  private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
-  private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
-  private static final String PROJECT_ATTR_KEY = "gcp.project_id";
-  private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
-
-  private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
-
-  OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) {
-    this.tracer = tracer;
-    if (this.tracer != null && enableOpenTelemetry) {
-      this.enabled = true;
-    }
-  }
-
-  /** Populates attributes that are common the publisher parent span and publish RPC span. */
-  private static final AttributesBuilder createCommonSpanAttributesBuilder(
-      String destinationName, String projectName, String codeFunction, String operation) {
-    AttributesBuilder attributesBuilder =
-        Attributes.builder()
-            .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-            .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName)
-            .put(PROJECT_ATTR_KEY, projectName)
-            .put(SemanticAttributes.CODE_FUNCTION, codeFunction);
-    if (operation != null) {
-      attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation);
-    }
-
-    return attributesBuilder;
-  }
-
-  private Span startChildSpan(String name, Span parent) {
-    return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan();
-  }
-
-  /**
-   * Creates and starts the parent span with the appropriate span attributes and injects the span
-   * context into the {@link PubsubMessage} attributes.
-   */
-  void startPublisherSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    AttributesBuilder attributesBuilder =
-        createCommonSpanAttributesBuilder(
-            message.getTopicName(), message.getTopicProject(), "publish", "create");
-
-    attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize());
-    if (!message.getOrderingKey().isEmpty()) {
-      attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
-    }
-
-    Span publisherSpan =
-        tracer
-            .spanBuilder(message.getTopicName() + " create")
-            .setSpanKind(SpanKind.PRODUCER)
-            .setAllAttributes(attributesBuilder.build())
-            .startSpan();
-
-    message.setPublisherSpan(publisherSpan);
-    if (publisherSpan.getSpanContext().isValid()) {
-      message.injectSpanContext();
-    }
-  }
-
-  void endPublisherSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endPublisherSpan();
-  }
-
-  void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) {
-    if (!enabled) {
-      return;
-    }
-    message.setPublisherMessageIdSpanAttribute(messageId);
-  }
-
-  /** Creates a span for publish-side flow control as a child of the parent publisher span. */
-  void startPublishFlowControlSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    Span publisherSpan = message.getPublisherSpan();
-    if (publisherSpan != null)
-      message.setPublishFlowControlSpan(
-          startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan));
-  }
-
-  void endPublishFlowControlSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endPublishFlowControlSpan();
-  }
-
-  void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) {
-    if (!enabled) {
-      return;
-    }
-    message.setPublishFlowControlSpanException(t);
-  }
-
-  /** Creates a span for publish message batching as a child of the parent publisher span. */
-  void startPublishBatchingSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    Span publisherSpan = message.getPublisherSpan();
-    if (publisherSpan != null) {
-      message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan));
-    }
-  }
-
-  void endPublishBatchingSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endPublishBatchingSpan();
-  }
-
-  /**
-   * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
-   * links with the publisher parent span are created for sampled messages in the batch.
-   */
-  Span startPublishRpcSpan(String topic, List<PubsubMessageWrapper> messages) {
-    if (!enabled) {
-      return null;
-    }
-    TopicName topicName = TopicName.parse(topic);
-    Attributes attributes =
-        createCommonSpanAttributesBuilder(
-                topicName.getTopic(), topicName.getProject(), "publishCall", "publish")
-            .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size())
-            .build();
-    SpanBuilder publishRpcSpanBuilder =
-        tracer
-            .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX)
-            .setSpanKind(SpanKind.CLIENT)
-            .setAllAttributes(attributes);
-    Attributes linkAttributes =
-        Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build();
-    for (PubsubMessageWrapper message : messages) {
-      if (message.getPublisherSpan().getSpanContext().isSampled())
-        publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes);
-    }
-    Span publishRpcSpan = publishRpcSpanBuilder.startSpan();
-
-    for (PubsubMessageWrapper message : messages) {
-      if (publishRpcSpan.getSpanContext().isSampled()) {
-        message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes);
-        message.addPublishStartEvent();
-      }
-    }
-    return publishRpcSpan;
-  }
-
-  /** Ends the given publish RPC span if it exists. */
-  void endPublishRpcSpan(Span publishRpcSpan) {
-    if (!enabled) {
-      return;
-    }
-    if (publishRpcSpan != null) {
-      publishRpcSpan.end();
-    }
-  }
-
-  /**
-   * Sets an error status and records an exception when an exception is thrown when publishing the
-   * message batch.
-   */
-  void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) {
-    if (!enabled) {
-      return;
-    }
-    if (publishRpcSpan != null) {
-      publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC.");
-      publishRpcSpan.recordException(t);
-      publishRpcSpan.end();
-    }
-  }
-
-  void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) {
-    if (!enabled) {
-      return;
-    }
-    AttributesBuilder attributesBuilder =
-        createCommonSpanAttributesBuilder(
-            message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null);
-
-    attributesBuilder
-        .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId())
-        .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize())
-        .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId())
-        .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
-    if (!message.getOrderingKey().isEmpty()) {
-      attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
-    }
-    if (message.getDeliveryAttempt() > 0) {
-      attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt());
-    }
-    Attributes attributes = attributesBuilder.build();
-    Context publisherSpanContext = message.extractSpanContext(attributes);
-    message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext));
-    message.setSubscriberSpan(
-        tracer
-            .spanBuilder(message.getSubscriptionName() + " subscribe")
-            .setSpanKind(SpanKind.CONSUMER)
-            .setParent(publisherSpanContext)
-            .setAllAttributes(attributes)
-            .startSpan());
-  }
-
-  void endSubscriberSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endSubscriberSpan();
-  }
-
-  void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.setSubscriberSpanExpirationResult();
-  }
-
-  void setSubscriberSpanException(PubsubMessageWrapper message, Throwable t, String exception) {
-    if (!enabled) {
-      return;
-    }
-    message.setSubscriberSpanException(t, exception);
-  }
-
-  /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */
-  void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    Span subscriberSpan = message.getSubscriberSpan();
-    if (subscriberSpan != null) {
-      message.setSubscribeConcurrencyControlSpan(
-          startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan));
-    }
-  }
-
-  void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endSubscribeConcurrencyControlSpan();
-  }
-
-  void setSubscribeConcurrencyControlSpanException(PubsubMessageWrapper message, Throwable t) {
-    if (!enabled) {
-      return;
-    }
-    message.setSubscribeConcurrencyControlSpanException(t);
-  }
-
-  /**
-   * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span.
-   */
-  void startSubscribeSchedulerSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    Span subscriberSpan = message.getSubscriberSpan();
-    if (subscriberSpan != null) {
-      message.setSubscribeSchedulerSpan(
-          startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan));
-    }
-  }
-
-  void endSubscribeSchedulerSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    message.endSubscribeSchedulerSpan();
-  }
-
-  /** Creates a span for subscribe message processing as a child of the parent subscriber span. */
-  void startSubscribeProcessSpan(PubsubMessageWrapper message) {
-    if (!enabled) {
-      return;
-    }
-    Span subscriberSpan = message.getSubscriberSpan();
-    if (subscriberSpan != null) {
-      Span subscribeProcessSpan =
-          startChildSpan(message.getSubscriptionName() + " process", subscriberSpan);
-      subscribeProcessSpan.setAttribute(
-          SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE);
-      Span publisherSpan = message.getPublisherSpan();
-      if (publisherSpan != null) {
-        subscribeProcessSpan.addLink(publisherSpan.getSpanContext());
-      }
-      message.setSubscribeProcessSpan(subscribeProcessSpan);
-    }
-  }
-
-  void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) {
-    if (!enabled) {
-      return;
-    }
-    message.endSubscribeProcessSpan(action);
-  }
-
-  /**
-   * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links
-   * to parent subscribe span for sampled messages are added.
-   */
-  Span startSubscribeRpcSpan(
-      String subscription,
-      String rpcOperation,
-      List<PubsubMessageWrapper> messages,
-      int ackDeadline,
-      boolean isReceiptModack) {
-    if (!enabled) {
-      return null;
-    }
-    String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations";
-    SubscriptionName subscriptionName = SubscriptionName.parse(subscription);
-    AttributesBuilder attributesBuilder =
-        createCommonSpanAttributesBuilder(
-                subscriptionName.getSubscription(),
-                subscriptionName.getProject(),
-                codeFunction,
-                rpcOperation)
-            .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size());
-
-    // Ack deadline and receipt modack are specific to the modack operation
-    if (rpcOperation == "modack") {
-      attributesBuilder
-          .put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
-          .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
-    }
-
-    SpanBuilder rpcSpanBuilder =
-        tracer
-            .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation)
-            .setSpanKind(SpanKind.CLIENT)
-            .setAllAttributes(attributesBuilder.build());
-    Attributes linkAttributes =
-        Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build();
-    for (PubsubMessageWrapper message : messages) {
-      if (message.getSubscriberSpan().getSpanContext().isSampled()) {
-        rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes);
-      }
-    }
-    Span rpcSpan = rpcSpanBuilder.startSpan();
-
-    for (PubsubMessageWrapper message : messages) {
-      if (rpcSpan.getSpanContext().isSampled()) {
-        message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
-        switch (rpcOperation) {
-          case "ack":
-            message.addAckStartEvent();
-            break;
-          case "modack":
-            message.addModAckStartEvent();
-            break;
-          case "nack":
-            message.addNackStartEvent();
-            break;
-        }
-      }
-    }
-    return rpcSpan;
-  }
-
-  /** Ends the given subscribe RPC span if it exists. */
-  void endSubscribeRpcSpan(Span rpcSpan) {
-    if (!enabled) {
-      return;
-    }
-    if (rpcSpan != null) {
-      rpcSpan.end();
-    }
-  }
-
-  /**
-   * Sets an error status and records an exception when an exception is thrown when handling a
-   * subscribe-side RPC.
-   */
-  void setSubscribeRpcSpanException(Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) {
-    if (!enabled) {
-      return;
-    }
-    if (rpcSpan != null) {
-      String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack");
-      rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC.");
-      rpcSpan.recordException(t);
-      rpcSpan.end();
-    }
-  }
-
-  /** Adds the appropriate subscribe-side RPC end event. */
-  void addEndRpcEvent(
-      PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) {
-    if (!enabled || !rpcSampled) {
-      return;
-    }
-    if (!isModack) {
-      message.addAckEndEvent();
-    } else if (ackDeadline == 0) {
-      message.addNackEndEvent();
-    } else {
-      message.addModAckEndEvent();
-    }
-  }
-}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
deleted file mode 100644
index 94fd13085..000000000
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub.v1;
-
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.SubscriptionName;
-import com.google.pubsub.v1.TopicName;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.StatusCode;
-import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
-import io.opentelemetry.context.Context;
-import io.opentelemetry.context.propagation.TextMapGetter;
-import io.opentelemetry.context.propagation.TextMapSetter;
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
-
-/**
- * A wrapper class for a {@link PubsubMessage} object that handles creation and tracking of
- * OpenTelemetry {@link Span} objects for different operations that occur during publishing.
- */
-public class PubsubMessageWrapper {
-  private PubsubMessage message;
-
-  private final TopicName topicName;
-  private final SubscriptionName subscriptionName;
-
-  // Attributes set only for messages received from a streaming pull response.
-  private final String ackId;
-  private final int deliveryAttempt;
-
-  private static final String PUBLISH_START_EVENT = "publish start";
-  private static final String PUBLISH_END_EVENT = "publish end";
-
-  private static final String MODACK_START_EVENT = "modack start";
-  private static final String MODACK_END_EVENT = "modack end";
-  private static final String NACK_START_EVENT = "nack start";
-  private static final String NACK_END_EVENT = "nack end";
-  private static final String ACK_START_EVENT = "ack start";
-  private static final String ACK_END_EVENT = "ack end";
-
-  private static final String GOOGCLIENT_PREFIX = "googclient_";
-
-  private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
-
-  private Span publisherSpan;
-  private Span publishFlowControlSpan;
-  private Span publishBatchingSpan;
-
-  private Span subscriberSpan;
-  private Span subscribeConcurrencyControlSpan;
-  private Span subscribeSchedulerSpan;
-  private Span subscribeProcessSpan;
-
-  private PubsubMessageWrapper(Builder builder) {
-    this.message = builder.message;
-    this.topicName = builder.topicName;
-    this.subscriptionName = builder.subscriptionName;
-    this.ackId = builder.ackId;
-    this.deliveryAttempt = builder.deliveryAttempt;
-  }
-
-  static Builder newBuilder(PubsubMessage message, String topicName) {
-    return new Builder(message, topicName);
-  }
-
-  static Builder newBuilder(
-      PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
-    return new Builder(message, subscriptionName, ackId, deliveryAttempt);
-  }
-
-  /** Returns the PubsubMessage associated with this wrapper. */
-  PubsubMessage getPubsubMessage() {
-    return message;
-  }
-
-  void setPubsubMessage(PubsubMessage message) {
-    this.message = message;
-  }
-
-  /** Returns the TopicName for this wrapper as a string. */
-  String getTopicName() {
-    if (topicName != null) {
-      return topicName.getTopic();
-    }
-    return "";
-  }
-
-  String getTopicProject() {
-    if (topicName != null) {
-      return topicName.getProject();
-    }
-    return "";
-  }
-
-  /** Returns the SubscriptionName for this wrapper as a string. */
-  String getSubscriptionName() {
-    if (subscriptionName != null) {
-      return subscriptionName.getSubscription();
-    }
-    return "";
-  }
-
-  String getSubscriptionProject() {
-    if (subscriptionName != null) {
-      return subscriptionName.getProject();
-    }
-    return "";
-  }
-
-  String getMessageId() {
-    return message.getMessageId();
-  }
-
-  String getAckId() {
-    return ackId;
-  }
-
-  int getDataSize() {
-    return message.getData().size();
-  }
-
-  String getOrderingKey() {
-    return message.getOrderingKey();
-  }
-
-  int getDeliveryAttempt() {
-    return deliveryAttempt;
-  }
-
-  Span getPublisherSpan() {
-    return publisherSpan;
-  }
-
-  void setPublisherSpan(Span span) {
-    this.publisherSpan = span;
-  }
-
-  void setPublishFlowControlSpan(Span span) {
-    this.publishFlowControlSpan = span;
-  }
-
-  void setPublishBatchingSpan(Span span) {
-    this.publishBatchingSpan = span;
-  }
-
-  Span getSubscriberSpan() {
-    return subscriberSpan;
-  }
-
-  void setSubscriberSpan(Span span) {
-    this.subscriberSpan = span;
-  }
-
-  void setSubscribeConcurrencyControlSpan(Span span) {
-    this.subscribeConcurrencyControlSpan = span;
-  }
-
-  void setSubscribeSchedulerSpan(Span span) {
-    this.subscribeSchedulerSpan = span;
-  }
-
-  void setSubscribeProcessSpan(Span span) {
-    this.subscribeProcessSpan = span;
-  }
-
-  /** Creates a publish start event that is tied to the publish RPC span time. */
-  void addPublishStartEvent() {
-    if (publisherSpan != null) {
-      publisherSpan.addEvent(PUBLISH_START_EVENT);
-    }
-  }
-
-  /**
-   * Sets the message ID attribute in the publisher parent span. This is called after the publish
-   * RPC returns with a message ID.
-   */
-  void setPublisherMessageIdSpanAttribute(String messageId) {
-    if (publisherSpan != null) {
-      publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId);
-    }
-  }
-
-  /** Ends the publisher parent span if it exists. */
-  void endPublisherSpan() {
-    if (publisherSpan != null) {
-      publisherSpan.addEvent(PUBLISH_END_EVENT);
-      publisherSpan.end();
-    }
-  }
-
-  /** Ends the publish flow control span if it exists. */
-  void endPublishFlowControlSpan() {
-    if (publishFlowControlSpan != null) {
-      publishFlowControlSpan.end();
-    }
-  }
-
-  /** Ends the publish batching span if it exists. */
-  void endPublishBatchingSpan() {
-    if (publishBatchingSpan != null) {
-      publishBatchingSpan.end();
-    }
-  }
-
-  /**
-   * Sets an error status and records an exception when an exception is thrown during flow control.
-   */
-  void setPublishFlowControlSpanException(Throwable t) {
-    if (publishFlowControlSpan != null) {
-      publishFlowControlSpan.setStatus(
-          StatusCode.ERROR, "Exception thrown during publish flow control.");
-      publishFlowControlSpan.recordException(t);
-      endAllPublishSpans();
-    }
-  }
-
-  /**
-   * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding
-   * RPC span start and end times.
-   */
-  void addModAckStartEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(MODACK_START_EVENT);
-    }
-  }
-
-  void addModAckEndEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(MODACK_END_EVENT);
-    }
-  }
-
-  void addNackStartEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(NACK_START_EVENT);
-    }
-  }
-
-  void addNackEndEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(NACK_END_EVENT);
-    }
-  }
-
-  void addAckStartEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(ACK_START_EVENT);
-    }
-  }
-
-  void addAckEndEvent() {
-    if (subscriberSpan != null) {
-      subscriberSpan.addEvent(ACK_END_EVENT);
-    }
-  }
-
-  /** Ends the subscriber parent span if exists. */
-  void endSubscriberSpan() {
-    if (subscriberSpan != null) {
-      subscriberSpan.end();
-    }
-  }
-
-  /** Ends the subscribe concurreny control span if exists. */
-  void endSubscribeConcurrencyControlSpan() {
-    if (subscribeConcurrencyControlSpan != null) {
-      subscribeConcurrencyControlSpan.end();
-    }
-  }
-
-  /** Ends the subscribe scheduler span if exists. */
-  void endSubscribeSchedulerSpan() {
-    if (subscribeSchedulerSpan != null) {
-      subscribeSchedulerSpan.end();
-    }
-  }
-
-  /**
-   * Ends the subscribe process span if it exists, creates an event with the appropriate result, and
-   * sets the result on the parent subscriber span.
-   */
-  void endSubscribeProcessSpan(String action) {
-    if (subscribeProcessSpan != null) {
-      subscribeProcessSpan.addEvent(action + " called");
-      subscribeProcessSpan.end();
-      subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action);
-    }
-  }
-
-  /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */
-  void setSubscriberSpanException(Throwable t, String exception) {
-    if (subscriberSpan != null) {
-      subscriberSpan.setStatus(StatusCode.ERROR, exception);
-      subscriberSpan.recordException(t);
-      endAllSubscribeSpans();
-    }
-  }
-
-  /** Sets result of the parent subscriber span to expired and ends its. */
-  void setSubscriberSpanExpirationResult() {
-    if (subscriberSpan != null) {
-      subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired");
-      endSubscriberSpan();
-    }
-  }
-
-  /**
-   * Sets an error status and records an exception when an exception is thrown subscriber
-   * concurrency control.
-   */
-  void setSubscribeConcurrencyControlSpanException(Throwable t) {
-    if (subscribeConcurrencyControlSpan != null) {
-      subscribeConcurrencyControlSpan.setStatus(
-          StatusCode.ERROR, "Exception thrown during subscribe concurrency control.");
-      subscribeConcurrencyControlSpan.recordException(t);
-      endAllSubscribeSpans();
-    }
-  }
-
-  /** Ends all publisher-side spans associated with this message wrapper. */
-  private void endAllPublishSpans() {
-    endPublishFlowControlSpan();
-    endPublishBatchingSpan();
-    endPublisherSpan();
-  }
-
-  /** Ends all subscriber-side spans associated with this message wrapper. */
-  private void endAllSubscribeSpans() {
-    endSubscribeConcurrencyControlSpan();
-    endSubscribeSchedulerSpan();
-    endSubscriberSpan();
-  }
-
-  /**
-   * Injects the span context into the attributes of a Pub/Sub message for propagation to the
-   * subscriber client.
-   */
-  void injectSpanContext() {
-    TextMapSetter<PubsubMessageWrapper> injectMessageAttributes =
-        new TextMapSetter<PubsubMessageWrapper>() {
-          @Override
-          public void set(PubsubMessageWrapper carrier, String key, String value) {
-            PubsubMessage newMessage =
-                PubsubMessage.newBuilder(carrier.message)
-                    .putAttributes(GOOGCLIENT_PREFIX + key, value)
-                    .build();
-            carrier.message = newMessage;
-          }
-        };
-    W3CTraceContextPropagator.getInstance()
-        .inject(Context.current().with(publisherSpan), this, injectMessageAttributes);
-  }
-
-  /**
-   * Extracts the span context from the attributes of a Pub/Sub message and creates the parent
-   * subscriber span using that context.
-   */
-  Context extractSpanContext(Attributes attributes) {
-    TextMapGetter<PubsubMessageWrapper> extractMessageAttributes =
-        new TextMapGetter<PubsubMessageWrapper>() {
-          @Override
-          public String get(PubsubMessageWrapper carrier, String key) {
-            return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, "");
-          }
-
-          public Iterable<String> keys(PubsubMessageWrapper carrier) {
-            return carrier.message.getAttributesMap().keySet();
-          }
-        };
-    Context context =
-        W3CTraceContextPropagator.getInstance()
-            .extract(Context.current(), this, extractMessageAttributes);
-    return context;
-  }
-
-  /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */
-  static final class Builder {
-    private PubsubMessage message = null;
-    private TopicName topicName = null;
-    private SubscriptionName subscriptionName = null;
-    private String ackId = null;
-    private int deliveryAttempt = 0;
-
-    public Builder(PubsubMessage message, String topicName) {
-      this.message = message;
-      if (topicName != null) {
-        this.topicName = TopicName.parse(topicName);
-      }
-    }
-
-    public Builder(
-        PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
-      this.message = message;
-      if (subscriptionName != null) {
-        this.subscriptionName = SubscriptionName.parse(subscriptionName);
-      }
-      this.ackId = ackId;
-      this.deliveryAttempt = deliveryAttempt;
-    }
-
-    public Builder(
-        PubsubMessage message,
-        SubscriptionName subscriptionName,
-        String ackId,
-        int deliveryAttempt) {
-      this.message = message;
-      this.subscriptionName = subscriptionName;
-      this.ackId = ackId;
-      this.deliveryAttempt = deliveryAttempt;
-    }
-
-    public PubsubMessageWrapper build() {
-      return new PubsubMessageWrapper(this);
-    }
-  }
-}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
deleted file mode 100644
index b4433f41e..000000000
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub.v1;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.protobuf.ByteString;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.SubscriptionName;
-import com.google.pubsub.v1.TopicName;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.api.trace.StatusCode;
-import io.opentelemetry.api.trace.Tracer;
-import io.opentelemetry.sdk.testing.assertj.AttributesAssert;
-import io.opentelemetry.sdk.testing.assertj.EventDataAssert;
-import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
-import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
-import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
-import io.opentelemetry.sdk.trace.data.LinkData;
-import io.opentelemetry.sdk.trace.data.SpanData;
-import io.opentelemetry.sdk.trace.data.StatusData;
-import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
-import java.util.Arrays;
-import java.util.List;
-import org.junit.Test;
-
-public class OpenTelemetryTest {
-  private static final TopicName FULL_TOPIC_NAME =
-      TopicName.parse("projects/test-project/topics/test-topic");
-  private static final SubscriptionName FULL_SUBSCRIPTION_NAME =
-      SubscriptionName.parse("projects/test-project/subscriptions/test-sub");
-  private static final String PROJECT_NAME = "test-project";
-  private static final String ORDERING_KEY = "abc";
-  private static final String MESSAGE_ID = "m0";
-  private static final String ACK_ID = "def";
-  private static final int DELIVERY_ATTEMPT = 1;
-  private static final int ACK_DEADLINE = 10;
-  private static final boolean EXACTLY_ONCE_ENABLED = true;
-
-  private static final String PUBLISHER_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " create";
-  private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
-  private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
-  private static final String PUBLISH_RPC_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " publish";
-  private static final String PUBLISH_START_EVENT = "publish start";
-  private static final String PUBLISH_END_EVENT = "publish end";
-
-  private static final String SUBSCRIBER_SPAN_NAME =
-      FULL_SUBSCRIPTION_NAME.getSubscription() + " subscribe";
-  private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME =
-      "subscriber concurrency control";
-  private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";
-  private static final String SUBSCRIBE_PROCESS_SPAN_NAME =
-      FULL_SUBSCRIPTION_NAME.getSubscription() + " process";
-  private static final String SUBSCRIBE_MODACK_RPC_SPAN_NAME =
-      FULL_SUBSCRIPTION_NAME.getSubscription() + " modack";
-  private static final String SUBSCRIBE_ACK_RPC_SPAN_NAME =
-      FULL_SUBSCRIPTION_NAME.getSubscription() + " ack";
-  private static final String SUBSCRIBE_NACK_RPC_SPAN_NAME =
-      FULL_SUBSCRIPTION_NAME.getSubscription() + " nack";
-
-  private static final String PROCESS_ACTION = "ack";
-  private static final String MODACK_START_EVENT = "modack start";
-  private static final String MODACK_END_EVENT = "modack end";
-  private static final String NACK_START_EVENT = "nack start";
-  private static final String NACK_END_EVENT = "nack end";
-  private static final String ACK_START_EVENT = "ack start";
-  private static final String ACK_END_EVENT = "ack end";
-
-  private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
-  private static final String PROJECT_ATTR_KEY = "gcp.project_id";
-  private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
-  private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
-  private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
-  private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
-  private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
-  private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
-      "messaging.gcp_pubsub.message.exactly_once_delivery";
-  private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
-  private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
-      "messaging.gcp_pubsub.message.delivery_attempt";
-
-  private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";
-
-  private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();
-
-  @Test
-  public void testPublishSpansSuccess() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
-    List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
-
-    long messageSize = messageWrapper.getPubsubMessage().getData().size();
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    // Call all span start/end methods in the expected order
-    tracer.startPublisherSpan(messageWrapper);
-    tracer.startPublishFlowControlSpan(messageWrapper);
-    tracer.endPublishFlowControlSpan(messageWrapper);
-    tracer.startPublishBatchingSpan(messageWrapper);
-    tracer.endPublishBatchingSpan(messageWrapper);
-    Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
-    tracer.endPublishRpcSpan(publishRpcSpan);
-    tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID);
-    tracer.endPublisherSpan(messageWrapper);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(4, allSpans.size());
-    SpanData flowControlSpanData = allSpans.get(0);
-    SpanData batchingSpanData = allSpans.get(1);
-    SpanData publishRpcSpanData = allSpans.get(2);
-    SpanData publisherSpanData = allSpans.get(3);
-
-    SpanDataAssert flowControlSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(flowControlSpanData);
-    flowControlSpanDataAssert
-        .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
-        .hasParent(publisherSpanData)
-        .hasEnded();
-
-    SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData);
-    batchingSpanDataAssert
-        .hasName(PUBLISH_BATCHING_SPAN_NAME)
-        .hasParent(publisherSpanData)
-        .hasEnded();
-
-    // Check span data, links, and attributes for the publish RPC span
-    SpanDataAssert publishRpcSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(publishRpcSpanData);
-    publishRpcSpanDataAssert
-        .hasName(PUBLISH_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasEnded();
-
-    List<LinkData> publishRpcLinks = publishRpcSpanData.getLinks();
-    assertEquals(messageWrappers.size(), publishRpcLinks.size());
-    assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext());
-
-    assertEquals(6, publishRpcSpanData.getAttributes().size());
-    AttributesAssert publishRpcSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes());
-    publishRpcSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
-        .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall")
-        .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish")
-        .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size());
-
-    // Check span data, events, links, and attributes for the publisher create span
-    SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
-    publisherSpanDataAssert
-        .hasName(PUBLISHER_SPAN_NAME)
-        .hasKind(SpanKind.PRODUCER)
-        .hasNoParent()
-        .hasEnded();
-
-    assertEquals(2, publisherSpanData.getEvents().size());
-    EventDataAssert startEventAssert =
-        OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0));
-    startEventAssert.hasName(PUBLISH_START_EVENT);
-    EventDataAssert endEventAssert =
-        OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(1));
-    endEventAssert.hasName(PUBLISH_END_EVENT);
-
-    List<LinkData> publisherLinks = publisherSpanData.getLinks();
-    assertEquals(1, publisherLinks.size());
-    assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext());
-
-    assertEquals(8, publisherSpanData.getAttributes().size());
-    AttributesAssert publisherSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes());
-    publisherSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic())
-        .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
-        .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
-        .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
-        .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
-        .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
-
-    // Check that the message has the attribute containing the trace context.
-    PubsubMessage message = messageWrapper.getPubsubMessage();
-    assertEquals(1, message.getAttributesMap().size());
-    assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE));
-    assertTrue(
-        message
-            .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "")
-            .contains(publisherSpanData.getTraceId()));
-    assertTrue(
-        message
-            .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "")
-            .contains(publisherSpanData.getSpanId()));
-  }
-
-  @Test
-  public void testPublishFlowControlSpanFailure() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
-
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    tracer.startPublisherSpan(messageWrapper);
-    tracer.startPublishFlowControlSpan(messageWrapper);
-
-    Exception e = new Exception("test-exception");
-    tracer.setPublishFlowControlSpanException(messageWrapper, e);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(2, allSpans.size());
-    SpanData flowControlSpanData = allSpans.get(0);
-    SpanData publisherSpanData = allSpans.get(1);
-
-    SpanDataAssert flowControlSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(flowControlSpanData);
-    StatusData expectedStatus =
-        StatusData.create(StatusCode.ERROR, "Exception thrown during publish flow control.");
-    flowControlSpanDataAssert
-        .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
-        .hasParent(publisherSpanData)
-        .hasStatus(expectedStatus)
-        .hasException(e)
-        .hasEnded();
-
-    SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
-    publisherSpanDataAssert
-        .hasName(PUBLISHER_SPAN_NAME)
-        .hasKind(SpanKind.PRODUCER)
-        .hasNoParent()
-        .hasEnded();
-  }
-
-  @Test
-  public void testPublishRpcSpanFailure() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
-
-    List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    tracer.startPublisherSpan(messageWrapper);
-    Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
-
-    Exception e = new Exception("test-exception");
-    tracer.setPublishRpcSpanException(publishRpcSpan, e);
-    tracer.endPublisherSpan(messageWrapper);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(2, allSpans.size());
-    SpanData rpcSpanData = allSpans.get(0);
-    SpanData publisherSpanData = allSpans.get(1);
-
-    SpanDataAssert rpcSpanDataAssert = OpenTelemetryAssertions.assertThat(rpcSpanData);
-    StatusData expectedStatus =
-        StatusData.create(StatusCode.ERROR, "Exception thrown on publish RPC.");
-    rpcSpanDataAssert
-        .hasName(PUBLISH_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasStatus(expectedStatus)
-        .hasException(e)
-        .hasEnded();
-
-    SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
-    publisherSpanDataAssert
-        .hasName(PUBLISHER_SPAN_NAME)
-        .hasKind(SpanKind.PRODUCER)
-        .hasNoParent()
-        .hasEnded();
-  }
-
-  @Test
-  public void testSubscribeSpansSuccess() {
-    openTelemetryTesting.clearSpans();
-
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    PubsubMessageWrapper publishMessageWrapper =
-        PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
-    // Initialize the Publisher span to inject the context in the message
-    tracer.startPublisherSpan(publishMessageWrapper);
-    tracer.endPublisherSpan(publishMessageWrapper);
-
-    PubsubMessage publishedMessage =
-        publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build();
-    PubsubMessageWrapper subscribeMessageWrapper =
-        PubsubMessageWrapper.newBuilder(
-                publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1)
-            .build();
-    List<PubsubMessageWrapper> subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper);
-
-    long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size();
-
-    // Call all span start/end methods in the expected order
-    tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED);
-    tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper);
-    tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper);
-    tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper);
-    tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper);
-    tracer.startSubscribeProcessSpan(subscribeMessageWrapper);
-    tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION);
-    Span subscribeModackRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(),
-            "modack",
-            subscribeMessageWrappers,
-            ACK_DEADLINE,
-            true);
-    tracer.endSubscribeRpcSpan(subscribeModackRpcSpan);
-    tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE);
-    Span subscribeAckRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false);
-    tracer.endSubscribeRpcSpan(subscribeAckRpcSpan);
-    tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0);
-    Span subscribeNackRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false);
-    tracer.endSubscribeRpcSpan(subscribeNackRpcSpan);
-    tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0);
-    tracer.endSubscriberSpan(subscribeMessageWrapper);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(8, allSpans.size());
-
-    SpanData publisherSpanData = allSpans.get(0);
-    SpanData concurrencyControlSpanData = allSpans.get(1);
-    SpanData schedulerSpanData = allSpans.get(2);
-    SpanData processSpanData = allSpans.get(3);
-    SpanData modackRpcSpanData = allSpans.get(4);
-    SpanData ackRpcSpanData = allSpans.get(5);
-    SpanData nackRpcSpanData = allSpans.get(6);
-    SpanData subscriberSpanData = allSpans.get(7);
-
-    SpanDataAssert concurrencyControlSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(concurrencyControlSpanData);
-    concurrencyControlSpanDataAssert
-        .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME)
-        .hasParent(subscriberSpanData)
-        .hasEnded();
-
-    SpanDataAssert schedulerSpanDataAssert = OpenTelemetryAssertions.assertThat(schedulerSpanData);
-    schedulerSpanDataAssert
-        .hasName(SUBSCRIBE_SCHEDULER_SPAN_NAME)
-        .hasParent(subscriberSpanData)
-        .hasEnded();
-
-    SpanDataAssert processSpanDataAssert = OpenTelemetryAssertions.assertThat(processSpanData);
-    processSpanDataAssert
-        .hasName(SUBSCRIBE_PROCESS_SPAN_NAME)
-        .hasParent(subscriberSpanData)
-        .hasEnded();
-
-    assertEquals(1, processSpanData.getEvents().size());
-    EventDataAssert actionCalledEventAssert =
-        OpenTelemetryAssertions.assertThat(processSpanData.getEvents().get(0));
-    actionCalledEventAssert.hasName(PROCESS_ACTION + " called");
-
-    // Check span data, links, and attributes for the modack RPC span
-    SpanDataAssert modackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(modackRpcSpanData);
-    modackRpcSpanDataAssert
-        .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasEnded();
-
-    List<LinkData> modackRpcLinks = modackRpcSpanData.getLinks();
-    assertEquals(subscribeMessageWrappers.size(), modackRpcLinks.size());
-    assertEquals(subscriberSpanData.getSpanContext(), modackRpcLinks.get(0).getSpanContext());
-
-    assertEquals(8, modackRpcSpanData.getAttributes().size());
-    AttributesAssert modackRpcSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(modackRpcSpanData.getAttributes());
-    modackRpcSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(
-            SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
-        .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
-        .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
-        .containsEntry(
-            SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
-        .containsEntry(ACK_DEADLINE_ATTR_KEY, 10)
-        .containsEntry(RECEIPT_MODACK_ATTR_KEY, true);
-
-    // Check span data, links, and attributes for the ack RPC span
-    SpanDataAssert ackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(ackRpcSpanData);
-    ackRpcSpanDataAssert
-        .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasEnded();
-
-    List<LinkData> ackRpcLinks = ackRpcSpanData.getLinks();
-    assertEquals(subscribeMessageWrappers.size(), ackRpcLinks.size());
-    assertEquals(subscriberSpanData.getSpanContext(), ackRpcLinks.get(0).getSpanContext());
-
-    assertEquals(6, ackRpcSpanData.getAttributes().size());
-    AttributesAssert ackRpcSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(ackRpcSpanData.getAttributes());
-    ackRpcSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(
-            SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
-        .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations")
-        .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack")
-        .containsEntry(
-            SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
-
-    // Check span data, links, and attributes for the nack RPC span
-    SpanDataAssert nackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(nackRpcSpanData);
-    nackRpcSpanDataAssert
-        .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasEnded();
-
-    List<LinkData> nackRpcLinks = nackRpcSpanData.getLinks();
-    assertEquals(subscribeMessageWrappers.size(), nackRpcLinks.size());
-    assertEquals(subscriberSpanData.getSpanContext(), nackRpcLinks.get(0).getSpanContext());
-
-    assertEquals(6, nackRpcSpanData.getAttributes().size());
-    AttributesAssert nackRpcSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(nackRpcSpanData.getAttributes());
-    nackRpcSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(
-            SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
-        .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject())
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations")
-        .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack")
-        .containsEntry(
-            SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size());
-
-    // Check span data, events, links, and attributes for the publisher create span
-    SpanDataAssert subscriberSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData);
-    subscriberSpanDataAssert
-        .hasName(SUBSCRIBER_SPAN_NAME)
-        .hasKind(SpanKind.CONSUMER)
-        .hasParent(publisherSpanData)
-        .hasEnded();
-
-    assertEquals(6, subscriberSpanData.getEvents().size());
-    EventDataAssert startModackEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(0));
-    startModackEventAssert.hasName(MODACK_START_EVENT);
-    EventDataAssert endModackEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(1));
-    endModackEventAssert.hasName(MODACK_END_EVENT);
-    EventDataAssert startAckEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(2));
-    startAckEventAssert.hasName(ACK_START_EVENT);
-    EventDataAssert endAckEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(3));
-    endAckEventAssert.hasName(ACK_END_EVENT);
-    EventDataAssert startNackEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(4));
-    startNackEventAssert.hasName(NACK_START_EVENT);
-    EventDataAssert endNackEventAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(5));
-    endNackEventAssert.hasName(NACK_END_EVENT);
-
-    List<LinkData> subscriberLinks = subscriberSpanData.getLinks();
-    assertEquals(3, subscriberLinks.size());
-    assertEquals(modackRpcSpanData.getSpanContext(), subscriberLinks.get(0).getSpanContext());
-    assertEquals(ackRpcSpanData.getSpanContext(), subscriberLinks.get(1).getSpanContext());
-    assertEquals(nackRpcSpanData.getSpanContext(), subscriberLinks.get(2).getSpanContext());
-
-    assertEquals(11, subscriberSpanData.getAttributes().size());
-    AttributesAssert subscriberSpanAttributesAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData.getAttributes());
-    subscriberSpanAttributesAssert
-        .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE)
-        .containsEntry(
-            SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
-        .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
-        .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
-        .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
-        .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
-        .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
-        .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT)
-        .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED)
-        .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION)
-        .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
-  }
-
-  @Test
-  public void testSubscribeConcurrencyControlSpanFailure() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(
-                getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
-            .build();
-
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
-    tracer.startSubscribeConcurrencyControlSpan(messageWrapper);
-
-    Exception e = new Exception("test-exception");
-    tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(2, allSpans.size());
-    SpanData concurrencyControlSpanData = allSpans.get(0);
-    SpanData subscriberSpanData = allSpans.get(1);
-
-    SpanDataAssert concurrencyControlSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(concurrencyControlSpanData);
-    StatusData expectedStatus =
-        StatusData.create(
-            StatusCode.ERROR, "Exception thrown during subscribe concurrency control.");
-    concurrencyControlSpanDataAssert
-        .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME)
-        .hasParent(subscriberSpanData)
-        .hasStatus(expectedStatus)
-        .hasException(e)
-        .hasEnded();
-
-    SpanDataAssert subscriberSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData);
-    subscriberSpanDataAssert
-        .hasName(SUBSCRIBER_SPAN_NAME)
-        .hasKind(SpanKind.CONSUMER)
-        .hasNoParent()
-        .hasEnded();
-  }
-
-  @Test
-  public void testSubscriberSpanFailure() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(
-                getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
-            .build();
-
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
-
-    Exception e = new Exception("test-exception");
-    tracer.setSubscriberSpanException(messageWrapper, e, "Test exception");
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(1, allSpans.size());
-    SpanData subscriberSpanData = allSpans.get(0);
-
-    StatusData expectedStatus = StatusData.create(StatusCode.ERROR, "Test exception");
-    SpanDataAssert subscriberSpanDataAssert =
-        OpenTelemetryAssertions.assertThat(subscriberSpanData);
-    subscriberSpanDataAssert
-        .hasName(SUBSCRIBER_SPAN_NAME)
-        .hasKind(SpanKind.CONSUMER)
-        .hasNoParent()
-        .hasStatus(expectedStatus)
-        .hasException(e)
-        .hasEnded();
-  }
-
-  @Test
-  public void testSubscribeRpcSpanFailures() {
-    openTelemetryTesting.clearSpans();
-
-    PubsubMessageWrapper messageWrapper =
-        PubsubMessageWrapper.newBuilder(
-                getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
-            .build();
-    List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
-
-    Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
-    OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
-
-    tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
-    Span subscribeModackRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true);
-    Span subscribeAckRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false);
-    Span subscribeNackRpcSpan =
-        tracer.startSubscribeRpcSpan(
-            FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false);
-
-    Exception e = new Exception("test-exception");
-    tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e);
-    tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e);
-    tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e);
-    tracer.endSubscriberSpan(messageWrapper);
-
-    List<SpanData> allSpans = openTelemetryTesting.getSpans();
-    assertEquals(4, allSpans.size());
-    SpanData modackSpanData = allSpans.get(0);
-    SpanData ackSpanData = allSpans.get(1);
-    SpanData nackSpanData = allSpans.get(2);
-    SpanData subscriberSpanData = allSpans.get(3);
-
-    StatusData expectedModackStatus =
-        StatusData.create(StatusCode.ERROR, "Exception thrown on modack RPC.");
-    SpanDataAssert modackSpanDataAssert = OpenTelemetryAssertions.assertThat(modackSpanData);
-    modackSpanDataAssert
-        .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasStatus(expectedModackStatus)
-        .hasException(e)
-        .hasEnded();
-
-    StatusData expectedAckStatus =
-        StatusData.create(StatusCode.ERROR, "Exception thrown on ack RPC.");
-    SpanDataAssert ackSpanDataAssert = OpenTelemetryAssertions.assertThat(ackSpanData);
-    ackSpanDataAssert
-        .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasStatus(expectedAckStatus)
-        .hasException(e)
-        .hasEnded();
-
-    StatusData expectedNackStatus =
-        StatusData.create(StatusCode.ERROR, "Exception thrown on nack RPC.");
-    SpanDataAssert nackSpanDataAssert = OpenTelemetryAssertions.assertThat(nackSpanData);
-    nackSpanDataAssert
-        .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME)
-        .hasKind(SpanKind.CLIENT)
-        .hasNoParent()
-        .hasStatus(expectedNackStatus)
-        .hasException(e)
-        .hasEnded();
-  }
-
-  private PubsubMessage getPubsubMessage() {
-    return PubsubMessage.newBuilder()
-        .setData(ByteString.copyFromUtf8("test-data"))
-        .setOrderingKey(ORDERING_KEY)
-        .build();
-  }
-}