Skip to content

Commit

Permalink
Implement ExtendedTextMapGetter in kafka-clients instrumentation (#13068
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xiepuhuan authored Jan 21, 2025
1 parent 946babb commit ab09fce
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception {
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
processAttributes("10", greeting, testHeaders)),
processAttributes("10", greeting, testHeaders, false)),
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
}

Expand Down Expand Up @@ -152,7 +152,8 @@ void testPassThroughTombstone()
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, null, false))));
.hasAttributesSatisfyingExactly(
processAttributes(null, null, false, false))));
}

@DisplayName("test records(TopicPartition) kafka consume")
Expand Down Expand Up @@ -203,6 +204,7 @@ void testRecordsWithTopicPartitionKafkaConsume()
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, greeting, false))));
.hasAttributesSatisfyingExactly(
processAttributes(null, greeting, false, false))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void testReadRemoteContextWhenPropagationIsDisabled() throws InterruptedExceptio
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasLinks(Collections.emptyList())
.hasAttributesSatisfyingExactly(processAttributes(null, message, false)),
.hasAttributesSatisfyingExactly(
processAttributes(null, message, false, false)),
span -> span.hasName("processing").hasParent(trace.getSpan(0))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -33,8 +34,19 @@ void testKafkaProduceAndConsume() throws InterruptedException {
testing.runWithSpan(
"parent",
() -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, 10, greeting);
producerRecord
.headers()
// adding baggage header in w3c baggage format
.add(
"baggage",
"test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8))
.add(
"baggage",
"test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8));
producer.send(
new ProducerRecord<>(SHARED_TOPIC, 10, greeting),
producerRecord,
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
Expand Down Expand Up @@ -70,7 +82,8 @@ void testKafkaProduceAndConsume() throws InterruptedException {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(processAttributes("10", greeting, false)),
.hasAttributesSatisfyingExactly(
processAttributes("10", greeting, false, true)),
span ->
span.hasName("processing")
.hasKind(SpanKind.INTERNAL)
Expand Down Expand Up @@ -108,7 +121,8 @@ void testPassThroughTombstone()
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, null, false))));
.hasAttributesSatisfyingExactly(
processAttributes(null, null, false, false))));
}

@Test
Expand Down Expand Up @@ -146,6 +160,7 @@ void testRecordsWithTopicPartitionKafkaConsume()
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, greeting, false))));
.hasAttributesSatisfyingExactly(
processAttributes(null, greeting, false, false))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)

@SuppressWarnings("deprecation") // using deprecated semconv
protected static List<AttributeAssertion> processAttributes(
String messageKey, String messageValue, boolean testHeaders) {
String messageKey, String messageValue, boolean testHeaders, boolean testMultiBaggage) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Arrays.asList(
Expand Down Expand Up @@ -249,6 +249,11 @@ protected static List<AttributeAssertion> processAttributes(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}

if (testMultiBaggage) {
assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-1"), "test-baggage-value-1"));
assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-2"), "test-baggage-value-2"));
}
return assertions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -48,8 +49,19 @@ void testInterceptors() throws InterruptedException {
testing.runWithSpan(
"parent",
() -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, greeting);
producerRecord
.headers()
// adding baggage header in w3c baggage format
.add(
"baggage",
"test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8))
.add(
"baggage",
"test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8));
producer.send(
new ProducerRecord<>(SHARED_TOPIC, greeting),
producerRecord,
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.AbstractLongAssert;
Expand Down Expand Up @@ -59,7 +60,13 @@ void assertTraces() {
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer"))),
stringAssert -> stringAssert.startsWith("consumer")),
equalTo(
AttributeKey.stringKey("test-baggage-key-1"),
"test-baggage-value-1"),
equalTo(
AttributeKey.stringKey("test-baggage-key-2"),
"test-baggage-value-2")),
span ->
span.hasName("process child")
.hasKind(SpanKind.INTERNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.internal.ExtendedTextMapGetter;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.kafka.common.header.Header;

enum KafkaConsumerRecordGetter implements TextMapGetter<KafkaProcessRequest> {
enum KafkaConsumerRecordGetter implements ExtendedTextMapGetter<KafkaProcessRequest> {
INSTANCE;

@Override
Expand All @@ -35,4 +36,11 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) {
}
return new String(value, StandardCharsets.UTF_8);
}

@Override
public Iterator<String> getAll(@Nullable KafkaProcessRequest carrier, String key) {
return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.iterator();
}
}

0 comments on commit ab09fce

Please sign in to comment.