Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply service naming flow to messaging integrations #2961

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqp10/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ class Amqp10ConsumerPlugin extends ConsumerPlugin {
const source = getShortName(link)
const address = getAddress(link)

this.startSpan('amqp.receive', {
service: this.config.service || `${this.tracer._service}-amqp`,
this.startSpan({
resource: ['receive', source].filter(v => v).join(' '),
type: 'worker',
kind: 'consumer',
meta: {
'amqp.link.source.address': source,
'amqp.link.role': 'receiver',
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqp10/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ class Amqp10ProducerPlugin extends ProducerPlugin {
const address = getAddress(link)
const target = getShortName(link)

this.startSpan('amqp.send', {
service: this.config.service || `${this.tracer._service}-amqp`,
this.startSpan({
resource: ['send', target].filter(v => v).join(' '),
kind: 'producer',
meta: {
'amqp.link.target.address': target,
'amqp.link.role': 'sender',
Expand Down
33 changes: 26 additions & 7 deletions packages/datadog-plugin-amqp10/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const agent = require('../../dd-trace/test/plugins/agent')
const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants')

const namingSchema = require('./naming')

describe('Plugin', () => {
let tracer
let client
Expand Down Expand Up @@ -63,8 +65,8 @@ describe('Plugin', () => {
.use(traces => {
const span = traces[0][0]

expect(span).to.have.property('name', 'amqp.send')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.send.opName)
expect(span).to.have.property('service', namingSchema.send.serviceName)
expect(span).to.have.property('resource', 'send amq.topic')
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('span.kind', 'producer')
Expand All @@ -84,7 +86,6 @@ describe('Plugin', () => {

sender.send({ key: 'value' })
})

it('should handle errors', done => {
let error

Expand Down Expand Up @@ -123,15 +124,21 @@ describe('Plugin', () => {
expect(promise).to.have.property('value')
})
})

withNamingSchema(
() => sender.send({ key: 'value' }),
() => namingSchema.send.opName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be functions? Accepting an object could make this easier also when an override is not needed, for example withNamingSchema(() => {}, namingSchema.send).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the way the tests are performed, they do need to be functions. I'm open to other ideas since I'm not a huge fan either 😅

namingSchema is a proxy that resolves the v0/v1 part of the naming schema, so it must be accessed inside the test case, because the configuration change that controls v0/v1 is kept at the test case level.

() => namingSchema.send.serviceName
)
})

describe('when consuming messages', () => {
it('should do automatic instrumentation', done => {
agent
.use(traces => {
const span = traces[0][0]
expect(span).to.have.property('name', 'amqp.receive')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.receive.opName)
expect(span).to.have.property('service', namingSchema.receive.serviceName)
expect(span).to.have.property('resource', 'receive amq.topic')
expect(span).to.have.property('type', 'worker')
expect(span.meta).to.have.property('span.kind', 'consumer')
Expand Down Expand Up @@ -163,12 +170,18 @@ describe('Plugin', () => {

sender.send({ key: 'value' })
})

withNamingSchema(
() => sender.send({ key: 'value' }),
() => namingSchema.receive.opName,
() => namingSchema.receive.serviceName
)
})
})

describe('with configuration', () => {
beforeEach(() => {
agent.reload('amqp10', { service: 'test' })
agent.reload('amqp10', { service: 'test-custom-name' })

const amqp = require(`../../../versions/amqp10@${version}`).get()

Expand All @@ -192,13 +205,19 @@ describe('Plugin', () => {
.use(traces => {
const span = traces[0][0]

expect(span).to.have.property('service', 'test')
expect(span).to.have.property('service', 'test-custom-name')
}, 2)
.then(done)
.catch(done)

sender.send({ key: 'value' })
})

withNamingSchema(
() => sender.send({ key: 'value' }),
() => namingSchema.receive.opName,
() => 'test-custom-name'
)
})
})
})
Expand Down
24 changes: 24 additions & 0 deletions packages/datadog-plugin-amqp10/test/naming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
const { resolveNaming } = require('../../dd-trace/test/plugins/helpers')

module.exports = resolveNaming({
send: {
v0: {
opName: 'amqp.send',
serviceName: 'test-amqp'
},
v1: {
opName: 'amqp.send',
serviceName: 'test'
}
},
receive: {
v0: {
opName: 'amqp.receive',
serviceName: 'test-amqp'
},
v1: {
opName: 'amqp.process',
serviceName: 'test'
}
}
})
7 changes: 4 additions & 3 deletions packages/datadog-plugin-amqplib/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ const { getResourceName } = require('./util')

class AmqplibClientPlugin extends ClientPlugin {
static get id () { return 'amqplib' }
static get type () { return 'messaging' }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant with both the span kind and the IO direction, and could be set in the base class for most cases. Can we settle on just 1 and make sure that everything is normalized accordingly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Span kind can be integrated in startSpan for consumer and producer plugins. For this specific one, since it's a client we can't bake it in until we've dealt with all clients, and we're in the rare case where the client is specifically a messaging client (most will be HTTP or database), so setting it here seems like the best we can do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 05ad286

static get operation () { return 'command' }

start ({ channel = {}, method, fields }) {
if (method === 'basic.deliver' || method === 'basic.get') return
if (method === 'basic.publish') return

const stream = (channel.connection && channel.connection.stream) || {}
const span = this.startSpan('amqp.command', {
service: this.config.service || `${this.tracer._service}-amqp`,
const span = this.startSpan(this.operationName(), {
service: this.config.service || this.serviceName(),
resource: getResourceName(method, fields),
kind: 'client',
kind: this.constructor.kind,
meta: {
'out.host': stream._host,
[CLIENT_PORT_KEY]: stream.remotePort,
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {

const childOf = extract(this.tracer, message)

this.startSpan('amqp.command', {
this.startSpan({
childOf,
service: this.config.service || `${this.tracer._service}-amqp`,
resource: getResourceName(method, fields),
kind: 'consumer',
type: 'worker',
meta: {
'amqp.queue': fields.queue,
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqplib/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ class AmqplibProducerPlugin extends ProducerPlugin {
if (method !== 'basic.publish') return

const stream = (channel.connection && channel.connection.stream) || {}
const span = this.startSpan('amqp.command', {
service: this.config.service || `${this.tracer._service}-amqp`,
const span = this.startSpan({
resource: getResourceName(method, fields),
kind: 'producer',
meta: {
'out.host': stream._host,
[CLIENT_PORT_KEY]: stream.remotePort,
Expand Down
55 changes: 45 additions & 10 deletions packages/datadog-plugin-amqplib/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const agent = require('../../dd-trace/test/plugins/agent')
const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants')

const namingSchema = require('./naming')

describe('Plugin', () => {
let tracer
let connection
Expand Down Expand Up @@ -55,8 +57,8 @@ describe('Plugin', () => {
agent
.use(traces => {
const span = traces[0][0]
expect(span).to.have.property('name', 'amqp.command')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.controlPlane.opName)
expect(span).to.have.property('service', namingSchema.controlPlane.serviceName)
expect(span).to.have.property('resource', 'queue.declare test')
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('span.kind', 'client')
Expand All @@ -75,8 +77,8 @@ describe('Plugin', () => {
.use(traces => {
const span = traces[0][0]

expect(span).to.have.property('name', 'amqp.command')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.controlPlane.opName)
expect(span).to.have.property('service', namingSchema.controlPlane.serviceName)
expect(span).to.have.property('resource', 'queue.delete test')
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('span.kind', 'client')
Expand Down Expand Up @@ -113,6 +115,12 @@ describe('Plugin', () => {
error = e
}
})

withNamingSchema(
() => channel.assertQueue('test', {}, () => {}),
() => namingSchema.controlPlane.opName,
() => namingSchema.controlPlane.serviceName
)
})

describe('when publishing messages', () => {
Expand All @@ -121,8 +129,8 @@ describe('Plugin', () => {
.use(traces => {
const span = traces[0][0]

expect(span).to.have.property('name', 'amqp.command')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.send.opName)
expect(span).to.have.property('service', namingSchema.send.serviceName)
expect(span).to.have.property('resource', 'basic.publish exchange routingKey')
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('out.host', 'localhost')
Expand Down Expand Up @@ -160,6 +168,15 @@ describe('Plugin', () => {
error = e
}
})

withNamingSchema(
() => {
channel.assertExchange('exchange', 'direct', {}, () => {})
channel.publish('exchange', 'routingKey', Buffer.from('content'))
},
() => namingSchema.send.opName,
() => namingSchema.send.serviceName
)
})

describe('when consuming messages', () => {
Expand All @@ -170,8 +187,8 @@ describe('Plugin', () => {
agent
.use(traces => {
const span = traces[0][0]
expect(span).to.have.property('name', 'amqp.command')
expect(span).to.have.property('service', 'test-amqp')
expect(span).to.have.property('name', namingSchema.receive.opName)
expect(span).to.have.property('service', namingSchema.receive.serviceName)
expect(span).to.have.property('resource', `basic.deliver ${queue}`)
expect(span).to.have.property('type', 'worker')
expect(span.meta).to.have.property('span.kind', 'consumer')
Expand Down Expand Up @@ -236,6 +253,18 @@ describe('Plugin', () => {
})
})
})

withNamingSchema(
() => {
channel.assertQueue('', {}, (err, ok) => {
if (err) return
channel.sendToQueue(ok.queue, Buffer.from('content'))
channel.consume(ok.queue, () => {}, {}, (err, ok) => {})
})
},
() => namingSchema.receive.opName,
() => namingSchema.receive.serviceName
)
})
})

Expand All @@ -260,7 +289,7 @@ describe('Plugin', () => {

describe('with configuration', () => {
before(() => {
return agent.load('amqplib', { service: 'test' })
return agent.load('amqplib', { service: 'test-custom-service' })
})

after(() => {
Expand All @@ -286,14 +315,20 @@ describe('Plugin', () => {
it('should be configured with the correct values', done => {
agent
.use(traces => {
expect(traces[0][0]).to.have.property('service', 'test')
expect(traces[0][0]).to.have.property('service', 'test-custom-service')
expect(traces[0][0]).to.have.property('resource', 'queue.declare test')
}, 2)
.then(done)
.catch(done)

channel.assertQueue('test', {}, () => {})
})

withNamingSchema(
() => channel.assertQueue('test', {}, () => {}),
() => namingSchema.controlPlane.opName,
() => 'test-custom-service'
)
})
})
})
Expand Down
34 changes: 34 additions & 0 deletions packages/datadog-plugin-amqplib/test/naming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const { resolveNaming } = require('../../dd-trace/test/plugins/helpers')

module.exports = resolveNaming({
send: {
v0: {
opName: 'amqp.command',
serviceName: 'test-amqp'
},
v1: {
opName: 'amqp.send',
serviceName: 'test'
}
},
receive: {
v0: {
opName: 'amqp.command',
serviceName: 'test-amqp'
},
v1: {
opName: 'amqp.process',
serviceName: 'test'
}
},
controlPlane: {
v0: {
opName: 'amqp.command',
serviceName: 'test-amqp'
},
v1: {
opName: 'amqp.command',
serviceName: 'test'
}
}
})
7 changes: 4 additions & 3 deletions packages/datadog-plugin-google-cloud-pubsub/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ const ClientPlugin = require('../../dd-trace/src/plugins/client')

class GoogleCloudPubsubClientPlugin extends ClientPlugin {
static get id () { return 'google-cloud-pubsub' }
static get type () { return 'messaging' }
static get operation () { return 'request' }

start ({ request, api, projectId }) {
if (api === 'publish') return

this.startSpan('pubsub.request', {
service: this.config.service || `${this.tracer._service}-pubsub`,
this.startSpan(this.operationName(), {
service: this.config.service || this.serviceName(),
resource: [api, request.name].filter(x => x).join(' '),
kind: 'client',
kind: this.constructor.kind,
meta: {
'pubsub.method': api,
'gcloud.project_id': projectId
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-google-cloud-pubsub/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
const topic = subscription.metadata && subscription.metadata.topic
const childOf = this.tracer.extract('text_map', message.attributes) || null

this.startSpan('pubsub.receive', {
this.startSpan({
childOf,
service: this.config.service,
resource: topic,
kind: 'consumer',
type: 'worker',
meta: {
'gcloud.project_id': subscription.pubsub.projectId,
Expand Down
Loading