Skip to content

Commit

Permalink
More info about events in logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
flexable777 committed Jan 17, 2024
1 parent f328555 commit 6d37483
Showing 1 changed file with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Flux
import java.time.Duration
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -61,29 +62,34 @@ class EventController(
@RequestParam("test-id", required = false) testId: String? = null,
request: HttpServletRequest,
): Flux<ServerSentEvent<JsonNode>?> {
val listenerStarted = LocalDateTime.now()
logger.debug(
"new events called with testId: {}, for behandlingId: {} and protocol: {}",
"events called with testId: {}, for behandlingId: {} and protocol: {}, with start time: {}",
testId,
behandlingId,
request.protocol
request.protocol,
listenerStarted,
)

val behandling = behandlingService.getBehandlingAndCheckLeseTilgangForPerson(behandlingId = behandlingId)

//https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-disconnects
val heartbeatStream: Flux<ServerSentEvent<JsonNode>> = getHeartbeatStream(
behandlingId = behandlingId,
testId = testId
testId = testId,
listenerStartTime = listenerStarted,
)

val behandlingEventPublisher = getBehandlingEventPublisher(
behandlingId = behandlingId,
testId = testId,
listenerStartTime = listenerStarted,
)

val identityEventPublisher = getIdentityEventPublisher(
behandling = behandling,
testId = testId,
listenerStartTime = listenerStarted,
)

val emitFirstHeartbeat = getFirstHeartbeat(
Expand All @@ -99,7 +105,8 @@ class EventController(

private fun getBehandlingEventPublisher(
behandlingId: UUID,
testId: String?
testId: String?,
listenerStartTime: LocalDateTime,
): Flux<ServerSentEvent<JsonNode>?> {
val flux = aivenKafkaClientCreator.getNewKafkaInternalBehandlingEventReceiver().receive()
.mapNotNull { consumerRecord ->
Expand All @@ -113,17 +120,28 @@ class EventController(
} else null
}
.doOnCancel {
logger.debug("events cancel for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"behandling events cancel for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandlingId,
listenerStartTime,
)
}
.doOnTerminate {
logger.debug("events terminate for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"behandling events terminate for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandlingId,
listenerStartTime,
)
}
.doFinally { signalType ->
logger.debug(
"events closed for testId: {}, behandlingId: {}. SignalType: {}",
"behandling events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
testId,
behandlingId,
signalType,
listenerStartTime,
)
gaugeBehandling.decrementAndGet()
}
Expand All @@ -135,7 +153,8 @@ class EventController(

private fun getIdentityEventPublisher(
behandling: Behandling,
testId: String?
testId: String?,
listenerStartTime: LocalDateTime,
): Flux<ServerSentEvent<JsonNode>?> {
val flux = aivenKafkaClientCreator.getNewKafkaInternalIdentityEventReceiver().receive()
.mapNotNull { consumerRecord ->
Expand All @@ -149,17 +168,28 @@ class EventController(
} else null
}
.doOnCancel {
logger.debug("events cancel for testId: {}, behandlingId: {}", testId, behandling.id)
logger.debug(
"identity events cancel for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandling.id,
listenerStartTime,
)
}
.doOnTerminate {
logger.debug("events terminate for testId: {}, behandlingId: {}", testId, behandling.id)
logger.debug(
"identity events terminate for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandling.id,
listenerStartTime,
)
}
.doFinally { signalType ->
logger.debug(
"events closed for testId: {}, behandlingId: {}. SignalType: {}",
"identity events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
testId,
behandling.id,
signalType,
listenerStartTime,
)
gaugeIdentity.decrementAndGet()
}
Expand Down Expand Up @@ -197,26 +227,43 @@ class EventController(
private fun getHeartbeatStream(
behandlingId: UUID,
testId: String?,
listenerStartTime: LocalDateTime,
): Flux<ServerSentEvent<JsonNode>> {
val heartbeatStream: Flux<ServerSentEvent<JsonNode>> = Flux.interval(Duration.ofSeconds(10))
.map {
logger.debug("creating heartbeat event for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"creating heartbeat event for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandlingId,
listenerStartTime,
)
toHeartBeatServerSentEvent()
}
.doFinally { signalType ->
logger.debug(
"heartbeat closed for testId: {}, behandlingId: {}. SignalType: {}",
"heartbeat events closed for testId: {}, behandlingId: {}. SignalType: {}, with start time: {}",
testId,
behandlingId,
signalType,
listenerStartTime,
)
gaugeBehandlingHeartbeat.decrementAndGet()
}
.doOnCancel {
logger.debug("heartbeat cancel for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"heartbeat cancel for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandlingId,
listenerStartTime,
)
}
.doOnTerminate {
logger.debug("heartbeat terminate for testId: {}, behandlingId: {}", testId, behandlingId)
logger.debug(
"heartbeat terminate for testId: {}, behandlingId: {}, with start time: {}",
testId,
behandlingId,
listenerStartTime,
)
}

gaugeBehandlingHeartbeat.incrementAndGet()
Expand Down

0 comments on commit 6d37483

Please sign in to comment.