Skip to content

Commit

Permalink
Added some metrics to keep count of event listeners.
Browse files Browse the repository at this point in the history
  • Loading branch information
flexable777 committed Jan 3, 2024
1 parent 2843f30 commit f819a07
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 94 deletions.
2 changes: 1 addition & 1 deletion deploy/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ envs:
- name: JAVA_TOOL_OPTIONS
value: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
- name: JAVA_OPTS
value: "-Xms2048m -Xmx10000m"
value: "-Xms512m -Xmx4096m"
6 changes: 3 additions & 3 deletions deploy/nais.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ spec:
timeout: 1
resources:
limits:
memory: 16000Mi
memory: 5000Mi
requests:
cpu: 4000m
memory: 10000Mi
cpu: 1000m
memory: 3000Mi
ingresses:
{{#each ingresses as |ingress|}}
- {{ingress}}
Expand Down
117 changes: 27 additions & 90 deletions src/main/kotlin/no/nav/klage/oppgave/api/controller/EventController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package no.nav.klage.oppgave.api.controller

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.micrometer.core.instrument.MeterRegistry
import io.swagger.v3.oas.annotations.tags.Tag
import jakarta.servlet.http.HttpServletRequest
import no.nav.klage.oppgave.config.SecurityConfiguration
import no.nav.klage.oppgave.config.setCurrentCountForEvents
import no.nav.klage.oppgave.domain.kafka.InternalBehandlingEvent
import no.nav.klage.oppgave.service.AivenKafkaClientCreator
import no.nav.klage.oppgave.util.getLogger
import no.nav.security.token.support.core.api.ProtectedWithClaims
import no.nav.security.token.support.core.api.Unprotected
import org.springframework.http.MediaType
import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Flux
import java.time.Duration
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger


@RestController
Expand All @@ -24,8 +25,11 @@ import java.time.LocalDateTime
@RequestMapping("/behandlinger/{behandlingId}/events")
class EventController(
private val aivenKafkaClientCreator: AivenKafkaClientCreator,
private val meterRegistry: MeterRegistry,
) {

private val counter = AtomicInteger(0)

companion object {
@Suppress("JAVA_CLASS_ON_COMPANION")
private val logger = getLogger(javaClass.enclosingClass)
Expand All @@ -44,7 +48,12 @@ class EventController(
request.protocol
)

//https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-disconnects
meterRegistry.setCurrentCountForEvents(
eventType = "behandling",
currentCount = counter.incrementAndGet(),
)

//https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-disconnects
val heartbeatStream: Flux<ServerSentEvent<JsonNode>> = Flux.interval(Duration.ofSeconds(10))
.map {
logger.debug("creating heartbeat event for testId: {}, behandlingId: {}", testId, behandlingId)
Expand All @@ -55,7 +64,7 @@ class EventController(
"heartbeat closed for testId: {}, behandlingId: {}. SignalType: {}",
testId,
behandlingId,
signalType
signalType,
)
}
.doOnCancel {
Expand All @@ -69,23 +78,13 @@ class EventController(

val eventPublisher = kafkaReceiver.receive()
.mapNotNull { consumerRecord ->
logger.debug("timeToPublishMeldingToKafka received: timestamp: {}", LocalDateTime.now())
val internalBehandlingEvent = jsonToEvent(consumerRecord.value())
logger.debug("timeToPublishMeldingToKafka internalBehandlingEvent created: timestamp: {}", LocalDateTime.now())
if (internalBehandlingEvent.behandlingId == behandlingId) {

logger.debug("creating behandling event for testId: {}, behandlingId: {}", testId, behandlingId)

val jsonNode = jacksonObjectMapper().readTree(internalBehandlingEvent.data)
logger.debug("timeToPublishMeldingToKafka jsonNode created: timestamp: {}", LocalDateTime.now())
val sseObject = ServerSentEvent.builder<JsonNode>()
ServerSentEvent.builder<JsonNode>()
.id(consumerRecord.offset().toString())
.event(internalBehandlingEvent.type.name)
.data(jsonNode)
.data(jacksonObjectMapper().readTree(internalBehandlingEvent.data))
.build()

logger.debug("timeToPublishMeldingToKafka returning sse: timestamp: {}", LocalDateTime.now())
sseObject
} else null
}
.doOnCancel {
Expand All @@ -99,36 +98,36 @@ class EventController(
"events closed for testId: {}, behandlingId: {}. SignalType: {}",
testId,
behandlingId,
signalType
signalType,
)
meterRegistry.setCurrentCountForEvents(
eventType = "behandling",
currentCount = counter.decrementAndGet(),
)
}

val emitOnce = Flux.generate<ServerSentEvent<JsonNode>> {
val emitFirstHeartbeat = Flux.generate<ServerSentEvent<JsonNode>> {
it.next(toHeartBeatServerSentEvent())
it.complete()
}
.doFinally { signalType ->
logger.debug(
"emitOnce closed for testId: {}, behandlingId: {}. SignalType: {}",
"emitFirstHeartbeat closed for testId: {}, behandlingId: {}. SignalType: {}",
testId,
behandlingId,
signalType
)
}
.doOnCancel {
logger.debug("emitOnce cancel for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug("emitFirstHeartbeat cancel for testId: {}, behandlingId: {}", testId, behandlingId)
}
.doOnTerminate {
logger.debug("emitOnce terminate for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug("emitFirstHeartbeat terminate for testId: {}, behandlingId: {}", testId, behandlingId)
}

// subscribes to the KafkaReceiver -> starts consumption (without observers attached)
// eventPublisher.connect()

return eventPublisher
.mergeWith(emitOnce)
.mergeWith(emitFirstHeartbeat)
.mergeWith(heartbeatStream)

}

private fun toHeartBeatServerSentEvent(): ServerSentEvent<JsonNode> {
Expand All @@ -137,68 +136,6 @@ class EventController(
.build()
}

private fun jsonToEvent(json: String?): InternalBehandlingEvent {
val internalBehandlingEvent = jacksonObjectMapper().readValue(json, InternalBehandlingEvent::class.java)
logger.debug("New stream, received event from Kafka: {}", internalBehandlingEvent)
return internalBehandlingEvent
}

@Unprotected
@GetMapping(value = ["/test"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun eventsTest(
@RequestParam("test-id", required = true) testId: String,
request: HttpServletRequest,
): Flux<ServerSentEvent<String>> {
logger.debug("test events called with HTTP protocol: " + request.protocol)
val heartbeatStream: Flux<ServerSentEvent<String>> = Flux.interval(Duration.ofSeconds(5))
.map {
logger.debug("heartbeat map: $testId")
toHeartBeatServerSentEventAsString()
}
.doFinally { signalType ->
logger.debug("heartbeat closed for testId: $testId, signalType: $signalType")
}
.doOnCancel {
logger.debug("doOnCancel: $testId")
}
.doOnTerminate {
logger.debug("doOnTerminate: $testId")
}

val fakeMessageStream: Flux<ServerSentEvent<String>> = Flux.interval(Duration.ofSeconds(10))
.map {
logger.debug("fakeMessageStream map: $testId")
ServerSentEvent.builder<String>()
.id("1")
.event("MESSAGE_ADDED")
.data("Hello world!")
.build()
}
.doFinally { signalType ->
logger.debug("fakeMessageStream closed for testId: $testId, signalType: $signalType")
}
.doOnCancel {
logger.debug("fakeMessageStream doOnCancel: $testId")
}
.doOnTerminate {
logger.debug("fakeMessageStream doOnTerminate: $testId")
}

val emitOnce = Flux.generate<ServerSentEvent<String>> {
it.next(toHeartBeatServerSentEventAsString())
it.complete()
}.doFinally { signalType ->
logger.debug("emitOnce closed")
}

return heartbeatStream
.mergeWith(emitOnce)
.mergeWith(fakeMessageStream)
}

private fun toHeartBeatServerSentEventAsString(): ServerSentEvent<String> {
return ServerSentEvent.builder<String>()
.event("HEARTBEAT")
.build()
}
private fun jsonToEvent(json: String?): InternalBehandlingEvent =
jacksonObjectMapper().readValue(json, InternalBehandlingEvent::class.java)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.klage.oppgave.config

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import no.nav.klage.oppgave.util.getLogger
import org.springframework.context.annotation.Configuration

Expand All @@ -12,6 +13,7 @@ class MetricsConfiguration {
@Suppress("JAVA_CLASS_ON_COMPANION")
private val logger = getLogger(javaClass.enclosingClass)
const val MOTTATT_KLAGEANKE = "funksjonell.mottattklageanke"
const val CURRENT_EVENT_LISTENERS = "technical.current_event_listeners"
}
}

Expand All @@ -25,4 +27,12 @@ fun MeterRegistry.incrementMottattKlageAnke(kildesystem: String, ytelse: String,
"type",
type
).increment()
}

fun MeterRegistry.setCurrentCountForEvents(eventType: String, currentCount: Int) {
gauge(
/* name = */ MetricsConfiguration.CURRENT_EVENT_LISTENERS,
/* tags = */ listOf(Tag.of("event-type", eventType)),
/* number = */ currentCount
)
}

0 comments on commit f819a07

Please sign in to comment.