Skip to content

Commit

Permalink
better code so not duplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
McSick committed May 29, 2024
1 parent 3504de9 commit e388cf2
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
SpanStatusCode,
ROOT_CONTEXT,
Link,
Context,
} from '@opentelemetry/api';
import {
hrTime,
Expand Down Expand Up @@ -408,20 +409,28 @@ export class AmqplibInstrumentation extends InstrumentationBase {
}

const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
let parentContext: Context | undefined = propagation.extract(
ROOT_CONTEXT,
headers
);
const exchange = msg.fields?.exchange;
let span: Span;
let links: Link[] | undefined;
if (self._config.useLinksForConsume) {
const parentSpanContext = trace.getSpan(parentContext)?.spanContext();
let links: Link[] | undefined;
const parentSpanContext = parentContext
? trace.getSpan(parentContext)?.spanContext()
: undefined;
parentContext = undefined;
if (parentSpanContext) {
links = [
{
context: parentSpanContext,
},
];
}
span = self.tracer.startSpan(`${queue} process`, {
}
const span = self.tracer.startSpan(
`${queue} process`,
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
Expand All @@ -438,30 +447,9 @@ export class AmqplibInstrumentation extends InstrumentationBase {
msg?.properties.correlationId,
},
links: links,
});
} else {
span = self.tracer.startSpan(
`${queue} process`,
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]:
msg.fields?.routingKey,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
[SemanticAttributes.MESSAGING_MESSAGE_ID]:
msg?.properties.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
},
},
parentContext
);
}
},
parentContext
);

if (self._config.consumeHook) {
safeExecuteInTheMiddle(
Expand All @@ -485,8 +473,10 @@ export class AmqplibInstrumentation extends InstrumentationBase {
// store the span on the message, so we can end it when user call 'ack' on it
msg[MESSAGE_STORED_SPAN] = span;
}

context.with(trace.setSpan(parentContext, span), () => {
const setContext: Context = parentContext
? parentContext
: ROOT_CONTEXT;
context.with(trace.setSpan(setContext, span), () => {
onMessage.call(this, msg);
});

Expand Down

0 comments on commit e388cf2

Please sign in to comment.