diff --git a/packages/datadog-plugin-amqp10/src/consumer.js b/packages/datadog-plugin-amqp10/src/consumer.js index 06c9bb65aaa..46309c924fa 100644 --- a/packages/datadog-plugin-amqp10/src/consumer.js +++ b/packages/datadog-plugin-amqp10/src/consumer.js @@ -11,11 +11,9 @@ class Amqp10ConsumerPlugin extends ConsumerPlugin { const source = getShortName(link) const address = getAddress(link) - this.startSpan('amqp.receive', { - service: this.config.service || `${this.tracer._service}-amqp`, + this.startSpan({ resource: ['receive', source].filter(v => v).join(' '), type: 'worker', - kind: 'consumer', meta: { 'amqp.link.source.address': source, 'amqp.link.role': 'receiver', diff --git a/packages/datadog-plugin-amqp10/src/producer.js b/packages/datadog-plugin-amqp10/src/producer.js index 80426202b32..4163ac34e7b 100644 --- a/packages/datadog-plugin-amqp10/src/producer.js +++ b/packages/datadog-plugin-amqp10/src/producer.js @@ -13,10 +13,8 @@ class Amqp10ProducerPlugin extends ProducerPlugin { const address = getAddress(link) const target = getShortName(link) - this.startSpan('amqp.send', { - service: this.config.service || `${this.tracer._service}-amqp`, + this.startSpan({ resource: ['send', target].filter(v => v).join(' '), - kind: 'producer', meta: { 'amqp.link.target.address': target, 'amqp.link.role': 'sender', diff --git a/packages/datadog-plugin-amqp10/test/index.spec.js b/packages/datadog-plugin-amqp10/test/index.spec.js index 9fc181f960e..9652274434e 100644 --- a/packages/datadog-plugin-amqp10/test/index.spec.js +++ b/packages/datadog-plugin-amqp10/test/index.spec.js @@ -3,6 +3,8 @@ const agent = require('../../dd-trace/test/plugins/agent') const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants') +const namingSchema = require('./naming') + describe('Plugin', () => { let tracer let client @@ -63,8 +65,8 @@ describe('Plugin', () => { .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.send') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.send.opName) + expect(span).to.have.property('service', namingSchema.send.serviceName) expect(span).to.have.property('resource', 'send amq.topic') expect(span).to.not.have.property('type') expect(span.meta).to.have.property('span.kind', 'producer') @@ -84,7 +86,6 @@ describe('Plugin', () => { sender.send({ key: 'value' }) }) - it('should handle errors', done => { let error @@ -123,6 +124,12 @@ describe('Plugin', () => { expect(promise).to.have.property('value') }) }) + + withNamingSchema( + () => sender.send({ key: 'value' }), + () => namingSchema.send.opName, + () => namingSchema.send.serviceName + ) }) describe('when consuming messages', () => { @@ -130,8 +137,8 @@ describe('Plugin', () => { agent .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.receive') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.receive.opName) + expect(span).to.have.property('service', namingSchema.receive.serviceName) expect(span).to.have.property('resource', 'receive amq.topic') expect(span).to.have.property('type', 'worker') expect(span.meta).to.have.property('span.kind', 'consumer') @@ -163,12 +170,18 @@ describe('Plugin', () => { sender.send({ key: 'value' }) }) + + withNamingSchema( + () => sender.send({ key: 'value' }), + () => namingSchema.receive.opName, + () => namingSchema.receive.serviceName + ) }) }) describe('with configuration', () => { beforeEach(() => { - agent.reload('amqp10', { service: 'test' }) + agent.reload('amqp10', { service: 'test-custom-name' }) const amqp = require(`../../../versions/amqp10@${version}`).get() @@ -192,13 +205,19 @@ describe('Plugin', () => { .use(traces => { const span = traces[0][0] - expect(span).to.have.property('service', 'test') + expect(span).to.have.property('service', 'test-custom-name') }, 2) .then(done) .catch(done) sender.send({ key: 'value' }) }) + + withNamingSchema( + () => sender.send({ key: 'value' }), + () => namingSchema.receive.opName, + () => 'test-custom-name' + ) }) }) }) diff --git a/packages/datadog-plugin-amqp10/test/naming.js b/packages/datadog-plugin-amqp10/test/naming.js new file mode 100644 index 00000000000..02e147dfeee --- /dev/null +++ b/packages/datadog-plugin-amqp10/test/naming.js @@ -0,0 +1,24 @@ +const { resolveNaming } = require('../../dd-trace/test/plugins/helpers') + +module.exports = resolveNaming({ + send: { + v0: { + opName: 'amqp.send', + serviceName: 'test-amqp' + }, + v1: { + opName: 'amqp.send', + serviceName: 'test' + } + }, + receive: { + v0: { + opName: 'amqp.receive', + serviceName: 'test-amqp' + }, + v1: { + opName: 'amqp.process', + serviceName: 'test' + } + } +}) diff --git a/packages/datadog-plugin-amqplib/src/client.js b/packages/datadog-plugin-amqplib/src/client.js index 3e567610402..415808e904d 100644 --- a/packages/datadog-plugin-amqplib/src/client.js +++ b/packages/datadog-plugin-amqplib/src/client.js @@ -7,6 +7,7 @@ const { getResourceName } = require('./util') class AmqplibClientPlugin extends ClientPlugin { static get id () { return 'amqplib' } + static get type () { return 'messaging' } static get operation () { return 'command' } start ({ channel = {}, method, fields }) { @@ -14,10 +15,10 @@ class AmqplibClientPlugin extends ClientPlugin { if (method === 'basic.publish') return const stream = (channel.connection && channel.connection.stream) || {} - const span = this.startSpan('amqp.command', { - service: this.config.service || `${this.tracer._service}-amqp`, + const span = this.startSpan(this.operationName(), { + service: this.config.service || this.serviceName(), resource: getResourceName(method, fields), - kind: 'client', + kind: this.constructor.kind, meta: { 'out.host': stream._host, [CLIENT_PORT_KEY]: stream.remotePort, diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index fb359556bff..0aed1696507 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -13,11 +13,9 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { const childOf = extract(this.tracer, message) - this.startSpan('amqp.command', { + this.startSpan({ childOf, - service: this.config.service || `${this.tracer._service}-amqp`, resource: getResourceName(method, fields), - kind: 'consumer', type: 'worker', meta: { 'amqp.queue': fields.queue, diff --git a/packages/datadog-plugin-amqplib/src/producer.js b/packages/datadog-plugin-amqplib/src/producer.js index 927992470d2..9c3d1da8d53 100644 --- a/packages/datadog-plugin-amqplib/src/producer.js +++ b/packages/datadog-plugin-amqplib/src/producer.js @@ -13,10 +13,8 @@ class AmqplibProducerPlugin extends ProducerPlugin { if (method !== 'basic.publish') return const stream = (channel.connection && channel.connection.stream) || {} - const span = this.startSpan('amqp.command', { - service: this.config.service || `${this.tracer._service}-amqp`, + const span = this.startSpan({ resource: getResourceName(method, fields), - kind: 'producer', meta: { 'out.host': stream._host, [CLIENT_PORT_KEY]: stream.remotePort, diff --git a/packages/datadog-plugin-amqplib/test/index.spec.js b/packages/datadog-plugin-amqplib/test/index.spec.js index c4a5da95373..db6799939ed 100644 --- a/packages/datadog-plugin-amqplib/test/index.spec.js +++ b/packages/datadog-plugin-amqplib/test/index.spec.js @@ -3,6 +3,8 @@ const agent = require('../../dd-trace/test/plugins/agent') const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants') +const namingSchema = require('./naming') + describe('Plugin', () => { let tracer let connection @@ -55,8 +57,8 @@ describe('Plugin', () => { agent .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.command') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.controlPlane.opName) + expect(span).to.have.property('service', namingSchema.controlPlane.serviceName) expect(span).to.have.property('resource', 'queue.declare test') expect(span).to.not.have.property('type') expect(span.meta).to.have.property('span.kind', 'client') @@ -75,8 +77,8 @@ describe('Plugin', () => { .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.command') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.controlPlane.opName) + expect(span).to.have.property('service', namingSchema.controlPlane.serviceName) expect(span).to.have.property('resource', 'queue.delete test') expect(span).to.not.have.property('type') expect(span.meta).to.have.property('span.kind', 'client') @@ -113,6 +115,12 @@ describe('Plugin', () => { error = e } }) + + withNamingSchema( + () => channel.assertQueue('test', {}, () => {}), + () => namingSchema.controlPlane.opName, + () => namingSchema.controlPlane.serviceName + ) }) describe('when publishing messages', () => { @@ -121,8 +129,8 @@ describe('Plugin', () => { .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.command') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.send.opName) + expect(span).to.have.property('service', namingSchema.send.serviceName) expect(span).to.have.property('resource', 'basic.publish exchange routingKey') expect(span).to.not.have.property('type') expect(span.meta).to.have.property('out.host', 'localhost') @@ -160,6 +168,15 @@ describe('Plugin', () => { error = e } }) + + withNamingSchema( + () => { + channel.assertExchange('exchange', 'direct', {}, () => {}) + channel.publish('exchange', 'routingKey', Buffer.from('content')) + }, + () => namingSchema.send.opName, + () => namingSchema.send.serviceName + ) }) describe('when consuming messages', () => { @@ -170,8 +187,8 @@ describe('Plugin', () => { agent .use(traces => { const span = traces[0][0] - expect(span).to.have.property('name', 'amqp.command') - expect(span).to.have.property('service', 'test-amqp') + expect(span).to.have.property('name', namingSchema.receive.opName) + expect(span).to.have.property('service', namingSchema.receive.serviceName) expect(span).to.have.property('resource', `basic.deliver ${queue}`) expect(span).to.have.property('type', 'worker') expect(span.meta).to.have.property('span.kind', 'consumer') @@ -236,6 +253,18 @@ describe('Plugin', () => { }) }) }) + + withNamingSchema( + () => { + channel.assertQueue('', {}, (err, ok) => { + if (err) return + channel.sendToQueue(ok.queue, Buffer.from('content')) + channel.consume(ok.queue, () => {}, {}, (err, ok) => {}) + }) + }, + () => namingSchema.receive.opName, + () => namingSchema.receive.serviceName + ) }) }) @@ -260,7 +289,7 @@ describe('Plugin', () => { describe('with configuration', () => { before(() => { - return agent.load('amqplib', { service: 'test' }) + return agent.load('amqplib', { service: 'test-custom-service' }) }) after(() => { @@ -286,7 +315,7 @@ describe('Plugin', () => { it('should be configured with the correct values', done => { agent .use(traces => { - expect(traces[0][0]).to.have.property('service', 'test') + expect(traces[0][0]).to.have.property('service', 'test-custom-service') expect(traces[0][0]).to.have.property('resource', 'queue.declare test') }, 2) .then(done) @@ -294,6 +323,12 @@ describe('Plugin', () => { channel.assertQueue('test', {}, () => {}) }) + + withNamingSchema( + () => channel.assertQueue('test', {}, () => {}), + () => namingSchema.controlPlane.opName, + () => 'test-custom-service' + ) }) }) }) diff --git a/packages/datadog-plugin-amqplib/test/naming.js b/packages/datadog-plugin-amqplib/test/naming.js new file mode 100644 index 00000000000..d0ed9b94866 --- /dev/null +++ b/packages/datadog-plugin-amqplib/test/naming.js @@ -0,0 +1,34 @@ +const { resolveNaming } = require('../../dd-trace/test/plugins/helpers') + +module.exports = resolveNaming({ + send: { + v0: { + opName: 'amqp.command', + serviceName: 'test-amqp' + }, + v1: { + opName: 'amqp.send', + serviceName: 'test' + } + }, + receive: { + v0: { + opName: 'amqp.command', + serviceName: 'test-amqp' + }, + v1: { + opName: 'amqp.process', + serviceName: 'test' + } + }, + controlPlane: { + v0: { + opName: 'amqp.command', + serviceName: 'test-amqp' + }, + v1: { + opName: 'amqp.command', + serviceName: 'test' + } + } +}) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/client.js b/packages/datadog-plugin-google-cloud-pubsub/src/client.js index e71f53ea8bc..8911717f1e5 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/client.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/client.js @@ -4,15 +4,16 @@ const ClientPlugin = require('../../dd-trace/src/plugins/client') class GoogleCloudPubsubClientPlugin extends ClientPlugin { static get id () { return 'google-cloud-pubsub' } + static get type () { return 'messaging' } static get operation () { return 'request' } start ({ request, api, projectId }) { if (api === 'publish') return - this.startSpan('pubsub.request', { - service: this.config.service || `${this.tracer._service}-pubsub`, + this.startSpan(this.operationName(), { + service: this.config.service || this.serviceName(), resource: [api, request.name].filter(x => x).join(' '), - kind: 'client', + kind: this.constructor.kind, meta: { 'pubsub.method': api, 'gcloud.project_id': projectId diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index c4e380bb4c8..0b157f5447b 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -11,11 +11,9 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { const topic = subscription.metadata && subscription.metadata.topic const childOf = this.tracer.extract('text_map', message.attributes) || null - this.startSpan('pubsub.receive', { + this.startSpan({ childOf, - service: this.config.service, resource: topic, - kind: 'consumer', type: 'worker', meta: { 'gcloud.project_id': subscription.pubsub.projectId, diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index fb3295f382c..a34d6bfacd8 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -11,10 +11,8 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { const messages = request.messages || [] const topic = request.topic - const span = this.startSpan('pubsub.request', { // TODO: rename - service: this.config.service || `${this.tracer._service}-pubsub`, + const span = this.startSpan({ // TODO: rename resource: `${api} ${topic}`, - kind: 'producer', meta: { 'gcloud.project_id': projectId, 'pubsub.method': api, // TODO: remove diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index d566dde7f77..95aadfe63a0 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -5,6 +5,8 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he const id = require('../../dd-trace/src/id') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') +const namingSchema = require('./naming') + // The roundtrip to the pubsub emulator takes time. Sometimes a *long* time. const TIMEOUT = 30000 @@ -46,8 +48,16 @@ describe('Plugin', () => { pubsub = new lib.PubSub({ projectId: project }) }) describe('createTopic', () => { + withNamingSchema( + async () => pubsub.createTopic(topicName), + () => namingSchema.controlPlane.opName, + () => namingSchema.controlPlane.serviceName + ) + it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ + name: namingSchema.controlPlane.opName, + service: namingSchema.controlPlane.serviceName, meta: { 'pubsub.method': 'createTopic', 'span.kind': 'client', @@ -68,6 +78,8 @@ describe('Plugin', () => { }, gax) const expectedSpanPromise = expectSpanWithDefaults({ + name: namingSchema.controlPlane.opName, + service: namingSchema.controlPlane.serviceName, meta: { 'pubsub.method': 'createTopic', 'span.kind': 'client', @@ -83,6 +95,8 @@ describe('Plugin', () => { it('should be instrumented w/ error', async () => { const expectedSpanPromise = expectSpanWithDefaults({ + name: namingSchema.controlPlane.opName, + service: namingSchema.controlPlane.serviceName, error: 1, meta: { 'pubsub.method': 'createTopic', @@ -94,7 +108,7 @@ describe('Plugin', () => { try { await publisher.createTopic({ name }) } catch (e) { - // this is just to prevent mocha from crashing + // this is just to prevent mocha from crashing } return expectedSpanPromise }) @@ -111,6 +125,8 @@ describe('Plugin', () => { describe('publish', () => { it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ + name: namingSchema.send.opName, + service: namingSchema.send.serviceName, meta: { 'pubsub.topic': resource, 'pubsub.method': 'publish', @@ -133,12 +149,22 @@ describe('Plugin', () => { expect(tracer.scope().active()).to.equal(firstSpan) }) }) + + withNamingSchema( + async () => { + const [topic] = await pubsub.createTopic(topicName) + await publish(topic, { data: Buffer.from('hello') }) + }, + () => namingSchema.send.opName, + () => namingSchema.send.serviceName + ) }) describe('onmessage', () => { it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ - name: 'pubsub.receive', + name: namingSchema.receive.opName, + service: namingSchema.receive.serviceName, type: 'worker', meta: { 'component': 'google-cloud-pubsub', @@ -158,7 +184,8 @@ describe('Plugin', () => { it('should give the current span a parentId from the sender', async () => { const expectedSpanPromise = expectSpanWithDefaults({ - name: 'pubsub.receive', + name: namingSchema.receive.opName, + service: namingSchema.receive.serviceName, meta: { 'span.kind': 'consumer' } }) const [topic] = await pubsub.createTopic(topicName) @@ -183,7 +210,8 @@ describe('Plugin', () => { it('should be instrumented w/ error', async () => { const error = new Error('bad') const expectedSpanPromise = expectSpanWithDefaults({ - name: 'pubsub.receive', + name: namingSchema.receive.opName, + service: namingSchema.receive.serviceName, error: 1, meta: { [ERROR_MESSAGE]: error.message, @@ -218,6 +246,17 @@ describe('Plugin', () => { await publish(topic, { data: Buffer.from('hello') }) return expectedSpanPromise }) + + withNamingSchema( + async () => { + const [topic] = await pubsub.createTopic(topicName) + const [sub] = await topic.createSubscription('foo') + sub.on('message', msg => msg.ack()) + await publish(topic, { data: Buffer.from('hello') }) + }, + () => namingSchema.receive.opName, + () => namingSchema.receive.serviceName + ) }) describe('when disabled', () => { @@ -254,6 +293,7 @@ describe('Plugin', () => { describe('createTopic', () => { it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ + name: namingSchema.controlPlane.opName, service: 'a_test_service', meta: { 'pubsub.method': 'createTopic' } }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js new file mode 100644 index 00000000000..24908870d2a --- /dev/null +++ b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js @@ -0,0 +1,34 @@ +const { resolveNaming } = require('../../dd-trace/test/plugins/helpers') + +module.exports = resolveNaming({ + send: { + v0: { + opName: 'pubsub.request', + serviceName: 'test-pubsub' + }, + v1: { + opName: 'gcp.pubsub.send', + serviceName: 'test' + } + }, + receive: { + v0: { + opName: 'pubsub.receive', + serviceName: 'test' + }, + v1: { + opName: 'gcp.pubsub.process', + serviceName: 'test' + } + }, + controlPlane: { + v0: { + opName: 'pubsub.request', + serviceName: 'test-pubsub' + }, + v1: { + opName: 'gcp.pubsub.request', + serviceName: 'test' + } + } +}) diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index c3777a36b00..4bd1367b957 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -8,12 +8,9 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { start ({ topic, partition, message }) { const childOf = extract(this.tracer, message.headers) - - this.startSpan('kafka.consume', { + this.startSpan({ childOf, - service: this.config.service || `${this.tracer._service}-kafka`, resource: topic, - kind: 'consumer', type: 'worker', meta: { 'component': 'kafkajs', diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 2108f7cd9b5..0a08fc5ba02 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -7,10 +7,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { static get operation () { return 'produce' } start ({ topic, messages }) { - const span = this.startSpan('kafka.produce', { - service: this.config.service || `${this.tracer._service}-kafka`, + const span = this.startSpan({ resource: topic, - kind: 'producer', meta: { 'component': 'kafkajs', 'kafka.topic': topic diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 503171d9258..2c7edf48220 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -5,6 +5,8 @@ const agent = require('../../dd-trace/test/plugins/agent') const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') +const namingSchema = require('./naming') + describe('Plugin', () => { describe('kafkajs', function () { this.timeout(10000) // TODO: remove when new internal trace has landed @@ -31,8 +33,8 @@ describe('Plugin', () => { describe('producer', () => { it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ - name: 'kafka.produce', - service: 'test-kafka', + name: namingSchema.send.opName, + service: namingSchema.send.serviceName, meta: { 'span.kind': 'producer', 'component': 'kafkajs' @@ -52,7 +54,7 @@ describe('Plugin', () => { it('should be instrumented w/ error', async () => { const producer = kafka.producer() - const resourceName = 'kafka.produce' + const resourceName = namingSchema.send.opName let error @@ -61,7 +63,7 @@ describe('Plugin', () => { expect(span).to.include({ name: resourceName, - service: 'test-kafka', + service: namingSchema.send.serviceName, resource: resourceName, error: 1 }) @@ -86,6 +88,12 @@ describe('Plugin', () => { return expectedSpanPromise } }) + + withNamingSchema( + async () => sendMessages(kafka, testTopic, messages), + () => namingSchema.send.opName, + () => namingSchema.send.serviceName + ) }) describe('consumer', () => { let consumer @@ -98,10 +106,11 @@ describe('Plugin', () => { afterEach(async () => { await consumer.disconnect() }) + it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ - name: 'kafka.consume', - service: 'test-kafka', + name: namingSchema.receive.opName, + service: namingSchema.receive.serviceName, meta: { 'span.kind': 'consumer', 'component': 'kafkajs' @@ -118,6 +127,7 @@ describe('Plugin', () => { return expectedSpanPromise }) + it('should run the consumer in the context of the consumer span', done => { const firstSpan = tracer.scope().active() @@ -126,7 +136,7 @@ describe('Plugin', () => { try { expect(currentSpan).to.not.equal(firstSpan) - expect(currentSpan.context()._name).to.equal('kafka.consume') + expect(currentSpan.context()._name).to.equal(namingSchema.receive.opName) done() } catch (e) { done(e) @@ -161,8 +171,8 @@ describe('Plugin', () => { it('should be instrumented w/ error', async () => { const fakeError = new Error('Oh No!') const expectedSpanPromise = expectSpanWithDefaults({ - name: 'kafka.consume', - service: 'test-kafka', + name: namingSchema.receive.opName, + service: namingSchema.receive.serviceName, meta: { [ERROR_TYPE]: fakeError.name, [ERROR_MESSAGE]: fakeError.message, @@ -208,6 +218,15 @@ describe('Plugin', () => { .then(() => sendMessages(kafka, testTopic, messages)) .catch(done) }) + + withNamingSchema( + async () => { + await consumer.run({ eachMessage: () => {} }) + await sendMessages(kafka, testTopic, messages) + }, + () => namingSchema.send.opName, + () => namingSchema.send.serviceName + ) }) }) }) diff --git a/packages/datadog-plugin-kafkajs/test/naming.js b/packages/datadog-plugin-kafkajs/test/naming.js new file mode 100644 index 00000000000..fdcb0e03b5a --- /dev/null +++ b/packages/datadog-plugin-kafkajs/test/naming.js @@ -0,0 +1,24 @@ +const { resolveNaming } = require('../../dd-trace/test/plugins/helpers') + +module.exports = resolveNaming({ + send: { + v0: { + opName: 'kafka.produce', + serviceName: 'test-kafka' + }, + v1: { + opName: 'kafka.send', + serviceName: 'test' + } + }, + receive: { + v0: { + opName: 'kafka.consume', + serviceName: 'test-kafka' + }, + v1: { + opName: 'kafka.process', + serviceName: 'test' + } + } +}) diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 949cb66e6ca..1adece16fbd 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -19,12 +19,10 @@ class RheaConsumerPlugin extends ConsumerPlugin { const name = getResourceNameFromMessage(msgObj) const childOf = extractTextMap(msgObj, this.tracer) - this.startSpan(this.operationName(), { + this.startSpan({ childOf, - service: this.config.service || this.serviceName(), resource: name, type: 'worker', - kind: 'consumer', meta: { 'component': 'rhea', 'amqp.link.source.address': name, diff --git a/packages/datadog-plugin-rhea/src/producer.js b/packages/datadog-plugin-rhea/src/producer.js index bdaf88ded15..332aff1276d 100644 --- a/packages/datadog-plugin-rhea/src/producer.js +++ b/packages/datadog-plugin-rhea/src/producer.js @@ -14,11 +14,8 @@ class RheaProducerPlugin extends ProducerPlugin { start ({ targetAddress, host, port }) { const name = targetAddress || 'amq.topic' - - this.startSpan(this.operationName(), { - service: this.config.service || this.serviceName(), + this.startSpan({ resource: name, - kind: 'producer', meta: { 'component': 'rhea', 'amqp.link.target.address': name, diff --git a/packages/datadog-plugin-tedious/test/naming.js b/packages/datadog-plugin-tedious/test/naming.js new file mode 100644 index 00000000000..ab3d95a4e53 --- /dev/null +++ b/packages/datadog-plugin-tedious/test/naming.js @@ -0,0 +1,14 @@ +const { namingResolver } = require('../../dd-trace/test/plugins/helpers') + +module.exports = namingResolver({ + outbound: { + v0: { + opName: 'tedious.request', + serviceName: 'test-mssql' + }, + v1: { + opName: 'sqlserver.query', + serviceName: 'test' + } + } +}) diff --git a/packages/dd-trace/src/plugins/client.js b/packages/dd-trace/src/plugins/client.js index 976377e1e62..e0fe539ca4a 100644 --- a/packages/dd-trace/src/plugins/client.js +++ b/packages/dd-trace/src/plugins/client.js @@ -4,6 +4,7 @@ const OutboundPlugin = require('./outbound') class ClientPlugin extends OutboundPlugin { static get operation () { return 'request' } + static get kind () { return 'client' } } module.exports = ClientPlugin diff --git a/packages/dd-trace/src/plugins/consumer.js b/packages/dd-trace/src/plugins/consumer.js index b1b5dedfeb1..88128bff1f6 100644 --- a/packages/dd-trace/src/plugins/consumer.js +++ b/packages/dd-trace/src/plugins/consumer.js @@ -4,7 +4,21 @@ const InboundPlugin = require('./inbound') class ConsumerPlugin extends InboundPlugin { static get operation () { return 'receive' } + static get kind () { return 'consumer' } static get type () { return 'messaging' } + + startSpan (options) { + const spanDefaults = { + service: this.config.service || this.serviceName(), + kind: this.constructor.kind + } + Object.keys(spanDefaults).forEach( + key => { + if (!options[key]) options[key] = spanDefaults[key] + } + ) + return super.startSpan(this.operationName(), options) + } } module.exports = ConsumerPlugin diff --git a/packages/dd-trace/src/plugins/inbound.js b/packages/dd-trace/src/plugins/inbound.js index 8fac2974a0d..49a533d92d0 100644 --- a/packages/dd-trace/src/plugins/inbound.js +++ b/packages/dd-trace/src/plugins/inbound.js @@ -2,8 +2,6 @@ const TracingPlugin = require('./tracing') -class InboundPlugin extends TracingPlugin { - static get ioDirection () { return 'inbound' } -} +class InboundPlugin extends TracingPlugin {} module.exports = InboundPlugin diff --git a/packages/dd-trace/src/plugins/outbound.js b/packages/dd-trace/src/plugins/outbound.js index d8c4f179b98..ee9202b4d51 100644 --- a/packages/dd-trace/src/plugins/outbound.js +++ b/packages/dd-trace/src/plugins/outbound.js @@ -5,8 +5,6 @@ const TracingPlugin = require('./tracing') // TODO: Exit span on finish when AsyncResource instances are removed. class OutboundPlugin extends TracingPlugin { - static get ioDirection () { return 'outbound' } - constructor (...args) { super(...args) diff --git a/packages/dd-trace/src/plugins/producer.js b/packages/dd-trace/src/plugins/producer.js index d2c8b5be007..99f914edabe 100644 --- a/packages/dd-trace/src/plugins/producer.js +++ b/packages/dd-trace/src/plugins/producer.js @@ -4,7 +4,21 @@ const OutboundPlugin = require('./outbound') class ProducerPlugin extends OutboundPlugin { static get operation () { return 'publish' } + static get kind () { return 'producer' } static get type () { return 'messaging' } + + startSpan (options) { + const spanDefaults = { + service: this.config.service || this.serviceName(), + kind: this.constructor.kind + } + Object.keys(spanDefaults).forEach( + key => { + if (!options[key]) options[key] = spanDefaults[key] + } + ) + return super.startSpan(this.operationName(), options) + } } module.exports = ProducerPlugin diff --git a/packages/dd-trace/src/plugins/tracing.js b/packages/dd-trace/src/plugins/tracing.js index 39825e5a868..0f2ecc28547 100644 --- a/packages/dd-trace/src/plugins/tracing.js +++ b/packages/dd-trace/src/plugins/tracing.js @@ -33,13 +33,13 @@ class TracingPlugin extends Plugin { } serviceName (serviceArgs) { - const { type, ioDirection, id } = this.constructor - return Nomenclature.serviceName(type, ioDirection, id, serviceArgs) + const { type, id, kind } = this.constructor + return Nomenclature.serviceName(type, kind, id, serviceArgs) } operationName (opNameArgs) { - const { type, ioDirection, id } = this.constructor - return Nomenclature.opName(type, ioDirection, id, opNameArgs) + const { type, id, kind } = this.constructor + return Nomenclature.opName(type, kind, id, opNameArgs) } configure (config) { diff --git a/packages/dd-trace/src/service-naming/index.js b/packages/dd-trace/src/service-naming/index.js index 90704b98881..eab4d12c9c4 100644 --- a/packages/dd-trace/src/service-naming/index.js +++ b/packages/dd-trace/src/service-naming/index.js @@ -1,5 +1,13 @@ const { schemaDefinitions } = require('./schemas') +const kindMap = { + messaging: { + client: 'controlPlane', + consumer: 'inbound', + producer: 'outbound' + } +} + class SchemaManager { constructor () { this.schemas = schemaDefinitions @@ -14,12 +22,12 @@ class SchemaManager { return this.config.spanAttributeSchema } - opName (type, ioDirection, plugin, opNameArgs) { - return this.schema.getOpName(type, ioDirection, plugin, opNameArgs) + opName (type, kind, plugin, opNameArgs) { + return this.schema.getOpName(type, kindMap[type][kind], plugin, opNameArgs) } - serviceName (type, ioDirection, plugin, serviceNameArgs) { - return this.schema.getServiceName(type, ioDirection, plugin, serviceNameArgs) + serviceName (type, kind, plugin, serviceNameArgs) { + return this.schema.getServiceName(type, kindMap[type][kind], plugin, serviceNameArgs) } configure (config = {}) { diff --git a/packages/dd-trace/src/service-naming/schemas/definition.js b/packages/dd-trace/src/service-naming/schemas/definition.js index cd1553b693e..0022cc0ab53 100644 --- a/packages/dd-trace/src/service-naming/schemas/definition.js +++ b/packages/dd-trace/src/service-naming/schemas/definition.js @@ -3,20 +3,20 @@ class SchemaDefinition { this.schema = schema } - getSchemaItem (type, ioDirection, plugin) { + getSchemaItem (type, subType, plugin) { const schema = this.schema - if (schema && schema[type] && schema[type][ioDirection] && schema[type][ioDirection][plugin]) { - return schema[type][ioDirection][plugin] + if (schema && schema[type] && schema[type][subType] && schema[type][subType][plugin]) { + return schema[type][subType][plugin] } } - getOpName (type, ioDirection, plugin, opNameArgs) { - const item = this.getSchemaItem(type, ioDirection, plugin) + getOpName (type, subType, plugin, opNameArgs) { + const item = this.getSchemaItem(type, subType, plugin) return item.opName(opNameArgs) } - getServiceName (type, ioDirection, plugin, serviceNameArgs) { - const item = this.getSchemaItem(type, ioDirection, plugin) + getServiceName (type, subType, plugin, serviceNameArgs) { + const item = this.getSchemaItem(type, subType, plugin) return item.serviceName(this.service, serviceNameArgs) } diff --git a/packages/dd-trace/src/service-naming/schemas/v0.js b/packages/dd-trace/src/service-naming/schemas/v0.js index 84b44a22abc..aa5eb6fc147 100644 --- a/packages/dd-trace/src/service-naming/schemas/v0.js +++ b/packages/dd-trace/src/service-naming/schemas/v0.js @@ -1,17 +1,63 @@ const SchemaDefinition = require('./definition') +function amqpServiceName (service) { + return `${service}-amqp` +} + const schema = { messaging: { outbound: { + amqplib: { + opName: () => 'amqp.command', + serviceName: amqpServiceName + }, + amqp10: { + opName: () => 'amqp.send', + serviceName: amqpServiceName + }, + 'google-cloud-pubsub': { + opName: () => 'pubsub.request', + serviceName: service => `${service}-pubsub` + }, + kafkajs: { + opName: () => 'kafka.produce', + serviceName: service => `${service}-kafka` + }, rhea: { opName: () => 'amqp.send', - serviceName: (service) => `${service}-amqp-producer` + serviceName: service => `${service}-amqp-producer` } }, inbound: { + amqplib: { + opName: () => 'amqp.command', + serviceName: amqpServiceName + }, + amqp10: { + opName: () => 'amqp.receive', + serviceName: amqpServiceName + }, + 'google-cloud-pubsub': { + opName: () => 'pubsub.receive', + serviceName: service => service + }, + kafkajs: { + opName: () => 'kafka.consume', + serviceName: service => `${service}-kafka` + }, rhea: { opName: () => 'amqp.receive', - serviceName: (service) => service + serviceName: service => service + } + }, + controlPlane: { + amqplib: { + opName: () => 'amqp.command', + serviceName: amqpServiceName + }, + 'google-cloud-pubsub': { + opName: () => 'pubsub.request', + serviceName: service => `${service}-pubsub` } } } diff --git a/packages/dd-trace/src/service-naming/schemas/v1.js b/packages/dd-trace/src/service-naming/schemas/v1.js index fa9f0f38bfa..709e16aeb0f 100644 --- a/packages/dd-trace/src/service-naming/schemas/v1.js +++ b/packages/dd-trace/src/service-naming/schemas/v1.js @@ -4,17 +4,51 @@ function identityService (service) { return service } +const amqpInbound = { + opName: () => 'amqp.process', + serviceName: identityService +} + +const amqpOutbound = { + opName: () => 'amqp.send', + serviceName: identityService +} + const schema = { messaging: { outbound: { - rhea: { - opName: () => 'amqp.send', + amqplib: amqpOutbound, + amqp10: amqpOutbound, + 'google-cloud-pubsub': { + opName: () => 'gcp.pubsub.send', serviceName: identityService - } + }, + kafkajs: { + opName: () => 'kafka.send', + serviceName: identityService + }, + rhea: amqpOutbound }, inbound: { - rhea: { - opName: () => 'amqp.process', + amqplib: amqpInbound, + amqp10: amqpInbound, + 'google-cloud-pubsub': { + opName: () => 'gcp.pubsub.process', + serviceName: identityService + }, + kafkajs: { + opName: () => 'kafka.process', + serviceName: identityService + }, + rhea: amqpInbound + }, + controlPlane: { + amqplib: { + opName: () => 'amqp.command', + serviceName: identityService + }, + 'google-cloud-pubsub': { + opName: () => 'gcp.pubsub.request', serviceName: identityService } }