Skip to content

Commit

Permalink
Merge pull request #830 from navikt/kafka_bq_sink
Browse files Browse the repository at this point in the history
Legger til kafka BigQuery sink app
  • Loading branch information
maccyber authored Feb 20, 2025
2 parents 5a34f67 + d850370 commit e5d10fd
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/platform_build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jobs:
- skedulert-harddelete
- skedulert-paaminnelse
- dataprodukt
- kafka-bq
#- manuelt-vedlikehold
steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/platform_deploy_dev_manual.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- skedulert-harddelete
- skedulert-paaminnelse
- dataprodukt
- kafka-bq
#- manuelt-vedlikehold
steps:
- uses: actions/checkout@v4
Expand All @@ -41,4 +42,4 @@ jobs:
VAR: image=${{ steps.login.outputs.registry }}/arbeidsgiver-notifikasjon-produsent-api:${{ github.sha }}
REF: ${{ github.sha }}
CLUSTER: ${{ matrix.cluster }}
RESOURCE: app/nais/${{ matrix.cluster }}-${{ matrix.app }}.yaml
RESOURCE: app/nais/${{ matrix.cluster }}-${{ matrix.app }}.yaml
36 changes: 36 additions & 0 deletions app/nais/dev-gcp-kafka-bq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: nais.io/v1alpha1
kind: Application
metadata:
name: notifikasjon-kafka-bq
namespace: fager
labels:
team: fager
spec:
image: {{image}}
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
memory: 512Mi
liveness:
path: /internal/alive
readiness:
path: /internal/ready
replicas:
min: 1
max: 1
prometheus:
enabled: true
path: /internal/metrics
kafka:
pool: nav-dev
gcp:
bigQueryDatasets:
- name: debughendelse
permission: READWRITE
env:
- name: BIGQUERY_DATASET_ID
value: debughendelse
- name: BIGQUERY_TABLE_NAME
value: notifikasjon
36 changes: 36 additions & 0 deletions app/nais/prod-gcp-kafka-bq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: nais.io/v1alpha1
kind: Application
metadata:
name: notifikasjon-kafka-bq
namespace: fager
labels:
team: fager
spec:
image: {{image}}
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
memory: 512Mi
liveness:
path: /internal/alive
readiness:
path: /internal/ready
replicas:
min: 1
max: 1
prometheus:
enabled: true
path: /internal/metrics
kafka:
pool: nav-prod
gcp:
bigQueryDatasets:
- name: debughendelse
permission: READWRITE
env:
- name: BIGQUERY_DATASET_ID
value: debughendelse
- name: BIGQUERY_TABLE_NAME
value: notifikasjon
7 changes: 7 additions & 0 deletions app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@
<version>3.7.1</version>
</dependency>

<!-- big query -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.47.0</version>
</dependency>

<!-- db -->
<dependency>
<groupId>org.postgresql</groupId>
Expand Down
2 changes: 2 additions & 0 deletions app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import no.nav.arbeidsgiver.notifikasjon.kafka_backup.KafkaBackup
import no.nav.arbeidsgiver.notifikasjon.kafka_reaper.KafkaReaper
import no.nav.arbeidsgiver.notifikasjon.manuelt_vedlikehold.ManueltVedlikehold
import no.nav.arbeidsgiver.notifikasjon.produsent.Produsent
import no.nav.arbeidsgiver.notifikasjon.kafka_bq.KafkaBQ
import no.nav.arbeidsgiver.notifikasjon.replay_validator.ReplayValidator
import no.nav.arbeidsgiver.notifikasjon.skedulert_påminnelse.SkedulertPåminnelse
import no.nav.arbeidsgiver.notifikasjon.skedulert_utgått.SkedulertUtgått
Expand Down Expand Up @@ -38,6 +39,7 @@ fun main(@Suppress("UNUSED_PARAMETER") args: Array<String>) {
"notifikasjon-hendelse-transformer" -> HendelseTransformer.main()
"notifikasjon-dataprodukt" -> Dataprodukt.main()
"notifikasjon-manuelt-vedlikehold" -> ManueltVedlikehold.main()
"notifikasjon-kafka-bq" -> KafkaBQ.main()
else -> Main.log.error("ukjent \$NAIS_APP_NAME '$navn'")
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package no.nav.arbeidsgiver.notifikasjon.infrastruktur

import com.google.cloud.bigquery.*

interface BigQueryClient {
fun getOrCreateTable(tableName: String, schema: Schema): TableId
fun insert(tableId: TableId, insertId: String, row: Map<String, Any>)
}

class BigQueryClientImpl(
projectId: String,
private val datasetId: String,
) : BigQueryClient {
private val bigQuery = BigQueryOptions.newBuilder()
.setProjectId(projectId)
.build()
.service

override fun getOrCreateTable(tableName: String, schema: Schema): TableId {
val tableId = TableId.of(datasetId, tableName)
val existingTable = bigQuery.getTable(tableId)

return if (existingTable != null) {
tableId
} else {
createTable(tableId, schema)
}
}

private fun createTable(tableId: TableId, schema: Schema): TableId {
val tableDefinition = StandardTableDefinition.of(schema)
val tableInfo = TableInfo.of(tableId, tableDefinition)
return bigQuery.create(tableInfo).tableId
}

override fun insert(tableId: TableId, insertId: String, row: Map<String, Any>) {
val insertResponse = bigQuery.getTable(tableId).insert(
listOf(InsertAllRequest.RowToInsert.of(insertId, row))
)

if (insertResponse.hasErrors()) {
throw RuntimeException("Lagring i BigQuery feilet: '${insertResponse.insertErrors}'")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package no.nav.arbeidsgiver.notifikasjon.kafka_bq

import com.google.cloud.bigquery.Field
import com.google.cloud.bigquery.Schema
import com.google.cloud.bigquery.StandardSQLTypeName
import com.google.cloud.bigquery.TableId
import no.nav.arbeidsgiver.notifikasjon.hendelse.HendelseModel
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClient
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.json.laxObjectMapper

class BigQueryHendelseService(
private val bigQueryClient: BigQueryClient,
private val tableName: String,
) {
private val tableId: TableId by lazy {
bigQueryClient.getOrCreateTable(tableName, schema)
}

private val schema = Schema.of(
Field.of("hendelseId", StandardSQLTypeName.STRING),
Field.of("aggregateId", StandardSQLTypeName.STRING),
Field.of("virksomhetsnummer", StandardSQLTypeName.STRING),
Field.of("data", StandardSQLTypeName.JSON)
)

fun insertHendelse(hendelse: HendelseModel.Hendelse) {
val row = mapOf(
"hendelseId" to hendelse.hendelseId.toString(),
"aggregateId" to hendelse.aggregateId.toString(),
"virksomhetsnummer" to hendelse.virksomhetsnummer,
"data" to laxObjectMapper.writeValueAsString(hendelse)
)
bigQueryClient.insert(tableId, hendelse.hendelseId.toString(), row)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package no.nav.arbeidsgiver.notifikasjon.kafka_bq

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClientImpl
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.Health
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.Subsystem
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.http.launchHttpServer
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.kafka.HendelsesstrømKafkaImpl

object KafkaBQ {
private val hendelsesstrøm by lazy {
HendelsesstrømKafkaImpl(groupId = "kafka-bq-v1")
}
private val projectId = System.getenv("GCP_TEAM_PROJECT_ID")
?: error("Missing required environment variable: GCP_TEAM_PROJECT_ID")
private val datasetId = System.getenv("BIGQUERY_DATASET_ID")
?: error("Missing required environment variable: BIGQUERY_DATASET_ID")
private val tableName = System.getenv("BIGQUERY_TABLE_NAME")
?: error("Missing required environment variable: BIGQUERY_TABLE_NAME")

private val bigQueryHendelseService = BigQueryHendelseService(
tableName = tableName,
bigQueryClient = BigQueryClientImpl(
projectId = projectId,
datasetId = datasetId
),
)

fun main(httpPort: Int = 8080) {
runBlocking(Dispatchers.Default) {
Health.subsystemReady[Subsystem.DATABASE] = true

launch {
hendelsesstrøm.forEach { hendelse ->
bigQueryHendelseService.insertHendelse(hendelse)
}
}

launchHttpServer(httpPort = httpPort)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package no.nav.arbeidsgiver.notifikasjon.kafka_bq

import com.google.cloud.bigquery.Schema
import com.google.cloud.bigquery.TableId
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClient
import no.nav.arbeidsgiver.notifikasjon.util.EksempelHendelse

class BigQueryHendelseServiceTest : FunSpec({

test("insertHendelse should store the correct row in the BigQuery Local Client") {
val datasetId = "myLocalDataset"
val tableName = "myLocalTable"

val localBigQueryClient = LocalBigQueryClient(datasetId)

val bigQueryHendelseService = BigQueryHendelseService(
bigQueryClient = localBigQueryClient,
tableName = tableName
)

val testHendelse = EksempelHendelse.SakOpprettet

bigQueryHendelseService.insertHendelse(testHendelse)

val tableId = TableId.of(datasetId, tableName)
val rows = localBigQueryClient.tableData[tableId]

rows?.size shouldBe 1

val insertedRow = rows?.first()
insertedRow?.get("hendelseId") shouldBe testHendelse.hendelseId.toString()

val jsonData = insertedRow?.get("data") as String
jsonData.contains(testHendelse.hendelseId.toString()) shouldBe true
}
})

class LocalBigQueryClient(private val datasetId: String) : BigQueryClient {
val tableData = mutableMapOf<TableId, MutableList<Map<String, Any>>>()

override fun getOrCreateTable(tableName: String, schema: Schema): TableId {
val tableId = TableId.of(datasetId, tableName)
tableData.putIfAbsent(tableId, mutableListOf())
return tableId
}

override fun insert(tableId: TableId, insertId: String, row: Map<String, Any>) {
tableData[tableId]?.add(row)
}
}

0 comments on commit e5d10fd

Please sign in to comment.