Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extract-jdbc: refactor constants #44482

Merged
Merged
1 change: 0 additions & 1 deletion airbyte-cdk/bulk/core/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,5 @@ dependencies {
}
testFixturesApi 'io.micronaut.test:micronaut-test-core:4.5.0'
testFixturesApi 'io.micronaut.test:micronaut-test-junit5:4.5.0'
testFixturesApi 'com.h2database:h2:2.2.224'
testFixturesApi 'io.github.deblockt:json-diff:1.0.1'
}
2 changes: 0 additions & 2 deletions airbyte-cdk/bulk/core/extract/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ dependencies {
implementation 'hu.webarticum:tree-printer:3.2.1'

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))

testImplementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-extract-jdbc')
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover

import io.airbyte.protocol.models.Field as AirbyteField
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers

/** Stateless object for building an [AirbyteStream] during DISCOVER. */
interface AirbyteStreamFactory {
/** Connector-specific [AirbyteStream] creation logic for GLOBAL-state streams. */
fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream

/** Connector-specific [AirbyteStream] creation logic for STREAM-state streams. */
fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream

companion object {

fun createAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream =
CatalogHelpers.createAirbyteStream(
discoveredStream.name,
discoveredStream.namespace,
discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
},
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ package io.airbyte.cdk.discover
import io.airbyte.cdk.Operation
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.Field as AirbyteField
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand All @@ -18,7 +16,7 @@ import jakarta.inject.Singleton
class DiscoverOperation(
val config: SourceConfiguration,
val metadataQuerierFactory: MetadataQuerier.Factory<SourceConfiguration>,
val airbyteStreamDecorator: AirbyteStreamDecorator,
val airbyteStreamFactory: AirbyteStreamFactory,
val outputConsumer: OutputConsumer,
) : Operation {
private val log = KotlinLogging.logger {}
Expand All @@ -39,50 +37,16 @@ class DiscoverOperation(
}
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(name, namespace)
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey)
airbyteStreams.add(toAirbyteStream(discoveredStream))
val airbyteStream: AirbyteStream =
if (config.global) {
airbyteStreamFactory.createGlobal(discoveredStream)
} else {
airbyteStreamFactory.createNonGlobal(discoveredStream)
}
airbyteStreams.add(airbyteStream)
}
}
}
outputConsumer.accept(AirbyteCatalog().withStreams(airbyteStreams))
}

fun toAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream {
val allColumnsByID: Map<String, Field> = discoveredStream.columns.associateBy { it.id }
val airbyteStream: AirbyteStream =
CatalogHelpers.createAirbyteStream(
discoveredStream.name,
discoveredStream.namespace,
discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
},
)
val isValidPK: Boolean =
discoveredStream.primaryKeyColumnIDs.all { idComponents: List<String> ->
val id: String = idComponents.joinToString(separator = ".")
val field: Field? = allColumnsByID[id]
field != null && airbyteStreamDecorator.isPossiblePrimaryKeyElement(field)
}
airbyteStream.withSourceDefinedPrimaryKey(
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
)
airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty()
if (config.global) {
// There is a global feed of incremental records, like CDC.
airbyteStreamDecorator.decorateGlobal(airbyteStream)
} else if (discoveredStream.columns.any { airbyteStreamDecorator.isPossibleCursor(it) }) {
// There is one field whose values can be round-tripped and aggregated by MAX.
airbyteStreamDecorator.decorateNonGlobal(airbyteStream)
} else {
// There is no such field.
airbyteStreamDecorator.decorateNonGlobalNoCursor(airbyteStream)
}
return airbyteStream
}

data class DiscoveredStream(
val name: String,
val namespace: String?,
val columns: List<Field>,
val primaryKeyColumnIDs: List<List<String>>,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.discover

data class DiscoveredStream(
val name: String,
val namespace: String?,
val columns: List<Field>,
val primaryKeyColumnIDs: List<List<String>>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package io.airbyte.cdk.discover

import io.airbyte.cdk.command.SourceConfiguration

/** A very thin abstraction around JDBC metadata queries. */
/** An abstraction for a catalog discovery session. */
interface MetadataQuerier : AutoCloseable {
/**
* Queries the information_schema for all table names in the schemas specified by the connector
* configuration.
*/

/** Returns all available namespaces. */
fun streamNamespaces(): List<String>

/** Returns all available stream names in the given namespace. */
fun streamNames(streamNamespace: String?): List<String>

/** Executes a SELECT * on the table, discards the results, and extracts all column metadata. */
/** Returns all available fields in the given stream. */
fun fields(
streamName: String,
streamNamespace: String?,
): List<Field>

/** Queries the information_schema for any primary key on the given table. */
/** Returns the primary key for the given stream, if it exists; empty list otherwise. */
fun primaryKey(
streamName: String,
streamNamespace: String?,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.check

import io.airbyte.cdk.Operation
import io.airbyte.cdk.check.CheckOperation
import io.airbyte.cdk.fakesource.FakeSourceConfigurationJsonObject
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.micronaut.context.annotation.Property
Expand All @@ -13,7 +13,7 @@ import org.junit.jupiter.api.Test

@MicronautTest(environments = ["source"], rebuildContext = true)
@Property(name = Operation.PROPERTY, value = "check")
class FakeSourceCheckTest {
class CheckTest {
@Inject lateinit var checkOperation: CheckOperation<FakeSourceConfigurationJsonObject>

@Inject lateinit var outputConsumer: BufferingOutputConsumer
Expand All @@ -22,30 +22,31 @@ class FakeSourceCheckTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.port", value = "-1")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testConfigBadPort() {
assertFailed(" must have a minimum value of 0".toRegex())
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testSuccess() {
assertSucceeded()
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-empty.json")
@Property(name = "metadata.resource", value = "discover/metadata-empty.json")
fun testBadSchema() {
assertFailed("Discovered zero tables".toRegex())
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "metadata.resource", value = "fakesource/metadata-column-query-fails.json")
@Property(name = "metadata.resource", value = "discover/metadata-column-query-fails.json")
fun testBadTables() {
assertFailed("Unable to query any of the [0-9]+ discovered table".toRegex())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.command

import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.fakesource.FakeSourceConfiguration
import io.airbyte.cdk.fakesource.UserDefinedCursor
import io.airbyte.cdk.ssh.SshConnectionOptions
import io.airbyte.cdk.ssh.SshPasswordAuthTunnelMethod
import io.micronaut.context.annotation.Property
Expand All @@ -13,7 +14,7 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest(rebuildContext = true)
class FakeSourceConfigurationTest {
class ConfigurationFactoryTest {
@Inject lateinit var actual: SourceConfiguration

@Test
Expand All @@ -39,10 +40,7 @@ class FakeSourceConfigurationTest {
sshTunnel = SshPasswordAuthTunnelMethod("localhost", 22, "sshuser", "secret"),
sshConnectionOptions =
SshConnectionOptions(1_000.milliseconds, 2_000.milliseconds, Duration.ZERO),
jdbcUrlFmt = "jdbc:h2:tcp://%s:%d/mem:testdb",
schemas = setOf("PUBLIC", "TESTSCHEMA"),
cursor = UserDefinedCursor,
resumablePreferred = true,
maxConcurrency = 1,
checkpointTargetInterval = java.time.Duration.ofDays(100L),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigurationJsonObjectSupplierTest {
@Test
fun testSchema() {
Assertions.assertEquals(FakeSourceConfigurationJsonObject::class.java, supplier.javaClass)
val expected: String = ResourceUtils.readResource("command/expected-schema.json")
val expected: String = ResourceUtils.readResource("fakesource/expected-schema.json")
Assertions.assertEquals(Jsons.readTree(expected), supplier.jsonSchema)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.command

import org.junit.jupiter.api.Test

class SyncsTestFixtureTest {

@Test
fun testSpec() {
SyncsTestFixture.testSpec("fakesource/expected-spec.json")
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.fakesource
package io.airbyte.cdk.discover

import io.airbyte.cdk.Operation
import io.airbyte.cdk.discover.DiscoverOperation
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteCatalog
Expand All @@ -16,7 +15,7 @@ import org.junit.jupiter.api.Test

@MicronautTest(environments = ["source"], rebuildContext = true)
@Property(name = Operation.PROPERTY, value = "discover")
class FakeSourceDiscoverTest {
class DiscoverTest {
@Inject lateinit var discoverOperation: DiscoverOperation

@Inject lateinit var outputConsumer: BufferingOutputConsumer
Expand All @@ -25,14 +24,15 @@ class FakeSourceDiscoverTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "airbyte.connector.config.cursor", value = "user_defined")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testCursorBasedIncremental() {
val events =
AirbyteStream()
.withName("EVENTS")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
Expand All @@ -41,6 +41,7 @@ class FakeSourceDiscoverTest {
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
Expand All @@ -52,14 +53,15 @@ class FakeSourceDiscoverTest {
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.database", value = "testdb")
@Property(name = "airbyte.connector.config.cursor", value = "cdc")
@Property(name = "metadata.resource", value = "fakesource/metadata-valid.json")
@Property(name = "metadata.resource", value = "discover/metadata-valid.json")
fun testCdcIncremental() {
val events =
AirbyteStream()
.withName("EVENTS")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
Expand All @@ -68,6 +70,7 @@ class FakeSourceDiscoverTest {
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
Expand Down
Loading
Loading