From 484109aec84518ab04eeedcec3a980bcb470c334 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 13 Apr 2021 09:47:27 -0400 Subject: [PATCH] Add instrumentation for RxJava 3 --- .../javaagent/rxjava-3.0-javaagent.gradle | 19 + .../rxjava3/RxJava3InstrumentationModule.java | 58 ++ .../rxjava3/TracingAssemblyActivation.java | 27 + .../groovy/RxJava3SubscriptionTest.groovy | 11 + .../src/test/groovy/RxJava3Test.groovy | 10 + .../RxJava3WithSpanInstrumentationTest.groovy | 841 ++++++++++++++++++ .../rxjava3/TracedWithSpan.java | 53 ++ .../library/rxjava-3.0-library.gradle | 7 + .../rxjava3/RxJava3AsyncSpanEndStrategy.java | 135 +++ .../rxjava3/TracingAssembly.java | 279 ++++++ .../rxjava3/TracingCompletableObserver.java | 74 ++ .../rxjava3/TracingConditionalSubscriber.java | 88 ++ .../rxjava3/TracingMaybeObserver.java | 81 ++ .../rxjava3/TracingObserver.java | 80 ++ .../rxjava3/TracingParallelFlowable.java | 67 ++ .../rxjava3/TracingSingleObserver.java | 74 ++ .../rxjava3/TracingSubscriber.java | 80 ++ .../RxJava3AsyncSpanEndStrategyTest.groovy | 731 +++++++++++++++ .../groovy/RxJava3SubscriptionTest.groovy | 15 + .../src/test/groovy/RxJava3Test.groovy | 15 + .../testing/rxjava-3.0-testing.gradle | 13 + .../AbstractRxJava3SubscriptionTest.groovy | 54 ++ .../rxjava3/AbstractRxJava3Test.groovy | 371 ++++++++ settings.gradle | 3 + 24 files changed, 3186 insertions(+) create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3Test.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/rxjava-3.0-library.gradle create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingConditionalSubscriber.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingObserver.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingParallelFlowable.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSubscriber.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/testing/rxjava-3.0-testing.gradle create mode 100644 instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle new file mode 100644 index 000000000000..b6f6423781dd --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle @@ -0,0 +1,19 @@ +apply from: "$rootDir/gradle/instrumentation.gradle" + +muzzle { + pass { + group = "io.reactivex.rxjava3" + module = "rxjava" + versions = "[3.0.0,)" + assertInverse true + } +} + +dependencies { + library group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.0" + + implementation project(":instrumentation:rxjava:rxjava-3.0:library") + + testImplementation deps.opentelemetryExtAnnotations + testImplementation project(':instrumentation:rxjava:rxjava-3.0:testing') +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java new file mode 100644 index 000000000000..f4419c160505 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.tooling.InstrumentationModule; +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(InstrumentationModule.class) +public class RxJava3InstrumentationModule extends InstrumentationModule { + + public RxJava3InstrumentationModule() { + super("rxjava3"); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new PluginInstrumentation()); + } + + public static class PluginInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("io.reactivex.rxjava3.plugins.RxJavaPlugins"); + } + + @Override + public Map, String> transformers() { + return Collections.singletonMap( + isMethod(), RxJava3InstrumentationModule.class.getName() + "$RxJavaPluginsAdvice"); + } + } + + public static class RxJavaPluginsAdvice { + + // TODO(anuraaga): Replace with adding a type initializer to RxJavaPlugins + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2685 + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void activateOncePerClassloader() { + TracingAssemblyActivation.activate(RxJavaPlugins.class); + } + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java new file mode 100644 index 000000000000..92c4937e87d1 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import java.util.concurrent.atomic.AtomicBoolean; + +public final class TracingAssemblyActivation { + + private static final ClassValue activated = + new ClassValue() { + @Override + protected AtomicBoolean computeValue(Class type) { + return new AtomicBoolean(); + } + }; + + public static void activate(Class clz) { + if (activated.get(clz).compareAndSet(false, true)) { + TracingAssembly.enable(); + } + } + + private TracingAssemblyActivation() {} +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy new file mode 100644 index 000000000000..32b5b607f884 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -0,0 +1,11 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait { + +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3Test.groovy new file mode 100644 index 000000000000..1cd151d9fbd8 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3Test.groovy @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class RxJava3Test extends AbstractRxJava3Test implements AgentTestTrait { +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy new file mode 100644 index 000000000000..3a620d0d93ae --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -0,0 +1,841 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.instrumentation.rxjava3.TracedWithSpan +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.reactivex.rxjava3.core.Completable +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.observers.TestObserver +import io.reactivex.rxjava3.processors.UnicastProcessor +import io.reactivex.rxjava3.subjects.CompletableSubject +import io.reactivex.rxjava3.subjects.MaybeSubject +import io.reactivex.rxjava3.subjects.SingleSubject +import io.reactivex.rxjava3.subjects.UnicastSubject +import io.reactivex.rxjava3.subscribers.TestSubscriber +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription + +class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecification { + + def "should capture span for already completed Completable"() { + setup: + def observer = new TestObserver() + def source = Completable.complete() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Completable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Completable.error(error) + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Completable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed Maybe"() { + setup: + def observer = new TestObserver() + def source = Maybe.just("Value") + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already empty Maybe"() { + setup: + def observer = new TestObserver() + def source = Maybe.empty() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Maybe"() { + setup: + def source = MaybeSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onSuccess("Value") + observer.assertValue("Value") + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Maybe"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Maybe.error(error) + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Maybe"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = MaybeSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed Single"() { + setup: + def observer = new TestObserver() + def source = Single.just("Value") + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Single"() { + setup: + def source = SingleSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onSuccess("Value") + observer.assertValue("Value") + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Single"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Single.error(error) + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Single"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = SingleSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed Observable"() { + setup: + def observer = new TestObserver() + def source = Observable.just("Value") + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Observable"() { + setup: + def source = UnicastSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Observable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Observable.error(error) + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Observable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed Flowable"() { + setup: + def observer = new TestSubscriber() + def source = Flowable.just("Value") + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Flowable"() { + setup: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored Flowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestSubscriber() + def source = Flowable.error(error) + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Flowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for already completed ParallelFlowable"() { + setup: + def observer = new TestSubscriber() + def source = Flowable.just("Value") + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually completed ParallelFlowable"() { + setup: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for already errored ParallelFlowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestSubscriber() + def source = Flowable.error(error) + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored ParallelFlowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind SpanKind.INTERNAL + hasNoParent() + errored false + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Publisher"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind SpanKind.INTERNAL + hasNoParent() + errored true + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + static class CustomPublisher implements Publisher, Subscription { + Subscriber subscriber + + @Override + void subscribe(Subscriber subscriber) { + this.subscriber = subscriber + subscriber.onSubscribe(this) + } + + void onComplete() { + this.subscriber.onComplete() + } + + void onError(Throwable exception) { + this.subscriber.onError(exception) + } + + @Override + void request(long l) { } + + @Override + void cancel() { } + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java new file mode 100644 index 000000000000..65785fb5bf88 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.extension.annotations.WithSpan; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import org.reactivestreams.Publisher; + +public class TracedWithSpan { + + @WithSpan + public Completable completable(Completable source) { + return source; + } + + @WithSpan + public Maybe maybe(Maybe source) { + return source; + } + + @WithSpan + public Single single(Single source) { + return source; + } + + @WithSpan + public Observable observable(Observable source) { + return source; + } + + @WithSpan + public Flowable flowable(Flowable source) { + return source; + } + + @WithSpan + public ParallelFlowable parallelFlowable(ParallelFlowable source) { + return source; + } + + @WithSpan + public Publisher publisher(Publisher source) { + return source; + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/rxjava-3.0-library.gradle b/instrumentation/rxjava/rxjava-3.0/library/rxjava-3.0-library.gradle new file mode 100644 index 000000000000..d90c745145b3 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/rxjava-3.0-library.gradle @@ -0,0 +1,7 @@ +apply from: "$rootDir/gradle/instrumentation-library.gradle" + +dependencies { + library group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.12" + + testImplementation project(':instrumentation:rxjava:rxjava-3.0:testing') +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java new file mode 100644 index 000000000000..b62f638af2b3 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java @@ -0,0 +1,135 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Action; +import io.reactivex.rxjava3.functions.BiConsumer; +import io.reactivex.rxjava3.functions.Consumer; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.reactivestreams.Publisher; + +public enum RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy { + INSTANCE; + + @Override + public boolean supports(Class returnType) { + return returnType == Publisher.class + || returnType == Completable.class + || returnType == Maybe.class + || returnType == Single.class + || returnType == Observable.class + || returnType == Flowable.class + || returnType == ParallelFlowable.class; + } + + @Override + public Object end(BaseTracer tracer, Context context, Object returnValue) { + + EndOnFirstNotificationConsumer notificationConsumer = + new EndOnFirstNotificationConsumer<>(tracer, context); + if (returnValue instanceof Completable) { + return endWhenComplete((Completable) returnValue, notificationConsumer); + } else if (returnValue instanceof Maybe) { + return endWhenMaybeComplete((Maybe) returnValue, notificationConsumer); + } else if (returnValue instanceof Single) { + return endWhenSingleComplete((Single) returnValue, notificationConsumer); + } else if (returnValue instanceof Observable) { + return endWhenObservableComplete((Observable) returnValue, notificationConsumer); + } else if (returnValue instanceof ParallelFlowable) { + return endWhenFirstComplete((ParallelFlowable) returnValue, notificationConsumer); + } + return endWhenPublisherComplete((Publisher) returnValue, notificationConsumer); + } + + private Completable endWhenComplete( + Completable completable, EndOnFirstNotificationConsumer notificationConsumer) { + return completable.doOnEvent(notificationConsumer); + } + + private Maybe endWhenMaybeComplete( + Maybe maybe, EndOnFirstNotificationConsumer notificationConsumer) { + @SuppressWarnings("unchecked") + EndOnFirstNotificationConsumer typedConsumer = + (EndOnFirstNotificationConsumer) notificationConsumer; + return maybe.doOnEvent(typedConsumer); + } + + private Single endWhenSingleComplete( + Single single, EndOnFirstNotificationConsumer notificationConsumer) { + @SuppressWarnings("unchecked") + EndOnFirstNotificationConsumer typedConsumer = + (EndOnFirstNotificationConsumer) notificationConsumer; + return single.doOnEvent(typedConsumer); + } + + private Observable endWhenObservableComplete( + Observable observable, EndOnFirstNotificationConsumer notificationConsumer) { + return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + } + + private ParallelFlowable endWhenFirstComplete( + ParallelFlowable parallelFlowable, + EndOnFirstNotificationConsumer notificationConsumer) { + return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + } + + private Flowable endWhenPublisherComplete( + Publisher publisher, EndOnFirstNotificationConsumer notificationConsumer) { + return Flowable.fromPublisher(publisher) + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer); + } + + /** + * Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or + * OnError notifications are received. Multiple notifications can happen anytime multiple + * subscribers subscribe to the same publisher. + */ + private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + implements Action, Consumer, BiConsumer { + + private final BaseTracer tracer; + private final Context context; + + public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { + super(false); + this.tracer = tracer; + this.context = context; + } + + @Override + public void run() { + if (compareAndSet(false, true)) { + tracer.end(context); + } + } + + @Override + public void accept(Throwable exception) { + if (compareAndSet(false, true)) { + if (exception != null) { + tracer.endExceptionally(context, exception); + } else { + tracer.end(context); + } + } + } + + @Override + public void accept(T value, Throwable exception) { + accept(exception); + } + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java new file mode 100644 index 000000000000..b090d0b779ba --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java @@ -0,0 +1,279 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.functions.BiFunction; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.reactivestreams.Subscriber; + +/** + * rxjava3 library instrumentation. + * + *

In order to enable rxjava3 instrumentation one has to call the {@link + * TracingAssembly#enable()} method. + * + *

Instrumentation uses on*Assembly and on*Subscribe RxJavaPlugin hooks + * to wrap rxjava3 classes in their tracing equivalents. + * + *

Instrumentation can be disabled by calling the {@link TracingAssembly#disable()} method. + */ +public final class TracingAssembly { + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static BiFunction + oldOnObservableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static BiFunction< + ? super Completable, ? super CompletableObserver, ? extends CompletableObserver> + oldOnCompletableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static BiFunction + oldOnSingleSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static BiFunction + oldOnMaybeSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static BiFunction + oldOnFlowableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + private static Function + oldOnParallelAssembly; + + @GuardedBy("TracingAssembly.class") + private static boolean enabled; + + private TracingAssembly() {} + + public static synchronized void enable() { + if (enabled) { + return; + } + + enableObservable(); + + enableCompletable(); + + enableSingle(); + + enableMaybe(); + + enableFlowable(); + + enableParallel(); + + enableWithSpanStrategy(); + + enabled = true; + } + + public static synchronized void disable() { + if (!enabled) { + return; + } + + disableObservable(); + + disableCompletable(); + + disableSingle(); + + disableMaybe(); + + disableFlowable(); + + disableParallel(); + + disableWithSpanStrategy(); + + enabled = false; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableParallel() { + oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly(); + RxJavaPlugins.setOnParallelAssembly( + compose( + oldOnParallelAssembly, + parallelFlowable -> new TracingParallelFlowable(parallelFlowable, Context.current()))); + } + + private static void enableCompletable() { + oldOnCompletableSubscribe = RxJavaPlugins.getOnCompletableSubscribe(); + RxJavaPlugins.setOnCompletableSubscribe( + biCompose( + oldOnCompletableSubscribe, + (completable, observer) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingCompletableObserver(observer, context); + } + })); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableFlowable() { + oldOnFlowableSubscribe = RxJavaPlugins.getOnFlowableSubscribe(); + RxJavaPlugins.setOnFlowableSubscribe( + biCompose( + oldOnFlowableSubscribe, + (flowable, subscriber) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + if (subscriber instanceof ConditionalSubscriber) { + return new TracingConditionalSubscriber<>( + (ConditionalSubscriber) subscriber, context); + } else { + return new TracingSubscriber<>(subscriber, context); + } + } + })); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableObservable() { + oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); + RxJavaPlugins.setOnObservableSubscribe( + biCompose( + oldOnObservableSubscribe, + (observable, observer) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingObserver(observer, context); + } + })); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableSingle() { + oldOnSingleSubscribe = RxJavaPlugins.getOnSingleSubscribe(); + RxJavaPlugins.setOnSingleSubscribe( + biCompose( + oldOnSingleSubscribe, + (single, singleObserver) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingSingleObserver(singleObserver, context); + } + })); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableMaybe() { + oldOnMaybeSubscribe = RxJavaPlugins.getOnMaybeSubscribe(); + RxJavaPlugins.setOnMaybeSubscribe( + (BiFunction) + biCompose( + oldOnMaybeSubscribe, + (BiFunction) + (maybe, maybeObserver) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingMaybeObserver(maybeObserver, context); + } + })); + } + + private static void enableWithSpanStrategy() { + AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + } + + private static void disableParallel() { + RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly); + oldOnParallelAssembly = null; + } + + private static void disableObservable() { + RxJavaPlugins.setOnObservableSubscribe(oldOnObservableSubscribe); + oldOnObservableSubscribe = null; + } + + private static void disableCompletable() { + RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); + oldOnCompletableSubscribe = null; + } + + private static void disableFlowable() { + RxJavaPlugins.setOnFlowableSubscribe(oldOnFlowableSubscribe); + oldOnFlowableSubscribe = null; + } + + private static void disableSingle() { + RxJavaPlugins.setOnSingleSubscribe(oldOnSingleSubscribe); + oldOnSingleSubscribe = null; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void disableMaybe() { + RxJavaPlugins.setOnMaybeSubscribe( + (BiFunction) oldOnMaybeSubscribe); + oldOnMaybeSubscribe = null; + } + + private static void disableWithSpanStrategy() { + AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + } + + private static Function compose( + Function before, Function after) { + if (before == null) { + return after; + } + return (T v) -> after.apply(before.apply(v)); + } + + private static BiFunction biCompose( + BiFunction before, + BiFunction after) { + if (before == null) { + return after; + } + return (T v, U u) -> after.apply(v, before.apply(v, u)); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java new file mode 100644 index 000000000000..31f55f91b722 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +class TracingCompletableObserver implements CompletableObserver, Disposable { + + private final CompletableObserver actual; + private final Context context; + private Disposable disposable; + + TracingCompletableObserver(final CompletableObserver actual, final Context context) { + this.actual = actual; + this.context = context; + } + + @Override + public void onSubscribe(final Disposable d) { + if (!DisposableHelper.validate(disposable, d)) { + return; + } + disposable = d; + actual.onSubscribe(this); + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + actual.onComplete(); + } + } + + @Override + public void onError(final Throwable e) { + try (Scope ignored = context.makeCurrent()) { + actual.onError(e); + } + } + + @Override + public void dispose() { + disposable.dispose(); + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingConditionalSubscriber.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingConditionalSubscriber.java new file mode 100644 index 000000000000..f6624df2c941 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingConditionalSubscriber.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.internal.fuseable.QueueSubscription; +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber; + +class TracingConditionalSubscriber extends BasicFuseableConditionalSubscriber { + + // BasicFuseableConditionalSubscriber#actual has been renamed to downstream in newer versions, we + // can't use it in this class + private final ConditionalSubscriber wrappedSubscriber; + private final Context context; + + TracingConditionalSubscriber( + final ConditionalSubscriber actual, final Context context) { + super(actual); + this.wrappedSubscriber = actual; + this.context = context; + } + + @Override + public boolean tryOnNext(T t) { + try (Scope ignored = context.makeCurrent()) { + return wrappedSubscriber.tryOnNext(t); + } + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + final QueueSubscription qs = this.qs; + if (qs != null) { + final int m = qs.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qs.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java new file mode 100644 index 000000000000..dc6845d41bc3 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +class TracingMaybeObserver implements MaybeObserver, Disposable { + + private final MaybeObserver actual; + private final Context context; + private Disposable disposable; + + TracingMaybeObserver(final MaybeObserver actual, final Context context) { + this.actual = actual; + this.context = context; + } + + @Override + public void onSubscribe(final Disposable d) { + if (!DisposableHelper.validate(disposable, d)) { + return; + } + disposable = d; + actual.onSubscribe(this); + } + + @Override + public void onSuccess(final T t) { + try (Scope ignored = context.makeCurrent()) { + actual.onSuccess(t); + } + } + + @Override + public void onError(final Throwable e) { + try (Scope ignored = context.makeCurrent()) { + actual.onError(e); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + actual.onComplete(); + } + } + + @Override + public void dispose() { + disposable.dispose(); + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingObserver.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingObserver.java new file mode 100644 index 000000000000..77ca89594b96 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingObserver.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.internal.fuseable.QueueDisposable; +import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver; + +class TracingObserver extends BasicFuseableObserver { + + // BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it + // in this class + private final Observer wrappedObserver; + private final Context context; + + TracingObserver(final Observer actual, final Context context) { + super(actual); + this.wrappedObserver = actual; + this.context = context; + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + wrappedObserver.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + wrappedObserver.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + wrappedObserver.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + final QueueDisposable qd = this.qd; + if (qd != null) { + final int m = qd.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qd.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingParallelFlowable.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingParallelFlowable.java new file mode 100644 index 000000000000..ce38e09e49dc --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingParallelFlowable.java @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import org.reactivestreams.Subscriber; + +class TracingParallelFlowable extends ParallelFlowable { + + private final ParallelFlowable source; + private final Context context; + + TracingParallelFlowable(final ParallelFlowable source, final Context context) { + this.source = source; + this.context = context; + } + + @SuppressWarnings("unchecked") + @Override + public void subscribe(final Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + final int n = subscribers.length; + final Subscriber[] parents = new Subscriber[n]; + for (int i = 0; i < n; i++) { + final Subscriber z = subscribers[i]; + if (z instanceof ConditionalSubscriber) { + parents[i] = + new TracingConditionalSubscriber<>((ConditionalSubscriber) z, context); + } else { + parents[i] = new TracingSubscriber<>(z, context); + } + } + try (Scope ignored = context.makeCurrent()) { + source.subscribe(parents); + } + } + + @Override + public int parallelism() { + return source.parallelism(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java new file mode 100644 index 000000000000..7014f032101c --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +class TracingSingleObserver implements SingleObserver, Disposable { + + private final SingleObserver actual; + private final Context context; + private Disposable disposable; + + TracingSingleObserver(final SingleObserver actual, final Context context) { + this.actual = actual; + this.context = context; + } + + @Override + public void onSubscribe(final Disposable d) { + if (!DisposableHelper.validate(disposable, d)) { + return; + } + this.disposable = d; + actual.onSubscribe(this); + } + + @Override + public void onSuccess(final T t) { + try (Scope ignored = context.makeCurrent()) { + actual.onSuccess(t); + } + } + + @Override + public void onError(Throwable throwable) { + try (Scope ignored = context.makeCurrent()) { + actual.onError(throwable); + } + } + + @Override + public void dispose() { + disposable.dispose(); + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSubscriber.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSubscriber.java new file mode 100644 index 000000000000..073cea859b38 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSubscriber.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.internal.fuseable.QueueSubscription; +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber; +import org.reactivestreams.Subscriber; + +class TracingSubscriber extends BasicFuseableSubscriber { + + // BasicFuseableSubscriber#actual has been renamed to downstream in newer versions, we can't use + // it in this class + private final Subscriber wrappedSubscriber; + private final Context context; + + TracingSubscriber(final Subscriber actual, final Context context) { + super(actual); + this.wrappedSubscriber = actual; + this.context = context; + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + wrappedSubscriber.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + final QueueSubscription qs = this.qs; + if (qs != null) { + final int m = qs.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qs.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy new file mode 100644 index 000000000000..3df4f6cb8a10 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy @@ -0,0 +1,731 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.context.Context +import io.opentelemetry.instrumentation.api.tracer.BaseTracer +import io.opentelemetry.instrumentation.rxjava3.RxJava3AsyncSpanEndStrategy +import io.reactivex.rxjava3.core.Completable +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.observers.TestObserver +import io.reactivex.rxjava3.parallel.ParallelFlowable +import io.reactivex.rxjava3.processors.ReplayProcessor +import io.reactivex.rxjava3.processors.UnicastProcessor +import io.reactivex.rxjava3.subjects.CompletableSubject +import io.reactivex.rxjava3.subjects.MaybeSubject +import io.reactivex.rxjava3.subjects.ReplaySubject +import io.reactivex.rxjava3.subjects.SingleSubject +import io.reactivex.rxjava3.subjects.UnicastSubject +import io.reactivex.rxjava3.subscribers.TestSubscriber +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import spock.lang.Specification + +class RxJava3AsyncSpanEndStrategyTest extends Specification { + BaseTracer tracer + + Context context + + def underTest = RxJava3AsyncSpanEndStrategy.INSTANCE + + void setup() { + tracer = Mock() + context = Mock() + } + + static class CompletableTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Completable) + } + + def "ends span on already completed"() { + given: + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, Completable.complete()) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, Completable.error(exception)) + result.subscribe(observer) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span when completed"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = CompletableSubject.create() + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span once for multiple subscribers"() { + given: + def source = CompletableSubject.create() + def observer1 = new TestObserver() + def observer2 = new TestObserver() + def observer3 = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer1) + result.subscribe(observer2) + result.subscribe(observer3) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer1.assertComplete() + observer2.assertComplete() + observer3.assertComplete() + } + } + + static class MaybeTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Maybe) + } + + def "ends span on already completed"() { + given: + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, Maybe.just("Value")) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already empty"() { + given: + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, Maybe.empty()) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, Maybe.error(exception)) + result.subscribe(observer) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span when completed"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onSuccess("Value") + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when empty"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = MaybeSubject.create() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span once for multiple subscribers"() { + given: + def source = MaybeSubject.create() + def observer1 = new TestObserver() + def observer2 = new TestObserver() + def observer3 = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer1) + result.subscribe(observer2) + result.subscribe(observer3) + + then: + 0 * tracer._ + + when: + source.onSuccess("Value") + + then: + 1 * tracer.end(context) + observer1.assertValue("Value") + observer1.assertComplete() + observer2.assertValue("Value") + observer2.assertComplete() + observer3.assertValue("Value") + observer3.assertComplete() + } + } + + static class SingleTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Single) + } + + def "ends span on already completed"() { + given: + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, Single.just("Value")) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, Single.error(exception)) + result.subscribe(observer) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span when completed"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onSuccess("Value") + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = SingleSubject.create() + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span once for multiple subscribers"() { + given: + def source = SingleSubject.create() + def observer1 = new TestObserver() + def observer2 = new TestObserver() + def observer3 = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer1) + result.subscribe(observer2) + result.subscribe(observer3) + + then: + 0 * tracer._ + + when: + source.onSuccess("Value") + + then: + 1 * tracer.end(context) + observer1.assertValue("Value") + observer1.assertComplete() + observer2.assertValue("Value") + observer2.assertComplete() + observer3.assertValue("Value") + observer3.assertComplete() + } + } + + static class ObservableTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Observable) + } + + def "ends span on already completed"() { + given: + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, Observable.just("Value")) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, Observable.error(exception)) + result.subscribe(observer) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span when completed"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = UnicastSubject.create() + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span once for multiple subscribers"() { + given: + def source = ReplaySubject.create() + def observer1 = new TestObserver() + def observer2 = new TestObserver() + def observer3 = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer1) + result.subscribe(observer2) + result.subscribe(observer3) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer1.assertComplete() + observer2.assertComplete() + observer3.assertComplete() + } + } + + static class FlowableTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Flowable) + } + + def "ends span on already completed"() { + given: + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, Flowable.just("Value")) + result.subscribe(observer) + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, Flowable.error(exception)) + result.subscribe(observer) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span when completed"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + + def "ends span once for multiple subscribers"() { + given: + def source = ReplayProcessor.create() + def observer1 = new TestSubscriber() + def observer2 = new TestSubscriber() + def observer3 = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer1) + result.subscribe(observer2) + result.subscribe(observer3) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer1.assertComplete() + observer2.assertComplete() + observer3.assertComplete() + } + } + + static class ParallelFlowableTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(ParallelFlowable) + } + + def "ends span on already completed"() { + given: + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, Flowable.just("Value").parallel()) + result.sequential().subscribe(observer) + + then: + observer.assertComplete() + 1 * tracer.end(context) + } + + def "ends span on already errored"() { + given: + def exception = new IllegalStateException() + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, Flowable.error(exception).parallel()) + result.sequential().subscribe(observer) + + then: + observer.assertError(exception) + 1 * tracer.endExceptionally(context, exception) + } + + def "ends span when completed"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + observer.assertComplete() + 1 * tracer.end(context) + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + observer.assertError(exception) + 1 * tracer.endExceptionally(context, exception) + } + } + + static class PublisherTest extends RxJava3AsyncSpanEndStrategyTest { + def "is supported"() { + expect: + underTest.supports(Publisher) + } + + def "ends span when completed"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onComplete() + + then: + 1 * tracer.end(context) + observer.assertComplete() + } + + def "ends span when errored"() { + given: + def exception = new IllegalStateException() + def source = new CustomPublisher() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + source.onError(exception) + + then: + 1 * tracer.endExceptionally(context, exception) + observer.assertError(exception) + } + } + + static class CustomPublisher implements Publisher, Subscription { + Subscriber subscriber + + @Override + void subscribe(Subscriber subscriber) { + this.subscriber = subscriber + subscriber.onSubscribe(this) + } + + def onComplete() { + this.subscriber.onComplete() + } + + def onError(Throwable exception) { + this.subscriber.onError(exception) + } + + @Override + void request(long l) { } + + @Override + void cancel() { } + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy new file mode 100644 index 000000000000..0351dae24a8c --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -0,0 +1,15 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest +import io.opentelemetry.instrumentation.rxjava3.TracingAssembly +import io.opentelemetry.instrumentation.test.LibraryTestTrait + +class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait { + + def setupSpec() { + TracingAssembly.enable() + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy new file mode 100644 index 000000000000..4bcf431d69a4 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy @@ -0,0 +1,15 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test +import io.opentelemetry.instrumentation.rxjava3.TracingAssembly +import io.opentelemetry.instrumentation.test.LibraryTestTrait + +class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait { + + def setupSpec() { + TracingAssembly.enable() + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/testing/rxjava-3.0-testing.gradle b/instrumentation/rxjava/rxjava-3.0/testing/rxjava-3.0-testing.gradle new file mode 100644 index 000000000000..705f4ad9bbbc --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/testing/rxjava-3.0-testing.gradle @@ -0,0 +1,13 @@ +apply from: "$rootDir/gradle/java.gradle" + +dependencies { + api project(':testing-common') + + api group: 'io.reactivex.rxjava3', name: 'rxjava', version: "3.0.12" + + implementation deps.guava + + implementation deps.groovy + implementation deps.opentelemetryApi + implementation deps.spock +} diff --git a/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy new file mode 100644 index 000000000000..992a613185d2 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3 + +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.functions.Consumer + +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.instrumentation.test.InstrumentationSpecification + +import java.util.concurrent.CountDownLatch + +abstract class AbstractRxJava3SubscriptionTest extends InstrumentationSpecification { + + def "subscription test"() { + when: + CountDownLatch latch = new CountDownLatch(1) + runUnderTrace("parent") { + Single connection = Single.create { + it.onSuccess(new Connection()) + } + connection.subscribe(new Consumer() { + @Override + void accept(Connection t) { + t.query() + latch.countDown() + } + }) + } + latch.await() + + then: + assertTraces(1) { + trace(0, 2) { + basicSpan(it, 0, "parent") + basicSpan(it, 1, "Connection.query", span(0)) + } + } + } + + static class Connection { + static int query() { + def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan() + span.end() + return new Random().nextInt() + } + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy new file mode 100644 index 000000000000..5c5f0160d3c4 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy @@ -0,0 +1,371 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3 + +import io.reactivex.rxjava3.core.BackpressureStrategy +import io.reactivex.rxjava3.core.Completable +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.internal.operators.flowable.FlowablePublish +import io.reactivex.rxjava3.internal.operators.observable.ObservablePublish +import io.reactivex.rxjava3.schedulers.Schedulers + +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTraceWithoutExceptionCatch +import static java.util.concurrent.TimeUnit.MILLISECONDS + +import com.google.common.collect.Lists +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import spock.lang.Shared + +/** + *

Tests in this class may seem not exhaustive due to the fact that some classes are converted + * into others, ie. {@link Completable#toMaybe()}. Fortunately, RxJava3 uses helper classes like + * {@link io.reactivex.rxjava3.internal.operators.maybe.MaybeFromCompletable} and as a result we + * can test subscriptions and cancellations correctly. + */ +abstract class AbstractRxJava3Test extends InstrumentationSpecification { + + public static final String EXCEPTION_MESSAGE = "test exception" + + @Shared + def addOne = { i -> + addOneFunc(i) + } + + @Shared + def addTwo = { i -> + addTwoFunc(i) + } + + @Shared + def throwException = { + throw new RuntimeException(EXCEPTION_MESSAGE) + } + + static addOneFunc(int i) { + runUnderTrace("addOne") { + return i + 1 + } + } + + static addTwoFunc(int i) { + runUnderTrace("addTwo") { + return i + 2 + } + } + + def "Publisher '#name' test"() { + when: + def result = assemblePublisherUnderTrace(publisherSupplier) + + then: + result == expected + and: + assertTraces(1) { + sortSpansByStartTime() + trace(0, workSpans + 1) { + + basicSpan(it, 0, "publisher-parent") + for (int i = 1; i < workSpans + 1; ++i) { + basicSpan(it, i, "addOne", span(0)) + } + } + } + + where: + name | expected | workSpans | publisherSupplier + "basic maybe" | 2 | 1 | { -> Maybe.just(1).map(addOne) } + "two operations maybe" | 4 | 2 | { -> Maybe.just(2).map(addOne).map(addOne) } + "delayed maybe" | 4 | 1 | { -> + Maybe.just(3).delay(100, MILLISECONDS).map(addOne) + } + "delayed twice maybe" | 6 | 2 | { -> + Maybe.just(4).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne) + } + "basic flowable" | [6, 7] | 2 | { -> + Flowable.fromIterable([5, 6]).map(addOne) + } + "two operations flowable" | [8, 9] | 4 | { -> + Flowable.fromIterable([6, 7]).map(addOne).map(addOne) + } + "delayed flowable" | [8, 9] | 2 | { -> + Flowable.fromIterable([7, 8]).delay(100, MILLISECONDS).map(addOne) + } + "delayed twice flowable" | [10, 11] | 4 | { -> + Flowable.fromIterable([8, 9]).delay(100, MILLISECONDS).map(addOne).delay(100, MILLISECONDS).map(addOne) + } + "maybe from callable" | 12 | 2 | { -> + Maybe.fromCallable({ addOneFunc(10) }).map(addOne) + } + "basic single" | 1 | 1 | { -> Single.just(0).map(addOne) } + "basic observable" | [1] | 1 | { -> Observable.just(0).map(addOne) } + "connectable flowable" | [1] | 1 | { -> + FlowablePublish.just(0).delay(100, MILLISECONDS).map(addOne) + } + "connectable observable" | [1] | 1 | { -> + ObservablePublish.just(0).delay(100, MILLISECONDS).map(addOne) + } + } + + def "Publisher error '#name' test"() { + when: + assemblePublisherUnderTrace(publisherSupplier) + + then: + def thrownException = thrown RuntimeException + thrownException.message == EXCEPTION_MESSAGE + and: + assertTraces(1) { + sortSpansByStartTime() + trace(0, 1) { + // It's important that we don't attach errors at the Reactor level so that we don't + // impact the spans on reactor integrations such as netty and lettuce, as reactor is + // more of a context propagation mechanism than something we would be tracking for + // errors this is ok. + basicSpan(it, 0, "publisher-parent") + } + } + + where: + name | publisherSupplier + "maybe" | { -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE)) } + "flowable" | { -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE)) } + "single" | { -> Single.error(new RuntimeException(EXCEPTION_MESSAGE)) } + "observable" | { -> Observable.error(new RuntimeException(EXCEPTION_MESSAGE)) } + "completable" | { -> Completable.error(new RuntimeException(EXCEPTION_MESSAGE)) } + } + + def "Publisher step '#name' test"() { + when: + assemblePublisherUnderTrace(publisherSupplier) + + then: + def exception = thrown RuntimeException + exception.message == EXCEPTION_MESSAGE + and: + assertTraces(1) { + sortSpansByStartTime() + trace(0, workSpans + 1) { + // It's important that we don't attach errors at the Reactor level so that we don't + // impact the spans on reactor integrations such as netty and lettuce, as reactor is + // more of a context propagation mechanism than something we would be tracking for + // errors this is ok. + basicSpan(it, 0, "publisher-parent") + + for (int i = 1; i < workSpans + 1; i++) { + basicSpan(it, i, "addOne", span(0)) + } + } + } + + where: + name | workSpans | publisherSupplier + "basic maybe failure" | 1 | { -> + Maybe.just(1).map(addOne).map({ throwException() }) + } + "basic flowable failure" | 1 | { -> + Flowable.fromIterable([5, 6]).map(addOne).map({ throwException() }) + } + } + + def "Publisher '#name' cancel"() { + when: + cancelUnderTrace(publisherSupplier) + + then: + assertTraces(1) { + trace(0, 1) { + basicSpan(it, 0, "publisher-parent") + } + } + + where: + name | publisherSupplier + "basic maybe" | { -> Maybe.just(1) } + "basic flowable" | { -> Flowable.fromIterable([5, 6]) } + "basic single" | { -> Single.just(1) } + "basic completable" | { -> Completable.fromCallable({ -> 1 }) } + "basic observable" | { -> Observable.just(1) } + } + + def "Publisher chain spans have the correct parent for '#name'"() { + when: + assemblePublisherUnderTrace(publisherSupplier) + + then: + assertTraces(1) { + trace(0, workSpans + 1) { + basicSpan(it, 0, "publisher-parent") + + for (int i = 1; i < workSpans + 1; i++) { + basicSpan(it, i, "addOne", span(0)) + } + } + } + + where: + name | workSpans | publisherSupplier + "basic maybe" | 3 | { -> + Maybe.just(1).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne)) + } + "basic flowable" | 5 | { -> + Flowable.fromIterable([5, 6]).map(addOne).map(addOne).concatWith(Maybe.just(1).map(addOne).toFlowable()) + } + } + + def "Publisher chain spans have the correct parents from subscription time"() { + when: + def maybe = Maybe.just(42) + .map(addOne) + .map(addTwo) + + runUnderTrace("trace-parent") { + maybe.blockingGet() + } + + then: + assertTraces(1) { + trace(0, 3) { + sortSpansByStartTime() + basicSpan(it, 0, "trace-parent") + basicSpan(it, 1, "addOne", span(0)) + basicSpan(it, 2, "addTwo", span(0)) + } + } + } + + def "Publisher chain spans have the correct parents from subscription time '#name'"() { + when: + assemblePublisherUnderTrace { + // The "add one" operations in the publisher created here should be children of the publisher-parent + def publisher = publisherSupplier() + + runUnderTrace("intermediate") { + if (publisher instanceof Maybe) { + return ((Maybe) publisher).map(addTwo) + } else if (publisher instanceof Flowable) { + return ((Flowable) publisher).map(addTwo) + } else if (publisher instanceof Single) { + return ((Single) publisher).map(addTwo) + } else if (publisher instanceof Observable) { + return ((Observable) publisher).map(addTwo) + } else if (publisher instanceof Completable) { + return ((Completable) publisher).toMaybe().map(addTwo) + } + throw new IllegalStateException("Unknown publisher type") + } + } + + then: + assertTraces(1) { + trace(0, 2 + 2 * workItems) { + sortSpansByStartTime() + basicSpan(it, 0, "publisher-parent") + basicSpan(it, 1, "intermediate", span(0)) + + for (int i = 2; i < 2 + 2 * workItems; i = i + 2) { + basicSpan(it, i, "addOne", span(0)) + basicSpan(it, i + 1, "addTwo", span(0)) + } + } + } + + where: + name | workItems | publisherSupplier + "basic maybe" | 1 | { -> Maybe.just(1).map(addOne) } + "basic flowable" | 2 | { -> Flowable.fromIterable([1, 2]).map(addOne) } + "basic single" | 1 | { -> Single.just(1).map(addOne) } + "basic observable" | 1 | { -> Observable.just(1).map(addOne) } + } + + def "Flowables produce the right number of results '#scheduler'"() { + when: + List values = runUnderTrace("flowable root") { + Flowable.fromIterable([1, 2, 3, 4]) + .parallel() + .runOn(scheduler) + .flatMap({ num -> + Maybe.just(num).map(addOne).toFlowable() + }) + .sequential() + .toList() + .blockingGet() + } + + then: + values.size() == 4 + assertTraces(1) { + trace(0, 5) { + basicSpan(it, 0, "flowable root") + for (int i = 1; i < values.size() + 1; i++) { + basicSpan(it, i, "addOne", span(0)) + } + } + } + + where: + scheduler << [Schedulers.newThread(), Schedulers.computation(), Schedulers.single(), Schedulers.trampoline()] + } + + def cancelUnderTrace(def publisherSupplier) { + runUnderTraceWithoutExceptionCatch("publisher-parent") { + def publisher = publisherSupplier() + if (publisher instanceof Maybe) { + publisher = publisher.toFlowable() + } else if (publisher instanceof Single) { + publisher = publisher.toFlowable() + } else if (publisher instanceof Completable) { + publisher = publisher.toFlowable() + } else if (publisher instanceof Observable) { + publisher = publisher.toFlowable(BackpressureStrategy.LATEST) + } + + publisher.subscribe(new Subscriber() { + void onSubscribe(Subscription subscription) { + subscription.cancel() + } + + void onNext(Integer t) { + } + + void onError(Throwable error) { + } + + void onComplete() { + } + }) + } + } + + @SuppressWarnings("unchecked") + def assemblePublisherUnderTrace(def publisherSupplier) { + // The "add two" operations below should be children of this span + runUnderTraceWithoutExceptionCatch("publisher-parent") { + def publisher = publisherSupplier() + + // Read all data from publisher + if (publisher instanceof Maybe) { + return ((Maybe) publisher).blockingGet() + } else if (publisher instanceof Flowable) { + return Lists.newArrayList(((Flowable) publisher).blockingIterable()) + } else if (publisher instanceof Single) { + return ((Single) publisher).blockingGet() + } else if (publisher instanceof Observable) { + return Lists.newArrayList(((Observable) publisher).blockingIterable()) + } else if (publisher instanceof Completable) { + return ((Completable) publisher).toMaybe().blockingGet() + } + + throw new RuntimeException("Unknown publisher: " + publisher) + } + } +} diff --git a/settings.gradle b/settings.gradle index 27b1978f5a37..0bffc5db36d8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -225,6 +225,9 @@ include ':instrumentation:rxjava:rxjava-1.0:library' include ':instrumentation:rxjava:rxjava-2.0:library' include ':instrumentation:rxjava:rxjava-2.0:testing' include ':instrumentation:rxjava:rxjava-2.0:javaagent' +include ':instrumentation:rxjava:rxjava-3.0:library' +include ':instrumentation:rxjava:rxjava-3.0:testing' +include ':instrumentation:rxjava:rxjava-3.0:javaagent' include ':instrumentation:scala-executors:javaagent' include ':instrumentation:servlet:glassfish-testing' include ':instrumentation:servlet:servlet-common:library'