Skip to content

Commit

Permalink
feat: 增加对spring-kafka的生产-消费增强trace支持
Browse files Browse the repository at this point in the history
  • Loading branch information
i36lib committed Dec 13, 2024
1 parent 170532d commit 43cc373
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 4 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ $ java -javaagent=/dir/to/autotrace4j.jar=com.your-domain.biz1.pkg1,com.your-dom

### 5、Middleware

支持阿里云ONS和RocketMQ在生产和消费时传递trace,支持Dubbo:
​ 支持Dubbo、Kafka、阿里云ONS、RocketMQ在生产和消费时传递trace

- RocektMQ:`Producer` & `Consumer`
- Aliyun ONS:`Producer` & `Consumer`
- Spring Kafka:`Producer` & `Consumer`
- 如果消息没有trace信息,则生成新的trace信息
- 📢:如果是批量消费消息,traceId取第一条消息的traceId,parentSpanId取第一条消息的spanId
- Spring RocektMQ & Aliyun ONS:`Producer` & `Consumer`
- Dubbo:`org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter`
`org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.github.artlibs.autotrace4j.transformer.impl;

import io.github.artlibs.autotrace4j.context.MethodWrapper;
import io.github.artlibs.autotrace4j.context.ReflectUtils;
import io.github.artlibs.autotrace4j.context.TraceContext;
import io.github.artlibs.autotrace4j.transformer.abs.AbsVisitorTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;

import java.util.Objects;

import static net.bytebuddy.matcher.ElementMatchers.*;

/**
* Kafka 生产者增强转换器
* <p>
* @author Fury
* @since 2024-03-30
* <p>
* All rights Reserved.
*/
public class KafkaProducerTransformer extends AbsVisitorTransformer {

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return hasSuperType(named("org.apache.kafka.clients.producer.KafkaProducer"));
}

@Override
protected MethodMatcherHolder methodMatchers() {
return ofMatcher(isPrivate().and(named("doSend"))
.and(takesArgument(0, hasSuperType(named("org.apache.kafka.clients.producer.ProducerRecord"))))
.and(takesArgument(1, hasSuperType(named("org.apache.kafka.clients.producer.Callback"))))
);
}

/**
* 发送消息时将当前上下文trace信息放入消息头
* @param producerRecord -
*/
@Advice.OnMethodEnter
public static void adviceOnMethodEnter(
@Advice.Argument(value = 0, typing = Assigner.Typing.DYNAMIC
, readOnly = false) Object producerRecord) {
if (Objects.isNull(producerRecord) || Objects.isNull(TraceContext.getTraceId())) {
return;
}
Object headers = ReflectUtils.getMethodWrapper(producerRecord
, "headers").invoke();
if (Objects.isNull(headers)) {
return;
}
MethodWrapper method = ReflectUtils.getMethodWrapper(headers
, "add", String.class, byte[].class);
method.invoke(TraceContext.TRACE_KEY, TraceContext.getTraceId());
if (Objects.nonNull(TraceContext.getSpanId())) {
method.invoke(TraceContext.SPAN_KEY, TraceContext.getSpanId());
}
if (Objects.nonNull(TraceContext.getParentSpanId())) {
method.invoke(TraceContext.PARENT_SPAN_KEY, TraceContext.getParentSpanId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.github.artlibs.autotrace4j.transformer.impl;

import io.github.artlibs.autotrace4j.context.MethodWrapper;
import io.github.artlibs.autotrace4j.context.ReflectUtils;
import io.github.artlibs.autotrace4j.context.TraceContext;
import io.github.artlibs.autotrace4j.transformer.abs.AbsVisitorTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

import java.util.Iterator;
import java.util.Objects;

import static net.bytebuddy.matcher.ElementMatchers.*;

/**
* Spring Kafka 消费增强转换器, 参见类
* org.springframework.kafka.listener.KafkaMessageListenerContainer:doStart()
* org.springframework.kafka.listener.KafkaMessageListenerContainer
* .ListenerConsumer:run(),invokeListener(), invokeBatchOnMessageWithRecordsOrList() & doInvokeOnMessage()
* org.apache.kafka.clients.consumer.Consumer:poll()
* <p>
* @author Fury
* @since 2024-03-30
* <p>
* All rights Reserved.
*/
public class KafkaSpringConsumerTransformer extends AbsVisitorTransformer {

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return hasSuperType(named("org.springframework.kafka.listener." +
"KafkaMessageListenerContainer.ListenerConsumer"));
}

@Override
protected MethodMatcherHolder methodMatchers() {
return ofMatcher(isPrivate().and(
// private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
// @Nullable List<ConsumerRecord<K, V>> recordListArg)
named("invokeBatchOnMessageWithRecordsOrList")
// private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg)
.or(named("doInvokeOnMessage"))
));
}

/**
* 消费消息时将消息头的trace信息放入上下文, 或者生成新的上下文
* @param recOrRecs ConsumerRecord<K, V> or ConsumerRecords<K, V>
*/
@Advice.OnMethodEnter
public static void adviceOnMethodEnter(@Advice.Argument(value = 0) Object recOrRecs) {
Object consumerRecord = recOrRecs;
if (Iterable.class.isAssignableFrom(recOrRecs.getClass())) {
Iterator<?> iterator = ReflectUtils.getMethodWrapper(recOrRecs
, "iterator").invoke();
consumerRecord = ReflectUtils.getMethodWrapper(iterator
, "next").invoke();
}
Object headers = ReflectUtils.getMethodWrapper(consumerRecord
, "headers").invoke();
MethodWrapper method = ReflectUtils.getMethodWrapper(headers
, "lastHeader", String.class);

String traceId = method.invoke(TraceContext.TRACE_KEY);
if (Objects.isNull(traceId)) {
traceId = TraceContext.generate();
}
TraceContext.setTraceId(traceId);
TraceContext.setSpanId(TraceContext.generate());
String spanId = method.invoke(TraceContext.SPAN_KEY);
if (Objects.nonNull(spanId)) {
TraceContext.setParentSpanId(spanId);
}
}

/**
* 方法结束时清空上下文
*/
@Advice.OnMethodExit
public static void adviceOnMethodExit() {
TraceContext.removeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected MethodMatcherHolder methodMatchers() {
}

/**
* OnMethodExit
* OnMethodEnter
* @param builder -
*/
@Advice.OnMethodEnter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public ElementMatcher<? super TypeDescription> typeMatcher() {
}

/**
* 由于是在转换消息时设置的信息,所以不能在方法结束时清空trace上下文
* {@inheritDoc}
*/
@Override
Expand Down

0 comments on commit 43cc373

Please sign in to comment.