diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java index 6efdf0ecf995..89ff4abb0c6d 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java @@ -20,7 +20,7 @@ public final class ContextPropagationDebug { // locations where the context was propagated to another thread (tracking multiple steps is // helpful in akka where there is so much recursive async spawning of new work) - private static final ContextKey> THREAD_PROPAGATION_LOCATIONS = + private static final ContextKey> THREAD_PROPAGATION_LOCATIONS = ContextKey.named("thread-propagation-locations"); private static final boolean THREAD_PROPAGATION_DEBUGGER = @@ -36,13 +36,14 @@ public static boolean isThreadPropagationDebuggerEnabled() { return THREAD_PROPAGATION_DEBUGGER; } - public static Context appendLocations(Context context, StackTraceElement[] locations) { - List currentLocations = ContextPropagationDebug.getLocations(context); + public static Context appendLocations( + Context context, StackTraceElement[] locations, Object carrier) { + List currentLocations = ContextPropagationDebug.getPropagations(context); if (currentLocations == null) { currentLocations = new CopyOnWriteArrayList<>(); context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations); } - currentLocations.add(0, locations); + currentLocations.add(0, new Propagation(carrier.getClass().getName(), locations)); return context; } @@ -67,18 +68,20 @@ public static void debugContextLeakIfEnabled() { } } - private static List getLocations(Context context) { + private static List getPropagations(Context context) { return context.get(THREAD_PROPAGATION_LOCATIONS); } private static void debugContextPropagation(Context context) { - List locations = getLocations(context); - if (locations != null) { + List propagations = getPropagations(context); + if (propagations != null) { StringBuilder sb = new StringBuilder(); - Iterator i = locations.iterator(); + Iterator i = propagations.iterator(); while (i.hasNext()) { - for (StackTraceElement ste : i.next()) { - sb.append("\n"); + Propagation entry = i.next(); + sb.append("\ncarrier of type: ").append(entry.carrierClassName); + for (StackTraceElement ste : entry.location) { + sb.append("\n "); sb.append(ste); } if (i.hasNext()) { @@ -89,5 +92,15 @@ private static void debugContextPropagation(Context context) { } } + private static class Propagation { + public final String carrierClassName; + public final StackTraceElement[] location; + + public Propagation(String carrierClassName, StackTraceElement[] location) { + this.carrierClassName = carrierClassName; + this.location = location; + } + } + private ContextPropagationDebug() {} } diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy index a04ef5bca7b9..42aa0d26fd3e 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy +++ b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy @@ -14,6 +14,7 @@ import akka.http.javadsl.model.headers.RawHeader import akka.stream.ActorMaterializer import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.base.HttpClientTest +import io.opentelemetry.instrumentation.test.base.SingleConnection import spock.lang.Shared class AkkaHttpClientInstrumentationTest extends HttpClientTest implements AgentTestTrait { @@ -67,8 +68,10 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest impl } @Override - boolean testCausality() { - false + SingleConnection createSingleConnection(String host, int port) { + // singleConnection test would require instrumentation to support requests made through pools + // (newHostConnectionPool, superPool, etc), which is currently not supported. + return null } def "singleRequest exception trace"() { diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy index 77dac38131bb..044c43c906bd 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy +++ b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy @@ -23,6 +23,11 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest String expectedServerSpanName(ServerEndpoint endpoint) { return "akka.request" } + + @Override + boolean testConcurrency() { + return true + } } class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest { diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala index 9b7e695142c3..bdebd6b470f3 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala +++ b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala @@ -31,7 +31,13 @@ object AkkaHttpTestAsyncWebServer { def doCall(): HttpResponse = { val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain" endpoint match { - case SUCCESS => resp.withEntity(endpoint.getBody) + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case REDIRECT => resp.withHeaders(headers.Location(endpoint.getBody)) diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala index 010a820d50b9..bae57cd00add 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala +++ b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala @@ -29,7 +29,13 @@ object AkkaHttpTestSyncWebServer { def doCall(): HttpResponse = { val resp = HttpResponse(status = endpoint.getStatus) endpoint match { - case SUCCESS => resp.withEntity(endpoint.getBody) + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case REDIRECT => resp.withHeaders(headers.Location(endpoint.getBody)) diff --git a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java index b7ea1014af01..3c7db807988b 100644 --- a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java +++ b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java @@ -52,6 +52,13 @@ protected Boolean computeValue(Class taskClass) { return false; } + // This is a Mailbox created by akka.dispatch.Dispatcher#createMailbox. We must not add + // a context to it as context should only be carried by individual envelopes in the queue + // of this mailbox. + if (taskClass.getName().equals("akka.dispatch.Dispatcher$$anon$1")) { + return false; + } + Class enclosingClass = taskClass.getEnclosingClass(); if (enclosingClass != null) { // Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to @@ -144,7 +151,8 @@ public static boolean shouldAttachStateToTask(Object task) { public static State setupState(ContextStore contextStore, T task, Context context) { State state = contextStore.putIfAbsent(task, State.FACTORY); if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) { - context = ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace()); + context = + ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace(), task); } state.setParentContext(context); return state;