Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry tracing to the Publisher and Subscriber #2086

Merged
merged 51 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
903895a
feat: Initial publish side Open Telemetry support
michaelpri10 Jun 17, 2024
469f220
feat: Publish-side trace context injection
michaelpri10 Jun 20, 2024
ad31b53
feat: Tests and improvements to publish side OTel tracing
michaelpri10 Jun 24, 2024
9246689
feat: More tests and refactoring for publish-side OpenTelemetry
michaelpri10 Jun 24, 2024
34594cb
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
901e135
feat: Formatting files
michaelpri10 Jun 24, 2024
6339e52
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
c954ae1
feat: Publisher test changes
michaelpri10 Jun 24, 2024
fc163ab
test: Fix OpenTelemetry test
michaelpri10 Jun 24, 2024
6ab4cfe
Feat: Use OpenTelemetry semconv
michaelpri10 Jun 24, 2024
bc7530a
test: Fix some dependency issues
michaelpri10 Jun 24, 2024
77d56df
feat: Test fix
michaelpri10 Jun 24, 2024
e275efa
feat: Add comment for setter in builder
michaelpri10 Jun 25, 2024
456ac83
Opentelemetry subscribe (#2100)
michaelpri10 Jul 2, 2024
8bbd688
Merge branch 'main' into opentelemetry
michaelpri10 Jul 2, 2024
620e6b5
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
8668b92
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
gcf-owl-bot[bot] Jul 2, 2024
61257b8
Opentelemetry subscribe (#2101)
michaelpri10 Jul 2, 2024
b0e0424
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
e21a9f0
fix: Fix build errors in Publisher
michaelpri10 Jul 2, 2024
150ab74
test: Ignore org.assertj:assertj-core which is required for OTel test…
michaelpri10 Jul 8, 2024
f558305
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 8, 2024
e7b05de
test: Add tests for subscriber OTel functions
michaelpri10 Jul 9, 2024
2ab460c
Merge branch 'main' into opentelemetry
michaelpri10 Aug 28, 2024
6c5e03c
feat: Changes to OpenTelemetry implementation to add links earlier an…
michaelpri10 Aug 29, 2024
d257b6b
feat: Refactor OpenTelemetry implementation to use a context aware wr…
michaelpri10 Sep 12, 2024
ca4680f
feat: Initialize default no-op PubsubTracer in Publisher and Subscriber
michaelpri10 Sep 12, 2024
d07b493
feat: Ensure SubscriberStreamingConnection and MessageDispatcher have…
michaelpri10 Sep 12, 2024
22d95d4
samples: Add OpenTelemetry publisher and subscriber samples
michaelpri10 Sep 19, 2024
43d489f
feat: Add additional sampling checks to the Otel implementation
michaelpri10 Sep 19, 2024
79dd118
samples: Update pom.xml for samples with Cloud Trace exporter
michaelpri10 Sep 19, 2024
fe4753f
feat: Make OTel classes/methods package-private and remove non-generi…
michaelpri10 Sep 25, 2024
a08d169
feat: Lint fixes for Pub/Sub
michaelpri10 Sep 25, 2024
305610e
feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names
michaelpri10 Sep 26, 2024
e8dce75
feat: Format OTel changes
michaelpri10 Sep 26, 2024
acd208c
Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attrib…
michaelpri10 Sep 30, 2024
462dd8e
feat: trigger build
michaelpri10 Sep 30, 2024
d763421
Merge branch 'main' into opentelemetry
michaelpri10 Sep 30, 2024
c4ff119
chore: generate libraries at Mon Sep 30 20:37:03 UTC 2024
cloud-java-bot Sep 30, 2024
c711ea7
feat: trigger build
michaelpri10 Sep 30, 2024
2f2ab5d
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
michaelpri10 Sep 30, 2024
8490bdf
feat: Fix file overwrite from bad merge
michaelpri10 Sep 30, 2024
5ebbbf9
chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024
cloud-java-bot Sep 30, 2024
30d1c20
Revert "chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024"
michaelpri10 Sep 30, 2024
23f3a70
chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024
cloud-java-bot Sep 30, 2024
ea88412
Revert "chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024"
michaelpri10 Sep 30, 2024
d00b4aa
chore: generate libraries at Mon Sep 30 21:14:11 UTC 2024
cloud-java-bot Sep 30, 2024
c776056
feat: Prevent new files for OpenTelemetry from being overwritten
michaelpri10 Sep 30, 2024
110b46c
feat: Revert automated file deletion for OpenTelemetry changes
michaelpri10 Sep 30, 2024
149f3e7
feat: Remove OpenTelemetry samples as the samples use a released libr…
michaelpri10 Sep 30, 2024
4259646
chore: generate libraries at Mon Sep 30 22:11:14 UTC 2024
cloud-java-bot Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/.OwlBot-hermetic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java"
- "/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StatusUtilTest.java"
Expand All @@ -51,8 +52,10 @@ deep-preserve-regex:
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PublisherInterface.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java"
- "/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java"
Expand Down
28 changes: 28 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@
<artifactId>google-http-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -142,6 +154,21 @@
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
michaelpri10 marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down Expand Up @@ -174,6 +201,7 @@
<ignoredUnusedDeclaredDependency>com.google.auth:google-auth-library-oauth2-http:jar</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opencensus:opencensus-impl</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.assertj:assertj-core</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
private PubsubMessageWrapper messageWrapper;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
this.messageWrapper = builder.messageWrapper;
}

public String getAckId() {
Expand All @@ -36,6 +38,17 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

/**
* Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
* that use this method to be unit tested.
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null).build();
}
return messageWrapper;
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
Expand Down Expand Up @@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
private PubsubMessageWrapper messageWrapper;

protected Builder(String ackId) {
this.ackId = ackId;
Expand All @@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
return this;
}

public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
this.messageWrapper = messageWrapper;
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,6 +161,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
forget();
}

Expand All @@ -169,9 +174,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -217,6 +224,12 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
if (builder.tracer != null) {
tracer = builder.tracer;
}
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}

public PubsubMessageWrapper messageWrapper() {
return this.ackHandler.ackRequestData.getMessageWrapper();
}
}

private static class ReceiptCompleteData {
Expand Down Expand Up @@ -390,10 +405,20 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt())
.build();
builder.setMessageWrapper(messageWrapper);
tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
Expand Down Expand Up @@ -457,30 +482,40 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
tracer.setSubscribeConcurrencyControlSpanException(
message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
addDeliveryInfoCount(message.messageWrapper());
processOutstandingMessage(message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
messageWrapper.setPubsubMessage(
PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build());
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
private void processOutstandingMessage(final AckHandler ackHandler) {
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
// AckHandler object.
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
PubsubMessage message = messageWrapper.getPubsubMessage();

// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
Expand All @@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
tracer.setSubscriberSpanExpirationResult(messageWrapper);
return;
}
tracer.startSubscribeProcessSpan(messageWrapper);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
tracer.startSubscribeSchedulerSpan(messageWrapper);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
tracer.endSubscribeSchedulerSpan(messageWrapper);
}
}

Expand Down Expand Up @@ -607,8 +646,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down Expand Up @@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
this.tracer = tracer;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Loading
Loading