Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send Kafka-key til behov-rivers som aktiveres i parallell #797

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.domene.Arbeidsforhold
import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.metrics.Metrics
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand All @@ -26,6 +28,7 @@ data class HentArbeidsforholdMelding(
val behovType: BehovType,
val transaksjonId: UUID,
val data: Map<Key, JsonElement>,
val svarKafkaKey: KafkaKey?,
val fnr: Fnr,
)

Expand All @@ -46,6 +49,7 @@ class HentArbeidsforholdRiver(
behovType = Key.BEHOV.krev(BehovType.HENT_ARBEIDSFORHOLD, BehovType.serializer(), json),
transaksjonId = Key.KONTEKST_ID.les(UuidSerializer, json),
data = data,
svarKafkaKey = Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), data),
fnr = Key.FNR.les(Fnr.serializer(), data),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ private object Mock {
mapOf(
Key.FNR to fnr.toJson(Fnr.serializer()),
),
svarKafkaKey = null,
fnr = fnr,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import no.nav.helsearbeidsgiver.felles.domene.ResultJson
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.personMapSerializer
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish
import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore
Expand Down Expand Up @@ -95,13 +96,16 @@ class AktiveOrgnrService(
data: Map<Key, JsonElement>,
steg0: Steg0,
) {
val svarKafkaKey = KafkaKey(steg0.sykmeldtFnr)

rapid.publish(
key = steg0.sykmeldtFnr,
Key.EVENT_NAME to eventName.toJson(),
Key.BEHOV to BehovType.ARBEIDSGIVERE.toJson(),
Key.KONTEKST_ID to steg0.transaksjonId.toJson(),
Key.DATA to
mapOf(
Key.SVAR_KAFKA_KEY to svarKafkaKey.toJson(),
Key.ARBEIDSGIVER_FNR to steg0.avsenderFnr.toJson(),
).toJson(),
)
Expand All @@ -113,6 +117,7 @@ class AktiveOrgnrService(
Key.KONTEKST_ID to steg0.transaksjonId.toJson(),
Key.DATA to
mapOf(
Key.SVAR_KAFKA_KEY to svarKafkaKey.toJson(),
Key.FNR to steg0.sykmeldtFnr.toJson(),
).toJson(),
)
Expand All @@ -124,6 +129,7 @@ class AktiveOrgnrService(
Key.KONTEKST_ID to steg0.transaksjonId.toJson(),
Key.DATA to
mapOf(
Key.SVAR_KAFKA_KEY to svarKafkaKey.toJson(),
Key.FNR_LISTE to
setOf(
steg0.sykmeldtFnr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import no.nav.helsearbeidsgiver.felles.domene.Arbeidsgiver
import no.nav.helsearbeidsgiver.felles.domene.PeriodeNullable
import no.nav.helsearbeidsgiver.felles.domene.Person
import no.nav.helsearbeidsgiver.felles.domene.ResultJson
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.personMapSerializer
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisPrefix
import no.nav.helsearbeidsgiver.felles.rapidsrivers.service.ServiceRiverStateful
import no.nav.helsearbeidsgiver.felles.test.json.lesBehov
import no.nav.helsearbeidsgiver.felles.test.json.lesData
import no.nav.helsearbeidsgiver.felles.test.json.plusData
import no.nav.helsearbeidsgiver.felles.test.mock.MockRedis
import no.nav.helsearbeidsgiver.felles.test.mock.mockFail
Expand Down Expand Up @@ -62,9 +65,18 @@ class AktiveOrgnrServiceTest :
)

testRapid.inspektør.size shouldBeExactly 3
testRapid.message(0).lesBehov() shouldBe BehovType.ARBEIDSGIVERE
testRapid.message(1).lesBehov() shouldBe BehovType.HENT_ARBEIDSFORHOLD
testRapid.message(2).lesBehov() shouldBe BehovType.HENT_PERSONER
testRapid.message(0).also {
it.lesBehov() shouldBe BehovType.ARBEIDSGIVERE
Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), it.lesData()) shouldBe KafkaKey(Mock.sykmeldtFnr)
}
testRapid.message(1).also {
it.lesBehov() shouldBe BehovType.HENT_ARBEIDSFORHOLD
Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), it.lesData()) shouldBe KafkaKey(Mock.sykmeldtFnr)
}
testRapid.message(2).also {
it.lesBehov() shouldBe BehovType.HENT_PERSONER
Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), it.lesData()) shouldBe KafkaKey(Mock.sykmeldtFnr)
}

testRapid.sendJson(
Mock.steg1Data(transaksjonId, orgnr),
Expand Down Expand Up @@ -180,11 +192,12 @@ class AktiveOrgnrServiceTest :
})

private object Mock {
val sykmeldtFnr = Fnr.genererGyldig()

private const val SYKMELDT_NAVN = "Ole Idole"
private const val AVSENDER_NAVN = "Ole Jacob Evenrud"
private const val ORG_NAVN = "Mexican Standup A/S"

private val sykmeldtFnr = Fnr.genererGyldig()
private val avsenderFnr = Fnr.genererGyldig()

private val personer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import no.nav.helsearbeidsgiver.felles.EventName
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.metrics.Metrics
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand All @@ -27,6 +29,7 @@ data class Melding(
val behovType: BehovType,
val transaksjonId: UUID,
val data: Map<Key, JsonElement>,
val svarKafkaKey: KafkaKey?,
val fnr: Fnr,
)

Expand All @@ -47,6 +50,7 @@ class AltinnRiver(
behovType = Key.BEHOV.krev(BehovType.ARBEIDSGIVERE, BehovType.serializer(), json),
transaksjonId = Key.KONTEKST_ID.les(UuidSerializer, json),
data = data,
svarKafkaKey = Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), data),
fnr = Key.ARBEIDSGIVER_FNR.les(Fnr.serializer(), data),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object Mock {
mapOf(
Key.ARBEIDSGIVER_FNR to fnr.toJson(),
),
svarKafkaKey = null,
fnr = fnr,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,46 +56,42 @@ class BerikInntektsmeldingServiceTest :
}

test("nytt inntektsmeldingskjema berikes, lagres og sendes videre til journalføring") {
testRapid.sendJson(Mock.steg0(Mock.transaksjonId))
val transaksjonId = UUID.randomUUID()

testRapid.sendJson(Mock.steg0(transaksjonId))

// Melding med forventet behov og data sendt for å hente forespørsel
testRapid.inspektør.size shouldBeExactly 1
testRapid.message(0).also {
it.lesBehov() shouldBe BehovType.HENT_TRENGER_IM

Key.FORESPOERSEL_ID.lesOrNull(
serializer = UuidSerializer,
melding = it.lesData(),
) shouldBe Mock.skjema.forespoerselId
val data = it.lesData()
Key.FORESPOERSEL_ID.lesOrNull(UuidSerializer, data) shouldBe Mock.skjema.forespoerselId
}

testRapid.sendJson(Mock.steg1(Mock.transaksjonId))
testRapid.sendJson(Mock.steg1(transaksjonId))

// Melding med forventet behov og data sendt for å hente virksomhetsnavn
testRapid.inspektør.size shouldBeExactly 2
testRapid.message(1).also {
it.lesBehov() shouldBe BehovType.HENT_VIRKSOMHET_NAVN

Key.ORGNR_UNDERENHETER.lesOrNull(
serializer = Orgnr.serializer().set(),
melding = it.lesData(),
) shouldNotBe null
val data = it.lesData()
Key.ORGNR_UNDERENHETER.lesOrNull(Orgnr.serializer().set(), data) shouldNotBe null
}

testRapid.sendJson(Mock.steg2(Mock.transaksjonId))
testRapid.sendJson(Mock.steg2(transaksjonId))

// Melding med forventet behov og data sendt for å hente personnavn
testRapid.inspektør.size shouldBeExactly 3
testRapid.message(2).also {
it.lesBehov() shouldBe BehovType.HENT_PERSONER

Key.FNR_LISTE.lesOrNull(
serializer = Fnr.serializer().set(),
melding = it.lesData(),
) shouldNotBe null
val data = it.lesData()
Key.FNR_LISTE.lesOrNull(Fnr.serializer().set(), data) shouldNotBe null
}

testRapid.sendJson(Mock.steg3(Mock.transaksjonId))
testRapid.sendJson(Mock.steg3(transaksjonId))

// Melding med forventet behov og data sendt for å lagre inntektsmelding
testRapid.inspektør.size shouldBeExactly 4
Expand All @@ -108,7 +104,7 @@ class BerikInntektsmeldingServiceTest :
Key.INNSENDING_ID.lesOrNull(Long.serializer(), data) shouldBe Mock.INNSENDING_ID
}

testRapid.sendJson(Mock.steg4(Mock.transaksjonId))
testRapid.sendJson(Mock.steg4(transaksjonId))

// Inntektsmelding sendt videre til journalføring med forventet data
testRapid.inspektør.size shouldBeExactly 5
Expand All @@ -126,7 +122,7 @@ class BerikInntektsmeldingServiceTest :
test("duplikat IM sendes _ikke_ videre til journalføring") {
testRapid.sendJson(
Mock
.steg4(Mock.transaksjonId)
.steg4(UUID.randomUUID())
.plusData(Key.ER_DUPLIKAT_IM to true.toJson(Boolean.serializer())),
)

Expand All @@ -150,72 +146,63 @@ class BerikInntektsmeldingServiceTest :
private object Mock {
const val INNSENDING_ID = 1L

val transaksjonId: UUID = UUID.randomUUID()
val forespoersel = mockForespoersel()
val skjema = mockSkjemaInntektsmelding()
val inntektsmelding = mockInntektsmeldingV1()

val avsender =
private val forespoersel = mockForespoersel()

private val avsender =
Person(
fnr = Fnr.genererGyldig(),
navn = "Skrue McDuck",
)

val sykmeldt =
private val sykmeldt =
Person(
fnr = Fnr(forespoersel.fnr),
navn = "Dolly Duck",
)

val orgnrMedNavn = mapOf(Orgnr(forespoersel.orgnr) to "Lasses kasserollesjappe")
private val orgnrMedNavn = mapOf(Orgnr(forespoersel.orgnr) to "Lasses kasserollesjappe")

val personer =
private val personer =
mapOf(
avsender.fnr to avsender,
sykmeldt.fnr to sykmeldt,
)

val steg0data =
fun steg0(transaksjonId: UUID): Map<Key, JsonElement> =
mapOf(
Key.ARBEIDSGIVER_FNR to avsender.fnr.toJson(),
Key.SKJEMA_INNTEKTSMELDING to skjema.toJson(SkjemaInntektsmelding.serializer()),
Key.INNSENDING_ID to INNSENDING_ID.toJson(Long.serializer()),
Key.EVENT_NAME to EventName.INNTEKTSMELDING_SKJEMA_LAGRET.toJson(),
Key.KONTEKST_ID to transaksjonId.toJson(),
Key.DATA to
mapOf(
Key.ARBEIDSGIVER_FNR to avsender.fnr.toJson(),
Key.SKJEMA_INNTEKTSMELDING to skjema.toJson(SkjemaInntektsmelding.serializer()),
Key.INNSENDING_ID to INNSENDING_ID.toJson(Long.serializer()),
).toJson(),
)

val steg1data =
mapOf(
fun steg1(transaksjonId: UUID): Map<Key, JsonElement> =
steg0(transaksjonId).plusData(
Key.FORESPOERSEL_SVAR to forespoersel.toJson(Forespoersel.serializer()),
)

val steg2data =
mapOf(
fun steg2(transaksjonId: UUID): Map<Key, JsonElement> =
steg1(transaksjonId).plusData(
Key.VIRKSOMHETER to orgnrMedNavn.toJson(orgMapSerializer),
)

val steg3data =
mapOf(
fun steg3(transaksjonId: UUID): Map<Key, JsonElement> =
steg2(transaksjonId).plusData(
Key.PERSONER to personer.toJson(personMapSerializer),
)

val steg4data =
mapOf(
Key.ER_DUPLIKAT_IM to false.toJson(Boolean.serializer()),
Key.INNTEKTSMELDING to inntektsmelding.toJson(InntektsmeldingV1.serializer()),
Key.BESTEMMENDE_FRAVAERSDAG to 20.oktober.toJson(),
fun steg4(transaksjonId: UUID): Map<Key, JsonElement> =
steg3(transaksjonId).plusData(
mapOf(
Key.ER_DUPLIKAT_IM to false.toJson(Boolean.serializer()),
Key.INNTEKTSMELDING to mockInntektsmeldingV1().toJson(InntektsmeldingV1.serializer()),
Key.BESTEMMENDE_FRAVAERSDAG to 20.oktober.toJson(),
),
)

fun steg0(transaksjonId: UUID): Map<Key, JsonElement> =
mapOf(
Key.EVENT_NAME to EventName.INNTEKTSMELDING_SKJEMA_LAGRET.toJson(),
Key.KONTEKST_ID to transaksjonId.toJson(),
Key.DATA to steg0data.toJson(),
)

fun steg1(transaksjonId: UUID): Map<Key, JsonElement> = steg0(transaksjonId).plusData(steg1data)

fun steg2(transaksjonId: UUID): Map<Key, JsonElement> = steg1(transaksjonId).plusData(steg2data)

fun steg3(transaksjonId: UUID): Map<Key, JsonElement> = steg2(transaksjonId).plusData(steg3data)

fun steg4(transaksjonId: UUID): Map<Key, JsonElement> = steg3(transaksjonId).plusData(steg4data)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import no.nav.helsearbeidsgiver.felles.EventName
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.metrics.Metrics
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand All @@ -27,6 +29,7 @@ data class HentVirksomhetMelding(
val behovType: BehovType,
val transaksjonId: UUID,
val data: Map<Key, JsonElement>,
val svarKafkaKey: KafkaKey?,
val orgnr: Set<Orgnr>,
)

Expand All @@ -48,6 +51,7 @@ class HentVirksomhetNavnRiver(
behovType = Key.BEHOV.krev(BehovType.HENT_VIRKSOMHET_NAVN, BehovType.serializer(), json),
transaksjonId = Key.KONTEKST_ID.les(UuidSerializer, json),
data = data,
svarKafkaKey = Key.SVAR_KAFKA_KEY.lesOrNull(KafkaKey.serializer(), data),
orgnr = Key.ORGNR_UNDERENHETER.les(Orgnr.serializer().set(), data),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private object Mock {
mapOf(
Key.ORGNR_UNDERENHETER to orgnr.toJson(Orgnr.serializer()),
),
svarKafkaKey = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er den null fordi det ikke er noen partisjonering i test og den dermed ikke blir brukt til noe?
Eller er det noen tilfeller denne er null i prod også?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Den kan være null i prod også, foreløpig. Nå er det kun servicene med state som må bruke svarKafkaKey, mens de uten state ikke trenger det. Det er mulig at det er bedre om alle bruker det, men det tenkte jeg å se på etter denne PR-en.

orgnr = orgnr,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum class Key : IKey {
SKAL_HA_PAAMINNELSE,
SKJEMA_INNTEKTSMELDING,
SPINN_INNTEKTSMELDING_ID,
SVAR_KAFKA_KEY,
SYKMELDT,
TILGANG,
VEDTAKSPERIODE_ID_LISTE,
Expand Down
Loading
Loading