Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry metrics for OTel #3259

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions integration-tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class FakeAgent extends EventEmitter {
async start () {
const app = express()
app.use(bodyParser.raw({ limit: Infinity, type: 'application/msgpack' }))
app.use(bodyParser.json({ limit: Infinity, type: 'application/json' }))
app.put('/v0.4/traces', (req, res) => {
if (req.body.length === 0) return res.status(200).send()
res.status(200).send({ rate_by_service: { 'service:,env:': 1 } })
Expand All @@ -43,6 +44,13 @@ class FakeAgent extends EventEmitter {
files: req.files
})
})
app.post('/telemetry/proxy/api/v2/apmtelemetry', (req, res) => {
res.status(200).send()
this.emit('telemetry', {
headers: req.headers,
payload: req.body
})
})

return new Promise((resolve, reject) => {
const timeoutObj = setTimeout(() => {
Expand Down Expand Up @@ -103,6 +111,48 @@ class FakeAgent extends EventEmitter {

return resultPromise
}

assertTelemetryReceived (fn, timeout, requestType, expectedMessageCount = 1) {
timeout = timeout || 5000
let resultResolve
let resultReject
let msgCount = 0
const errors = []

const timeoutObj = setTimeout(() => {
resultReject([...errors, new Error('timeout')])
}, timeout)

const resultPromise = new Promise((resolve, reject) => {
resultResolve = () => {
clearTimeout(timeoutObj)
resolve()
}
resultReject = (e) => {
clearTimeout(timeoutObj)
reject(e)
}
})

const messageHandler = msg => {
if (msg.payload.request_type !== requestType) return
msgCount += 1
try {
fn(msg)
if (msgCount === expectedMessageCount) {
resultResolve()
}
} catch (e) {
errors.push(e)
}
if (msgCount === expectedMessageCount) {
this.removeListener('telemetry', messageHandler)
}
}
this.on('telemetry', messageHandler)

return resultPromise
}
}

function spawnProc (filename, options = {}) {
Expand Down
56 changes: 54 additions & 2 deletions integration-tests/opentelemetry.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ const { join } = require('path')
const { assert } = require('chai')
const { satisfies } = require('semver')

function check (agent, proc, timeout, onMessage = () => { }) {
function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
const messageReceiver = isMetrics
? agent.assertTelemetryReceived(onMessage, timeout, 'generate-metrics')
: agent.assertMessageReceived(onMessage, timeout)

return Promise.all([
agent.assertMessageReceived(onMessage, timeout),
messageReceiver,
new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error('Process timed out'))
Expand Down Expand Up @@ -38,6 +42,11 @@ function eachEqual (spans, expected, fn) {
return spans.every((span, i) => fn(span) === expected[i])
}

function nearNow (ts, now = Date.now(), range = 1000) {
const delta = Math.abs(now - ts)
return delta < range && delta >= 0
}

describe('opentelemetry', () => {
let agent
let proc
Expand Down Expand Up @@ -84,6 +93,49 @@ describe('opentelemetry', () => {
})
})

it('should capture telemetry', () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
DD_TRACE_AGENT_PORT: agent.port,
DD_TRACE_OTEL_ENABLED: 1,
DD_TELEMETRY_HEARTBEAT_INTERVAL: 1,
TIMEOUT: 1500
}
})

return check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
assert.strictEqual(metrics.namespace, 'tracers')

const spanCreated = metrics.series.find(({ metric }) => metric === 'span_created')
const spanFinished = metrics.series.find(({ metric }) => metric === 'span_finished')

// Validate common fields between start and finish
for (const series of [spanCreated, spanFinished]) {
assert.ok(series)

assert.strictEqual(series.points.length, 1)
assert.strictEqual(series.points[0].length, 2)

const [ts, value] = series.points[0]
assert.ok(nearNow(ts, Date.now() / 1e3))
assert.strictEqual(value, 1)

assert.strictEqual(series.type, 'count')
assert.strictEqual(series.common, true)
assert.deepStrictEqual(series.tags, [
'integration_name:otel',
'otel_enabled:true',
'lib_language:nodejs',
`version:${process.version}`
])
}
}, true)
})

it('should work within existing datadog-traced http request', async () => {
proc = fork(join(cwd, 'opentelemetry/server.js'), {
cwd,
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/opentelemetry/basic.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const TIMEOUT = Number(process.env.TIMEOUT || 0)

const tracer = require('dd-trace').init()

const { TracerProvider } = tracer
Expand All @@ -16,5 +18,8 @@ const otelTracer = ot.trace.getTracer(
otelTracer.startActiveSpan('otel-sub', otelSpan => {
setImmediate(() => {
otelSpan.end()

// Allow the process to be held open to gather telemetry metrics
setTimeout(() => {}, TIMEOUT)
})
})
1 change: 1 addition & 0 deletions packages/dd-trace/src/opentelemetry/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Span {
context: spanContext._ddContext,
startTime,
hostname: _tracer._hostname,
integrationName: 'otel',
tags: {
'service.name': _tracer._service
}
Expand Down
32 changes: 32 additions & 0 deletions packages/dd-trace/src/opentracing/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const tagger = require('../tagger')
const metrics = require('../metrics')
const log = require('../log')
const { storage } = require('../../../datadog-core')
const telemetryMetrics = require('../telemetry/metrics')

const tracerMetrics = telemetryMetrics.manager.namespace('tracers')

const {
DD_TRACE_EXPERIMENTAL_STATE_TRACKING,
Expand All @@ -20,6 +23,30 @@ const {
const unfinishedRegistry = createRegistry('unfinished')
const finishedRegistry = createRegistry('finished')

const OTEL_ENABLED = !!process.env.DD_TRACE_OTEL_ENABLED

const integrationCounters = {
span_created: {},
span_finished: {}
}

function getIntegrationCounter (event, integration) {
const counters = integrationCounters[event]

if (integration in counters) {
return counters[integration]
}

const counter = tracerMetrics.count(event, [
`integration_name:${integration.toLowerCase()}`,
`otel_enabled:${OTEL_ENABLED}`
])

integrationCounters[event][integration] = counter

return counter
}

class DatadogSpan {
constructor (tracer, processor, prioritySampler, fields, debug) {
const operationName = fields.operationName
Expand All @@ -38,6 +65,9 @@ class DatadogSpan {
// This name property is not updated when the span name changes.
// This is necessary for span count metrics.
this._name = operationName
this._integrationName = fields.integrationName || 'opentracing'

getIntegrationCounter('span_created', this._integrationName).inc()

this._spanContext = this._createContext(parent, fields)
this._spanContext._name = operationName
Expand Down Expand Up @@ -126,6 +156,8 @@ class DatadogSpan {
}
}

getIntegrationCounter('span_finished', this._integrationName).inc()

if (DD_TRACE_EXPERIMENTAL_SPAN_COUNTS && finishedRegistry) {
metrics.decrement('runtime.node.spans.unfinished')
metrics.decrement('runtime.node.spans.unfinished.by.name', `span_name:${this._name}`)
Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/opentracing/tracer.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class DatadogTracer {
tags,
startTime: options.startTime,
hostname: this._hostname,
traceId128BitGenerationEnabled: this._traceId128BitGenerationEnabled
traceId128BitGenerationEnabled: this._traceId128BitGenerationEnabled,
integrationName: options.integrationName
}, this._debug)

span.addTags(this._tags)
Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/plugins/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class TracingPlugin extends Plugin {
'span.type': type,
...meta,
...metrics
}
},
integrationName: type
})

analyticsSampler.sample(span, this.config.measured)
Expand Down
3 changes: 3 additions & 0 deletions packages/dd-trace/src/telemetry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const os = require('os')
const dependencies = require('./dependencies')
const { sendData } = require('./send-data')

const { manager: metricsManager } = require('./metrics')

const telemetryStartChannel = dc.channel('datadog:telemetry:start')
const telemetryStopChannel = dc.channel('datadog:telemetry:stop')

Expand Down Expand Up @@ -121,6 +123,7 @@ function start (aConfig, thePluginManager) {
dependencies.start(config, application, host)
sendData(config, application, host, 'app-started', appStarted())
interval = setInterval(() => {
metricsManager.send(config, application, host)
sendData(config, application, host, 'app-heartbeat')
}, heartbeatInterval)
interval.unref()
Expand Down
Loading