Skip to content

Commit

Permalink
benytter tbd-sql i jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsteinsland committed Feb 25, 2025
1 parent 6fc3c26 commit 740cdc4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 65 deletions.
1 change: 1 addition & 0 deletions jobs/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation(libs.postgresql)
implementation(libs.hikari)
implementation(libs.cloudsql)
implementation(libs.tbd.sql)
}

tasks {
Expand Down
94 changes: 29 additions & 65 deletions jobs/src/main/kotlin/no/nav/helse/spleis/jobs/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package no.nav.helse.spleis.jobs

import com.github.navikt.tbd_libs.kafka.AivenConfig
import com.github.navikt.tbd_libs.kafka.ConsumerProducerFactory
import com.github.navikt.tbd_libs.sql_dsl.connection
import com.github.navikt.tbd_libs.sql_dsl.firstOrNull
import com.github.navikt.tbd_libs.sql_dsl.long
import com.github.navikt.tbd_libs.sql_dsl.mapNotNull
import com.github.navikt.tbd_libs.sql_dsl.prepareStatementWithNamedParameters
import com.github.navikt.tbd_libs.sql_dsl.single
import com.github.navikt.tbd_libs.sql_dsl.string
import com.github.navikt.tbd_libs.sql_dsl.transaction
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import java.sql.Connection
import java.sql.ResultSet
import java.time.LocalDate
import java.time.LocalDateTime
import java.util.*
Expand Down Expand Up @@ -58,8 +65,8 @@ private fun vacuumTask() {
val ds = DataSourceConfiguration(DbUser.SPLEIS).dataSource()
log.info("Commencing VACUUM FULL")
val duration = measureTime {
ds.connection.use { connection ->
connection.createStatement().execute("VACUUM FULL person")
ds.connection {
createStatement().execute("VACUUM FULL person")
}
}
log.info(
Expand All @@ -81,7 +88,7 @@ private fun migrateV2Task(arbeidId: String, size: Int) {
// låser ned person-raden slik at spleis ikke tar inn meldinger og overskriver mens denne podden holder på
val data = prepareStatement(query).use { stmt ->
stmt.setLong(1, fnr)
stmt.executeQuery().singleOrNull { it.getString("data") }
stmt.executeQuery().firstOrNull { it.string("data") }
}
if (data != null) {
migreringCounter += 1
Expand Down Expand Up @@ -194,18 +201,18 @@ private fun klargjørEllerVentPåTilgjengeligArbeid(connection: Connection, arbe

fun opprettOgUtførArbeid(arbeidId: String, size: Int = 1, arbeider: (connection: Connection, fnr: Long) -> Unit) {
DataSourceConfiguration(DbUser.MIGRATE).dataSource(maximumPoolSize = 1).use { ds ->
ds.connection.use { connection ->
klargjørEllerVentPåTilgjengeligArbeid(connection, arbeidId)
ds.connection {
klargjørEllerVentPåTilgjengeligArbeid(this, arbeidId)
do {
log.info("Forsøker å hente arbeid")
val arbeidsliste = hentArbeid(connection, arbeidId, size)
val arbeidsliste = hentArbeid(this, arbeidId, size)
.also {
if (it.isNotEmpty()) log.info("Fikk ${it.size} stk")
}
.onEach { fnr ->
try {
arbeider(connection, fnr)
arbeidFullført(connection, arbeidId, fnr)
arbeider(this, fnr)
arbeidFullført(this, arbeidId, fnr)
} catch (e: Exception) {
log.error("feil ved arbeidId=$arbeidId: ${e.message}", e)
sikkerlogg.error("feil ved arbeidId=$arbeidId, fnr=$fnr: ${e.message}", e)
Expand Down Expand Up @@ -233,19 +240,20 @@ private fun testSpeilJsonTask(arbeidId: String) {
}

fun hentPerson(connection: Connection, fnr: Long) =
connection.prepareStatement("SELECT data FROM person WHERE fnr = ? ORDER BY id DESC LIMIT 1").use { stmt ->
stmt.setLong(1, fnr)
stmt.executeQuery().single { rs -> rs.getString("data") }
connection.prepareStatementWithNamedParameters("SELECT data FROM person WHERE fnr = :fnr ORDER BY id DESC LIMIT 1") {
withParameter("fnr", fnr)
}.use { stmt ->
stmt.executeQuery().single { rs -> rs.string("data") }
}

private fun migrateTask(factory: ConsumerProducerFactory) {
DataSourceConfiguration(DbUser.MIGRATE).dataSource().use { ds ->
var count = 0L
factory.createProducer().use { producer ->
ds.connection.use { connection ->
connection.prepareStatement("SELECT fnr FROM person").use { stmt ->
ds.connection {
prepareStatement("SELECT fnr FROM person").use { stmt ->
stmt.executeQuery().mapNotNull { row ->
row.getLong("fnr").toString().padStart(11, '0')
row.long("fnr").toString().padStart(11, '0')
}
}
}.forEach { fnr ->
Expand Down Expand Up @@ -274,16 +282,17 @@ private fun avstemmingTask(factory: ConsumerProducerFactory, customDayOfMonth: I
// hvor én bøtte tilsvarer én dag i måneden. Tallet 28 (pga februar) ble valgt slik at vi sikrer oss at vi avstemmer
// alle personer hver måned. Dag 29, 30, 31 avstemmes 0 personer siden det er umulig å ha disse rest-verdiene

ds.connection.use { connection ->
ds.connection {
@Language("PostgreSQL")
val statement = """
SELECT fnr
FROM person
WHERE (1 + mod(fnr, 28)) = ? AND (sist_avstemt is null or sist_avstemt < now() - interval '1 day')
WHERE (1 + mod(fnr, 28)) = :dag AND (sist_avstemt is null or sist_avstemt < now() - interval '1 day')
"""
connection.prepareStatement(statement).use { stmt ->
stmt.setInt(1, dayOfMonth)
stmt.executeQuery().mapNotNull { it.getLong(1) }
prepareStatementWithNamedParameters(statement) {
withParameter("dag", dayOfMonth)
}.use { stmt ->
stmt.executeQuery().mapNotNull { it.long(1) }
}.forEach { fnr ->
val fnrStr = fnr.toString().padStart(11, '0')
producer.send(ProducerRecord("tbd.rapid.v1", fnrStr, lagAvstemming(fnrStr)))
Expand Down Expand Up @@ -347,48 +356,3 @@ private enum class DbUser(private val dbUserPrefix: String) {

override fun toString() = dbUserPrefix
}

// krever minst én rad og at mapping-funksjonen ikke returnerer null
fun <R> ResultSet.single(map: (ResultSet) -> R?): R {
check(next()) { "forventet én rad" }
return checkNotNull(map(this)) { "forventet ikke en null-verdi" }
}

// returnerer null hvis result-settet er tomt eller at mapping-funksjonen gir null
fun <R> ResultSet.singleOrNull(map: (ResultSet) -> R?): R? {
if (!next()) return null
return map(this)
}

fun <R> ResultSet.mapNotNull(map: (ResultSet) -> R?): List<R> =
map(map).filterNotNull()

// siden flere av ResultSet-funksjonene returnerer potensielt null
// så føles det mer riktig å anta at map-funksjonen kan gi en nullable R.
// f.eks. vil ResultSet.getString() returnere `null` hvis kolonnen er lagret som `null` i databasen.
// i kotlin vil typen bli seende som `String!`, som kan godtas både som `String` og `String?` i kotlin.
// Det kan dessuten være legitimt bruksområde å hente ut rader, men bevare `null`-verdien. derfor foretas det ingen filtrering her.
// bruk `mapNotNull()` for å fjerne null-rader / gjøre listen not-null
fun <R> ResultSet.map(map: (ResultSet) -> R?): List<R?> {
return buildList {
while (next()) {
add(map(this@map))
}
}
}

fun <R> Connection.transaction(block: Connection.() -> R): R {
return try {
autoCommit = false
block().also { commit() }
} catch (err: Exception) {
try {
rollback()
} catch (suppressed: Exception) {
err.addSuppressed(suppressed)
}
throw err
} finally {
autoCommit = true
}
}

0 comments on commit 740cdc4

Please sign in to comment.