diff --git a/.github/workflows/platform_build_and_deploy.yaml b/.github/workflows/platform_build_and_deploy.yaml index 06beca527..f5ccb0882 100644 --- a/.github/workflows/platform_build_and_deploy.yaml +++ b/.github/workflows/platform_build_and_deploy.yaml @@ -101,6 +101,7 @@ jobs: - skedulert-harddelete - skedulert-paaminnelse - dataprodukt + - kafka-bq #- manuelt-vedlikehold steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/platform_deploy_dev_manual.yaml b/.github/workflows/platform_deploy_dev_manual.yaml index 2e98e4fe3..9c58f3ab3 100644 --- a/.github/workflows/platform_deploy_dev_manual.yaml +++ b/.github/workflows/platform_deploy_dev_manual.yaml @@ -25,6 +25,7 @@ jobs: - skedulert-harddelete - skedulert-paaminnelse - dataprodukt + - kafka-bq #- manuelt-vedlikehold steps: - uses: actions/checkout@v4 @@ -41,4 +42,4 @@ jobs: VAR: image=${{ steps.login.outputs.registry }}/arbeidsgiver-notifikasjon-produsent-api:${{ github.sha }} REF: ${{ github.sha }} CLUSTER: ${{ matrix.cluster }} - RESOURCE: app/nais/${{ matrix.cluster }}-${{ matrix.app }}.yaml \ No newline at end of file + RESOURCE: app/nais/${{ matrix.cluster }}-${{ matrix.app }}.yaml diff --git a/app/nais/dev-gcp-kafka-bq.yaml b/app/nais/dev-gcp-kafka-bq.yaml new file mode 100644 index 000000000..14cc396f5 --- /dev/null +++ b/app/nais/dev-gcp-kafka-bq.yaml @@ -0,0 +1,36 @@ +apiVersion: nais.io/v1alpha1 +kind: Application +metadata: + name: notifikasjon-kafka-bq + namespace: fager + labels: + team: fager +spec: + image: {{image}} + resources: + requests: + cpu: 200m + memory: 256Mi + limits: + memory: 512Mi + liveness: + path: /internal/alive + readiness: + path: /internal/ready + replicas: + min: 1 + max: 1 + prometheus: + enabled: true + path: /internal/metrics + kafka: + pool: nav-dev + gcp: + bigQueryDatasets: + - name: debughendelse + permission: READWRITE + env: + - name: BIGQUERY_DATASET_ID + value: debughendelse + - name: BIGQUERY_TABLE_NAME + value: notifikasjon diff --git a/app/nais/prod-gcp-kafka-bq.yaml b/app/nais/prod-gcp-kafka-bq.yaml new file mode 100644 index 000000000..69a449c75 --- /dev/null +++ b/app/nais/prod-gcp-kafka-bq.yaml @@ -0,0 +1,36 @@ +apiVersion: nais.io/v1alpha1 +kind: Application +metadata: + name: notifikasjon-kafka-bq + namespace: fager + labels: + team: fager +spec: + image: {{image}} + resources: + requests: + cpu: 200m + memory: 256Mi + limits: + memory: 512Mi + liveness: + path: /internal/alive + readiness: + path: /internal/ready + replicas: + min: 1 + max: 1 + prometheus: + enabled: true + path: /internal/metrics + kafka: + pool: nav-prod + gcp: + bigQueryDatasets: + - name: debughendelse + permission: READWRITE + env: + - name: BIGQUERY_DATASET_ID + value: debughendelse + - name: BIGQUERY_TABLE_NAME + value: notifikasjon diff --git a/app/pom.xml b/app/pom.xml index 423717784..b73890e6a 100644 --- a/app/pom.xml +++ b/app/pom.xml @@ -200,6 +200,13 @@ 3.7.1 + + + com.google.cloud + google-cloud-bigquery + 2.47.0 + + org.postgresql diff --git a/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/Main.kt b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/Main.kt index 7978d9611..39d80563f 100644 --- a/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/Main.kt +++ b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/Main.kt @@ -11,6 +11,7 @@ import no.nav.arbeidsgiver.notifikasjon.kafka_backup.KafkaBackup import no.nav.arbeidsgiver.notifikasjon.kafka_reaper.KafkaReaper import no.nav.arbeidsgiver.notifikasjon.manuelt_vedlikehold.ManueltVedlikehold import no.nav.arbeidsgiver.notifikasjon.produsent.Produsent +import no.nav.arbeidsgiver.notifikasjon.kafka_bq.KafkaBQ import no.nav.arbeidsgiver.notifikasjon.replay_validator.ReplayValidator import no.nav.arbeidsgiver.notifikasjon.skedulert_påminnelse.SkedulertPåminnelse import no.nav.arbeidsgiver.notifikasjon.skedulert_utgått.SkedulertUtgått @@ -38,6 +39,7 @@ fun main(@Suppress("UNUSED_PARAMETER") args: Array) { "notifikasjon-hendelse-transformer" -> HendelseTransformer.main() "notifikasjon-dataprodukt" -> Dataprodukt.main() "notifikasjon-manuelt-vedlikehold" -> ManueltVedlikehold.main() + "notifikasjon-kafka-bq" -> KafkaBQ.main() else -> Main.log.error("ukjent \$NAIS_APP_NAME '$navn'") } } catch (e: Exception) { diff --git a/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/infrastruktur/BigQueryClient.kt b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/infrastruktur/BigQueryClient.kt new file mode 100644 index 000000000..014c8d961 --- /dev/null +++ b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/infrastruktur/BigQueryClient.kt @@ -0,0 +1,45 @@ +package no.nav.arbeidsgiver.notifikasjon.infrastruktur + +import com.google.cloud.bigquery.* + +interface BigQueryClient { + fun getOrCreateTable(tableName: String, schema: Schema): TableId + fun insert(tableId: TableId, insertId: String, row: Map) +} + +class BigQueryClientImpl( + projectId: String, + private val datasetId: String, +) : BigQueryClient { + private val bigQuery = BigQueryOptions.newBuilder() + .setProjectId(projectId) + .build() + .service + + override fun getOrCreateTable(tableName: String, schema: Schema): TableId { + val tableId = TableId.of(datasetId, tableName) + val existingTable = bigQuery.getTable(tableId) + + return if (existingTable != null) { + tableId + } else { + createTable(tableId, schema) + } + } + + private fun createTable(tableId: TableId, schema: Schema): TableId { + val tableDefinition = StandardTableDefinition.of(schema) + val tableInfo = TableInfo.of(tableId, tableDefinition) + return bigQuery.create(tableInfo).tableId + } + + override fun insert(tableId: TableId, insertId: String, row: Map) { + val insertResponse = bigQuery.getTable(tableId).insert( + listOf(InsertAllRequest.RowToInsert.of(insertId, row)) + ) + + if (insertResponse.hasErrors()) { + throw RuntimeException("Lagring i BigQuery feilet: '${insertResponse.insertErrors}'") + } + } +} \ No newline at end of file diff --git a/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseService.kt b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseService.kt new file mode 100644 index 000000000..69efada36 --- /dev/null +++ b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseService.kt @@ -0,0 +1,35 @@ +package no.nav.arbeidsgiver.notifikasjon.kafka_bq + +import com.google.cloud.bigquery.Field +import com.google.cloud.bigquery.Schema +import com.google.cloud.bigquery.StandardSQLTypeName +import com.google.cloud.bigquery.TableId +import no.nav.arbeidsgiver.notifikasjon.hendelse.HendelseModel +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClient +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.json.laxObjectMapper + +class BigQueryHendelseService( + private val bigQueryClient: BigQueryClient, + private val tableName: String, +) { + private val tableId: TableId by lazy { + bigQueryClient.getOrCreateTable(tableName, schema) + } + + private val schema = Schema.of( + Field.of("hendelseId", StandardSQLTypeName.STRING), + Field.of("aggregateId", StandardSQLTypeName.STRING), + Field.of("virksomhetsnummer", StandardSQLTypeName.STRING), + Field.of("data", StandardSQLTypeName.JSON) + ) + + fun insertHendelse(hendelse: HendelseModel.Hendelse) { + val row = mapOf( + "hendelseId" to hendelse.hendelseId.toString(), + "aggregateId" to hendelse.aggregateId.toString(), + "virksomhetsnummer" to hendelse.virksomhetsnummer, + "data" to laxObjectMapper.writeValueAsString(hendelse) + ) + bigQueryClient.insert(tableId, hendelse.hendelseId.toString(), row) + } +} \ No newline at end of file diff --git a/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/KafkaBQ.kt b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/KafkaBQ.kt new file mode 100644 index 000000000..c151ec0f4 --- /dev/null +++ b/app/src/main/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/KafkaBQ.kt @@ -0,0 +1,44 @@ +package no.nav.arbeidsgiver.notifikasjon.kafka_bq + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClientImpl +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.Health +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.Subsystem +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.http.launchHttpServer +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.kafka.HendelsesstrømKafkaImpl + +object KafkaBQ { + private val hendelsesstrøm by lazy { + HendelsesstrømKafkaImpl(groupId = "kafka-bq-v1") + } + private val projectId = System.getenv("GCP_TEAM_PROJECT_ID") + ?: error("Missing required environment variable: GCP_TEAM_PROJECT_ID") + private val datasetId = System.getenv("BIGQUERY_DATASET_ID") + ?: error("Missing required environment variable: BIGQUERY_DATASET_ID") + private val tableName = System.getenv("BIGQUERY_TABLE_NAME") + ?: error("Missing required environment variable: BIGQUERY_TABLE_NAME") + + private val bigQueryHendelseService = BigQueryHendelseService( + tableName = tableName, + bigQueryClient = BigQueryClientImpl( + projectId = projectId, + datasetId = datasetId + ), + ) + + fun main(httpPort: Int = 8080) { + runBlocking(Dispatchers.Default) { + Health.subsystemReady[Subsystem.DATABASE] = true + + launch { + hendelsesstrøm.forEach { hendelse -> + bigQueryHendelseService.insertHendelse(hendelse) + } + } + + launchHttpServer(httpPort = httpPort) + } + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseServiceTests.kt b/app/src/test/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseServiceTests.kt new file mode 100644 index 000000000..f3b3cc922 --- /dev/null +++ b/app/src/test/kotlin/no/nav/arbeidsgiver/notifikasjon/kafka_bq/BigQueryHendelseServiceTests.kt @@ -0,0 +1,52 @@ +package no.nav.arbeidsgiver.notifikasjon.kafka_bq + +import com.google.cloud.bigquery.Schema +import com.google.cloud.bigquery.TableId +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import no.nav.arbeidsgiver.notifikasjon.infrastruktur.BigQueryClient +import no.nav.arbeidsgiver.notifikasjon.util.EksempelHendelse + +class BigQueryHendelseServiceTest : FunSpec({ + + test("insertHendelse should store the correct row in the BigQuery Local Client") { + val datasetId = "myLocalDataset" + val tableName = "myLocalTable" + + val localBigQueryClient = LocalBigQueryClient(datasetId) + + val bigQueryHendelseService = BigQueryHendelseService( + bigQueryClient = localBigQueryClient, + tableName = tableName + ) + + val testHendelse = EksempelHendelse.SakOpprettet + + bigQueryHendelseService.insertHendelse(testHendelse) + + val tableId = TableId.of(datasetId, tableName) + val rows = localBigQueryClient.tableData[tableId] + + rows?.size shouldBe 1 + + val insertedRow = rows?.first() + insertedRow?.get("hendelseId") shouldBe testHendelse.hendelseId.toString() + + val jsonData = insertedRow?.get("data") as String + jsonData.contains(testHendelse.hendelseId.toString()) shouldBe true + } +}) + +class LocalBigQueryClient(private val datasetId: String) : BigQueryClient { + val tableData = mutableMapOf>>() + + override fun getOrCreateTable(tableName: String, schema: Schema): TableId { + val tableId = TableId.of(datasetId, tableName) + tableData.putIfAbsent(tableId, mutableListOf()) + return tableId + } + + override fun insert(tableId: TableId, insertId: String, row: Map) { + tableData[tableId]?.add(row) + } +} \ No newline at end of file