From 9af49269292bff9cff70d5642c18aca66fb18e98 Mon Sep 17 00:00:00 2001 From: Dinh Anh Tuan Tran Date: Fri, 5 Jan 2024 08:29:14 +0100 Subject: [PATCH] remove usage of prometheus client, replace with micrometer --- .../no/nav/helse/rapids_rivers/KafkaRapid.kt | 5 +-- .../no/nav/helse/rapids_rivers/PingPong.kt | 2 ++ .../{KafkaRapidMetrics.kt => RapidMetrics.kt} | 2 +- .../no/nav/helse/rapids_rivers/River.kt | 36 ++++++++++++++----- .../nav/helse/rapids_rivers/RiverMetrics.kt | 36 +++++++++++++++++++ .../nav/helse/rapids_rivers/PingPongTest.kt | 2 +- .../no/nav/helse/rapids_rivers/KtorBuilder.kt | 13 ++----- .../helse/rapids_rivers/RapidApplication.kt | 5 +-- .../RapidApplicationComponentTest.kt | 4 +-- .../micronaut/RapidsRiversFactory.kt | 11 +++--- .../hm/rapids_rivers/micronaut/RiverHead.kt | 6 ++-- 11 files changed, 84 insertions(+), 38 deletions(-) rename hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/{KafkaRapidMetrics.kt => RapidMetrics.kt} (90%) create mode 100644 hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RiverMetrics.kt diff --git a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapid.kt b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapid.kt index 2d11fa6..d1cf4f3 100644 --- a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapid.kt +++ b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapid.kt @@ -41,7 +41,8 @@ class KafkaRapid( // metric definitions private val consumerMetric = KafkaClientMetrics(consumer) private val producerMetric = KafkaClientMetrics(producer) - private val rapidMetric = KafkaRapidMetrics(this) + private val rapidMetric = RapidMetrics(this) + private val riverMetrics = RiverMetrics() private val topics = listOf(rapidTopic) + extraTopics @@ -251,7 +252,7 @@ class KafkaRapid( } } - fun getMetrics() = listOf(consumerMetric, producerMetric, rapidMetric) + fun getMetrics() = listOf(consumerMetric, producerMetric, rapidMetric, riverMetrics) fun getConsumerMetric() = consumerMetric fun getProducerMetric() = producerMetric diff --git a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/PingPong.kt b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/PingPong.kt index 351f294..20dac13 100644 --- a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/PingPong.kt +++ b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/PingPong.kt @@ -1,6 +1,8 @@ package no.nav.helse.rapids_rivers import com.fasterxml.jackson.databind.JsonNode +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.slf4j.LoggerFactory import java.time.Duration import java.time.LocalDateTime diff --git a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapidMetrics.kt b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RapidMetrics.kt similarity index 90% rename from hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapidMetrics.kt rename to hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RapidMetrics.kt index e175fa8..76dae49 100644 --- a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/KafkaRapidMetrics.kt +++ b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RapidMetrics.kt @@ -4,7 +4,7 @@ import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.binder.MeterBinder -class KafkaRapidMetrics(private val rapid: KafkaRapid): MeterBinder { +class RapidMetrics(private val rapid: KafkaRapid): MeterBinder { override fun bindTo(registry: MeterRegistry) { Gauge.builder("rapids_rivers_consumer_active", this) {consumerActive()}.register(registry) diff --git a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/River.kt b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/River.kt index 580847a..100cbd1 100644 --- a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/River.kt +++ b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/River.kt @@ -1,5 +1,13 @@ package no.nav.helse.rapids_rivers +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import io.prometheus.client.CollectorRegistry import no.nav.helse.rapids_rivers.River.PacketListener.Companion.Name import java.util.* @@ -10,7 +18,21 @@ fun interface RandomIdGenerator { fun generateId(): String } -class River(rapidsConnection: RapidsConnection, private val randomIdGenerator: RandomIdGenerator = RandomIdGenerator.Default) : RapidsConnection.MessageListener { +class DefaultMeterRegistry { + companion object { + val collectorRegistry = CollectorRegistry.defaultRegistry + val Default = PrometheusMeterRegistry( + PrometheusConfig.DEFAULT, + collectorRegistry, + Clock.SYSTEM + ) + } + + +} + +class River(rapidsConnection: RapidsConnection, private val riverMetrics: RiverMetrics = RiverMetrics(), + private val randomIdGenerator: RandomIdGenerator = RandomIdGenerator.Default) : RapidsConnection.MessageListener{ private val validations = mutableListOf() private val listeners = mutableListOf() @@ -57,27 +79,23 @@ class River(rapidsConnection: RapidsConnection, private val randomIdGenerator: R packet.interestedIn("@event_name") listeners.forEach { val eventName = packet["@event_name"].textValue() ?: "ukjent" - Metrics.onPacketHistorgram.labels( - context.rapidName(), - it.name(), - eventName - ).time { + riverMetrics.timer(context.rapidName(), it.name(), eventName) { it.onPacket(packet, context) } - Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "ok").inc() + riverMetrics.messageCounter(context.rapidName(), it.name(), "success") } } private fun onSevere(error: MessageProblems.MessageException, context: MessageContext) { listeners.forEach { - Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "severe").inc() + riverMetrics.messageCounter(context.rapidName(), it.name(), "severe") it.onSevere(error, context) } } private fun onError(problems: MessageProblems, context: MessageContext) { listeners.forEach { - Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "error").inc() + riverMetrics.messageCounter(context.rapidName(), it.name(), "error") it.onError(problems, context) } } diff --git a/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RiverMetrics.kt b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RiverMetrics.kt new file mode 100644 index 0000000..908557f --- /dev/null +++ b/hm-rapids-and-rivers-v2-core/src/main/kotlin/no/nav/helse/rapids_rivers/RiverMetrics.kt @@ -0,0 +1,36 @@ +package no.nav.helse.rapids_rivers + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.binder.MeterBinder +import org.slf4j.LoggerFactory + +class RiverMetrics: MeterBinder { + + var meterRegistry: MeterRegistry = DefaultMeterRegistry.Default + + companion object { + private val LOG = LoggerFactory.getLogger(RiverMetrics::class.java) + } + override fun bindTo(registry: MeterRegistry) { + LOG.debug("Binding to meter registry {}", registry) + this.meterRegistry = registry + } + + fun messageCounter(rapidName:String, riverName: String, status: String) { + LOG.debug("Incrementing message counter for rapid {} and river {} with status {}", rapidName, riverName, status) + Counter.builder("message_counter").tags("rapid",rapidName, "river", riverName, "status", "severe") + .register(meterRegistry) + .increment() + } + + fun timer(rapidName: String, riverName: String, eventName: String, runnable: Runnable) { + LOG.debug("Recording timer for rapid {} and river {} and event {}", rapidName, riverName, eventName) + Timer.builder("on_packet_seconds") + .tags("rapid", rapidName, "river", riverName, "event_name", eventName) + .register(meterRegistry) + .record(runnable) + } +} \ No newline at end of file diff --git a/hm-rapids-and-rivers-v2-core/src/test/kotlin/no/nav/helse/rapids_rivers/PingPongTest.kt b/hm-rapids-and-rivers-v2-core/src/test/kotlin/no/nav/helse/rapids_rivers/PingPongTest.kt index 6764bbe..969fb94 100644 --- a/hm-rapids-and-rivers-v2-core/src/test/kotlin/no/nav/helse/rapids_rivers/PingPongTest.kt +++ b/hm-rapids-and-rivers-v2-core/src/test/kotlin/no/nav/helse/rapids_rivers/PingPongTest.kt @@ -9,7 +9,7 @@ import java.util.UUID class PingPongTest { val rapid = TestRapid() - val river = PingPong(rapid, "pingerino", "pongaroonie") + val river = PingPong(rapidsConnection = rapid, appName = "pingerino", instanceId = "pongaroonie") @BeforeEach fun reset() { diff --git a/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/KtorBuilder.kt b/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/KtorBuilder.kt index e3b83db..9828445 100644 --- a/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/KtorBuilder.kt +++ b/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/KtorBuilder.kt @@ -34,7 +34,6 @@ import org.slf4j.Logger class KtorBuilder { private val builder = ApplicationEngineEnvironmentBuilder() - private var collectorRegistry = CollectorRegistry.defaultRegistry private val extraMeterBinders = mutableListOf() fun port(port: Int) = apply { @@ -54,11 +53,7 @@ class KtorBuilder { fun build(factory: ApplicationEngineFactory): ApplicationEngine = embeddedServer(factory, applicationEngineEnvironment { module { install(MicrometerMetrics) { - registry = PrometheusMeterRegistry( - PrometheusConfig.DEFAULT, - collectorRegistry, - Clock.SYSTEM - ) + registry = DefaultMeterRegistry.Default meterBinders = listOf( ClassLoaderMetrics(), JvmMemoryMetrics(), @@ -74,7 +69,7 @@ class KtorBuilder { val names = call.request.queryParameters.getAll("name[]")?.toSet() ?: emptySet() call.respondTextWriter(ContentType.parse(TextFormat.CONTENT_TYPE_004)) { - TextFormat.write004(this, collectorRegistry.filteredMetricFamilySamples(names)) + TextFormat.write004(this, DefaultMeterRegistry.collectorRegistry.filteredMetricFamilySamples(names)) } } } @@ -129,10 +124,6 @@ class KtorBuilder { } } - fun withCollectorRegistry(registry: CollectorRegistry) = apply { - this.collectorRegistry = registry - } - fun metrics(metrics: List) = apply { extraMeterBinders.addAll(metrics) } diff --git a/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt b/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt index aeadb97..d33b0c8 100644 --- a/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt +++ b/hm-rapids-and-rivers-v2-ktor/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt @@ -3,6 +3,7 @@ package no.nav.helse.rapids_rivers import io.ktor.server.application.Application import io.ktor.server.cio.CIO import io.ktor.server.engine.* +import io.micrometer.core.instrument.MeterRegistry import io.prometheus.client.CollectorRegistry import kotlinx.coroutines.delay import org.slf4j.LoggerFactory @@ -147,10 +148,6 @@ class RapidApplication internal constructor( .readiness(rapid::isReady) .metrics(rapid.getMetrics()) - fun withCollectorRegistry(registry: CollectorRegistry = CollectorRegistry.defaultRegistry) = apply { - ktor.withCollectorRegistry(registry) - } - fun withKtorModule(module: Application.() -> Unit) = apply { ktor.module(module) } diff --git a/hm-rapids-and-rivers-v2-ktor/src/test/kotlin/no/nav/helse/rapids_rivers/RapidApplicationComponentTest.kt b/hm-rapids-and-rivers-v2-ktor/src/test/kotlin/no/nav/helse/rapids_rivers/RapidApplicationComponentTest.kt index e86e059..7e1575c 100644 --- a/hm-rapids-and-rivers-v2-ktor/src/test/kotlin/no/nav/helse/rapids_rivers/RapidApplicationComponentTest.kt +++ b/hm-rapids-and-rivers-v2-ktor/src/test/kotlin/no/nav/helse/rapids_rivers/RapidApplicationComponentTest.kt @@ -171,7 +171,7 @@ internal class RapidApplicationComponentTest { @DelicateCoroutinesApi @Test fun `metric values`() { - withRapid(collectorRegistry = CollectorRegistry.defaultRegistry) { rapid -> + withRapid { rapid -> waitForEvent("application_ready") rapid.publish("""{"@event_name":"ping","@id":"${UUID.randomUUID()}","ping_time":"${LocalDateTime.now()}"}""") waitForEvent("ping") @@ -230,12 +230,10 @@ internal class RapidApplicationComponentTest { @DelicateCoroutinesApi private fun withRapid( builder: RapidApplication.Builder? = null, - collectorRegistry: CollectorRegistry = CollectorRegistry(), block: (RapidsConnection) -> Unit ) { val rapidsConnection = (builder ?: RapidApplication.Builder(RapidApplication.RapidApplicationConfig.fromEnv(createConfig()))) - .withCollectorRegistry(collectorRegistry) .build() val job = GlobalScope.launch { rapidsConnection.start() } try { diff --git a/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RapidsRiversFactory.kt b/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RapidsRiversFactory.kt index 7e856ef..76d9546 100644 --- a/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RapidsRiversFactory.kt +++ b/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RapidsRiversFactory.kt @@ -2,16 +2,14 @@ package no.nav.hm.rapids_rivers.micronaut import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics -import io.micronaut.context.annotation.Bean import io.micronaut.context.annotation.Factory -import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Requires -import io.micronaut.context.env.Environment import jakarta.inject.Named import jakarta.inject.Singleton import no.nav.helse.rapids_rivers.KafkaConfig import no.nav.helse.rapids_rivers.KafkaRapid -import no.nav.helse.rapids_rivers.KafkaRapidMetrics +import no.nav.helse.rapids_rivers.RapidMetrics +import no.nav.helse.rapids_rivers.RiverMetrics import org.slf4j.LoggerFactory @Factory @@ -36,7 +34,7 @@ class RapidsRiversFactory { } @Singleton - fun rapidMetrics(kafkaRapid: KafkaRapid): KafkaRapidMetrics = kafkaRapid.getRapidMetric() + fun rapidMetrics(kafkaRapid: KafkaRapid): RapidMetrics = kafkaRapid.getRapidMetric() @Singleton @Named("ConsumerMetric") @@ -46,5 +44,8 @@ class RapidsRiversFactory { @Named("ProducerMetric") fun producerMetric(kafkaRapid: KafkaRapid): KafkaClientMetrics = kafkaRapid.getProducerMetric() + @Singleton + fun riverMetrics(): RiverMetrics = RiverMetrics() + } diff --git a/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RiverHead.kt b/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RiverHead.kt index 3543920..7a62166 100644 --- a/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RiverHead.kt +++ b/hm-rapids-and-rivers-v2-micronaut/src/main/kotlin/no/nav/hm/rapids_rivers/micronaut/RiverHead.kt @@ -1,15 +1,17 @@ package no.nav.hm.rapids_rivers.micronaut +import io.micrometer.core.instrument.MeterRegistry import io.micronaut.context.annotation.Prototype import no.nav.helse.rapids_rivers.RapidsConnection import no.nav.helse.rapids_rivers.River +import no.nav.helse.rapids_rivers.RiverMetrics import org.slf4j.LoggerFactory @Prototype -class RiverHead(rapidsConnection: RapidsConnection) { +class RiverHead(rapidsConnection: RapidsConnection, riverMetrics: RiverMetrics) { - private val river = River(rapidsConnection) + private val river = River(rapidsConnection, riverMetrics) companion object { private val LOG = LoggerFactory.getLogger(River::class.java)