Skip to content

Commit

Permalink
Telemetry metrics for OTel
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Belanger committed Jun 20, 2023
1 parent 5aaa178 commit f6d3fcd
Show file tree
Hide file tree
Showing 11 changed files with 850 additions and 7 deletions.
50 changes: 50 additions & 0 deletions integration-tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class FakeAgent extends EventEmitter {
async start () {
const app = express()
app.use(bodyParser.raw({ limit: Infinity, type: 'application/msgpack' }))
app.use(bodyParser.json({ limit: Infinity, type: 'application/json' }))
app.put('/v0.4/traces', (req, res) => {
if (req.body.length === 0) return res.status(200).send()
res.status(200).send({ rate_by_service: { 'service:,env:': 1 } })
Expand All @@ -43,6 +44,13 @@ class FakeAgent extends EventEmitter {
files: req.files
})
})
app.post('/telemetry/proxy/api/v2/apmtelemetry', (req, res) => {
res.status(200).send()
this.emit('telemetry', {
headers: req.headers,
payload: req.body
})
})

return new Promise((resolve, reject) => {
const timeoutObj = setTimeout(() => {
Expand Down Expand Up @@ -103,6 +111,48 @@ class FakeAgent extends EventEmitter {

return resultPromise
}

assertTelemetryReceived (fn, timeout, requestType, expectedMessageCount = 1) {
timeout = timeout || 5000
let resultResolve
let resultReject
let msgCount = 0
const errors = []

const timeoutObj = setTimeout(() => {
resultReject([...errors, new Error('timeout')])
}, timeout)

const resultPromise = new Promise((resolve, reject) => {
resultResolve = () => {
clearTimeout(timeoutObj)
resolve()
}
resultReject = (e) => {
clearTimeout(timeoutObj)
reject(e)
}
})

const messageHandler = msg => {
if (msg.payload.request_type !== requestType) return
msgCount += 1
try {
fn(msg)
if (msgCount === expectedMessageCount) {
resultResolve()
}
} catch (e) {
errors.push(e)
}
if (msgCount === expectedMessageCount) {
this.removeListener('telemetry', messageHandler)
}
}
this.on('telemetry', messageHandler)

return resultPromise
}
}

function spawnProc (filename, options = {}) {
Expand Down
56 changes: 54 additions & 2 deletions integration-tests/opentelemetry.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ const { fork } = require('child_process')
const { join } = require('path')
const { assert } = require('chai')

function check (agent, proc, timeout, onMessage = () => { }) {
function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
const messageReceiver = isMetrics
? agent.assertTelemetryReceived(onMessage, timeout, 'generate-metrics')
: agent.assertMessageReceived(onMessage, timeout)

return Promise.all([
agent.assertMessageReceived(onMessage, timeout),
messageReceiver,
new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error('Process timed out'))
Expand Down Expand Up @@ -37,6 +41,11 @@ function eachEqual (spans, expected, fn) {
return spans.every((span, i) => fn(span) === expected[i])
}

function nearNow (ts, now = Date.now(), range = 1000) {
const delta = Math.abs(now - ts)
return delta < range && delta >= 0
}

describe('opentelemetry', () => {
let agent
let proc
Expand Down Expand Up @@ -75,6 +84,49 @@ describe('opentelemetry', () => {
})
})

it('should capture telemetry', () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
DD_TRACE_AGENT_PORT: agent.port,
DD_TRACE_OTEL_ENABLED: 1,
DD_TELEMETRY_HEARTBEAT_INTERVAL: 1,
TIMEOUT: 1500
}
})

return check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
assert.strictEqual(metrics.namespace, 'tracers')
assert.strictEqual(metrics.lib_language, 'nodejs')
assert.strictEqual(metrics.lib_version, process.version)

const spanCreated = metrics.series.find(({ metric }) => metric === 'span_created')
const spanFinished = metrics.series.find(({ metric }) => metric === 'span_finished')

// Validate common fields between start and finish
for (const series of [spanCreated, spanFinished]) {
assert.ok(series)

assert.strictEqual(series.points.length, 1)
assert.strictEqual(series.points[0].length, 2)

const [ts, value] = series.points[0]
assert.ok(nearNow(ts, Date.now() / 1e3))
assert.strictEqual(value, 1)

assert.strictEqual(series.type, 'count')
assert.strictEqual(series.common, true)
assert.deepStrictEqual(series.tags, [
'integration_name:otel',
'otel_enabled:true'
])
}
}, true)
})

it('should work within existing datadog-traced http request', async () => {
proc = fork(join(cwd, 'opentelemetry/server.js'), {
cwd,
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/opentelemetry/basic.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const TIMEOUT = Number(process.env.TIMEOUT || 0)

const tracer = require('dd-trace').init()

const { TracerProvider } = tracer
Expand All @@ -16,5 +18,8 @@ const otelTracer = ot.trace.getTracer(
otelTracer.startActiveSpan('otel-sub', otelSpan => {
setImmediate(() => {
otelSpan.end()

// Allow the process to be held open to gather telemetry metrics
setTimeout(() => {}, TIMEOUT)
})
})
1 change: 1 addition & 0 deletions packages/dd-trace/src/opentelemetry/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Span {
context: spanContext._ddContext,
startTime,
hostname: _tracer._hostname,
integrationName: 'otel',
tags: {
'service.name': _tracer._service
}
Expand Down
16 changes: 16 additions & 0 deletions packages/dd-trace/src/opentracing/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const tagger = require('../tagger')
const metrics = require('../metrics')
const log = require('../log')
const { storage } = require('../../../datadog-core')
const telemetryMetrics = require('../telemetry/metrics')

const tracerMetrics = telemetryMetrics.manager.namespace('tracers')

const {
DD_TRACE_EXPERIMENTAL_STATE_TRACKING,
Expand All @@ -20,6 +23,8 @@ const {
const unfinishedRegistry = createRegistry('unfinished')
const finishedRegistry = createRegistry('finished')

const otelEnabled = !!process.env.DD_TRACE_OTEL_ENABLED

class DatadogSpan {
constructor (tracer, processor, prioritySampler, fields, debug) {
const operationName = fields.operationName
Expand All @@ -38,6 +43,12 @@ class DatadogSpan {
// This name property is not updated when the span name changes.
// This is necessary for span count metrics.
this._name = operationName
this._integrationName = fields.integrationName || 'opentracing'

tracerMetrics.count('span_created', {
integration_name: this._integrationName,
otel_enabled: otelEnabled
}).inc()

this._spanContext = this._createContext(parent, fields)
this._spanContext._name = operationName
Expand Down Expand Up @@ -126,6 +137,11 @@ class DatadogSpan {
}
}

tracerMetrics.count('span_finished', {
integration_name: this._integrationName,
otel_enabled: otelEnabled
}).inc()

if (DD_TRACE_EXPERIMENTAL_SPAN_COUNTS && finishedRegistry) {
metrics.decrement('runtime.node.spans.unfinished')
metrics.decrement('runtime.node.spans.unfinished.by.name', `span_name:${this._name}`)
Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/opentracing/tracer.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class DatadogTracer {
tags,
startTime: options.startTime,
hostname: this._hostname,
traceId128BitGenerationEnabled: this._traceId128BitGenerationEnabled
traceId128BitGenerationEnabled: this._traceId128BitGenerationEnabled,
integrationName: options.integrationName
}, this._debug)

span.addTags(this._tags)
Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/plugins/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class TracingPlugin extends Plugin {
'span.type': type,
...meta,
...metrics
}
},
integrationName: type
})

analyticsSampler.sample(span, this.config.measured)
Expand Down
3 changes: 3 additions & 0 deletions packages/dd-trace/src/telemetry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const os = require('os')
const dependencies = require('./dependencies')
const { sendData } = require('./send-data')

const { manager: metricsManager } = require('./metrics')

const telemetryStartChannel = dc.channel('datadog:telemetry:start')
const telemetryStopChannel = dc.channel('datadog:telemetry:stop')

Expand Down Expand Up @@ -121,6 +123,7 @@ function start (aConfig, thePluginManager) {
dependencies.start(config, application, host)
sendData(config, application, host, 'app-started', appStarted())
interval = setInterval(() => {
metricsManager.send(config, application, host)
sendData(config, application, host, 'app-heartbeat')
}, heartbeatInterval)
interval.unref()
Expand Down
Loading

0 comments on commit f6d3fcd

Please sign in to comment.