Skip to content

Commit

Permalink
Akka propagation fix and concurrency tests
Browse files Browse the repository at this point in the history
  • Loading branch information
agoallikmaa committed May 27, 2021
1 parent df078f9 commit 0fa4f97
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class ContextPropagationDebug {

// locations where the context was propagated to another thread (tracking multiple steps is
// helpful in akka where there is so much recursive async spawning of new work)
private static final ContextKey<List<StackTraceElement[]>> THREAD_PROPAGATION_LOCATIONS =
private static final ContextKey<List<Propagation>> THREAD_PROPAGATION_LOCATIONS =
ContextKey.named("thread-propagation-locations");

private static final boolean THREAD_PROPAGATION_DEBUGGER =
Expand All @@ -36,13 +36,14 @@ public static boolean isThreadPropagationDebuggerEnabled() {
return THREAD_PROPAGATION_DEBUGGER;
}

public static Context appendLocations(Context context, StackTraceElement[] locations) {
List<StackTraceElement[]> currentLocations = ContextPropagationDebug.getLocations(context);
public static Context appendLocations(
Context context, StackTraceElement[] locations, Object carrier) {
List<Propagation> currentLocations = ContextPropagationDebug.getLocations(context);
if (currentLocations == null) {
currentLocations = new CopyOnWriteArrayList<>();
context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations);
}
currentLocations.add(0, locations);
currentLocations.add(0, new Propagation(carrier.getClass().getName(), locations));
return context;
}

Expand All @@ -67,18 +68,20 @@ public static void debugContextLeakIfEnabled() {
}
}

private static List<StackTraceElement[]> getLocations(Context context) {
private static List<Propagation> getLocations(Context context) {
return context.get(THREAD_PROPAGATION_LOCATIONS);
}

private static void debugContextPropagation(Context context) {
List<StackTraceElement[]> locations = getLocations(context);
List<Propagation> locations = getLocations(context);
if (locations != null) {
StringBuilder sb = new StringBuilder();
Iterator<StackTraceElement[]> i = locations.iterator();
Iterator<Propagation> i = locations.iterator();
while (i.hasNext()) {
for (StackTraceElement ste : i.next()) {
sb.append("\n");
Propagation entry = i.next();
sb.append("\ncarrier of type: ").append(entry.carrierClassName);
for (StackTraceElement ste : entry.location) {
sb.append("\n ");
sb.append(ste);
}
if (i.hasNext()) {
Expand All @@ -89,5 +92,15 @@ private static void debugContextPropagation(Context context) {
}
}

private static class Propagation {
public final String carrierClassName;
public final StackTraceElement[] location;

public Propagation(String carrierClassName, StackTraceElement[] location) {
this.carrierClassName = carrierClassName;
this.location = location;
}
}

private ContextPropagationDebug() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.http.javadsl.model.headers.RawHeader
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import spock.lang.Shared

class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
Expand Down Expand Up @@ -67,8 +68,10 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
}

@Override
boolean testCausality() {
false
SingleConnection createSingleConnection(String host, int port) {
// singleConnection test would require instrumentation to support requests made through pools
// (newHostConnectionPool, superPool, etc), which is currently not supported.
return null
}

def "singleRequest exception trace"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest<Object>
String expectedServerSpanName(ServerEndpoint endpoint) {
return "akka.request"
}

@Override
boolean testConcurrency() {
return true
}
}

class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import groovy.lang.Closure
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint._
import io.opentelemetry.javaagent.testing.common.Java8BytecodeBridge

import scala.concurrent.{Await, ExecutionContextExecutor, Future}

Expand All @@ -31,7 +32,15 @@ object AkkaHttpTestAsyncWebServer {
def doCall(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain"
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
Java8BytecodeBridge
.currentSpan()
.setAttribute(
"test.request.id",
uri.query().get("id").orNull.toLong
)
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import groovy.lang.Closure
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint._
import io.opentelemetry.javaagent.testing.common.Java8BytecodeBridge

import scala.concurrent.Await

Expand All @@ -29,7 +30,15 @@ object AkkaHttpTestSyncWebServer {
def doCall(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus)
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
Java8BytecodeBridge
.currentSpan()
.setAttribute(
"test.request.id",
uri.query().get("id").orNull.toLong
)
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ protected Boolean computeValue(Class<?> taskClass) {
return false;
}

// This is a Mailbox created by akka.dispatch.Dispatcher#createMailbox. We must not add
// a context to it as context should only be carried by individual envelopes in the queue
// of this mailbox.
if (taskClass.getName().equals("akka.dispatch.Dispatcher$$anon$1")) {
return false;
}

Class<?> enclosingClass = taskClass.getEnclosingClass();
if (enclosingClass != null) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
Expand Down Expand Up @@ -144,7 +151,8 @@ public static boolean shouldAttachStateToTask(Object task) {
public static <T> State setupState(ContextStore<T, State> contextStore, T task, Context context) {
State state = contextStore.putIfAbsent(task, State.FACTORY);
if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) {
context = ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace());
context =
ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace(), task);
}
state.setParentContext(context);
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.testing.common;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;

/**
Expand All @@ -18,4 +19,9 @@ public final class Java8BytecodeBridge {
public static Context currentContext() {
return Context.current();
}

/** Calls {@link Span#current()}. */
public static Span currentSpan() {
return Span.current();
}
}

0 comments on commit 0fa4f97

Please sign in to comment.