Skip to content

Commit

Permalink
exports more NATS metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
derhuerst committed Nov 19, 2024
1 parent 0d9c783 commit ec16ce1
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,45 @@ const serveGtfsRtDataFromNats = async (cfg, opt = {}) => {
process.exit(1)
}

// NATS-related metrics
// Note: We mirror OpenDataVBB/gtfs-rt-feed's & vdv-453-nats-adapter's metrics here.
const natsNrOfMessagesReceivedTotal = new Counter({
name: 'nats_nr_of_msgs_received_total',
help: 'number of messages received from NATS',
registers: [metricsRegister],
labelNames: [
'stream', // name of the JetStream stream
'consumer', // name of the JetStream consumer
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
'redelivered', // 1/0
],
})
const natsLatestMessageReceivedTimestampSeconds = new Gauge({
name: 'nats_latest_msg_received_timestamp_seconds',
help: 'when the latest message has been received from NATS',
registers: [metricsRegister],
labelNames: [
'stream', // name of the JetStream stream
'consumer', // name of the JetStream consumer
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
'redelivered', // 1/0
],
})
// NATS gives separate sequence numbers to both a) messages in a stream and b) messages as (re-)received by a consumer.
// We currently use `msg.seq`, which is the stream sequence (not the consumer sequence) of the message.
const natsMsgSeq = new Gauge({
// todo [breaking]: rename to e.g. nats_latest_msg_received_seq for consistency
name: 'nats_msg_seq',
help: 'sequence number of the latest NATS message being processed',
registers: [metricsRegister],
})
// todo [breaking]: remove in favor of `nats_nr_of_msgs_received_total`
const receivedFromNatsTotal = new Counter({
name: 'received_from_nats_total',
help: 'no. of TripUpdates received from NATS',
registers: [metricsRegister],
})

const digestTime = new Summary({
name: 'digest_time_seconds',
help: 'time needed to add a TripUpdate into the GTFS-RT feed',
Expand Down Expand Up @@ -213,8 +247,39 @@ const serveGtfsRtDataFromNats = async (cfg, opt = {}) => {
// todo: warn-log publish failures?

const onNatsMsg = (msg) => {
const tReceived = Date.now()
// todo: trace-log msg
receivedFromNatsTotal.inc()

// update NATS metrics
{
const {
// todo: "subject" or "topic"? what is the canonical terminology?
subject: topic,
} = msg
const {
stream,
consumer,
seq, // stream sequence, not consumer sequence
} = msg.info
// We slice() to keep the cardinality low in case of a bug.
const topic_root = (topic.split('.')[0] || '').slice(0, 7)
const redelivered = msg.info.redelivered ? '1' : '0'
natsNrOfMessagesReceivedTotal.inc({
stream, // name of the JetStream stream
consumer, // name of the JetStream consumer
topic_root,
redelivered,
})
natsLatestMessageReceivedTimestampSeconds.set({
stream, // name of the JetStream stream
consumer, // name of the JetStream consumer
topic_root,
redelivered,
}, tReceived / 1000)
natsMsgSeq.set(seq)
receivedFromNatsTotal.inc()
}

const t0 = performance.now()

const tripUpdate = msg.json(msg.data)
Expand Down

0 comments on commit ec16ce1

Please sign in to comment.