Skip to content

Commit

Permalink
Fix UnsupportedOperationException happening when reactor-rabbitmq is …
Browse files Browse the repository at this point in the history
…used (#3381)
  • Loading branch information
Mateusz Rzeszutek authored Jun 22, 2021
1 parent 0788a03 commit 2c2c19d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ dependencies {
testLibrary ("org.springframework.amqp:spring-rabbit:1.1.0.RELEASE") {
exclude group: 'com.rabbitmq', module: 'amqp-client'
}

testInstrumentation project(':instrumentation:reactor-3.1:javaagent')

testLibrary 'io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RELEASE'
}

tasks.withType(Test).configureEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public ElementMatcher<ClassLoader> classLoaderOptimization() {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("com.rabbitmq.client.Channel"));
return implementsInterface(named("com.rabbitmq.client.Channel"))
// broken implementation that throws UnsupportedOperationException on getConnection() calls
.and(not(named("reactor.rabbitmq.ChannelProxy")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
Expand All @@ -23,42 +22,24 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared

class RabbitMQTest extends AgentInstrumentationSpecification {
class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait {

@Shared
def rabbitMQContainer
@Shared
InetSocketAddress rabbitmqAddress

ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port)
Connection conn = factory.newConnection()
Connection conn = connectionFactory.newConnection()
Channel channel = conn.createChannel()

def setupSpec() {
rabbitMQContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
rabbitMQContainer.start()
rabbitmqAddress = new InetSocketAddress(
rabbitMQContainer.containerIpAddress,
rabbitMQContainer.getMappedPort(5672)
)
startRabbit()
}

def cleanupSpec() {
if (rabbitMQContainer) {
rabbitMQContainer.stop()
}
stopRabbit()
}

def cleanup() {
Expand Down Expand Up @@ -266,7 +247,7 @@ class RabbitMQTest extends AgentInstrumentationSpecification {
def "test spring rabbit"() {
setup:
def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port)
def connectionFactory = new CachingConnectionFactory(connectionFactory)
AmqpAdmin admin = new RabbitAdmin(connectionFactory)
def queue = new Queue("some-routing-queue", false, true, true, null)
admin.declareQueue(queue)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.rabbitmq.ExchangeSpecification
import reactor.rabbitmq.RabbitFlux
import reactor.rabbitmq.SenderOptions

class ReactorRabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait {

def setupSpec() {
startRabbit()
}

def cleanupSpec() {
stopRabbit()
}

def "should not fail declaring exchange"() {
given:
def sender = RabbitFlux.createSender(new SenderOptions().connectionFactory(connectionFactory))

when:
sender.declareExchange(ExchangeSpecification.exchange("testExchange"))
.block()

then:
noExceptionThrown()

assertTraces(1) {
trace(0, 1) {
span(0) {
name 'exchange.declare'
kind SpanKind.CLIENT
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" { it == null || it instanceof String }
"${SemanticAttributes.NET_PEER_IP.key}" String
"${SemanticAttributes.NET_PEER_PORT.key}" { it == null || it instanceof Long }
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
"rabbitmq.command" "exchange.declare"
}
}
}
}

cleanup:
sender?.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import com.rabbitmq.client.ConnectionFactory
import java.time.Duration
import org.testcontainers.containers.GenericContainer

trait WithRabbitMqTrait {

static GenericContainer rabbitMqContainer
static ConnectionFactory connectionFactory

def startRabbit() {
rabbitMqContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
rabbitMqContainer.start()

connectionFactory = new ConnectionFactory(
host: rabbitMqContainer.containerIpAddress,
port: rabbitMqContainer.getMappedPort(5672)
)
}

def stopRabbit() {
rabbitMqContainer?.stop()
}
}

0 comments on commit 2c2c19d

Please sign in to comment.