Skip to content

Commit

Permalink
Lagre åpen inntektsmelding (#403)
Browse files Browse the repository at this point in the history
* Lagre åpen inntektsmelding

* Fjern unødvendige exceptions

* Rett navnefeil i metric

* Slett ubrukt testkode
  • Loading branch information
bjerga authored Jan 23, 2024
1 parent 44fd96a commit bd0c98d
Show file tree
Hide file tree
Showing 24 changed files with 484 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import no.nav.helsearbeidsgiver.felles.rapidsrivers.redis.RedisStore
import no.nav.helsearbeidsgiver.felles.utils.Log
import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer
import no.nav.helsearbeidsgiver.utils.json.toJson
import no.nav.helsearbeidsgiver.utils.json.toJsonStr
import no.nav.helsearbeidsgiver.utils.json.toPretty
import no.nav.helsearbeidsgiver.utils.log.MdcUtils
import no.nav.helsearbeidsgiver.utils.log.logger
Expand All @@ -53,8 +52,7 @@ class AapenImService(
Key.VIRKSOMHET,
Key.ARBEIDSTAKER_INFORMASJON,
Key.ARBEIDSGIVER_INFORMASJON,
Key.AAPEN_INNTEKTMELDING,
Key.ER_DUPLIKAT_IM
Key.AAPEN_INNTEKTMELDING
)

private val step1Data =
Expand Down Expand Up @@ -111,7 +109,7 @@ class AapenImService(
Log.transaksjonId(transaksjonId),
Log.aapenId(aapenId)
) {
if (step1Data.all { it in melding }) {
if (step1Data.all(melding::containsKey)) {
val skjema = Key.SKJEMA_INNTEKTSMELDING.les(SkjemaInntektsmelding.serializer(), melding)
val orgNavn = Key.VIRKSOMHET.les(String.serializer(), melding)
val sykmeldt = Key.ARBEIDSTAKER_INFORMASJON.les(PersonDato.serializer(), melding)
Expand All @@ -125,27 +123,24 @@ class AapenImService(
avsender = avsender
)

logger.debug("Skal sende melding med behov 'BehovType.LAGRE_AAPEN_IM'")
sikkerLogger.debug("Skal sende melding med behov 'BehovType.LAGRE_AAPEN_IM'")
// rapid.publish(
// Key.EVENT_NAME to event.toJson(),
// Key.UUID to transaksjonId.toJson(),
// Key.AAPEN_ID to aapenId.toJson(),
// Key.BEHOV to BehovType.LAGRE_AAPEN_IM.toJson(),
// Key.AAPEN_INNTEKTMELDING to inntektsmelding.toJson(Inntektsmelding.serializer())
// )

// TODO Midlertidig sett svar til im-api
val clientId = redisStore.get(RedisKey.of(transaksjonId, event))!!.let(UUID::fromString)
redisStore.set(RedisKey.of(clientId), inntektsmelding.toJsonStr(Inntektsmelding.serializer()))
rapid.publish(
Key.EVENT_NAME to event.toJson(),
Key.BEHOV to BehovType.LAGRE_AAPEN_IM.toJson(),
Key.UUID to transaksjonId.toJson(),
Key.AAPEN_ID to aapenId.toJson(),
Key.AAPEN_INNTEKTMELDING to inntektsmelding.toJson(Inntektsmelding.serializer())
)
.also {
logger.info("Publiserte melding med behov '${BehovType.LAGRE_AAPEN_IM}'.")
sikkerLogger.info("Publiserte melding:\n${it.toPretty()}")
}
}
}
}

override fun finalize(melding: Map<Key, JsonElement>) {
val transaksjonId = Key.UUID.les(UuidSerializer, melding)
val aapenId = Key.AAPEN_ID.les(UuidSerializer, melding)
val erDuplikat = Key.ER_DUPLIKAT_IM.les(Boolean.serializer(), melding)
val inntektsmeldingJson = Key.AAPEN_INNTEKTMELDING.les(JsonElement.serializer(), melding)

MdcUtils.withLogFields(
Expand All @@ -162,22 +157,20 @@ class AapenImService(
redisStore.set(RedisKey.of(clientId), inntektsmeldingJson.toString())
}

if (!erDuplikat) {
rapid.publish(
Key.EVENT_NAME to EventName.AAPEN_IM_LAGRET.toJson(),
Key.UUID to transaksjonId.toJson(),
Key.AAPEN_ID to aapenId.toJson(),
Key.AAPEN_INNTEKTMELDING to inntektsmeldingJson
)
.also {
MdcUtils.withLogFields(
Log.event(EventName.AAPEN_IM_LAGRET)
) {
logger.info("Publiserte melding.")
sikkerLogger.info("Publiserte melding:\n${it.toPretty()}")
}
rapid.publish(
Key.EVENT_NAME to EventName.AAPEN_IM_LAGRET.toJson(),
Key.UUID to transaksjonId.toJson(),
Key.AAPEN_ID to aapenId.toJson(),
Key.AAPEN_INNTEKTMELDING to inntektsmeldingJson
)
.also {
MdcUtils.withLogFields(
Log.event(EventName.AAPEN_IM_LAGRET)
) {
logger.info("Publiserte melding.")
sikkerLogger.info("Publiserte melding:\n${it.toPretty()}")
}
}
}
}
}

Expand All @@ -194,6 +187,7 @@ class AapenImService(
Key.VIRKSOMHET to "Ukjent virksomhet".toJson()
)
}

BehovType.FULLT_NAVN -> {
val sykmeldtFnr = Key.IDENTITETSNUMMER.les(String.serializer(), melding)
val avsenderFnr = Key.ARBEIDSGIVER_ID.les(String.serializer(), melding)
Expand All @@ -202,6 +196,7 @@ class AapenImService(
Key.ARBEIDSGIVER_INFORMASJON to tomPerson(avsenderFnr).toJson(PersonDato.serializer())
)
}

else -> {
emptyList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class AltinnLoeser(
identitetsnummer = Key.IDENTITETSNUMMER.les(String.serializer(), json)
)

override fun Melding.haandter(): Map<Key, JsonElement> {
override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val rettigheterForenklet =
Metrics.altinnRequest.recordTime {
altinnClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package no.nav.helsearbeidsgiver.inntektsmelding.db

import no.nav.helsearbeidsgiver.domene.inntektsmelding.v1.Inntektsmelding
import no.nav.helsearbeidsgiver.felles.metrics.Metrics
import no.nav.helsearbeidsgiver.felles.metrics.recordTime
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.firstOrNull
import no.nav.helsearbeidsgiver.inntektsmelding.db.tabell.AapenInntektsmeldingEntitet
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Query
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.update
import java.util.UUID

// TODO test
class AapenImRepo(private val db: Database) {

fun hentNyesteIm(aapenId: UUID): Inntektsmelding? =
Metrics.dbAapenIm.recordTime(::hentNyesteIm.name) {
transaction(db) {
hentNyesteImQuery(aapenId)
.firstOrNull(AapenInntektsmeldingEntitet.inntektsmelding)
}
}

fun lagreIm(aapenId: UUID, im: Inntektsmelding) {
Metrics.dbAapenIm.recordTime(::lagreIm.name) {
transaction(db) {
AapenInntektsmeldingEntitet.insert {
it[this.aapenId] = aapenId
it[inntektsmelding] = im
}
}
}
}

fun oppdaterJournalpostId(aapenId: UUID, journalpostId: String) {
Metrics.dbAapenIm.recordTime(::oppdaterJournalpostId.name) {
transaction(db) {
AapenInntektsmeldingEntitet.update(
where = {
(AapenInntektsmeldingEntitet.id eqSubQuery hentNyesteImQuery(aapenId).adjustSlice { slice(AapenInntektsmeldingEntitet.id) }) and
AapenInntektsmeldingEntitet.journalpostId.isNull()
}
) {
it[this.journalpostId] = journalpostId
}
}
}
}
}

private fun hentNyesteImQuery(aapenId: UUID): Query =
AapenInntektsmeldingEntitet
.select {
AapenInntektsmeldingEntitet.aapenId eq aapenId
}
.orderBy(AapenInntektsmeldingEntitet.opprettet, SortOrder.DESC)
.limit(1)
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package no.nav.helsearbeidsgiver.inntektsmelding.db

import com.zaxxer.hikari.HikariConfig
import no.nav.helse.rapids_rivers.RapidApplication
import no.nav.helse.rapids_rivers.RapidsConnection
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.Database
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.DatabaseConfig
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.mapHikariConfig
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.HentAapenImRiver
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.HentOrgnrLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.HentPersistertLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.LagreAapenImRiver
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.LagreEksternInntektsmeldingLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.LagreForespoerselLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.LagreJournalpostIdLoeser
Expand All @@ -16,29 +15,32 @@ import no.nav.helsearbeidsgiver.inntektsmelding.db.river.PersisterImLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.PersisterOppgaveLoeser
import no.nav.helsearbeidsgiver.inntektsmelding.db.river.PersisterSakLoeser
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger

private val logger = "helsearbeidsgiver-im-db".logger()
private val sikkerLogger = sikkerLogger()

fun main() {
buildApp(mapHikariConfig(DatabaseConfig()), System.getenv()).start()
}
val database = Database(Database.Secrets("NAIS_DATABASE_IM_DB_INNTEKTSMELDING"))

fun buildApp(config: HikariConfig, env: Map<String, String>): RapidsConnection {
val database = Database(config)
sikkerLogger.info("Bruker database url: ${config.jdbcUrl}")
logger.info("Migrering starter...")
database.migrate()
logger.info("Migrering ferdig.")

val imRepo = InntektsmeldingRepository(database.db)
val aapenImRepo = AapenImRepo(database.db)
val forespoerselRepo = ForespoerselRepository(database.db)

return RapidApplication
.create(env)
.createDb(database, imRepo, forespoerselRepo)
.create(System.getenv())
.createDbRivers(imRepo, aapenImRepo, forespoerselRepo)
.registerDbLifecycle(database)
.start()
}

fun RapidsConnection.createDb(database: Database, imRepo: InntektsmeldingRepository, forespoerselRepo: ForespoerselRepository): RapidsConnection =
fun RapidsConnection.createDbRivers(
imRepo: InntektsmeldingRepository,
aapenImRepo: AapenImRepo,
forespoerselRepo: ForespoerselRepository
): RapidsConnection =
also {
logger.info("Starter ${LagreForespoerselLoeser::class.simpleName}...")
LagreForespoerselLoeser(this, forespoerselRepo)
Expand Down Expand Up @@ -67,14 +69,19 @@ fun RapidsConnection.createDb(database: Database, imRepo: InntektsmeldingReposit
logger.info("Starter ${LagreEksternInntektsmeldingLoeser::class.simpleName}...")
LagreEksternInntektsmeldingLoeser(this, imRepo)

registerDbLifecycle(database)
logger.info("Starter ${HentAapenImRiver::class.simpleName}...")
HentAapenImRiver(aapenImRepo).connect(this)

logger.info("Starter ${LagreAapenImRiver::class.simpleName}...")
LagreAapenImRiver(aapenImRepo).connect(this)
}

private fun RapidsConnection.registerDbLifecycle(db: Database) {
register(object : RapidsConnection.StatusListener {
override fun onShutdown(rapidsConnection: RapidsConnection) {
logger.info("Mottatt stoppsignal, lukker databasetilkobling")
db.dataSource.close()
}
})
}
fun RapidsConnection.registerDbLifecycle(db: Database): RapidsConnection =
also {
it.register(object : RapidsConnection.StatusListener {
override fun onShutdown(rapidsConnection: RapidsConnection) {
logger.info("Stoppsignal mottatt, lukker databasetilkobling.")
db.dataSource.close()
}
})
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package no.nav.helsearbeidsgiver.inntektsmelding.db

import io.prometheus.client.Summary
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.ForespoerselEntitet
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.firstOrNull
import no.nav.helsearbeidsgiver.inntektsmelding.db.tabell.ForespoerselEntitet
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Expression
import org.jetbrains.exposed.sql.Query
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
Expand Down Expand Up @@ -94,6 +93,3 @@ class ForespoerselRepository(private val db: Database) {
}
}
}

private fun <T> Query.firstOrNull(c: Expression<T>): T? =
firstOrNull()?.getOrNull(c)
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package no.nav.helsearbeidsgiver.inntektsmelding.db
import io.prometheus.client.Summary
import no.nav.helsearbeidsgiver.domene.inntektsmelding.deprecated.Inntektsmelding
import no.nav.helsearbeidsgiver.felles.EksternInntektsmelding
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.InntektsmeldingEntitet
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.InntektsmeldingEntitet.forespoerselId
import no.nav.helsearbeidsgiver.inntektsmelding.db.config.InntektsmeldingEntitet.innsendt
import no.nav.helsearbeidsgiver.inntektsmelding.db.tabell.InntektsmeldingEntitet
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger
import org.jetbrains.exposed.sql.Database
Expand Down Expand Up @@ -59,16 +57,20 @@ class InntektsmeldingRepository(private val db: Database) {
fun hentNyesteEksternEllerInternInntektsmelding(forespørselId: String): Pair<Inntektsmelding?, EksternInntektsmelding?>? {
val requestTimer = requestLatency.labels("hentNyesteInternEllerEkstern").startTimer()
return transaction(db) {
InntektsmeldingEntitet.slice(InntektsmeldingEntitet.dokument, InntektsmeldingEntitet.eksternInntektsmelding).run {
select { (forespoerselId eq forespørselId) }.orderBy(innsendt, SortOrder.DESC)
}.limit(1).map {
Pair(
it[InntektsmeldingEntitet.dokument],
it[InntektsmeldingEntitet.eksternInntektsmelding]
)
}.firstOrNull().also {
requestTimer.observeDuration()
}
InntektsmeldingEntitet.slice(InntektsmeldingEntitet.dokument, InntektsmeldingEntitet.eksternInntektsmelding)
.select { (InntektsmeldingEntitet.forespoerselId eq forespørselId) }
.orderBy(InntektsmeldingEntitet.innsendt, SortOrder.DESC)
.limit(1)
.map {
Pair(
it[InntektsmeldingEntitet.dokument],
it[InntektsmeldingEntitet.eksternInntektsmelding]
)
}
.firstOrNull()
.also {
requestTimer.observeDuration()
}
}
}

Expand Down Expand Up @@ -110,6 +112,6 @@ class InntektsmeldingRepository(private val db: Database) {
InntektsmeldingEntitet.select {
(InntektsmeldingEntitet.forespoerselId eq forespoerselId.toString()) and InntektsmeldingEntitet.dokument.isNotNull()
}
.orderBy(innsendt, SortOrder.DESC)
.orderBy(InntektsmeldingEntitet.innsendt, SortOrder.DESC)
.limit(1)
}
Loading

0 comments on commit bd0c98d

Please sign in to comment.