Skip to content

Commit

Permalink
extract-jdbc: refactor constants (#44482)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta authored Aug 21, 2024
1 parent 8ef222b commit 571a3b6
Show file tree
Hide file tree
Showing 84 changed files with 3,819 additions and 1,534 deletions.
1 change: 0 additions & 1 deletion airbyte-cdk/bulk/core/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,5 @@ dependencies {
}
testFixturesApi 'io.micronaut.test:micronaut-test-core:4.5.0'
testFixturesApi 'io.micronaut.test:micronaut-test-junit5:4.5.0'
testFixturesApi 'com.h2database:h2:2.2.224'
testFixturesApi 'io.github.deblockt:json-diff:1.0.1'
}
2 changes: 0 additions & 2 deletions airbyte-cdk/bulk/core/extract/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ dependencies {
implementation 'hu.webarticum:tree-printer:3.2.1'

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))

testImplementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-extract-jdbc')
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover

import io.airbyte.protocol.models.Field as AirbyteField
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers

/** Stateless object for building an [AirbyteStream] during DISCOVER. */
interface AirbyteStreamFactory {
/** Connector-specific [AirbyteStream] creation logic for GLOBAL-state streams. */
fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream

/** Connector-specific [AirbyteStream] creation logic for STREAM-state streams. */
fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream

companion object {

fun createAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream =
CatalogHelpers.createAirbyteStream(
discoveredStream.name,
discoveredStream.namespace,
discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
},
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ package io.airbyte.cdk.discover
import io.airbyte.cdk.Operation
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.Field as AirbyteField
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand All @@ -18,7 +16,7 @@ import jakarta.inject.Singleton
class DiscoverOperation(
val config: SourceConfiguration,
val metadataQuerierFactory: MetadataQuerier.Factory<SourceConfiguration>,
val airbyteStreamDecorator: AirbyteStreamDecorator,
val airbyteStreamFactory: AirbyteStreamFactory,
val outputConsumer: OutputConsumer,
) : Operation {
private val log = KotlinLogging.logger {}
Expand All @@ -39,50 +37,16 @@ class DiscoverOperation(
}
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(name, namespace)
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey)
airbyteStreams.add(toAirbyteStream(discoveredStream))
val airbyteStream: AirbyteStream =
if (config.global) {
airbyteStreamFactory.createGlobal(discoveredStream)
} else {
airbyteStreamFactory.createNonGlobal(discoveredStream)
}
airbyteStreams.add(airbyteStream)
}
}
}
outputConsumer.accept(AirbyteCatalog().withStreams(airbyteStreams))
}

fun toAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream {
val allColumnsByID: Map<String, Field> = discoveredStream.columns.associateBy { it.id }
val airbyteStream: AirbyteStream =
CatalogHelpers.createAirbyteStream(
discoveredStream.name,
discoveredStream.namespace,
discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
},
)
val isValidPK: Boolean =
discoveredStream.primaryKeyColumnIDs.all { idComponents: List<String> ->
val id: String = idComponents.joinToString(separator = ".")
val field: Field? = allColumnsByID[id]
field != null && airbyteStreamDecorator.isPossiblePrimaryKeyElement(field)
}
airbyteStream.withSourceDefinedPrimaryKey(
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
)
airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty()
if (config.global) {
// There is a global feed of incremental records, like CDC.
airbyteStreamDecorator.decorateGlobal(airbyteStream)
} else if (discoveredStream.columns.any { airbyteStreamDecorator.isPossibleCursor(it) }) {
// There is one field whose values can be round-tripped and aggregated by MAX.
airbyteStreamDecorator.decorateNonGlobal(airbyteStream)
} else {
// There is no such field.
airbyteStreamDecorator.decorateNonGlobalNoCursor(airbyteStream)
}
return airbyteStream
}

data class DiscoveredStream(
val name: String,
val namespace: String?,
val columns: List<Field>,
val primaryKeyColumnIDs: List<List<String>>,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.discover

data class DiscoveredStream(
val name: String,
val namespace: String?,
val columns: List<Field>,
val primaryKeyColumnIDs: List<List<String>>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package io.airbyte.cdk.discover

import io.airbyte.cdk.command.SourceConfiguration

/** A very thin abstraction around JDBC metadata queries. */
/** An abstraction for a catalog discovery session. */
interface MetadataQuerier : AutoCloseable {
/**
* Queries the information_schema for all table names in the schemas specified by the connector
* configuration.
*/

/** Returns all available namespaces. */
fun streamNamespaces(): List<String>

/** Returns all available stream names in the given namespace. */
fun streamNames(streamNamespace: String?): List<String>

/** Executes a SELECT * on the table, discards the results, and extracts all column metadata. */
/** Returns all available fields in the given stream. */
fun fields(
streamName: String,
streamNamespace: String?,
): List<Field>

/** Queries the information_schema for any primary key on the given table. */
/** Returns the primary key for the given stream, if it exists; empty list otherwise. */
fun primaryKey(
streamName: String,
streamNamespace: String?,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.check

import io.airbyte.cdk.Operation
import io.airbyte.cdk.check.CheckOperation
import io.airbyte.cdk.fakesource.FakeSourceConfigurationJsonObject
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.micronaut.context.annotation.Property
Expand All @@ -13,7 +13,7 @@ import org.junit.jupiter.api.Test

@MicronautTest(environments = ["source"], rebuildContext = true)
@Property(name = Operation.PROPERTY, value = "check")
class FakeSourceCheckTest {
class CheckTest {
@Inject lateinit var checkOperation: CheckOperation<FakeSourceConfigurationJsonObject>

@Inject lateinit var outputConsumer: BufferingOutputConsumer
Expand All @@ -22,30 +22,31 @@ class FakeSourceCheckTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.port", value = "-1")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testConfigBadPort() {
assertFailed(" must have a minimum value of 0".toRegex())
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testSuccess() {
assertSucceeded()
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-empty.json")
@Property(name = "metadata.resource", value = "discover/metadata-empty.json")
fun testBadSchema() {
assertFailed("Discovered zero tables".toRegex())
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-column-query-fails.json")
@Property(name = "metadata.resource", value = "discover/metadata-column-query-fails.json")
fun testBadTables() {
assertFailed("Unable to query any of the [0-9]+ discovered table".toRegex())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.command

import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.fakesource.FakeSourceConfiguration
import io.airbyte.cdk.fakesource.UserDefinedCursor
import io.airbyte.cdk.ssh.SshConnectionOptions
import io.airbyte.cdk.ssh.SshPasswordAuthTunnelMethod
import io.micronaut.context.annotation.Property
Expand All @@ -13,7 +14,7 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest(rebuildContext = true)
class FakeSourceConfigurationTest {
class ConfigurationFactoryTest {
@Inject lateinit var actual: SourceConfiguration

@Test
Expand All @@ -39,10 +40,7 @@ class FakeSourceConfigurationTest {
sshTunnel = SshPasswordAuthTunnelMethod("localhost", 22, "sshuser", "secret"),
sshConnectionOptions =
SshConnectionOptions(1_000.milliseconds, 2_000.milliseconds, Duration.ZERO),
jdbcUrlFmt = "jdbc:h2:tcp://%s:%d/mem:testdb",
schemas = setOf("PUBLIC", "TESTSCHEMA"),
cursor = UserDefinedCursor,
resumablePreferred = true,
maxConcurrency = 1,
checkpointTargetInterval = java.time.Duration.ofDays(100L),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigurationJsonObjectSupplierTest {
@Test
fun testSchema() {
Assertions.assertEquals(FakeSourceConfigurationJsonObject::class.java, supplier.javaClass)
val expected: String = ResourceUtils.readResource("command/expected-schema.json")
val expected: String = ResourceUtils.readResource("fakesource/expected-schema.json")
Assertions.assertEquals(Jsons.readTree(expected), supplier.jsonSchema)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.command

import org.junit.jupiter.api.Test

class SyncsTestFixtureTest {

@Test
fun testSpec() {
SyncsTestFixture.testSpec("fakesource/expected-spec.json")
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.discover

import io.airbyte.cdk.Operation
import io.airbyte.cdk.discover.DiscoverOperation
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteCatalog
Expand All @@ -16,7 +15,7 @@ import org.junit.jupiter.api.Test

@MicronautTest(environments = ["source"], rebuildContext = true)
@Property(name = Operation.PROPERTY, value = "discover")
class FakeSourceDiscoverTest {
class DiscoverTest {
@Inject lateinit var discoverOperation: DiscoverOperation

@Inject lateinit var outputConsumer: BufferingOutputConsumer
Expand All @@ -25,14 +24,15 @@ class FakeSourceDiscoverTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "airbyte.connector.config.cursor", value = "user_defined")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testCursorBasedIncremental() {
val events =
AirbyteStream()
.withName("EVENTS")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
Expand All @@ -41,6 +41,7 @@ class FakeSourceDiscoverTest {
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
Expand All @@ -52,14 +53,15 @@ class FakeSourceDiscoverTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "airbyte.connector.config.cursor", value = "cdc")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testCdcIncremental() {
val events =
AirbyteStream()
.withName("EVENTS")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
Expand All @@ -68,6 +70,7 @@ class FakeSourceDiscoverTest {
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
Expand Down
Loading

0 comments on commit 571a3b6

Please sign in to comment.