Skip to content

Commit

Permalink
More info about events in logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
flexable777 committed Jan 18, 2024
1 parent e89179a commit 886fc4c
Showing 1 changed file with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@ class EventController(
request: HttpServletRequest,
): Flux<ServerSentEvent<JsonNode>?> {
val listenerStarted = LocalDateTime.now()

var traceId = "unknown"
try {
//[00-b37d69e0b7574e7c8ea89df62c9bab4c-ce0c8200a58042dd-00]
traceId = request.getHeader("traceparent").first().toString().split("-")[1]
} catch (e: Exception) {
logger.warn("could not extract traceId")
}

logger.debug(
"events called with testId: {}, for behandlingId: {} and protocol: {}, with start time: {}",
"events called with testId: {}, for behandlingId: {} and protocol: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
request.protocol,
listenerStarted,
traceId,
)

val behandling = behandlingService.getBehandlingAndCheckLeseTilgangForPerson(behandlingId = behandlingId)
Expand All @@ -78,23 +88,27 @@ class EventController(
behandlingId = behandlingId,
testId = testId,
listenerStartTime = listenerStarted,
traceId = traceId,
)

val behandlingEventPublisher = getBehandlingEventPublisher(
behandlingId = behandlingId,
testId = testId,
listenerStartTime = listenerStarted,
traceId = traceId,
)

val identityEventPublisher = getIdentityEventPublisher(
behandling = behandling,
testId = testId,
listenerStartTime = listenerStarted,
traceId = traceId,
)

val emitFirstHeartbeat = getFirstHeartbeat(
behandlingId = behandlingId,
testId = testId,
traceId = traceId,
)

return behandlingEventPublisher
Expand All @@ -107,6 +121,7 @@ class EventController(
behandlingId: UUID,
testId: String?,
listenerStartTime: LocalDateTime,
traceId: String,
): Flux<ServerSentEvent<JsonNode>?> {
val flux = aivenKafkaClientCreator.getNewKafkaInternalBehandlingEventReceiver().receive()
.mapNotNull { consumerRecord ->
Expand All @@ -121,27 +136,30 @@ class EventController(
}
.doOnCancel {
logger.debug(
"behandling events cancel for testId: {}, behandlingId: {}, with start time: {}",
"behandling events cancel for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
listenerStartTime,
traceId,
)
}
.doOnTerminate {
logger.debug(
"behandling events terminate for testId: {}, behandlingId: {}, with start time: {}",
"behandling events terminate for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
listenerStartTime,
traceId,
)
}
.doFinally { signalType ->
logger.debug(
"behandling events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
"behandling events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
signalType,
listenerStartTime,
traceId,
)
gaugeBehandling.decrementAndGet()
}
Expand All @@ -155,6 +173,7 @@ class EventController(
behandling: Behandling,
testId: String?,
listenerStartTime: LocalDateTime,
traceId: String,
): Flux<ServerSentEvent<JsonNode>?> {
val flux = aivenKafkaClientCreator.getNewKafkaInternalIdentityEventReceiver().receive()
.mapNotNull { consumerRecord ->
Expand All @@ -169,27 +188,30 @@ class EventController(
}
.doOnCancel {
logger.debug(
"identity events cancel for testId: {}, behandlingId: {}, with start time: {}",
"identity events cancel for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandling.id,
listenerStartTime,
traceId,
)
}
.doOnTerminate {
logger.debug(
"identity events terminate for testId: {}, behandlingId: {}, with start time: {}",
"identity events terminate for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandling.id,
listenerStartTime,
traceId,
)
}
.doFinally { signalType ->
logger.debug(
"identity events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
"identity events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}, traceId: {}",
testId,
behandling.id,
signalType,
listenerStartTime,
traceId,
)
gaugeIdentity.decrementAndGet()
}
Expand All @@ -202,24 +224,36 @@ class EventController(
private fun getFirstHeartbeat(
behandlingId: UUID,
testId: String?,
traceId: String,
): Flux<ServerSentEvent<JsonNode>> {
val emitFirstHeartbeat = Flux.generate<ServerSentEvent<JsonNode>> {
it.next(toHeartBeatServerSentEvent())
it.complete()
}
.doFinally { signalType ->
logger.debug(
"emitFirstHeartbeat closed for testId: {}, behandlingId: {}. SignalType: {}",
"emitFirstHeartbeat closed for testId: {}, behandlingId: {}. SignalType: {}, traceId: {}",
testId,
behandlingId,
signalType
signalType,
traceId,
)
}
.doOnCancel {
logger.debug("emitFirstHeartbeat cancel for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"emitFirstHeartbeat cancel for testId: {}, behandlingId: {}, traceId: {}",
testId,
behandlingId,
traceId,
)
}
.doOnTerminate {
logger.debug("emitFirstHeartbeat terminate for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"emitFirstHeartbeat terminate for testId: {}, behandlingId: {}, traceId: {}",
testId,
behandlingId,
traceId,
)
}
return emitFirstHeartbeat
}
Expand All @@ -228,41 +262,46 @@ class EventController(
behandlingId: UUID,
testId: String?,
listenerStartTime: LocalDateTime,
traceId: String,
): Flux<ServerSentEvent<JsonNode>> {
val heartbeatStream: Flux<ServerSentEvent<JsonNode>> = Flux.interval(Duration.ofSeconds(10))
.map {
logger.debug(
"creating heartbeat event for testId: {}, behandlingId: {}, with start time: {}",
"creating heartbeat event for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
listenerStartTime,
traceId,
)
toHeartBeatServerSentEvent()
}
.doFinally { signalType ->
logger.debug(
"heartbeat events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
"heartbeat events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
signalType,
listenerStartTime,
traceId,
)
gaugeBehandlingHeartbeat.decrementAndGet()
}
.doOnCancel {
logger.debug(
"heartbeat cancel for testId: {}, behandlingId: {}, with start time: {}",
"heartbeat cancel for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
listenerStartTime,
traceId,
)
}
.doOnTerminate {
logger.debug(
"heartbeat terminate for testId: {}, behandlingId: {}, with start time: {}",
"heartbeat terminate for testId: {}, behandlingId: {}, with start time: {}, traceId: {}",
testId,
behandlingId,
listenerStartTime,
traceId,
)
}

Expand Down

0 comments on commit 886fc4c

Please sign in to comment.