Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move async propagation API from scope to tracer #8231

Merged
merged 9 commits into from
Jan 21, 2025
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand All @@ -26,7 +27,7 @@ public static AgentScope startTaskScope(State state) {
final AgentScope.Continuation continuation = state.getAndResetContinuation();
if (continuation != null) {
final AgentScope scope = continuation.activate();
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
// important - stop timing after the scope has been activated so the time in the queue can
// be attributed to the correct context without duplicating the propagated information
state.stopTiming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

/**
* Test executor instrumentation for Akka specific classes.
* This is to large extent a copy of ExecutorInstrumentationTest.
Expand Down Expand Up @@ -45,7 +46,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new AkkaAsyncChild())
// this child won't
Expand Down Expand Up @@ -101,7 +102,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
dispatcher.execute(new AkkaAsyncChild())
// this child won't
Expand Down Expand Up @@ -132,7 +133,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
// Our current instrumentation instrumentation does not behave very well
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.apachehttpasyncclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.apachehttpasyncclient.ApacheHttpAsyncClientDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void completed(final T result) {
completeDelegate(result);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
completeDelegate(result);
}
}
Expand All @@ -56,7 +57,7 @@ public void failed(final Exception ex) {
failDelegate(ex);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
failDelegate(ex);
}
}
Expand All @@ -72,7 +73,7 @@ public void cancelled() {
cancelDelegate();
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
cancelDelegate();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.apachehttpclient5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void completed(final T result) {
completeDelegate(result);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
completeDelegate(result);
}
}
Expand All @@ -59,7 +60,7 @@ public void failed(final Exception ex) {
failDelegate(ex);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
failDelegate(ex);
}
}
Expand All @@ -75,7 +76,7 @@ public void cancelled() {
cancelDelegate();
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
cancelDelegate();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package datadog.trace.instrumentation.hystrix;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
Expand All @@ -35,23 +35,17 @@ public static class EnableAsyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean enableAsyncTracking() {
final AgentScope scope = activeScope();
if (scope != null) {
if (!scope.isAsyncPropagating()) {
scope.setAsyncPropagation(true);
return true;
}
if (!isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(true);
return true;
}
return false;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void disableAsyncTracking(@Advice.Enter final boolean wasEnabled) {
if (wasEnabled) {
final AgentScope scope = activeScope();
if (scope != null) {
scope.setAsyncPropagation(false);
}
setAsyncPropagationEnabled(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.function.Supplier

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

/**
* Note: ideally this should live with the rest of ExecutorInstrumentationTest,
Expand Down Expand Up @@ -45,7 +45,7 @@ class CompletableFutureTest extends AgentTestRunner {
@Trace(operationName = "parent")
CompletableFuture<String> get() {
try {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

class VirtualThreadTest extends AgentTestRunner {
@Shared
Expand Down Expand Up @@ -37,7 +37,7 @@ class VirtualThreadTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -177,19 +177,18 @@ public void methodAdvice(MethodTransformer transformer) {
public static class DisableAsyncAdvice {

@Advice.OnMethodEnter
public static AgentScope before() {
AgentScope scope = activeScope();
if (null != scope && scope.isAsyncPropagating()) {
scope.setAsyncPropagation(false);
return scope;
public static boolean before() {
if (isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(false);
return true;
}
return null;
return false;
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(@Advice.Enter AgentScope scope) {
if (null != scope) {
scope.setAsyncPropagation(true);
public static void after(@Advice.Enter boolean wasDisabled) {
if (wasDisabled) {
setAsyncPropagationEnabled(true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

abstract class ExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -82,7 +82,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -256,7 +256,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
def future = m(pool, task)
sleep(500)
future.cancel(true)
Expand Down Expand Up @@ -317,7 +317,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
pool.execute(new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -368,7 +368,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
m(pool, w(child))
}
}.run()
Expand Down Expand Up @@ -406,7 +406,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
final JavaAsyncChild child = new JavaAsyncChild(false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

class NettyExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -65,7 +65,7 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -213,7 +213,7 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
final JavaAsyncChild child = new JavaAsyncChild(false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

class RejectedExecutionTest extends AgentTestRunner {

Expand Down Expand Up @@ -193,7 +193,7 @@ class RejectedExecutionTest extends AgentTestRunner {

when:
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// must be rejected because the queue will be full until some
// time after the first task is released
executor.submit((Runnable) new JavaAsyncChild(true, false))
Expand Down Expand Up @@ -237,7 +237,7 @@ class RejectedExecutionTest extends AgentTestRunner {

return {
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
pool.submit({})
}
}
Expand All @@ -261,7 +261,7 @@ class RejectedExecutionTest extends AgentTestRunner {

return {
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// must be rejected because the queue will be full until some
// time after the first task is released
def testTask = new JavaAsyncChild(true, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

class JettyExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -46,7 +46,7 @@ class JettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down
Loading
Loading