Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry tracing to the Publisher and Subscriber #2086

Merged
merged 51 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
903895a
feat: Initial publish side Open Telemetry support
michaelpri10 Jun 17, 2024
469f220
feat: Publish-side trace context injection
michaelpri10 Jun 20, 2024
ad31b53
feat: Tests and improvements to publish side OTel tracing
michaelpri10 Jun 24, 2024
9246689
feat: More tests and refactoring for publish-side OpenTelemetry
michaelpri10 Jun 24, 2024
34594cb
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
901e135
feat: Formatting files
michaelpri10 Jun 24, 2024
6339e52
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
c954ae1
feat: Publisher test changes
michaelpri10 Jun 24, 2024
fc163ab
test: Fix OpenTelemetry test
michaelpri10 Jun 24, 2024
6ab4cfe
Feat: Use OpenTelemetry semconv
michaelpri10 Jun 24, 2024
bc7530a
test: Fix some dependency issues
michaelpri10 Jun 24, 2024
77d56df
feat: Test fix
michaelpri10 Jun 24, 2024
e275efa
feat: Add comment for setter in builder
michaelpri10 Jun 25, 2024
456ac83
Opentelemetry subscribe (#2100)
michaelpri10 Jul 2, 2024
8bbd688
Merge branch 'main' into opentelemetry
michaelpri10 Jul 2, 2024
620e6b5
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
8668b92
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
gcf-owl-bot[bot] Jul 2, 2024
61257b8
Opentelemetry subscribe (#2101)
michaelpri10 Jul 2, 2024
b0e0424
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
e21a9f0
fix: Fix build errors in Publisher
michaelpri10 Jul 2, 2024
150ab74
test: Ignore org.assertj:assertj-core which is required for OTel test…
michaelpri10 Jul 8, 2024
f558305
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 8, 2024
e7b05de
test: Add tests for subscriber OTel functions
michaelpri10 Jul 9, 2024
2ab460c
Merge branch 'main' into opentelemetry
michaelpri10 Aug 28, 2024
6c5e03c
feat: Changes to OpenTelemetry implementation to add links earlier an…
michaelpri10 Aug 29, 2024
d257b6b
feat: Refactor OpenTelemetry implementation to use a context aware wr…
michaelpri10 Sep 12, 2024
ca4680f
feat: Initialize default no-op PubsubTracer in Publisher and Subscriber
michaelpri10 Sep 12, 2024
d07b493
feat: Ensure SubscriberStreamingConnection and MessageDispatcher have…
michaelpri10 Sep 12, 2024
22d95d4
samples: Add OpenTelemetry publisher and subscriber samples
michaelpri10 Sep 19, 2024
43d489f
feat: Add additional sampling checks to the Otel implementation
michaelpri10 Sep 19, 2024
79dd118
samples: Update pom.xml for samples with Cloud Trace exporter
michaelpri10 Sep 19, 2024
fe4753f
feat: Make OTel classes/methods package-private and remove non-generi…
michaelpri10 Sep 25, 2024
a08d169
feat: Lint fixes for Pub/Sub
michaelpri10 Sep 25, 2024
305610e
feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names
michaelpri10 Sep 26, 2024
e8dce75
feat: Format OTel changes
michaelpri10 Sep 26, 2024
acd208c
Revert "feat: Use MessagingIncubatingAttributes for gcp_pubsub attrib…
michaelpri10 Sep 30, 2024
462dd8e
feat: trigger build
michaelpri10 Sep 30, 2024
d763421
Merge branch 'main' into opentelemetry
michaelpri10 Sep 30, 2024
c4ff119
chore: generate libraries at Mon Sep 30 20:37:03 UTC 2024
cloud-java-bot Sep 30, 2024
c711ea7
feat: trigger build
michaelpri10 Sep 30, 2024
2f2ab5d
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
michaelpri10 Sep 30, 2024
8490bdf
feat: Fix file overwrite from bad merge
michaelpri10 Sep 30, 2024
5ebbbf9
chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024
cloud-java-bot Sep 30, 2024
30d1c20
Revert "chore: generate libraries at Mon Sep 30 20:49:40 UTC 2024"
michaelpri10 Sep 30, 2024
23f3a70
chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024
cloud-java-bot Sep 30, 2024
ea88412
Revert "chore: generate libraries at Mon Sep 30 21:03:31 UTC 2024"
michaelpri10 Sep 30, 2024
d00b4aa
chore: generate libraries at Mon Sep 30 21:14:11 UTC 2024
cloud-java-bot Sep 30, 2024
c776056
feat: Prevent new files for OpenTelemetry from being overwritten
michaelpri10 Sep 30, 2024
110b46c
feat: Revert automated file deletion for OpenTelemetry changes
michaelpri10 Sep 30, 2024
149f3e7
feat: Remove OpenTelemetry samples as the samples use a released libr…
michaelpri10 Sep 30, 2024
4259646
chore: generate libraries at Mon Sep 30 22:11:14 UTC 2024
cloud-java-bot Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Refactor OpenTelemetry implementation to use a context aware wr…
…apper for the tracer and a PubsubTracer interface
  • Loading branch information
michaelpri10 committed Sep 12, 2024
commit d257b6bb59ecbc607b1333580d615cff79898c85
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null, false).build();
return PubsubMessageWrapper.newBuilder(null, null).build();
}
return messageWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -107,7 +106,7 @@ class MessageDispatcher {

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;
private final PubsubTracer tracer;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
Expand Down Expand Up @@ -162,7 +161,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
forget();
}

Expand All @@ -175,11 +174,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -409,11 +408,10 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
message.getMessage(),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
message.getDeliveryAttempt())
.build();
builder.setMessageWrapper(messageWrapper);
messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());
tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
Expand Down Expand Up @@ -482,13 +480,14 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
try {
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
message.messageWrapper().endSubscribeConcurrencyControlSpan();
tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
tracer.setSubscribeConcurrencyControlSpanException(
message.messageWrapper(), unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
addDeliveryInfoCount(message.messageWrapper());
Expand Down Expand Up @@ -533,10 +532,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
messageWrapper.setSubscriberSpanExpirationResult();
tracer.setSubscriberSpanExpirationResult(messageWrapper);
return;
}
messageWrapper.startSubscribeProcessSpan(tracer);
tracer.startSubscribeProcessSpan(messageWrapper);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -557,9 +556,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
messageWrapper.startSubscribeSchedulerSpan(tracer);
tracer.startSubscribeSchedulerSpan(messageWrapper);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
messageWrapper.endSubscribeSchedulerSpan();
tracer.endSubscribeSchedulerSpan(messageWrapper);
}
}

Expand Down Expand Up @@ -687,7 +686,7 @@ public static final class Builder {

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;
private PubsubTracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
Expand Down Expand Up @@ -769,7 +768,7 @@ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing)
return this;
}

public Builder setTracer(Tracer tracer) {
public Builder setTracer(PubsubTracer tracer) {
this.tracer = tracer;
return this;
}
Expand Down
Loading
Loading