From becb0446f0509c2dfa0c05b640718be054c28829 Mon Sep 17 00:00:00 2001 From: Mikael Bjerga Date: Tue, 19 Dec 2023 17:01:20 +0100 Subject: [PATCH] Bruk tilstandsfunksjoner i servicer --- .../aktiveorgnrservice/AktiveOrgnrService.kt | 183 ++++++------ .../inntektsmelding/brospinn/SpinnService.kt | 66 ++--- .../felles/test/mock/MockRedis.kt | 14 +- .../felles/rapidsrivers/DataKanal.kt | 8 +- .../rapidsrivers/DelegatingFailKanal.kt | 16 -- .../felles/rapidsrivers/FailKanal.kt | 17 +- .../felles/rapidsrivers/StatefullDataKanal.kt | 37 +-- .../rapidsrivers/StatefullEventListener.kt | 19 +- .../composite/CompositeEventListener.kt | 82 ++---- .../rapidsrivers/composite/Transaction.kt | 5 - .../felles/rapidsrivers/redis/RedisStore.kt | 4 +- .../felles/rapidsrivers/DataKanalTest.kt | 18 +- ...atingFailKanalTest.kt => FailKanalTest.kt} | 4 +- gradle.properties | 2 +- .../innsending/InnsendingService.kt | 224 +++++++-------- .../innsending/KvitteringService.kt | 76 ++--- .../GenericDataPackageListenerTest.kt | 8 +- .../inntektservice/InntektService.kt | 235 ++++++++-------- .../inntektservice/InntektServiceTest.kt | 65 ++++- .../notifikasjon/ManuellOpprettSakService.kt | 201 +++++++------- .../notifikasjon/OpprettOppgaveService.kt | 184 ++++++------ .../notifikasjon/OpprettSakService.kt | 233 ++++++++-------- .../notifikasjon/OpprettOppgaveServiceTest.kt | 2 - .../tilgangservice/TilgangService.kt | 185 ++++++------- .../inntektservice/TilgangServiceTest.kt | 53 +++- .../trengerservice/TrengerService.kt | 261 +++++++++--------- 26 files changed, 1060 insertions(+), 1142 deletions(-) delete mode 100644 felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanal.kt delete mode 100644 felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/Transaction.kt rename felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/{DelegatingFailKanalTest.kt => FailKanalTest.kt} (94%) diff --git a/aktiveorgnrservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/aktiveorgnrservice/AktiveOrgnrService.kt b/aktiveorgnrservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/aktiveorgnrservice/AktiveOrgnrService.kt index 16f3cb6caf..349faad2c7 100644 --- a/aktiveorgnrservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/aktiveorgnrservice/AktiveOrgnrService.kt +++ b/aktiveorgnrservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/aktiveorgnrservice/AktiveOrgnrService.kt @@ -18,7 +18,6 @@ import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -39,120 +38,98 @@ import java.util.UUID class AktiveOrgnrService( private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { +) : CompositeEventListener() { + private val sikkerLogger = sikkerLogger() private val logger = logger() - override val event: EventName = EventName.AKTIVE_ORGNR_REQUESTED + + override val event = EventName.AKTIVE_ORGNR_REQUESTED + override val startKeys = listOf( + Key.FNR, + Key.ARBEIDSGIVER_FNR + ) + override val dataKeys = listOf( + Key.ARBEIDSFORHOLD, + Key.ORG_RETTIGHETER, + Key.ARBEIDSTAKER_INFORMASJON, + Key.VIRKSOMHETER + ) init { - withEventListener { - StatefullEventListener( - redisStore = redisStore, - event = event, - dataFelter = arrayOf(Key.FNR, Key.ARBEIDSGIVER_FNR), - mainListener = it, - rapidsConnection = rapid + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + } + + override fun new(message: JsonMessage) { + val json = message.toJsonMap() + val transaksjonId = Key.UUID.les(UuidSerializer, json) + + val innloggetFnr = json[Key.ARBEIDSGIVER_FNR]?.fromJson(String.serializer()) + val sykemeldtFnr = json[Key.FNR]?.fromJson(String.serializer()) + if (innloggetFnr != null && sykemeldtFnr != null) { + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.ARBEIDSGIVERE.toJson(), + Key.IDENTITETSNUMMER to innloggetFnr.toJson(), + Key.UUID to transaksjonId.toJson() ) - } - withDataKanal { - StatefullDataKanal( - dataFelter = arrayOf( - Key.ARBEIDSFORHOLD, - Key.ORG_RETTIGHETER, - Key.ARBEIDSTAKER_INFORMASJON, - Key.VIRKSOMHETER - ), - eventName = event, - mainListener = it, - rapidsConnection = rapid, - redisStore = redisStore + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.ARBEIDSFORHOLD.toJson(), + Key.IDENTITETSNUMMER to sykemeldtFnr.toJson(), + Key.UUID to transaksjonId.toJson() + ) + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.FULLT_NAVN.toJson(), + Key.IDENTITETSNUMMER to sykemeldtFnr.toJson(), + Key.UUID to transaksjonId.toJson() ) + } else { + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(event), + Log.transaksjonId(transaksjonId) + ) { + "Mangler arbeidsgiverFnr eller arbeidstakerFnr." + .also { + sikkerLogger.error(it) + logger.error(it) + } + + onError(message, message.createFail("Ukjent feil oppstod", transaksjonId)) + } } } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun inProgress(message: JsonMessage) { val json = message.toJsonMap() val transaksjonId = Key.UUID.les(UuidSerializer, json) - when (transaction) { - Transaction.NEW -> { - val innloggetFnr = json[Key.ARBEIDSGIVER_FNR]?.fromJson(String.serializer()) - val sykemeldtFnr = json[Key.FNR]?.fromJson(String.serializer()) - if (innloggetFnr != null && sykemeldtFnr != null) { - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.ARBEIDSGIVERE.toJson(), - Key.IDENTITETSNUMMER to innloggetFnr.toJson(), - Key.UUID to transaksjonId.toJson() - ) - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.ARBEIDSFORHOLD.toJson(), - Key.IDENTITETSNUMMER to sykemeldtFnr.toJson(), - Key.UUID to transaksjonId.toJson() - ) - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.FULLT_NAVN.toJson(), - Key.IDENTITETSNUMMER to sykemeldtFnr.toJson(), - Key.UUID to transaksjonId.toJson() - ) - } else { - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(event), - Log.transaksjonId(transaksjonId) - ) { - "Mangler arbeidsgiverFnr eller arbeidstakerFnr." - .also { - sikkerLogger.error(it) - logger.error(it) - } - - terminate(message.createFail("Ukjent feil oppstod", transaksjonId)) - } - } + if (isDataCollected(step1data(transaksjonId))) { + val arbeidsforholdListe = RedisKey.of(transaksjonId, Key.ARBEIDSFORHOLD).read()?.fromJson(Arbeidsforhold.serializer().list()) + val orgrettigheter = RedisKey.of(transaksjonId, Key.ORG_RETTIGHETER).read()?.fromJson(String.serializer().set()) + val result = trekkUtArbeidsforhold(arbeidsforholdListe, orgrettigheter) + result.onSuccess { arbeidsgivere -> + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.VIRKSOMHET.toJson(), + Key.UUID to transaksjonId.toJson(), + Key.ORGNRUNDERENHETER to arbeidsgivere.toJson(String.serializer()) + ) } - - Transaction.IN_PROGRESS -> { - if (isDataCollected(*step1data(transaksjonId))) { - val arbeidsforholdListe = RedisKey.of(transaksjonId, Key.ARBEIDSFORHOLD).read()?.fromJson(Arbeidsforhold.serializer().list()) - val orgrettigheter = RedisKey.of(transaksjonId, Key.ORG_RETTIGHETER).read()?.fromJson(String.serializer().set()) - val result = trekkUtArbeidsforhold(arbeidsforholdListe, orgrettigheter) - result.onSuccess { arbeidsgivere -> - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.VIRKSOMHET.toJson(), - Key.UUID to transaksjonId.toJson(), - Key.ORGNRUNDERENHETER to arbeidsgivere.toJson(String.serializer()) - ) - } - result.onFailure { - val feilmelding = it.message ?: "Ukjent feil oppstod" - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(event), - Log.transaksjonId(transaksjonId) - ) { - sikkerLogger.error(feilmelding) - logger.error(feilmelding) - } - terminate(message.createFail(feilmelding, transaksjonId)) - } + result.onFailure { + val feilmelding = it.message ?: "Ukjent feil oppstod" + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(event), + Log.transaksjonId(transaksjonId) + ) { + sikkerLogger.error(feilmelding) + logger.error(feilmelding) } + onError(message, message.createFail(feilmelding, transaksjonId)) } - - else -> { - logger.info("Transaksjon $transaction er ikke støttet.") - } - } - - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(event), - Log.transaksjonId(transaksjonId) - ) { - sikkerLogger.info("Prosesserer transaksjon $transaction.") } } @@ -207,11 +184,11 @@ class AktiveOrgnrService( } } } - terminate(message.createFail("Ukjent feil oppstod", transaksjonId)) + onError(message, message.createFail("Ukjent feil oppstod", transaksjonId)) } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { val transaksjonId = fail.transaksjonId val clientId = RedisKey.of(transaksjonId, event) @@ -262,7 +239,7 @@ class AktiveOrgnrService( } } - private fun step1data(uuid: UUID): Array = arrayOf( + private fun step1data(uuid: UUID): List = listOf( RedisKey.of(uuid, Key.ARBEIDSFORHOLD), RedisKey.of(uuid, Key.ORG_RETTIGHETER) ) diff --git a/bro-spinn/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/brospinn/SpinnService.kt b/bro-spinn/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/brospinn/SpinnService.kt index dd13ff5722..e7940b8550 100644 --- a/bro-spinn/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/brospinn/SpinnService.kt +++ b/bro-spinn/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/brospinn/SpinnService.kt @@ -9,11 +9,10 @@ import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.les import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -34,30 +33,27 @@ private const val AVSENDER_NAV_NO = "NAV_NO" class SpinnService( private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { +) : CompositeEventListener() { private val logger = logger() private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.EKSTERN_INNTEKTSMELDING_REQUESTED + override val event = EventName.EKSTERN_INNTEKTSMELDING_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.SPINN_INNTEKTSMELDING_ID + ) + override val dataKeys = listOf( + Key.EKSTERN_INNTEKTSMELDING + ) init { - withFailKanal { DelegatingFailKanal(event, it, rapid) } - withDataKanal { - StatefullDataKanal( - dataFelter = arrayOf( - Key.EKSTERN_INNTEKTSMELDING - ), - eventName = event, - mainListener = it, - rapidsConnection = rapid, - redisStore = redisStore - ) - } - withEventListener { StatefullEventListener(redisStore, event, arrayOf(Key.FORESPOERSEL_ID, Key.SPINN_INNTEKTSMELDING_ID), it, rapid) } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun new(message: JsonMessage) { val json = message.toJsonMap() val transaksjonId = Key.UUID.les(UuidSerializer, json) @@ -78,20 +74,24 @@ class SpinnService( Log.event(event), Log.forespoerselId(forespoerselId) ) { - sikkerLogger.info("Prosesserer transaksjon $transaction.") - if (transaction == Transaction.NEW) { - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.HENT_EKSTERN_INNTEKTSMELDING.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.SPINN_INNTEKTSMELDING_ID to spinnImId.toJson(), - Key.UUID to transaksjonId.toJson() - ) - .also { - logger.info("Publiserte melding om ${BehovType.HENT_EKSTERN_INNTEKTSMELDING.name} for transaksjonId $transaksjonId.") - sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") - } - } + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.HENT_EKSTERN_INNTEKTSMELDING.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.SPINN_INNTEKTSMELDING_ID to spinnImId.toJson(), + Key.UUID to transaksjonId.toJson() + ) + .also { + logger.info("Publiserte melding om ${BehovType.HENT_EKSTERN_INNTEKTSMELDING.name} for transaksjonId $transaksjonId.") + sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") + } + } + } + + override fun inProgress(message: JsonMessage) { + "Service skal aldri være \"underveis\".".also { + logger.error(it) + sikkerLogger.error(it) } } @@ -132,7 +132,7 @@ class SpinnService( } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { MdcUtils.withLogFields( Log.transaksjonId(fail.transaksjonId) ) { diff --git a/felles-test/src/main/kotlin/no/nav/helsearbeidsgiver/felles/test/mock/MockRedis.kt b/felles-test/src/main/kotlin/no/nav/helsearbeidsgiver/felles/test/mock/MockRedis.kt index 9bd1ac821e..146d747b6b 100644 --- a/felles-test/src/main/kotlin/no/nav/helsearbeidsgiver/felles/test/mock/MockRedis.kt +++ b/felles-test/src/main/kotlin/no/nav/helsearbeidsgiver/felles/test/mock/MockRedis.kt @@ -12,11 +12,9 @@ class MockRedis { private val mockStorage = mutableMapOf() private val redisKey = slot() + private val redisKeys = slot>() private val newValue = slot() - // Fungerer som en capture slot for vararg - private val keysToCheck = mutableListOf() - init { setup() } @@ -32,14 +30,8 @@ class MockRedis { mockStorage[redisKey.captured] } - every { store.exist(*varargAll { keysToCheck.add(it) }) } answers { - val allKeys = mockStorage.keys.toSet() - - val keysExist = keysToCheck.intersect(allKeys).size.toLong() - - keysToCheck.clear() - - keysExist + every { store.exist(capture(redisKeys)) } answers { + mockStorage.keys.intersect(redisKeys.captured.toSet()).size.toLong() } } } diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanal.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanal.kt index a118aa6bb3..3ca211b4b5 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanal.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanal.kt @@ -9,14 +9,14 @@ import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger -abstract class DataKanal(val rapidsConnection: RapidsConnection) : River.PacketListener { - abstract val eventName: EventName +abstract class DataKanal(val rapid: RapidsConnection) : River.PacketListener { + abstract val event: EventName private val logger = logger() private val sikkerLogger = sikkerLogger() init { configure( - River(rapidsConnection).apply { + River(rapid).apply { validate(accept()) } ).register(this) @@ -26,7 +26,7 @@ abstract class DataKanal(val rapidsConnection: RapidsConnection) : River.PacketL private fun configure(river: River): River { return river.validate { - it.demandValue(Key.EVENT_NAME.str, eventName.name) + it.demandValue(Key.EVENT_NAME.str, event.name) it.demandKey(Key.DATA.str) it.rejectKey(Key.BEHOV.str) it.rejectKey(Key.LØSNING.str) diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanal.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanal.kt deleted file mode 100644 index 81b898c825..0000000000 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanal.kt +++ /dev/null @@ -1,16 +0,0 @@ -package no.nav.helsearbeidsgiver.felles.rapidsrivers - -import no.nav.helse.rapids_rivers.JsonMessage -import no.nav.helse.rapids_rivers.RapidsConnection -import no.nav.helse.rapids_rivers.River -import no.nav.helsearbeidsgiver.felles.EventName - -class DelegatingFailKanal( - override val eventName: EventName, - private val mainListener: River.PacketListener, - rapidsConnection: RapidsConnection -) : FailKanal(rapidsConnection) { - override fun onFail(packet: JsonMessage) { - mainListener.onPacket(packet, rapidsConnection) - } -} diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanal.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanal.kt index 0062628d48..a72d4a4373 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanal.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanal.kt @@ -9,18 +9,19 @@ import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.utils.json.fromJson -// vi kan vurdere å bruke event feltet og dispatche event istedenfor Fail. -abstract class FailKanal(val rapidsConnection: RapidsConnection) : River.PacketListener { - abstract val eventName: EventName - +class FailKanal( + val rapid: RapidsConnection, + private val event: EventName, + private val onFail: (JsonMessage, MessageContext) -> Unit +) : River.PacketListener { init { - River(rapidsConnection).apply { + River(rapid).apply { validate { msg -> msg.demand( Key.FAIL to { it.fromJson(Fail.serializer()) } ) msg.demandValues( - Key.EVENT_NAME to eventName.name + Key.EVENT_NAME to event.name ) msg.requireKeys(Key.UUID) msg.interestedIn(Key.FORESPOERSEL_ID) @@ -35,8 +36,6 @@ abstract class FailKanal(val rapidsConnection: RapidsConnection) : River.PacketL } override fun onPacket(packet: JsonMessage, context: MessageContext) { - onFail(packet) + onFail(packet, context) } - - abstract fun onFail(packet: JsonMessage) } diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullDataKanal.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullDataKanal.kt index c816ae047f..ec8f3efb62 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullDataKanal.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullDataKanal.kt @@ -1,6 +1,7 @@ package no.nav.helsearbeidsgiver.felles.rapidsrivers import no.nav.helse.rapids_rivers.JsonMessage +import no.nav.helse.rapids_rivers.MessageContext import no.nav.helse.rapids_rivers.RapidsConnection import no.nav.helse.rapids_rivers.River import no.nav.helsearbeidsgiver.felles.EventName @@ -12,23 +13,21 @@ import no.nav.helsearbeidsgiver.utils.log.sikkerLogger import java.util.UUID class StatefullDataKanal( - private val dataFelter: Array, - override val eventName: EventName, - private val mainListener: River.PacketListener, - rapidsConnection: RapidsConnection, - val redisStore: RedisStore -) : DataKanal( - rapidsConnection -) { + rapid: RapidsConnection, + override val event: EventName, + private val redisStore: RedisStore, + private val dataKeys: List, + private val onDataCollected: (JsonMessage, MessageContext) -> Unit +) : DataKanal(rapid) { private val logger = logger() private val sikkerLogger = sikkerLogger() override fun accept(): River.PacketValidation { return River.PacketValidation { - it.demandValue(Key.EVENT_NAME.str, eventName.name) + it.demandValue(Key.EVENT_NAME.str, event.name) it.demandKey(Key.DATA.str) - dataFelter.forEach { datafelt -> - it.interestedIn(datafelt) + dataKeys.forEach { dataKey -> + it.interestedIn(dataKey) } } } @@ -41,8 +40,8 @@ class StatefullDataKanal( if (packet[Key.UUID.str].asText().isNullOrEmpty()) { sikkerLogger.error("Transaksjon-ID er ikke initialisert for\n${packet.toPretty()}") } else if (collectData(packet)) { - sikkerLogger.info("data collected for event ${eventName.name} med packet\n${packet.toPretty()}") - mainListener.onPacket(packet, rapidsConnection) + sikkerLogger.info("data collected for event ${event.name} med packet\n${packet.toPretty()}") + onDataCollected(packet, rapid) } else { sikkerLogger.warn("Mangler data for ${packet.toPretty()}") } @@ -50,7 +49,7 @@ class StatefullDataKanal( private fun collectData(message: JsonMessage): Boolean { // putt alle mottatte datafelter fra pakke i redis - val dataMap = dataFelter.filter { dataFelt -> + val dataMap = dataKeys.filter { dataFelt -> !message[dataFelt.str].isMissingNode } .associateWith { @@ -74,14 +73,4 @@ class StatefullDataKanal( return dataMap.isNotEmpty() } - - fun isAllDataCollected(transaksjonId: UUID): Boolean { - val allKeys = dataFelter.map { RedisKey.of(transaksjonId, it) }.toTypedArray() - val numKeysInRedis = redisStore.exist(*allKeys) - logger.info("found " + numKeysInRedis) - return numKeysInRedis == dataFelter.size.toLong() - } - fun isDataCollected(vararg keys: RedisKey): Boolean { - return redisStore.exist(*keys) == keys.size.toLong() - } } diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullEventListener.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullEventListener.kt index b39400aa05..33e8111b66 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullEventListener.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/StatefullEventListener.kt @@ -1,6 +1,7 @@ package no.nav.helsearbeidsgiver.felles.rapidsrivers import no.nav.helse.rapids_rivers.JsonMessage +import no.nav.helse.rapids_rivers.MessageContext import no.nav.helse.rapids_rivers.RapidsConnection import no.nav.helse.rapids_rivers.River import no.nav.helsearbeidsgiver.felles.EventName @@ -11,19 +12,17 @@ import no.nav.helsearbeidsgiver.felles.utils.randomUuid import no.nav.helsearbeidsgiver.utils.log.sikkerLogger class StatefullEventListener( - val redisStore: RedisStore, + rapid: RapidsConnection, override val event: EventName, - private val dataFelter: Array, - private val mainListener: River.PacketListener, - rapidsConnection: RapidsConnection -) : EventListener( - rapidsConnection -) { + private val redisStore: RedisStore, + private val dataKeys: List, + private val onEventProcessed: (JsonMessage, MessageContext) -> Unit +) : EventListener(rapid) { private val sikkerLogger = sikkerLogger() override fun accept(): River.PacketValidation { return River.PacketValidation { - it.interestedIn(*dataFelter) + it.interestedIn(*dataKeys.toTypedArray()) } } @@ -31,7 +30,7 @@ class StatefullEventListener( val transactionId = randomUuid() packet[Key.UUID.str] = transactionId.toString() - dataFelter.associateWith { + dataKeys.associateWith { packet[it.str] } .onEach { (dataFelt, data) -> @@ -50,6 +49,6 @@ class StatefullEventListener( override fun onEvent(packet: JsonMessage) { sikkerLogger.info("Statefull event listener for event ${event.name} med packet \n${packet.toPretty()}") collectData(packet) - mainListener.onPacket(packet, rapidsConnection) + onEventProcessed(packet, rapidsConnection) } } diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/CompositeEventListener.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/CompositeEventListener.kt index f2adafd36b..69c38dd7a1 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/CompositeEventListener.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/CompositeEventListener.kt @@ -10,9 +10,6 @@ import no.nav.helsearbeidsgiver.felles.IKey import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.EventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal -import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore @@ -24,15 +21,23 @@ import no.nav.helsearbeidsgiver.utils.json.toPretty import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger +import no.nav.helsearbeidsgiver.utils.pipe.orDefault import java.util.UUID -abstract class CompositeEventListener(open val redisStore: RedisStore) : River.PacketListener { +abstract class CompositeEventListener : River.PacketListener { private val logger = logger() private val sikkerLogger = sikkerLogger() + abstract val redisStore: RedisStore abstract val event: EventName - private lateinit var dataKanal: StatefullDataKanal + abstract val startKeys: List + abstract val dataKeys: List + + abstract fun new(message: JsonMessage) + abstract fun inProgress(message: JsonMessage) + abstract fun finalize(message: JsonMessage) + abstract fun onError(message: JsonMessage, fail: Fail) override fun onPacket(packet: JsonMessage, context: MessageContext) { val json = packet.toJson().parseJson().toMap() @@ -42,17 +47,6 @@ abstract class CompositeEventListener(open val redisStore: RedisStore) : River.P sikkerLogger.warn("Mangler forespørselId!") } - val transaction = determineTransactionState(json) - when (transaction) { - Transaction.NEW, - Transaction.IN_PROGRESS -> dispatchBehov(packet, transaction) - Transaction.FINALIZE -> finalize(packet) - Transaction.TERMINATE -> terminate(json[Key.FAIL]!!.fromJson(Fail.serializer())) - Transaction.NOT_ACTIVE -> return - } - } - - fun determineTransactionState(json: Map): Transaction { val transaksjonId = json[Key.UUID]?.fromJson(UuidSerializer) if (transaksjonId == null) { @@ -60,7 +54,7 @@ abstract class CompositeEventListener(open val redisStore: RedisStore) : River.P logger.error(it) sikkerLogger.error(it) } - return Transaction.NOT_ACTIVE + return } MdcUtils.withLogFields( @@ -69,7 +63,7 @@ abstract class CompositeEventListener(open val redisStore: RedisStore) : River.P val fail = toFailOrNull(json) if (fail != null) { sikkerLogger.error("Feilmelding er '${fail.feilmelding}'. Utløsende melding er \n${fail.utloesendeMelding.toPretty()}") - return onError(fail) + return onError(packet, fail) } val clientIdRedisKey = RedisKey.of(transaksjonId, event) @@ -82,29 +76,25 @@ abstract class CompositeEventListener(open val redisStore: RedisStore) : River.P logger.error(it) sikkerLogger.error(it) } - Transaction.NOT_ACTIVE + Unit } else { val clientId = json[Key.CLIENT_ID]?.fromJson(UuidSerializer) - .let { clientId -> - if (clientId != null) { - clientId - } else { - "Client-ID mangler. Bruker transaksjon-ID som backup.".also { - logger.error(it) - sikkerLogger.error(it) - } - transaksjonId + .orDefault { + "Client-ID mangler. Bruker transaksjon-ID som backup.".also { + logger.error(it) + sikkerLogger.error(it) } + transaksjonId } redisStore.set(clientIdRedisKey, clientId.toString()) - Transaction.NEW + new(packet) } } - isDataCollected(transaksjonId) -> Transaction.FINALIZE - else -> Transaction.IN_PROGRESS + isAllDataCollected(transaksjonId) -> finalize(packet) + else -> inProgress(packet) } } } @@ -120,29 +110,13 @@ abstract class CompositeEventListener(open val redisStore: RedisStore) : River.P json[Key.EVENT_NAME] != null && json.keys.intersect(setOf(Key.BEHOV, Key.DATA, Key.FAIL)).isEmpty() - abstract fun dispatchBehov(message: JsonMessage, transaction: Transaction) - abstract fun finalize(message: JsonMessage) - abstract fun terminate(fail: Fail) + fun isDataCollected(keys: List): Boolean = + redisStore.exist(keys) == keys.size.toLong() - open fun onError(feil: Fail): Transaction { - return Transaction.TERMINATE + private fun isAllDataCollected(transaksjonId: UUID): Boolean { + val allKeys = dataKeys.map { RedisKey.of(transaksjonId, it) } + val numKeysInRedis = redisStore.exist(allKeys) + logger.info("found " + numKeysInRedis) + return numKeysInRedis == dataKeys.size.toLong() } - - fun withFailKanal(failKanalSupplier: (t: CompositeEventListener) -> FailKanal): CompositeEventListener { - failKanalSupplier.invoke(this) - return this - } - - fun withEventListener(eventListenerSupplier: (t: CompositeEventListener) -> EventListener): CompositeEventListener { - eventListenerSupplier.invoke(this) - return this - } - - fun withDataKanal(dataKanalSupplier: (t: CompositeEventListener) -> StatefullDataKanal): CompositeEventListener { - dataKanal = dataKanalSupplier.invoke(this) - return this - } - - open fun isDataCollected(uuid: UUID): Boolean = dataKanal.isAllDataCollected(uuid) - open fun isDataCollected(vararg keys: RedisKey): Boolean = dataKanal.isDataCollected(*keys) } diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/Transaction.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/Transaction.kt deleted file mode 100644 index 8da8dd4760..0000000000 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/composite/Transaction.kt +++ /dev/null @@ -1,5 +0,0 @@ -package no.nav.helsearbeidsgiver.felles.rapidsrivers.composite - -enum class Transaction { - NEW, IN_PROGRESS, FINALIZE, TERMINATE, NOT_ACTIVE -} diff --git a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/redis/RedisStore.kt b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/redis/RedisStore.kt index e33330af7f..7d05434054 100644 --- a/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/redis/RedisStore.kt +++ b/felles/src/main/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/redis/RedisStore.kt @@ -22,10 +22,10 @@ class RedisStore(redisUrl: String) { return value } - fun exist(vararg keys: RedisKey): Long { + fun exist(keys: List): Long { val keysAsString = keys.map { it.toString() }.toTypedArray() val count = syncCommands.exists(*keysAsString) - sikkerLogger.debug("Checking exist in redis: ${keys.contentToString()} -> $count") + sikkerLogger.debug("Checking exist in redis: $keys -> $count") return count } diff --git a/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanalTest.kt b/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanalTest.kt index 4162b542d2..eb50fdd334 100644 --- a/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanalTest.kt +++ b/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DataKanalTest.kt @@ -3,7 +3,6 @@ package no.nav.helsearbeidsgiver.felles.rapidsrivers import io.mockk.clearAllMocks import io.mockk.verify import no.nav.helse.rapids_rivers.JsonMessage -import no.nav.helse.rapids_rivers.MessageContext import no.nav.helse.rapids_rivers.RapidsConnection import no.nav.helse.rapids_rivers.River import no.nav.helse.rapids_rivers.testsupport.TestRapid @@ -22,14 +21,9 @@ import java.util.UUID class DataKanalTest { - private val testRapid: TestRapid = TestRapid() - + private val testRapid = TestRapid() private val mockRedis = MockRedis() - private val dummyListener = object : River.PacketListener { - override fun onPacket(packet: JsonMessage, context: MessageContext) {} - } - @BeforeEach fun setup() { testRapid.reset() @@ -80,8 +74,8 @@ class DataKanalTest { @Test fun `Test DATA collection, Primitive`() { - val testFelter = arrayOf(Key.FNR, Key.INNTEKT) - StatefullDataKanal(testFelter, EventName.INNTEKTSMELDING_MOTTATT, dummyListener, testRapid, mockRedis.store) + val testFelter = listOf(Key.FNR, Key.INNTEKT) + StatefullDataKanal(testRapid, EventName.INNTEKTSMELDING_MOTTATT, mockRedis.store, testFelter) { _, _ -> } val uuid = UUID.randomUUID() testRapid.sendTestMessage( JsonMessage.newMessage( @@ -120,8 +114,8 @@ class DataKanalTest { @Test fun `Test DATA collection, Object`() { - val testFelter = arrayOf(Key.ARBEIDSGIVER_INFORMASJON) - StatefullDataKanal(testFelter, EventName.INNTEKTSMELDING_MOTTATT, dummyListener, testRapid, mockRedis.store) + val testFelter = listOf(Key.ARBEIDSGIVER_INFORMASJON) + StatefullDataKanal(testRapid, EventName.INNTEKTSMELDING_MOTTATT, mockRedis.store, testFelter) { _, _ -> } val uuid = UUID.randomUUID() val personDato = PersonDato("X", null, "") @@ -139,7 +133,7 @@ class DataKanalTest { } class TestDataKanal(rapidsConnection: RapidsConnection) : DataKanal(rapidsConnection) { - override val eventName: EventName = EventName.INNTEKTSMELDING_MOTTATT + override val event: EventName = EventName.INNTEKTSMELDING_MOTTATT var invocations = 0 override fun accept(): River.PacketValidation = River.PacketValidation { } diff --git a/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanalTest.kt b/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanalTest.kt similarity index 94% rename from felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanalTest.kt rename to felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanalTest.kt index 650f8e6632..f5439879eb 100644 --- a/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/DelegatingFailKanalTest.kt +++ b/felles/src/test/kotlin/no/nav/helsearbeidsgiver/felles/rapidsrivers/FailKanalTest.kt @@ -16,13 +16,13 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.util.UUID -class DelegatingFailKanalTest { +class FailKanalTest { private val testRapid = TestRapid() private val mockPacketListener = mockk(relaxed = true) init { - DelegatingFailKanal(EventName.INSENDING_STARTED, mockPacketListener, testRapid) + FailKanal(testRapid, EventName.INSENDING_STARTED, mockPacketListener::onPacket) } @BeforeEach diff --git a/gradle.properties b/gradle.properties index f8ae24b988..d65d421f3f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ ktorVersion=2.3.6 lettuceVersion=6.3.0.RELEASE mockkVersion=1.13.8 rapidsAndRiversVersion=2023120410321701682379.55cc1a24d803 -utilsVersion=0.7.0 +utilsVersion=0.8.0 # Client dependency versions aaregClientVersion=0.6.0 diff --git a/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/InnsendingService.kt b/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/InnsendingService.kt index a6ca954c39..d5cdc415f7 100644 --- a/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/InnsendingService.kt +++ b/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/InnsendingService.kt @@ -10,11 +10,10 @@ import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toJsonElement import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -33,138 +32,88 @@ import java.util.UUID class InnsendingService( private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { - - override val event: EventName = EventName.INSENDING_STARTED +) : CompositeEventListener() { private val logger = logger() - init { - withFailKanal { DelegatingFailKanal(event, it, rapid) } - withDataKanal { - StatefullDataKanal( - dataFelter = arrayOf( - Key.VIRKSOMHET, - Key.ARBEIDSFORHOLD, - Key.INNTEKTSMELDING_DOKUMENT, - Key.ARBEIDSGIVER_INFORMASJON, - Key.ARBEIDSTAKER_INFORMASJON, - Key.ER_DUPLIKAT_IM - ), - event, - it, - rapid, - redisStore - ) - } - withEventListener { - StatefullEventListener( - redisStore, - event, - arrayOf( - Key.FORESPOERSEL_ID, - Key.ORGNRUNDERENHET, - Key.INNTEKTSMELDING, - Key.ARBEIDSGIVER_ID, - Key.IDENTITETSNUMMER - ), - it, - rapid - ) - } - } - - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) + override val event = EventName.INSENDING_STARTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.ORGNRUNDERENHET, + Key.INNTEKTSMELDING, + Key.ARBEIDSGIVER_ID, + Key.IDENTITETSNUMMER + ) + override val dataKeys = listOf( + Key.VIRKSOMHET, + Key.ARBEIDSFORHOLD, + Key.INNTEKTSMELDING_DOKUMENT, + Key.ARBEIDSGIVER_INFORMASJON, + Key.ARBEIDSTAKER_INFORMASJON, + Key.ER_DUPLIKAT_IM + ) - if (utloesendeBehov == BehovType.VIRKSOMHET) { - val virksomhetKey = RedisKey.of(feil.transaksjonId, Key.VIRKSOMHET) - redisStore.set(virksomhetKey, "Ukjent virksomhet") - return Transaction.IN_PROGRESS - } else if (utloesendeBehov == BehovType.FULLT_NAVN) { - val arbeidstakerFulltnavnKey = RedisKey.of(feil.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON) - val arbeidsgiverFulltnavnKey = RedisKey.of(feil.transaksjonId, Key.ARBEIDSGIVER_INFORMASJON) - redisStore.set(arbeidstakerFulltnavnKey, personIkkeFunnet().toJsonStr(PersonDato.serializer())) - redisStore.set(arbeidsgiverFulltnavnKey, personIkkeFunnet().toJsonStr(PersonDato.serializer())) - return Transaction.IN_PROGRESS - } - return Transaction.TERMINATE + init { + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun terminate(fail: Fail) { - val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, event)) - ?.let(UUID::fromString) + override fun new(message: JsonMessage) { + val transaksjonId = message[Key.UUID.str].asText().let(UUID::fromString) + val forespoerselId = message[Key.FORESPOERSEL_ID.str].asText().let(UUID::fromString) - if (clientId == null) { - MdcUtils.withLogFields( - Log.transaksjonId(fail.transaksjonId) - ) { - sikkerLogger.error("Forsøkte å terminere, men clientId mangler i Redis. forespoerselId=${fail.forespoerselId}") - } - } else { - redisStore.set(RedisKey.of(clientId), fail.feilmelding) - } + logger.info("InnsendingService: emitting behov Virksomhet") + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.VIRKSOMHET.toJson(), + Key.ORGNRUNDERENHET to message[Key.ORGNRUNDERENHET.str].asText().toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson() + ) + + logger.info("InnsendingService: emitting behov ARBEIDSFORHOLD") + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.ARBEIDSFORHOLD.toJson(), + Key.IDENTITETSNUMMER to message[Key.IDENTITETSNUMMER.str].asText().toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson() + ) + + logger.info("InnsendingService: emitting behov FULLT_NAVN") + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.FULLT_NAVN.toJson(), + Key.IDENTITETSNUMMER to message[Key.IDENTITETSNUMMER.str].asText().toJson(), + Key.ARBEIDSGIVER_ID to message[Key.ARBEIDSGIVER_ID.str].asText().toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson() + ) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun inProgress(message: JsonMessage) { val transaksjonId = message[Key.UUID.str].asText().let(UUID::fromString) val forespoerselId = message[Key.FORESPOERSEL_ID.str].asText().let(UUID::fromString) - when (transaction) { - Transaction.NEW -> { - logger.info("InnsendingService: emitting behov Virksomhet") - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.VIRKSOMHET.toJson(), - Key.ORGNRUNDERENHET to message[Key.ORGNRUNDERENHET.str].asText().toJson(), - Key.UUID to transaksjonId.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson() - ) - - logger.info("InnsendingService: emitting behov ARBEIDSFORHOLD") - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.ARBEIDSFORHOLD.toJson(), - Key.IDENTITETSNUMMER to message[Key.IDENTITETSNUMMER.str].asText().toJson(), - Key.UUID to transaksjonId.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson() - ) - - logger.info("InnsendingService: emitting behov FULLT_NAVN") - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.FULLT_NAVN.toJson(), - Key.IDENTITETSNUMMER to message[Key.IDENTITETSNUMMER.str].asText().toJson(), - Key.ARBEIDSGIVER_ID to message[Key.ARBEIDSGIVER_ID.str].asText().toJson(), - Key.UUID to transaksjonId.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson() - ) - } - - Transaction.IN_PROGRESS -> { - if (isDataCollected(*step1data(transaksjonId))) { - val arbeidstakerRedis = redisStore.get(RedisKey.of(transaksjonId, Key.ARBEIDSTAKER_INFORMASJON))?.fromJson(PersonDato.serializer()) - val arbeidsgiverRedis = redisStore.get(RedisKey.of(transaksjonId, Key.ARBEIDSGIVER_INFORMASJON))?.fromJson(PersonDato.serializer()) - logger.info("InnsendingService: emitting behov PERSISTER_IM") - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.PERSISTER_IM.toJson(), - Key.VIRKSOMHET to redisStore.get(RedisKey.of(transaksjonId, Key.VIRKSOMHET)).orDefault("Ukjent virksomhet").toJson(), - Key.ARBEIDSTAKER_INFORMASJON to ( - arbeidstakerRedis ?: personIkkeFunnet(message[Key.IDENTITETSNUMMER.str].asText()) - ).toJson(PersonDato.serializer()), - Key.ARBEIDSGIVER_INFORMASJON to ( - arbeidsgiverRedis ?: personIkkeFunnet(message[Key.ARBEIDSGIVER_ID.str].asText()) - ).toJson(PersonDato.serializer()), - Key.INNTEKTSMELDING to redisStore.get(RedisKey.of(transaksjonId, Key.INNTEKTSMELDING))!!.parseJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.UUID to transaksjonId.toJson() - ) - } - } - else -> { - logger.error("Illegal transaction type ecountered in dispatchBehov $transaction for uuid=$transaksjonId") - } + if (isDataCollected(step1data(transaksjonId))) { + val arbeidstakerRedis = redisStore.get(RedisKey.of(transaksjonId, Key.ARBEIDSTAKER_INFORMASJON))?.fromJson(PersonDato.serializer()) + val arbeidsgiverRedis = redisStore.get(RedisKey.of(transaksjonId, Key.ARBEIDSGIVER_INFORMASJON))?.fromJson(PersonDato.serializer()) + logger.info("InnsendingService: emitting behov PERSISTER_IM") + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.PERSISTER_IM.toJson(), + Key.VIRKSOMHET to redisStore.get(RedisKey.of(transaksjonId, Key.VIRKSOMHET)).orDefault("Ukjent virksomhet").toJson(), + Key.ARBEIDSTAKER_INFORMASJON to ( + arbeidstakerRedis ?: personIkkeFunnet(message[Key.IDENTITETSNUMMER.str].asText()) + ).toJson(PersonDato.serializer()), + Key.ARBEIDSGIVER_INFORMASJON to ( + arbeidsgiverRedis ?: personIkkeFunnet(message[Key.ARBEIDSGIVER_ID.str].asText()) + ).toJson(PersonDato.serializer()), + Key.INNTEKTSMELDING to redisStore.get(RedisKey.of(transaksjonId, Key.INNTEKTSMELDING))!!.parseJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.UUID to transaksjonId.toJson() + ) } } @@ -191,7 +140,36 @@ class InnsendingService( } } - private fun step1data(uuid: UUID): Array = arrayOf( + override fun onError(message: JsonMessage, fail: Fail) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + + if (utloesendeBehov == BehovType.VIRKSOMHET) { + val virksomhetKey = RedisKey.of(fail.transaksjonId, Key.VIRKSOMHET) + redisStore.set(virksomhetKey, "Ukjent virksomhet") + return inProgress(message) + } else if (utloesendeBehov == BehovType.FULLT_NAVN) { + val arbeidstakerFulltnavnKey = RedisKey.of(fail.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON) + val arbeidsgiverFulltnavnKey = RedisKey.of(fail.transaksjonId, Key.ARBEIDSGIVER_INFORMASJON) + redisStore.set(arbeidstakerFulltnavnKey, personIkkeFunnet().toJsonStr(PersonDato.serializer())) + redisStore.set(arbeidsgiverFulltnavnKey, personIkkeFunnet().toJsonStr(PersonDato.serializer())) + return inProgress(message) + } + + val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, event)) + ?.let(UUID::fromString) + + if (clientId == null) { + MdcUtils.withLogFields( + Log.transaksjonId(fail.transaksjonId) + ) { + sikkerLogger.error("Forsøkte å terminere, men clientId mangler i Redis. forespoerselId=${fail.forespoerselId}") + } + } else { + redisStore.set(RedisKey.of(clientId), fail.feilmelding) + } + } + + private fun step1data(uuid: UUID): List = listOf( RedisKey.of(uuid, Key.VIRKSOMHET), RedisKey.of(uuid, Key.ARBEIDSFORHOLD), RedisKey.of(uuid, Key.ARBEIDSTAKER_INFORMASJON), diff --git a/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/KvitteringService.kt b/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/KvitteringService.kt index 290751ac26..e7c35503a4 100644 --- a/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/KvitteringService.kt +++ b/innsending/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/innsending/KvitteringService.kt @@ -8,62 +8,68 @@ import no.nav.helsearbeidsgiver.felles.EksternInntektsmelding import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.InnsendtInntektsmelding import no.nav.helsearbeidsgiver.felles.Key -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.json.toJson +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore import no.nav.helsearbeidsgiver.felles.utils.Log import no.nav.helsearbeidsgiver.utils.json.fromJson +import no.nav.helsearbeidsgiver.utils.json.toJson import no.nav.helsearbeidsgiver.utils.json.toJsonStr +import no.nav.helsearbeidsgiver.utils.json.toPretty import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import java.util.UUID // TODO : Duplisert mesteparten av InnsendingService, skal trekke ut i super / generisk løsning. class KvitteringService( - private val rapidsConnection: RapidsConnection, + private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { - - override val event: EventName = EventName.KVITTERING_REQUESTED +) : CompositeEventListener() { private val logger = logger() + override val event = EventName.KVITTERING_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID + ) + override val dataKeys = listOf( + Key.INNTEKTSMELDING_DOKUMENT, + Key.EKSTERN_INNTEKTSMELDING + ) + init { - withEventListener { StatefullEventListener(redisStore, event, arrayOf(Key.FORESPOERSEL_ID), this, rapidsConnection) } - withFailKanal { DelegatingFailKanal(event, this, rapidsConnection) } - withDataKanal { - StatefullDataKanal( - arrayOf(Key.INNTEKTSMELDING_DOKUMENT, Key.EKSTERN_INNTEKTSMELDING), - event, - this, - rapidsConnection, - redisStore - ) - } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { - val transactionId: String = message[Key.UUID.str].asText() - if (transaction == Transaction.NEW) { - val forespoerselId: String = message[Key.FORESPOERSEL_ID.str].asText() - logger.info("Sender event: ${event.name} for forespørsel $forespoerselId") - val msg = JsonMessage.newMessage( - mapOf( - Key.BEHOV.str to BehovType.HENT_PERSISTERT_IM.name, - Key.EVENT_NAME.str to event.name, - Key.UUID.str to transactionId, - Key.FORESPOERSEL_ID.str to forespoerselId - ) - ).toJson() - logger.info("Publiserer melding: $msg") - rapidsConnection.publish(msg) - } else { - logger.error("Illegal transaction type ecountered in dispatchBehov $transaction for uuid= $transactionId") + override fun new(message: JsonMessage) { + val transaksjonId: String = message[Key.UUID.str].asText() + val forespoerselId: String = message[Key.FORESPOERSEL_ID.str].asText() + + logger.info("Sender event: ${event.name} for forespørsel $forespoerselId") + + rapid.publish( + Key.BEHOV to BehovType.HENT_PERSISTERT_IM.toJson(), + Key.EVENT_NAME to event.toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson() + ) + .also { + logger.info("Publiserte melding: ${it.toPretty()}") + } + } + + override fun inProgress(message: JsonMessage) { + "Service skal aldri være \"underveis\".".also { + logger.error(it) + sikkerLogger.error(it) } } @@ -80,7 +86,7 @@ class KvitteringService( redisStore.set(RedisKey.of(clientId), im) } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, event)) ?.let(UUID::fromString) diff --git a/innsending/src/test/kotlin/no.nav.helsearbeidsgiver.inntektsmelding.innsending/GenericDataPackageListenerTest.kt b/innsending/src/test/kotlin/no.nav.helsearbeidsgiver.inntektsmelding.innsending/GenericDataPackageListenerTest.kt index c6edbeed73..23ee96dbac 100644 --- a/innsending/src/test/kotlin/no.nav.helsearbeidsgiver.inntektsmelding.innsending/GenericDataPackageListenerTest.kt +++ b/innsending/src/test/kotlin/no.nav.helsearbeidsgiver.inntektsmelding.innsending/GenericDataPackageListenerTest.kt @@ -30,11 +30,11 @@ class GenericDataPackageListenerTest { init { StatefullDataKanal( - arrayOf(Key.FNR, Key.INNTEKT), - EventName.INSENDING_STARTED, - mockListener, testRapid, - mockRedis.store + EventName.INSENDING_STARTED, + mockRedis.store, + listOf(Key.FNR, Key.INNTEKT), + mockListener::onPacket ) } diff --git a/inntektservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektService.kt b/inntektservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektService.kt index b3ff5452a3..4ec0c75d34 100644 --- a/inntektservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektService.kt +++ b/inntektservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektService.kt @@ -16,11 +16,10 @@ import no.nav.helsearbeidsgiver.felles.json.les import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -34,37 +33,69 @@ import no.nav.helsearbeidsgiver.utils.json.toPretty import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger -import no.nav.helsearbeidsgiver.utils.pipe.orDefault import java.util.UUID class InntektService( private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { +) : CompositeEventListener() { - private val sikkerLogger = sikkerLogger() private val logger = logger() + private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.INNTEKT_REQUESTED + override val event = EventName.INNTEKT_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.SKJAERINGSTIDSPUNKT + ) + override val dataKeys = listOf( + Key.FORESPOERSEL_SVAR, + Key.INNTEKT + ) init { - withFailKanal { DelegatingFailKanal(event, it, rapid) } - withDataKanal { - StatefullDataKanal( - dataFelter = arrayOf( - Key.FORESPOERSEL_SVAR, - Key.INNTEKT - ), - eventName = event, - mainListener = it, - rapidsConnection = rapid, - redisStore = redisStore + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) + } + + override fun new(message: JsonMessage) { + val json = message.toJsonMap() + + val transaksjonId = Key.UUID.les(UuidSerializer, json) + + // TODO Les fra melding + val forespoerselId = RedisKey.of(transaksjonId, Key.FORESPOERSEL_ID) + .read() + ?.let(UUID::fromString) + if (forespoerselId == null) { + sikkerLogger.error("kunne ikke finne forespørselId for transaksjon $transaksjonId i Redis!") + logger.error("kunne ikke finne forespørselId for transaksjon $transaksjonId i Redis!") + return + } + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(event), + Log.transaksjonId(transaksjonId), + Log.forespoerselId(forespoerselId) + ) { + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.HENT_TRENGER_IM.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.UUID to transaksjonId.toJson() ) + .also { + MdcUtils.withLogFields( + Log.behov(BehovType.HENT_TRENGER_IM) + ) { + sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") + } + } } - withEventListener { StatefullEventListener(redisStore, event, arrayOf(Key.FORESPOERSEL_ID, Key.SKJAERINGSTIDSPUNKT), it, rapid) } } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun inProgress(message: JsonMessage) { val json = message.toJsonMap() val transaksjonId = Key.UUID.les(UuidSerializer, json) @@ -83,61 +114,35 @@ class InntektService( Log.transaksjonId(transaksjonId), Log.forespoerselId(forespoerselId) ) { - sikkerLogger.info("Prosesserer transaksjon $transaction.") - - when (transaction) { - Transaction.NEW -> { - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.HENT_TRENGER_IM.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.UUID to transaksjonId.toJson() - ) - .also { - MdcUtils.withLogFields( - Log.behov(BehovType.HENT_TRENGER_IM) - ) { - sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") - } - } + val forspoerselKey = RedisKey.of(transaksjonId, Key.FORESPOERSEL_SVAR) + + if (isDataCollected(listOf(forspoerselKey))) { + val forespoersel = forspoerselKey.read()?.fromJson(TrengerInntekt.serializer()) + val skjaeringstidspunkt = RedisKey.of(transaksjonId, Key.SKJAERINGSTIDSPUNKT).read() + if (forespoersel == null || skjaeringstidspunkt == null) { + logger.error("Klarte ikke å finne forespørsel eller skjæringstidspunkt i Redis!") + sikkerLogger.error("Klarte ikke å finne data i Redis - forespørsel: $forespoersel og skjæringstidspunkt $skjaeringstidspunkt") + return } - Transaction.IN_PROGRESS -> { - val forspoerselKey = RedisKey.of(transaksjonId, Key.FORESPOERSEL_SVAR) - - if (isDataCollected(forspoerselKey)) { - val forespoersel = forspoerselKey.read()?.fromJson(TrengerInntekt.serializer()) - val skjaeringstidspunkt = RedisKey.of(transaksjonId, Key.SKJAERINGSTIDSPUNKT).read() - if (forespoersel == null || skjaeringstidspunkt == null) { - logger.error("Klarte ikke å finne forespørsel eller skjæringstidspunkt i Redis!") - sikkerLogger.error("Klarte ikke å finne data i Redis - forespørsel: $forespoersel og skjæringstidspunkt $skjaeringstidspunkt") - return + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.INNTEKT.toJson(), + Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.FNR to forespoersel.fnr.toJson(), + Key.SKJAERINGSTIDSPUNKT to skjaeringstidspunkt.toJson(), + Key.UUID to transaksjonId.toJson() + ) + .also { + MdcUtils.withLogFields( + Log.behov(BehovType.INNTEKT) + ) { + sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") } - - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.INNTEKT.toJson(), - Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.FNR to forespoersel.fnr.toJson(), - Key.SKJAERINGSTIDSPUNKT to skjaeringstidspunkt.toJson(), - Key.UUID to transaksjonId.toJson() - ) - .also { - MdcUtils.withLogFields( - Log.behov(BehovType.INNTEKT) - ) { - sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") - } - } - } else { - sikkerLogger.error("Transaksjon er underveis, men mangler data. Dette bør aldri skje, ettersom vi kun venter på én datapakke.") } - } - - else -> { - sikkerLogger.error("Støtte på forutsett transaksjonstype: $transaction") - } + } else { + sikkerLogger.error("Transaksjon er underveis, men mangler data. Dette bør aldri skje, ettersom vi kun venter på én datapakke.") } } } @@ -156,6 +161,7 @@ class InntektService( logger.error("Kunne ikke finne clientId for transaksjonId $transaksjonId i Redis!") } else { val inntekt = RedisKey.of(transaksjonId, Key.INNTEKT).read() + val feil = RedisKey.of(transaksjonId, Feilmelding("")).read() val inntektJson = InntektData( @@ -175,7 +181,47 @@ class InntektService( } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + + if (utloesendeBehov == BehovType.INNTEKT) { + val feilmelding = Feilmelding( + "Vi har problemer med å hente inntektsopplysninger. Du kan legge inn beregnet månedsinntekt manuelt, eller prøv igjen senere.", + datafelt = Key.INNTEKT + ) + + "Legger til feilmelding: '${feilmelding.melding}'".also { + logger.error(it) + sikkerLogger.error(it) + } + + RedisKey.of(fail.transaksjonId, feilmelding).write( + FeilReport( + mutableListOf(feilmelding) + ).toJson(FeilReport.serializer()) + ) + + RedisKey.of(fail.transaksjonId, Key.INNTEKT).write(JsonObject(emptyMap())) + + // TODO bruk finalize (sjekk andre servicer for tilsvarende feil) + return inProgress(message) + } + + val feilReport = if (utloesendeBehov == BehovType.HENT_TRENGER_IM) { + val feilmelding = Feilmelding("Teknisk feil, prøv igjen senere.", -1, Key.FORESPOERSEL_SVAR) + + "Returnerer feilmelding: '${feilmelding.melding}'".also { + logger.error(it) + sikkerLogger.error(it) + } + + FeilReport( + mutableListOf(feilmelding) + ) + } else { + FeilReport() + } + val clientId = RedisKey.of(fail.transaksjonId, event) .read() ?.let(UUID::fromString) @@ -188,10 +234,8 @@ class InntektService( logger.error("Forsøkte å terminere, men fant ikke clientID for transaksjon ${fail.transaksjonId} i Redis") } } else { - val feil = RedisKey.of(fail.transaksjonId, Feilmelding("")).read() - val feilResponse = InntektData( - feil = feil?.fromJson(FeilReport.serializer()) + feil = feilReport ) .toJson(InntektData.serializer()) @@ -206,49 +250,6 @@ class InntektService( } } - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) - - val (feilmelding, transaction) = when (utloesendeBehov) { - BehovType.HENT_TRENGER_IM -> { - val feilmelding = Feilmelding("Teknisk feil, prøv igjen senere.", -1, Key.FORESPOERSEL_SVAR) - - feilmelding to Transaction.TERMINATE - } - - BehovType.INNTEKT -> { - val feilmelding = Feilmelding( - "Vi har problemer med å hente inntektsopplysninger. Du kan legge inn beregnet månedsinntekt manuelt, eller prøv igjen senere.", - datafelt = Key.INNTEKT - ) - - RedisKey.of(feil.transaksjonId, Key.INNTEKT).write(JsonObject(emptyMap())) - - feilmelding to null - } - - else -> null to null - } - - if (feilmelding != null) { - sikkerLogger.error("Mottok feilmelding: '${feilmelding.melding}'") - - val feilKey = RedisKey.of(feil.transaksjonId, feilmelding) - - val feilReport = feilKey.read() - ?.fromJson(FeilReport.serializer()) - .orDefault(FeilReport()) - .also { - it.feil.add(feilmelding) - } - .toJson(FeilReport.serializer()) - - feilKey.write(feilReport) - } - - return transaction ?: Transaction.IN_PROGRESS - } - private fun RedisKey.write(json: JsonElement) { redisStore.set(this, json.toString()) } diff --git a/inntektservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektServiceTest.kt b/inntektservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektServiceTest.kt index d8b62ed5d3..bfa4a7784c 100644 --- a/inntektservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektServiceTest.kt +++ b/inntektservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/InntektServiceTest.kt @@ -1,15 +1,26 @@ package no.nav.helsearbeidsgiver.inntektsmelding.inntektservice +import io.kotest.assertions.throwables.shouldNotThrowAny +import io.kotest.matchers.nulls.shouldBeNull +import io.mockk.Runs +import io.mockk.clearAllMocks +import io.mockk.every +import io.mockk.just +import io.mockk.spyk +import io.mockk.verify import kotlinx.serialization.json.JsonObject +import no.nav.helse.rapids_rivers.JsonMessage import no.nav.helse.rapids_rivers.testsupport.TestRapid import no.nav.helsearbeidsgiver.felles.BehovType import no.nav.helsearbeidsgiver.felles.EventName +import no.nav.helsearbeidsgiver.felles.FeilReport import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.toJson -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.test.mock.MockRedis -import org.junit.jupiter.api.Assertions.assertEquals +import no.nav.helsearbeidsgiver.utils.json.fromJson +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.util.UUID @@ -18,13 +29,30 @@ class InntektServiceTest { private val testRapid = TestRapid() private val mockRedis = MockRedis() + private val service = spyk( + InntektService(testRapid, mockRedis.store) + ) + + @BeforeEach + fun setup() { + testRapid.reset() + clearAllMocks() + mockRedis.setup() + } + @Test - fun terminate() { - val service = InntektService(testRapid, mockRedis.store) - val feil = Fail( + fun `ukritisk feil stopper ikke flyten`() { + val event = EventName.INNTEKT_REQUESTED + val clientId = UUID.randomUUID() + val transaksjonId = UUID.randomUUID() + + every { mockRedis.store.get(RedisKey.of(transaksjonId, event)) } returns clientId.toString() + every { service.inProgress(any()) } just Runs + + val fail = Fail( feilmelding = "ikkeno", - event = EventName.INNTEKT_REQUESTED, - transaksjonId = UUID.randomUUID(), + event = event, + transaksjonId = transaksjonId, forespoerselId = null, utloesendeMelding = JsonObject( mapOf( @@ -32,9 +60,26 @@ class InntektServiceTest { ) ) ) - val transaction = service.onError(feil) - assertEquals(Transaction.IN_PROGRESS, transaction) - service.terminate(feil) // skal ikke kaste exception.. + shouldNotThrowAny { + service.onError(JsonMessage.newMessage(), fail) + } + + verify { + service.inProgress(any()) + } + + verify(exactly = 0) { + mockRedis.store.set( + RedisKey.of(clientId), + withArg { + runCatching { + it.fromJson(FeilReport.serializer()) + } + .getOrNull() + .shouldBeNull() + } + ) + } } } diff --git a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/ManuellOpprettSakService.kt b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/ManuellOpprettSakService.kt index 209c04e3ba..d67b9fffa9 100644 --- a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/ManuellOpprettSakService.kt +++ b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/ManuellOpprettSakService.kt @@ -7,136 +7,134 @@ import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.PersonDato import no.nav.helsearbeidsgiver.felles.TrengerInntekt -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.json.toJson +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore import no.nav.helsearbeidsgiver.utils.json.fromJson +import no.nav.helsearbeidsgiver.utils.json.toJson import no.nav.helsearbeidsgiver.utils.log.sikkerLogger import java.util.UUID -class ManuellOpprettSakService(private val rapidsConnection: RapidsConnection, override val redisStore: RedisStore) : CompositeEventListener(redisStore) { - override val event: EventName = EventName.MANUELL_OPPRETT_SAK_REQUESTED +class ManuellOpprettSakService( + private val rapid: RapidsConnection, + override val redisStore: RedisStore +) : CompositeEventListener() { private val sikkerLogger = sikkerLogger() + override val event = EventName.MANUELL_OPPRETT_SAK_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.UUID + ) + override val dataKeys = listOf( + Key.FORESPOERSEL_SVAR, + Key.ARBEIDSTAKER_INFORMASJON, + Key.SAK_ID, + Key.PERSISTERT_SAK_ID + ) + init { - withEventListener { - StatefullEventListener( - redisStore, - event, - arrayOf(Key.FORESPOERSEL_ID, Key.UUID), - this, - rapidsConnection - ) - } - withDataKanal { - StatefullDataKanal( - arrayOf( - Key.FORESPOERSEL_SVAR, - Key.ARBEIDSTAKER_INFORMASJON, - Key.SAK_ID, - Key.PERSISTERT_SAK_ID - ), - event, - this, - rapidsConnection, - redisStore - ) - } - withFailKanal { DelegatingFailKanal(event, this, rapidsConnection) } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun new(message: JsonMessage) { val transaksjonsId = message[Key.UUID.str].asText().let(UUID::fromString) + + // TODO Les fra melding val forespoerselId = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_ID))!! - if (transaction == Transaction.NEW) { - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.BEHOV.str to BehovType.HENT_TRENGER_IM.name, - Key.FORESPOERSEL_ID.str to forespoerselId, - Key.UUID.str to transaksjonsId - ) - ).toJson() - ) - } else if (transaction == Transaction.IN_PROGRESS) { - val forespoersel = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_SVAR))?.fromJson(TrengerInntekt.serializer()) - - if (forespoersel == null) { - sikkerLogger.error("Fant ikke forespørsel '$forespoerselId' i redis-cache. transaksjonId='$transaksjonsId'") - return - } - when { - isDataCollected(*steg4(transaksjonsId)) -> { - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, - Key.BEHOV.str to BehovType.PERSISTER_SAK_ID.name, - Key.FORESPOERSEL_ID.str to forespoerselId, - Key.SAK_ID.str to redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID))!! - ) - ).toJson() - ) + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.HENT_TRENGER_IM.toJson(), + Key.UUID to transaksjonsId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson() + ) + } + + override fun inProgress(message: JsonMessage) { + val transaksjonsId = message[Key.UUID.str].asText().let(UUID::fromString) + val forespoerselId = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_ID))!! + val forespoersel = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_SVAR))?.fromJson(TrengerInntekt.serializer()) + + if (forespoersel == null) { + sikkerLogger.error("Fant ikke forespørsel '$forespoerselId' i redis-cache. transaksjonId='$transaksjonsId'") + return + } - if (forespoersel.erBesvart) { - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to EventName.FORESPOERSEL_BESVART, - Key.UUID.str to transaksjonsId, - Key.FORESPOERSEL_ID.str to forespoerselId, - Key.SAK_ID.str to redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID))!! - ) - ).toJson() + when { + isDataCollected(steg4(transaksjonsId)) -> { + rapid.publish( + JsonMessage.newMessage( + mapOf( + Key.EVENT_NAME.str to event.name, + Key.UUID.str to transaksjonsId, + Key.BEHOV.str to BehovType.PERSISTER_SAK_ID.name, + Key.FORESPOERSEL_ID.str to forespoerselId, + Key.SAK_ID.str to redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID))!! ) - } - } - isDataCollected(*steg3(transaksjonsId)) -> { - val arbeidstakerRedis = redisStore.get(RedisKey.of(transaksjonsId, Key.ARBEIDSTAKER_INFORMASJON))?.fromJson(PersonDato.serializer()) - rapidsConnection.publish( + ).toJson() + ) + + if (forespoersel.erBesvart) { + rapid.publish( JsonMessage.newMessage( mapOf( - Key.EVENT_NAME.str to event.name, + Key.EVENT_NAME.str to EventName.FORESPOERSEL_BESVART, Key.UUID.str to transaksjonsId, - Key.BEHOV.str to BehovType.OPPRETT_SAK, Key.FORESPOERSEL_ID.str to forespoerselId, - Key.ORGNRUNDERENHET.str to forespoersel.orgnr, - // @TODO this transformation is not nessesary. StatefullDataKanal should be fixed to use Tree - Key.ARBEIDSTAKER_INFORMASJON.str to arbeidstakerRedis!! + Key.SAK_ID.str to redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID))!! ) ).toJson() ) } - isDataCollected(*steg2(transaksjonsId)) -> { - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, - Key.BEHOV.str to BehovType.FULLT_NAVN.name, - Key.IDENTITETSNUMMER.str to forespoersel.fnr, - Key.FORESPOERSEL_ID.str to forespoerselId + } - ) - ).toJson() - ) - } + isDataCollected(steg3(transaksjonsId)) -> { + val arbeidstakerRedis = redisStore.get(RedisKey.of(transaksjonsId, Key.ARBEIDSTAKER_INFORMASJON))?.fromJson(PersonDato.serializer()) + rapid.publish( + JsonMessage.newMessage( + mapOf( + Key.EVENT_NAME.str to event.name, + Key.UUID.str to transaksjonsId, + Key.BEHOV.str to BehovType.OPPRETT_SAK, + Key.FORESPOERSEL_ID.str to forespoerselId, + Key.ORGNRUNDERENHET.str to forespoersel.orgnr, + // @TODO this transformation is not nessesary. StatefullDataKanal should be fixed to use Tree + Key.ARBEIDSTAKER_INFORMASJON.str to arbeidstakerRedis!! + ) + ).toJson() + ) + } + + isDataCollected(steg2(transaksjonsId)) -> { + rapid.publish( + JsonMessage.newMessage( + mapOf( + Key.EVENT_NAME.str to event.name, + Key.UUID.str to transaksjonsId, + Key.BEHOV.str to BehovType.FULLT_NAVN.name, + Key.IDENTITETSNUMMER.str to forespoersel.fnr, + Key.FORESPOERSEL_ID.str to forespoerselId + + ) + ).toJson() + ) } } } override fun finalize(message: JsonMessage) { val transaksjonsId = message[Key.UUID.str].asText().let(UUID::fromString) - rapidsConnection.publish( + rapid.publish( JsonMessage.newMessage( mapOf( Key.EVENT_NAME.str to EventName.SAK_OPPRETTET.name, @@ -147,16 +145,11 @@ class ManuellOpprettSakService(private val rapidsConnection: RapidsConnection, o ) } - override fun terminate(fail: Fail) { - sikkerLogger.error("Terminerer flyt med transaksjon-ID '${fail.transaksjonId}'") - } - - override fun onError(feil: Fail): Transaction { - sikkerLogger.error("Mottok feil:\n$feil") - return Transaction.TERMINATE + override fun onError(message: JsonMessage, fail: Fail) { + sikkerLogger.error("Mottok feil:\n$fail") } - private fun steg2(transactionId: UUID) = arrayOf(RedisKey.of(transactionId, Key.FORESPOERSEL_SVAR)) - private fun steg3(transactionId: UUID) = arrayOf(RedisKey.of(transactionId, Key.ARBEIDSTAKER_INFORMASJON)) - private fun steg4(transactionId: UUID) = arrayOf(RedisKey.of(transactionId, Key.SAK_ID)) + private fun steg2(transactionId: UUID): List = listOf(RedisKey.of(transactionId, Key.FORESPOERSEL_SVAR)) + private fun steg3(transactionId: UUID): List = listOf(RedisKey.of(transactionId, Key.ARBEIDSTAKER_INFORMASJON)) + private fun steg4(transactionId: UUID): List = listOf(RedisKey.of(transactionId, Key.SAK_ID)) } diff --git a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveService.kt b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveService.kt index a0d53ae638..ebc226d81b 100644 --- a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveService.kt +++ b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveService.kt @@ -6,119 +6,84 @@ import no.nav.helsearbeidsgiver.felles.BehovType import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.lesOrNull +import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore import no.nav.helsearbeidsgiver.felles.utils.Log import no.nav.helsearbeidsgiver.utils.json.fromJson import no.nav.helsearbeidsgiver.utils.json.parseJson import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer +import no.nav.helsearbeidsgiver.utils.json.toJson import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger import java.util.UUID class OpprettOppgaveService( - private val rapidsConnection: RapidsConnection, + private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { +) : CompositeEventListener() { private val logger = logger() private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.OPPGAVE_OPPRETT_REQUESTED + override val event = EventName.OPPGAVE_OPPRETT_REQUESTED + override val startKeys = listOf( + Key.ORGNRUNDERENHET, + Key.FORESPOERSEL_ID, + Key.UUID + ) + override val dataKeys = listOf( + Key.VIRKSOMHET + ) init { - withEventListener { - StatefullEventListener( - redisStore, - event, - arrayOf(Key.ORGNRUNDERENHET, Key.FORESPOERSEL_ID, Key.UUID), - this, - rapidsConnection - ) - } - withDataKanal { - StatefullDataKanal( - arrayOf(Key.VIRKSOMHET), - event, - this, - rapidsConnection, - redisStore - ) - } - withFailKanal { DelegatingFailKanal(event, this, rapidsConnection) } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(EventName.OPPGAVE_OPPRETT_REQUESTED) - ) { - val json = message.toJson().parseJson().toMap() + override fun new(message: JsonMessage) { + medTransaksjonIdOgForespoerselId(message) { transaksjonId, forespoerselId -> + val orgnr = message[Key.ORGNRUNDERENHET.str].asText() - val transaksjonsId = json[Key.UUID]?.fromJson(UuidSerializer) - if (transaksjonsId == null) { - "Mangler transaksjonId. Klarer ikke opprette oppgave.".also { - logger.error(it) - sikkerLogger.error(it) - } - return - } - - val forespoerselId = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_ID))?.let(UUID::fromString) - ?: json[Key.FORESPOERSEL_ID]?.fromJson(UuidSerializer) + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.VIRKSOMHET.toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.ORGNRUNDERENHET to orgnr.toJson() + ) + } + } - if (forespoerselId == null) { - MdcUtils.withLogFields( - Log.transaksjonId(transaksjonsId) - ) { - "Mangler forespoerselId. Klarer ikke opprette oppgave.".also { - logger.error(it) - sikkerLogger.error(it) - } - } - return - } - - MdcUtils.withLogFields( - Log.transaksjonId(transaksjonsId), - Log.forespoerselId(forespoerselId) - ) { - if (transaction == Transaction.NEW) { - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, - Key.BEHOV.str to BehovType.VIRKSOMHET.name, - Key.FORESPOERSEL_ID.str to forespoerselId, - Key.ORGNRUNDERENHET.str to message[Key.ORGNRUNDERENHET.str] - ) - ).toJson() - ) - } + override fun inProgress(message: JsonMessage) { + medTransaksjonIdOgForespoerselId(message) { _, _ -> + "Service skal aldri være \"underveis\".".also { + logger.error(it) + sikkerLogger.error(it) } } } override fun finalize(message: JsonMessage) { - val transaksjonsId = message[Key.UUID.str].asText().let(UUID::fromString) + val transaksjonId = message[Key.UUID.str].asText().let(UUID::fromString) val forespoerselId = message[Key.FORESPOERSEL_ID.str].asText().let(UUID::fromString) MdcUtils.withLogFields( Log.klasse(this), Log.event(EventName.OPPGAVE_OPPRETT_REQUESTED), - Log.transaksjonId(transaksjonsId), + Log.transaksjonId(transaksjonId), Log.forespoerselId(forespoerselId) ) { - val orgnr = redisStore.get(RedisKey.of(transaksjonsId, Key.ORGNRUNDERENHET)) + val orgnr = redisStore.get(RedisKey.of(transaksjonId, Key.ORGNRUNDERENHET)) if (orgnr == null) { "Mangler orgnr i redis. Klarer ikke opprette oppgave.".also { logger.error(it) @@ -127,30 +92,33 @@ class OpprettOppgaveService( return } - val virksomhetNavn = redisStore.get(RedisKey.of(transaksjonsId, Key.VIRKSOMHET)) + val virksomhetNavn = redisStore.get(RedisKey.of(transaksjonId, Key.VIRKSOMHET)) ?: defaultVirksomhetNavn() - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.BEHOV.str to BehovType.OPPRETT_OPPGAVE, - Key.UUID.str to transaksjonsId, - Key.FORESPOERSEL_ID.str to forespoerselId, - Key.VIRKSOMHET.str to virksomhetNavn, - Key.ORGNRUNDERENHET.str to orgnr - ) - ).toJson() + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.OPPRETT_OPPGAVE.toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.VIRKSOMHET to virksomhetNavn.toJson(), + Key.ORGNRUNDERENHET to orgnr.toJson() ) } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { MdcUtils.withLogFields( Log.klasse(this), Log.event(EventName.OPPGAVE_OPPRETT_REQUESTED), Log.transaksjonId(fail.transaksjonId) ) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + if (utloesendeBehov == BehovType.VIRKSOMHET) { + val virksomhetKey = RedisKey.of(fail.transaksjonId, Key.VIRKSOMHET) + redisStore.set(virksomhetKey, defaultVirksomhetNavn()) + return finalize(message) + } + val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, event)) ?.let(UUID::fromString) @@ -162,14 +130,44 @@ class OpprettOppgaveService( } } - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) - if (utloesendeBehov == BehovType.VIRKSOMHET) { - val virksomhetKey = RedisKey.of(feil.transaksjonId, Key.VIRKSOMHET) - redisStore.set(virksomhetKey, defaultVirksomhetNavn()) - return Transaction.FINALIZE + private inline fun medTransaksjonIdOgForespoerselId(message: JsonMessage, block: (UUID, UUID) -> Unit) { + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(EventName.OPPGAVE_OPPRETT_REQUESTED) + ) { + val json = message.toJson().parseJson().toMap() + + val transaksjonId = json[Key.UUID]?.fromJson(UuidSerializer) + if (transaksjonId == null) { + "Mangler transaksjonId. Klarer ikke opprette oppgave.".also { + logger.error(it) + sikkerLogger.error(it) + } + return + } + + val forespoerselId = redisStore.get(RedisKey.of(transaksjonId, Key.FORESPOERSEL_ID))?.let(UUID::fromString) + ?: json[Key.FORESPOERSEL_ID]?.fromJson(UuidSerializer) + + if (forespoerselId == null) { + MdcUtils.withLogFields( + Log.transaksjonId(transaksjonId) + ) { + "Mangler forespoerselId. Klarer ikke opprette oppgave.".also { + logger.error(it) + sikkerLogger.error(it) + } + } + return + } + + MdcUtils.withLogFields( + Log.transaksjonId(transaksjonId), + Log.forespoerselId(forespoerselId) + ) { + block(transaksjonId, forespoerselId) + } } - return Transaction.TERMINATE } private fun defaultVirksomhetNavn(): String = diff --git a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettSakService.kt b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettSakService.kt index 88f204f86c..3ecdb189b8 100644 --- a/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettSakService.kt +++ b/notifikasjon/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettSakService.kt @@ -7,97 +7,57 @@ import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.PersonDato import no.nav.helsearbeidsgiver.felles.json.lesOrNull +import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore import no.nav.helsearbeidsgiver.felles.utils.Log import no.nav.helsearbeidsgiver.utils.json.fromJson import no.nav.helsearbeidsgiver.utils.json.parseJson import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer +import no.nav.helsearbeidsgiver.utils.json.toJson import no.nav.helsearbeidsgiver.utils.json.toJsonStr import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger import java.util.UUID -class OpprettSakService(private val rapidsConnection: RapidsConnection, override val redisStore: RedisStore) : CompositeEventListener(redisStore) { +class OpprettSakService( + private val rapid: RapidsConnection, + override val redisStore: RedisStore +) : CompositeEventListener() { private val logger = logger() private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.SAK_OPPRETT_REQUESTED + override val event = EventName.SAK_OPPRETT_REQUESTED + override val startKeys = listOf( + Key.ORGNRUNDERENHET, + Key.IDENTITETSNUMMER, + Key.FORESPOERSEL_ID, + Key.UUID + ) + override val dataKeys = listOf( + Key.ARBEIDSTAKER_INFORMASJON, + Key.SAK_ID, + Key.PERSISTERT_SAK_ID + ) init { - withEventListener { - StatefullEventListener( - redisStore, - event, - arrayOf(Key.ORGNRUNDERENHET, Key.IDENTITETSNUMMER, Key.FORESPOERSEL_ID, Key.UUID), - this, - rapidsConnection - ) - } - withDataKanal { - StatefullDataKanal( - arrayOf(Key.ARBEIDSTAKER_INFORMASJON, Key.SAK_ID, Key.PERSISTERT_SAK_ID), - event, - this, - rapidsConnection, - redisStore - ) - } - withFailKanal { DelegatingFailKanal(event, this, rapidsConnection) } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(EventName.SAK_OPPRETT_REQUESTED) - ) { - val json = message.toJson().parseJson().toMap() - - val transaksjonsId = json[Key.UUID]?.fromJson(UuidSerializer) - if (transaksjonsId == null) { - "Mangler transaksjonId. Klarer ikke opprette sak.".also { - logger.error(it) - sikkerLogger.error(it) - } - return - } - - val forespoerselId = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_ID))?.let(UUID::fromString) - ?: json[Key.FORESPOERSEL_ID]?.fromJson(UuidSerializer) - - if (forespoerselId == null) { - MdcUtils.withLogFields( - Log.transaksjonId(transaksjonsId) - ) { - "Mangler forespoerselId. Klarer ikke opprette sak.".also { - logger.error(it) - sikkerLogger.error(it) - } - } - return - } - - MdcUtils.withLogFields( - Log.transaksjonId(transaksjonsId), - Log.forespoerselId(forespoerselId) - ) { - dispatch(transaction, transaksjonsId, forespoerselId) - } - } - } - - private fun dispatch(transaction: Transaction, transaksjonsId: UUID, forespoerselId: UUID) { - if (transaction == Transaction.NEW) { - val fnr = redisStore.get(RedisKey.of(transaksjonsId, Key.IDENTITETSNUMMER)) + override fun new(message: JsonMessage) { + medTransaksjonIdOgForespoerselId(message) { transaksjonId, forespoerselId -> + val fnr = redisStore.get(RedisKey.of(transaksjonId, Key.IDENTITETSNUMMER)) if (fnr == null) { "Mangler fnr i redis. Klarer ikke opprette sak.".also { logger.error(it) @@ -106,21 +66,20 @@ class OpprettSakService(private val rapidsConnection: RapidsConnection, override return } - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, - Key.BEHOV.str to BehovType.FULLT_NAVN.name, - Key.IDENTITETSNUMMER.str to fnr, - Key.FORESPOERSEL_ID.str to forespoerselId - - ) - ).toJson() + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.FULLT_NAVN.toJson(), + Key.UUID to transaksjonId.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.IDENTITETSNUMMER to fnr.toJson() ) - } else if (transaction == Transaction.IN_PROGRESS) { - if (isDataCollected(*steg3(transaksjonsId))) { - val sakId = redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID)) + } + } + + override fun inProgress(message: JsonMessage) { + medTransaksjonIdOgForespoerselId(message) { transaksjonId, forespoerselId -> + if (isDataCollected(steg3(transaksjonId))) { + val sakId = redisStore.get(RedisKey.of(transaksjonId, Key.SAK_ID)) if (sakId == null) { "Mangler sakId i redis. Klarer ikke opprette sak.".also { logger.error(it) @@ -129,19 +88,19 @@ class OpprettSakService(private val rapidsConnection: RapidsConnection, override return } - rapidsConnection.publish( + rapid.publish( JsonMessage.newMessage( mapOf( Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, + Key.UUID.str to transaksjonId, Key.BEHOV.str to BehovType.PERSISTER_SAK_ID.name, Key.FORESPOERSEL_ID.str to forespoerselId, Key.SAK_ID.str to sakId ) ).toJson() ) - } else if (isDataCollected(*steg2(transaksjonsId))) { - val orgnr = redisStore.get(RedisKey.of(transaksjonsId, Key.ORGNRUNDERENHET)) + } else if (isDataCollected(steg2(transaksjonId))) { + val orgnr = redisStore.get(RedisKey.of(transaksjonId, Key.ORGNRUNDERENHET)) if (orgnr == null) { "Mangler orgnr i redis. Klarer ikke opprette sak.".also { logger.error(it) @@ -150,15 +109,15 @@ class OpprettSakService(private val rapidsConnection: RapidsConnection, override return } - val arbeidstaker = redisStore.get(RedisKey.of(transaksjonsId, Key.ARBEIDSTAKER_INFORMASJON)) + val arbeidstaker = redisStore.get(RedisKey.of(transaksjonId, Key.ARBEIDSTAKER_INFORMASJON)) ?.fromJson(PersonDato.serializer()) ?: ukjentArbeidstaker() - rapidsConnection.publish( + rapid.publish( JsonMessage.newMessage( mapOf( Key.EVENT_NAME.str to event.name, - Key.UUID.str to transaksjonsId, + Key.UUID.str to transaksjonId, Key.BEHOV.str to BehovType.OPPRETT_SAK, Key.FORESPOERSEL_ID.str to forespoerselId, Key.ORGNRUNDERENHET.str to orgnr, @@ -171,40 +130,44 @@ class OpprettSakService(private val rapidsConnection: RapidsConnection, override } override fun finalize(message: JsonMessage) { - val transaksjonsId = message[Key.UUID.str].asText().let(UUID::fromString) - - MdcUtils.withLogFields( - Log.klasse(this), - Log.event(EventName.SAK_OPPRETT_REQUESTED), - Log.transaksjonId(transaksjonsId) - ) { - val sakId = redisStore.get(RedisKey.of(transaksjonsId, Key.SAK_ID)) - if (sakId == null) { - "Mangler sakId i redis. Klarer ikke publisere event om opprettet sak.".also { - logger.error(it) - sikkerLogger.error(it) + medTransaksjonIdOgForespoerselId(message) { transaksjonId, forespoerselId -> + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(EventName.SAK_OPPRETT_REQUESTED), + Log.transaksjonId(transaksjonId), + Log.forespoerselId(forespoerselId) + ) { + val sakId = redisStore.get(RedisKey.of(transaksjonId, Key.SAK_ID)) + if (sakId == null) { + "Mangler sakId i redis. Klarer ikke publisere event om opprettet sak.".also { + logger.error(it) + sikkerLogger.error(it) + } + return } - return - } - rapidsConnection.publish( - JsonMessage.newMessage( - mapOf( - Key.EVENT_NAME.str to EventName.SAK_OPPRETTET.name, - Key.FORESPOERSEL_ID.str to message[Key.FORESPOERSEL_ID.str], - Key.SAK_ID.str to sakId - ) - ).toJson() - ) + rapid.publish( + Key.EVENT_NAME to EventName.SAK_OPPRETTET.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.SAK_ID to sakId.toJson() + ) + } } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { MdcUtils.withLogFields( Log.klasse(this), Log.event(EventName.SAK_OPPRETT_REQUESTED), Log.transaksjonId(fail.transaksjonId) ) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + if (utloesendeBehov == BehovType.FULLT_NAVN) { + val arbeidstakerKey = RedisKey.of(fail.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON) + redisStore.set(arbeidstakerKey, ukjentArbeidstaker().toJsonStr(PersonDato.serializer())) + return inProgress(message) + } + val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, event)) ?.let(UUID::fromString) @@ -216,19 +179,49 @@ class OpprettSakService(private val rapidsConnection: RapidsConnection, override } } - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) - if (utloesendeBehov == BehovType.FULLT_NAVN) { - val arbeidstakerKey = RedisKey.of(feil.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON) - redisStore.set(arbeidstakerKey, ukjentArbeidstaker().toJsonStr(PersonDato.serializer())) - return Transaction.IN_PROGRESS + private inline fun medTransaksjonIdOgForespoerselId(message: JsonMessage, block: (UUID, UUID) -> Unit) { + MdcUtils.withLogFields( + Log.klasse(this), + Log.event(EventName.SAK_OPPRETT_REQUESTED) + ) { + val json = message.toJson().parseJson().toMap() + + val transaksjonsId = json[Key.UUID]?.fromJson(UuidSerializer) + if (transaksjonsId == null) { + "Mangler transaksjonId. Klarer ikke opprette sak.".also { + logger.error(it) + sikkerLogger.error(it) + } + return + } + + val forespoerselId = redisStore.get(RedisKey.of(transaksjonsId, Key.FORESPOERSEL_ID))?.let(UUID::fromString) + ?: json[Key.FORESPOERSEL_ID]?.fromJson(UuidSerializer) + + if (forespoerselId == null) { + MdcUtils.withLogFields( + Log.transaksjonId(transaksjonsId) + ) { + "Mangler forespoerselId. Klarer ikke opprette sak.".also { + logger.error(it) + sikkerLogger.error(it) + } + } + return + } + + MdcUtils.withLogFields( + Log.transaksjonId(transaksjonsId), + Log.forespoerselId(forespoerselId) + ) { + block(transaksjonsId, forespoerselId) + } } - return Transaction.TERMINATE } private fun ukjentArbeidstaker(): PersonDato = PersonDato("Ukjent person", null, "") - private fun steg2(transactionId: UUID) = arrayOf(RedisKey.of(transactionId, Key.ARBEIDSTAKER_INFORMASJON)) - private fun steg3(transactionId: UUID) = arrayOf(RedisKey.of(transactionId, Key.SAK_ID)) + private fun steg2(transactionId: UUID): List = listOf(RedisKey.of(transactionId, Key.ARBEIDSTAKER_INFORMASJON)) + private fun steg3(transactionId: UUID): List = listOf(RedisKey.of(transactionId, Key.SAK_ID)) } diff --git a/notifikasjon/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveServiceTest.kt b/notifikasjon/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveServiceTest.kt index 867fb2f3be..8dc4731ba2 100644 --- a/notifikasjon/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveServiceTest.kt +++ b/notifikasjon/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/notifikasjon/OpprettOppgaveServiceTest.kt @@ -9,7 +9,6 @@ import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.IKey import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.toJson -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.test.mock.MockRedis import no.nav.helsearbeidsgiver.felles.test.rapidsrivers.sendJson @@ -55,7 +54,6 @@ class OpprettOppgaveServiceTest { Key.FORESPOERSEL_ID to fail.forespoerselId!!.toJson() ) - assertEquals(Transaction.TERMINATE, service.determineTransactionState(failMap)) assertNotNull(service.toFailOrNull(failMap)) } diff --git a/tilgangservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/tilgangservice/TilgangService.kt b/tilgangservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/tilgangservice/TilgangService.kt index 84dc169fd4..6db631d18a 100644 --- a/tilgangservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/tilgangservice/TilgangService.kt +++ b/tilgangservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/tilgangservice/TilgangService.kt @@ -14,11 +14,10 @@ import no.nav.helsearbeidsgiver.felles.json.les import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -32,37 +31,33 @@ import no.nav.helsearbeidsgiver.utils.json.toPretty import no.nav.helsearbeidsgiver.utils.log.MdcUtils import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger -import no.nav.helsearbeidsgiver.utils.pipe.orDefault import java.util.UUID class TilgangService( private val rapid: RapidsConnection, override val redisStore: RedisStore -) : CompositeEventListener(redisStore) { +) : CompositeEventListener() { private val logger = logger() private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.TILGANG_REQUESTED + override val event = EventName.TILGANG_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.FNR + ) + override val dataKeys = listOf( + Key.ORGNRUNDERENHET, + Key.TILGANG + ) init { - withFailKanal { DelegatingFailKanal(event, it, rapid) } - withDataKanal { - StatefullDataKanal( - dataFelter = arrayOf( - Key.ORGNRUNDERENHET, - Key.TILGANG - ), - eventName = event, - mainListener = it, - rapidsConnection = rapid, - redisStore = redisStore - ) - } - withEventListener { StatefullEventListener(redisStore, event, arrayOf(Key.FORESPOERSEL_ID, Key.FNR), it, rapid) } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun new(message: JsonMessage) { val json = message.toJsonMap() val transaksjonId = Key.UUID.les(UuidSerializer, json) @@ -72,67 +67,63 @@ class TilgangService( Log.transaksjonId(transaksjonId), Log.forespoerselId(forespoerselId) ) { - dispatch(transaction, transaksjonId, forespoerselId) + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.HENT_IM_ORGNR.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.UUID to transaksjonId.toJson() + ) + .also { + MdcUtils.withLogFields( + Log.behov(BehovType.HENT_IM_ORGNR) + ) { + sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") + } + } } } - private fun dispatch(transaction: Transaction, transaksjonId: UUID, forespoerselId: UUID) { - sikkerLogger.info("Prosesserer transaksjon $transaction.") + override fun inProgress(message: JsonMessage) { + val json = message.toJsonMap() + + val transaksjonId = Key.UUID.les(UuidSerializer, json) + val forespoerselId = Key.FORESPOERSEL_ID.les(UuidSerializer, json) + + MdcUtils.withLogFields( + Log.transaksjonId(transaksjonId), + Log.forespoerselId(forespoerselId) + ) { + val orgnrKey = RedisKey.of(transaksjonId, Key.ORGNRUNDERENHET) + + if (isDataCollected(listOf(orgnrKey))) { + val orgnr = orgnrKey.read() - when (transaction) { - Transaction.NEW -> { + val fnr = RedisKey.of(transaksjonId, Key.FNR) + .read() + if (orgnr == null || fnr == null) { + "Klarte ikke lese orgnr og / eller fnr fra Redis.".also { + logger.error(it) + sikkerLogger.error(it) + } + return + } rapid.publish( Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.HENT_IM_ORGNR.toJson(), + Key.BEHOV to BehovType.TILGANGSKONTROLL.toJson(), Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.ORGNRUNDERENHET to orgnr.toJson(), + Key.FNR to fnr.toJson(), Key.UUID to transaksjonId.toJson() ) .also { MdcUtils.withLogFields( - Log.behov(BehovType.HENT_IM_ORGNR) + Log.behov(BehovType.TILGANGSKONTROLL) ) { sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") } } - } - - Transaction.IN_PROGRESS -> { - val orgnrKey = RedisKey.of(transaksjonId, Key.ORGNRUNDERENHET) - - if (isDataCollected(orgnrKey)) { - val orgnr = orgnrKey.read() - - val fnr = RedisKey.of(transaksjonId, Key.FNR) - .read() - if (orgnr == null || fnr == null) { - "Klarte ikke lese orgnr og / eller fnr fra Redis.".also { - logger.error(it) - sikkerLogger.error(it) - } - return - } - rapid.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.TILGANGSKONTROLL.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.ORGNRUNDERENHET to orgnr.toJson(), - Key.FNR to fnr.toJson(), - Key.UUID to transaksjonId.toJson() - ) - .also { - MdcUtils.withLogFields( - Log.behov(BehovType.TILGANGSKONTROLL) - ) { - sikkerLogger.info("Publiserte melding:\n${it.toPretty()}.") - } - } - } else { - sikkerLogger.error("Transaksjon er underveis, men mangler data. Dette bør aldri skje, ettersom vi kun venter på én datapakke.") - } - } - - else -> { - sikkerLogger.error("Støtte på forutsett transaksjonstype: $transaction") + } else { + sikkerLogger.error("Transaksjon er underveis, men mangler data. Dette bør aldri skje, ettersom vi kun venter på én datapakke.") } } } @@ -168,23 +159,35 @@ class TilgangService( } } - override fun terminate(fail: Fail) { + override fun onError(message: JsonMessage, fail: Fail) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + + val manglendeDatafelt = when (utloesendeBehov) { + BehovType.HENT_IM_ORGNR -> Key.ORGNRUNDERENHET + BehovType.TILGANGSKONTROLL -> Key.TILGANG + else -> null + } + + val feilReport = if (manglendeDatafelt != null) { + val feilmelding = Feilmelding("Teknisk feil, prøv igjen senere.", -1, manglendeDatafelt) + + sikkerLogger.error("Returnerer feilmelding: '${feilmelding.melding}'") + + FeilReport( + mutableListOf(feilmelding) + ) + } else { + FeilReport() + } + val clientId = RedisKey.of(fail.transaksjonId, event) .read() ?.let(UUID::fromString) - val feil = RedisKey.of(fail.transaksjonId, Feilmelding("")) - .read() - - val feilResponse = TilgangData( - feil = feil?.fromJson(FeilReport.serializer()) - ) - .toJson(TilgangData.serializer()) - if (clientId == null) { sikkerLogger.error("$event forsøkt terminert, kunne ikke finne ${fail.transaksjonId} i redis!") } else { - RedisKey.of(clientId).write(feilResponse) + RedisKey.of(clientId).write(feilReport.toJson(FeilReport.serializer())) MdcUtils.withLogFields( Log.clientId(clientId), @@ -195,36 +198,6 @@ class TilgangService( } } - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) - - val manglendeDatafelt = when (utloesendeBehov) { - BehovType.HENT_IM_ORGNR -> Key.ORGNRUNDERENHET - BehovType.TILGANGSKONTROLL -> Key.TILGANG - else -> null - } - - if (manglendeDatafelt != null) { - val feilmelding = Feilmelding("Teknisk feil, prøv igjen senere.", -1, manglendeDatafelt) - - sikkerLogger.error("Mottok feilmelding: '${feilmelding.melding}'") - - val feilKey = RedisKey.of(feil.transaksjonId, feilmelding) - - val feilReport = feilKey.read() - ?.fromJson(FeilReport.serializer()) - .orDefault(FeilReport()) - .also { - it.feil.add(feilmelding) - } - .toJson(FeilReport.serializer()) - - feilKey.write(feilReport) - } - - return Transaction.TERMINATE - } - private fun RedisKey.write(json: JsonElement) { redisStore.set(this, json.toString()) } diff --git a/tilgangservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/TilgangServiceTest.kt b/tilgangservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/TilgangServiceTest.kt index 1ce20cb54d..929068315e 100644 --- a/tilgangservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/TilgangServiceTest.kt +++ b/tilgangservice/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/inntektservice/TilgangServiceTest.kt @@ -1,16 +1,24 @@ package no.nav.helsearbeidsgiver.inntektsmelding.inntektservice +import io.kotest.assertions.throwables.shouldNotThrowAny +import io.mockk.clearAllMocks +import io.mockk.every +import io.mockk.verify import kotlinx.serialization.json.JsonObject +import no.nav.helse.rapids_rivers.JsonMessage import no.nav.helse.rapids_rivers.testsupport.TestRapid import no.nav.helsearbeidsgiver.felles.BehovType import no.nav.helsearbeidsgiver.felles.EventName +import no.nav.helsearbeidsgiver.felles.FeilReport +import no.nav.helsearbeidsgiver.felles.Feilmelding import no.nav.helsearbeidsgiver.felles.Key import no.nav.helsearbeidsgiver.felles.json.toJson -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey import no.nav.helsearbeidsgiver.felles.test.mock.MockRedis import no.nav.helsearbeidsgiver.inntektsmelding.tilgangservice.TilgangService -import org.junit.jupiter.api.Assertions.assertEquals +import no.nav.helsearbeidsgiver.utils.json.toJsonStr +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.util.UUID @@ -19,18 +27,32 @@ class TilgangServiceTest { private val testRapid = TestRapid() private val mockRedis = MockRedis() + private val service = TilgangService(testRapid, mockRedis.store) + + @BeforeEach + fun setup() { + testRapid.reset() + clearAllMocks() + mockRedis.setup() + } + @Test - fun testOnError() { + fun `kritisk feil stopper flyten`() { // TODO: uuid *må* være satt i Fail - ellers kastes IllegalStateException ved onError() // Kunne endret dette til å returnere Transaction.TERMINATE, men da gjenoppstår // problemet som en IllegalArgumentException i .terminate() // Kanskje bør uuid enforces til ikke-null i Fail? - val service = TilgangService(testRapid, mockRedis.store) - val feil = Fail( + val event = EventName.TILGANG_REQUESTED + val clientId = UUID.randomUUID() + val transaksjonId = UUID.randomUUID() + + every { mockRedis.store.get(RedisKey.of(transaksjonId, event)) } returns clientId.toString() + + val fail = Fail( feilmelding = "ikkeno", - event = EventName.TILGANG_REQUESTED, - transaksjonId = UUID.randomUUID(), + event = event, + transaksjonId = transaksjonId, forespoerselId = null, utloesendeMelding = JsonObject( mapOf( @@ -38,9 +60,20 @@ class TilgangServiceTest { ) ) ) - val transaction = service.onError(feil) - assertEquals(Transaction.TERMINATE, transaction) - service.terminate(feil) + shouldNotThrowAny { + service.onError(JsonMessage.newMessage(), fail) + } + + val expectedFeilReport = FeilReport( + mutableListOf( + Feilmelding("Teknisk feil, prøv igjen senere.", -1, Key.TILGANG) + ) + ) + .toJsonStr(FeilReport.serializer()) + + verify { + mockRedis.store.set(RedisKey.of(clientId), expectedFeilReport) + } } } diff --git a/trengerservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/trengerservice/TrengerService.kt b/trengerservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/trengerservice/TrengerService.kt index 2c9369b84b..9549e47cf3 100644 --- a/trengerservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/trengerservice/TrengerService.kt +++ b/trengerservice/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/trengerservice/TrengerService.kt @@ -16,11 +16,10 @@ import no.nav.helsearbeidsgiver.felles.json.les import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap -import no.nav.helsearbeidsgiver.felles.rapidsrivers.DelegatingFailKanal +import no.nav.helsearbeidsgiver.felles.rapidsrivers.FailKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullDataKanal import no.nav.helsearbeidsgiver.felles.rapidsrivers.StatefullEventListener import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.CompositeEventListener -import no.nav.helsearbeidsgiver.felles.rapidsrivers.composite.Transaction import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisKey @@ -38,147 +37,107 @@ import no.nav.helsearbeidsgiver.utils.log.sikkerLogger import java.util.UUID const val UNDEFINED_FELT: String = "{}" -class TrengerService(private val rapidsConnection: RapidsConnection, override val redisStore: RedisStore) : CompositeEventListener(redisStore) { + +class TrengerService( + private val rapid: RapidsConnection, + override val redisStore: RedisStore +) : CompositeEventListener() { private val sikkerLogger = sikkerLogger() - override val event: EventName = EventName.TRENGER_REQUESTED + override val event = EventName.TRENGER_REQUESTED + override val startKeys = listOf( + Key.FORESPOERSEL_ID, + Key.ARBEIDSGIVER_ID + ) + override val dataKeys = listOf( + Key.FORESPOERSEL_SVAR, + Key.ARBEIDSTAKER_INFORMASJON, + Key.ARBEIDSGIVER_INFORMASJON, + Key.VIRKSOMHET, + Key.INNTEKT + ) init { - withFailKanal { DelegatingFailKanal(event, it, rapidsConnection) } - withDataKanal { - StatefullDataKanal( - listOf( - Key.FORESPOERSEL_SVAR, - Key.ARBEIDSTAKER_INFORMASJON, - Key.ARBEIDSGIVER_INFORMASJON, - Key.VIRKSOMHET, - Key.INNTEKT - ).toTypedArray(), - event, - it, - rapidsConnection, - redisStore - ) - } - withEventListener { - StatefullEventListener( - redisStore, - event, - listOf(Key.FORESPOERSEL_ID, Key.ARBEIDSGIVER_ID).toTypedArray(), - it, - rapidsConnection - ) - } + StatefullEventListener(rapid, event, redisStore, startKeys, ::onPacket) + StatefullDataKanal(rapid, event, redisStore, dataKeys, ::onPacket) + FailKanal(rapid, event, ::onPacket) } - override fun onError(feil: Fail): Transaction { - val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), feil.utloesendeMelding.toMap()) + override fun new(message: JsonMessage) { + val uuid = message[Key.UUID.str].asText().let(UUID::fromString) + val forespoerselId = message[Key.FORESPOERSEL_ID.str].asText().let(UUID::fromString) - var feilmelding: Feilmelding? = null - if (utloesendeBehov == BehovType.HENT_TRENGER_IM) { - feilmelding = Feilmelding("Teknisk feil, prøv igjen senere.", -1, datafelt = Key.FORESPOERSEL_SVAR) - val feilKey = RedisKey.of(feil.transaksjonId, feilmelding) - val feilReport: FeilReport = redisStore.get(feilKey)?.fromJson(FeilReport.serializer()) ?: FeilReport() - feilReport.feil.add(feilmelding) - redisStore.set(feilKey, feilReport.toJsonStr(FeilReport.serializer())) - return Transaction.TERMINATE - } else if (utloesendeBehov == BehovType.VIRKSOMHET) { - feilmelding = Feilmelding("Vi klarte ikke å hente virksomhet navn.", datafelt = Key.VIRKSOMHET) - redisStore.set(RedisKey.of(feil.transaksjonId, Key.VIRKSOMHET), "Ukjent navn") - } else if (utloesendeBehov == BehovType.FULLT_NAVN) { - feilmelding = Feilmelding("Vi klarte ikke å hente arbeidstaker informasjon.", datafelt = Key.ARBEIDSTAKER_INFORMASJON) - redisStore.set( - RedisKey.of(feil.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON), - PersonDato("Ukjent navn", null, "").toJsonStr(PersonDato.serializer()) - ) - redisStore.set( - RedisKey.of(feil.transaksjonId, Key.ARBEIDSGIVER_INFORMASJON), - PersonDato("Ukjent navn", null, "").toJsonStr(PersonDato.serializer()) - ) - } else if (utloesendeBehov == BehovType.INNTEKT) { - feilmelding = Feilmelding( - "Vi har problemer med å hente inntektsopplysninger. Du kan legge inn beregnet månedsinntekt manuelt, eller prøv igjen senere.", - datafelt = Key.INNTEKT - ) - redisStore.set(RedisKey.of(feil.transaksjonId, Key.INNTEKT), UNDEFINED_FELT) - } - if (feilmelding != null) { - val feilKey = RedisKey.of(feil.transaksjonId, feilmelding) - val feilReport: FeilReport = redisStore.get(feilKey)?.fromJson(FeilReport.serializer()) ?: FeilReport() - feilReport.feil.add(feilmelding) - redisStore.set(feilKey, feilReport.toJsonStr(FeilReport.serializer())) - } - return Transaction.IN_PROGRESS + sikkerLogger.info("Dispatcher HENT_TRENGER_IM for $uuid") + sikkerLogger.info("${simpleName()} Dispatcher HENT_TRENGER_IM for $uuid") + + val agFnr = message[Key.ARBEIDSGIVER_ID.str].asText() + + redisStore.set(RedisKey.of(uuid, Key.ARBEIDSGIVER_FNR), agFnr) // ta vare på denne til vi slår opp fullt navn + + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.HENT_TRENGER_IM.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.UUID to uuid.toJson() + ) } - override fun dispatchBehov(message: JsonMessage, transaction: Transaction) { + override fun inProgress(message: JsonMessage) { val uuid = message[Key.UUID.str].asText().let(UUID::fromString) val forespoerselId = message[Key.FORESPOERSEL_ID.str].asText().let(UUID::fromString) - sikkerLogger.info("Dispatcher for $uuid with trans state $transaction") - if (transaction == Transaction.NEW) { - sikkerLogger.info("Dispatcher HENT_TRENGER_IM for $uuid") - sikkerLogger.info("${simpleName()} Dispatcher HENT_TRENGER_IM for $uuid") - val agFnr = message[Key.ARBEIDSGIVER_ID.str].asText() - redisStore.set(RedisKey.of(uuid, Key.ARBEIDSGIVER_FNR), agFnr) // ta vare på denne til vi slår opp fullt navn - rapidsConnection.publish( + + sikkerLogger.info("Dispatcher for $uuid with trans state 'in progress'") + + message.interestedIn(Key.FORESPOERSEL_SVAR.str) + + if (isDataCollected(step1data(uuid)) && !message[Key.FORESPOERSEL_SVAR.str].isMissingNode) { + val forespoersel = redisStore.get(RedisKey.of(uuid, Key.FORESPOERSEL_SVAR))!!.fromJson(TrengerInntekt.serializer()) + sikkerLogger.info("${simpleName()} Dispatcher VIRKSOMHET for $uuid") + rapid.publish( Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.HENT_TRENGER_IM.toJson(), + Key.BEHOV to BehovType.VIRKSOMHET.toJson(), Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.UUID to uuid.toJson() + Key.UUID to uuid.toJson(), + Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson() ) - } else if (transaction == Transaction.IN_PROGRESS) { - message.interestedIn(Key.FORESPOERSEL_SVAR.str) - if (isDataCollected(*step1data(uuid)) && !message[Key.FORESPOERSEL_SVAR.str].isMissingNode) { - val forespoersel = redisStore.get(RedisKey.of(uuid, Key.FORESPOERSEL_SVAR))!!.fromJson(TrengerInntekt.serializer()) - sikkerLogger.info("${simpleName()} Dispatcher VIRKSOMHET for $uuid") - rapidsConnection.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.VIRKSOMHET.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.UUID to uuid.toJson(), - Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson() - ) - sikkerLogger.info("${simpleName()} dispatcher FULLT_NAVN for $uuid") - rapidsConnection.publish( + sikkerLogger.info("${simpleName()} dispatcher FULLT_NAVN for $uuid") + rapid.publish( + Key.EVENT_NAME to event.toJson(), + Key.BEHOV to BehovType.FULLT_NAVN.toJson(), + Key.FORESPOERSEL_ID to forespoerselId.toJson(), + Key.UUID to uuid.toJson(), + Key.IDENTITETSNUMMER to forespoersel.fnr.toJson(), + Key.ARBEIDSGIVER_ID to redisStore.get(RedisKey.of(uuid, Key.ARBEIDSGIVER_FNR)).orEmpty().toJson() + ) + + val skjaeringstidspunkt = forespoersel.skjaeringstidspunkt + ?: finnSkjaeringstidspunkt(forespoersel.egenmeldingsperioder + forespoersel.sykmeldingsperioder) + + if (skjaeringstidspunkt != null) { + sikkerLogger.info("${simpleName()} Dispatcher INNTEKT for $uuid") + rapid.publish( Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.FULLT_NAVN.toJson(), + Key.BEHOV to BehovType.INNTEKT.toJson(), Key.FORESPOERSEL_ID to forespoerselId.toJson(), Key.UUID to uuid.toJson(), - Key.IDENTITETSNUMMER to forespoersel.fnr.toJson(), - Key.ARBEIDSGIVER_ID to redisStore.get(RedisKey.of(uuid, Key.ARBEIDSGIVER_FNR)).orEmpty().toJson() + Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson(), + Key.FNR to forespoersel.fnr.toJson(), + Key.SKJAERINGSTIDSPUNKT to skjaeringstidspunkt.toJson() ) - - val skjaeringstidspunkt = forespoersel.skjaeringstidspunkt - ?: finnSkjaeringstidspunkt(forespoersel.egenmeldingsperioder + forespoersel.sykmeldingsperioder) - - if (skjaeringstidspunkt != null) { - sikkerLogger.info("${simpleName()} Dispatcher INNTEKT for $uuid") - rapidsConnection.publish( - Key.EVENT_NAME to event.toJson(), - Key.BEHOV to BehovType.INNTEKT.toJson(), - Key.FORESPOERSEL_ID to forespoerselId.toJson(), - Key.UUID to uuid.toJson(), - Key.ORGNRUNDERENHET to forespoersel.orgnr.toJson(), - Key.FNR to forespoersel.fnr.toJson(), - Key.SKJAERINGSTIDSPUNKT to skjaeringstidspunkt.toJson() + } else { + "Fant ikke skjaeringstidspunkt å hente inntekt for.".also { + sikkerLogger.error("$it forespoersel=$forespoersel") + val fail = Fail( + feilmelding = it, + event = event, + transaksjonId = uuid, + forespoerselId = forespoerselId, + utloesendeMelding = message.toJson().parseJson() ) - } else { - "Fant ikke skjaeringstidspunkt å hente inntekt for.".also { - sikkerLogger.error("$it forespoersel=$forespoersel") - val fail = Fail( - feilmelding = it, - event = event, - transaksjonId = uuid, - forespoerselId = forespoerselId, - utloesendeMelding = message.toJson().parseJson() - ) - onError(fail) - } + onError(message, fail) } } - } else { - sikkerLogger.error("Illegal transaction type ecountered in dispatchBehov $transaction for uuid= $uuid") } } @@ -221,21 +180,59 @@ class TrengerService(private val rapidsConnection: RapidsConnection, override va } } - override fun terminate(fail: Fail) { - sikkerLogger.info("terminate transaction id ${fail.transaksjonId} with evenname ${fail.event}") - val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, fail.event))?.let(UUID::fromString) - // @TODO kan vare smartere her. Kan definere feilmeldingen i Feil message istedenfor å hardkode det i TrengerService. Vi også ikke trenger å sende alle andre ikke kritiske feilmeldinger hvis vi har noe kritisk - val feilReport: FeilReport = redisStore.get(RedisKey.of(fail.transaksjonId, Feilmelding("")))!!.fromJson(FeilReport.serializer()) - if (clientId != null) { - redisStore.set(RedisKey.of(clientId), TrengerData(feilReport = feilReport).toJsonStr(TrengerData.serializer())) + override fun onError(message: JsonMessage, fail: Fail) { + val utloesendeBehov = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding.toMap()) + + var feilmelding: Feilmelding? = null + if (utloesendeBehov == BehovType.HENT_TRENGER_IM) { + val feilReport = FeilReport( + mutableListOf( + Feilmelding("Teknisk feil, prøv igjen senere.", -1, datafelt = Key.FORESPOERSEL_SVAR) + ) + ) + + sikkerLogger.info("terminate transaction id ${fail.transaksjonId} with evenname ${fail.event}") + + val clientId = redisStore.get(RedisKey.of(fail.transaksjonId, fail.event))?.let(UUID::fromString) + if (clientId != null) { + redisStore.set(RedisKey.of(clientId), TrengerData(feilReport = feilReport).toJsonStr(TrengerData.serializer())) + } + return + } else if (utloesendeBehov == BehovType.VIRKSOMHET) { + feilmelding = Feilmelding("Vi klarte ikke å hente virksomhet navn.", datafelt = Key.VIRKSOMHET) + redisStore.set(RedisKey.of(fail.transaksjonId, Key.VIRKSOMHET), "Ukjent navn") + } else if (utloesendeBehov == BehovType.FULLT_NAVN) { + feilmelding = Feilmelding("Vi klarte ikke å hente arbeidstaker informasjon.", datafelt = Key.ARBEIDSTAKER_INFORMASJON) + redisStore.set( + RedisKey.of(fail.transaksjonId, Key.ARBEIDSTAKER_INFORMASJON), + PersonDato("Ukjent navn", null, "").toJsonStr(PersonDato.serializer()) + ) + redisStore.set( + RedisKey.of(fail.transaksjonId, Key.ARBEIDSGIVER_INFORMASJON), + PersonDato("Ukjent navn", null, "").toJsonStr(PersonDato.serializer()) + ) + } else if (utloesendeBehov == BehovType.INNTEKT) { + feilmelding = Feilmelding( + "Vi har problemer med å hente inntektsopplysninger. Du kan legge inn beregnet månedsinntekt manuelt, eller prøv igjen senere.", + datafelt = Key.INNTEKT + ) + redisStore.set(RedisKey.of(fail.transaksjonId, Key.INNTEKT), UNDEFINED_FELT) + } + if (feilmelding != null) { + val feilKey = RedisKey.of(fail.transaksjonId, feilmelding) + val feilReport: FeilReport = redisStore.get(feilKey)?.fromJson(FeilReport.serializer()) ?: FeilReport() + feilReport.feil.add(feilmelding) + redisStore.set(feilKey, feilReport.toJsonStr(FeilReport.serializer())) } + return inProgress(message) } - private fun step1data(uuid: UUID): Array = arrayOf( - RedisKey.of(uuid, Key.FORESPOERSEL_SVAR) - ) + private fun step1data(uuid: UUID): List = + listOf( + RedisKey.of(uuid, Key.FORESPOERSEL_SVAR) + ) - fun String.fromJsonWithUndefined(serializer: KSerializer): T? { + private fun String.fromJsonWithUndefined(serializer: KSerializer): T? { if (this == UNDEFINED_FELT) return null return this.fromJson(serializer) }