Skip to content

Commit

Permalink
Load CDK: Expanded namespace coverage (#52074)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Feb 4, 2025
1 parent 2556cc2 commit 51a07bb
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.test.util

interface DefaultNamespaceProvider {
fun get(randomNamespace: String): String?
}

class DefaultDefaultNamespaceProvider : DefaultNamespaceProvider {
override fun get(randomNamespace: String): String? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import io.airbyte.cdk.load.message.InputStreamCheckpoint
import io.airbyte.cdk.load.message.Meta.Change
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.ConfigurationUpdater
import io.airbyte.cdk.load.test.util.DefaultDefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
Expand Down Expand Up @@ -151,6 +153,7 @@ abstract class BasicFunctionalityIntegrationTest(
nameMapper: NameMapper = NoopNameMapper,
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<Property, String> = emptyMap(),
val defaultNamespaceProvider: DefaultNamespaceProvider = DefaultDefaultNamespaceProvider(),
/**
* Whether to actually verify that the connector wrote data to the destination. This should only
* ever be disabled for test destinations (dev-null, etc.).
Expand Down Expand Up @@ -455,9 +458,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testNamespaces() {
assumeTrue(verifyDataWriting)
fun makeStream(namespace: String) =
fun makeStream(namespace: String?, name: String = "test_stream") =
DestinationStream(
DestinationStream.Descriptor(namespace, "test_stream"),
DestinationStream.Descriptor(namespace, name),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
Expand All @@ -466,12 +469,14 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
val streamWithDefaultNamespace = makeStream(null, randomizedNamespace + "_stream")
runSync(
updatedConfig,
DestinationCatalog(
listOf(
stream1,
stream2,
streamWithDefaultNamespace,
)
),
listOf(
Expand All @@ -487,6 +492,12 @@ abstract class BasicFunctionalityIntegrationTest(
data = """{"id": 5678}""",
emittedAtMs = 1234,
),
InputRecord(
namespace = streamWithDefaultNamespace.descriptor.namespace,
name = streamWithDefaultNamespace.descriptor.name,
data = """{"id": 91011}""",
emittedAtMs = 1234,
),
)
)
assertAll(
Expand Down Expand Up @@ -521,6 +532,27 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(listOf("id")),
cursor = null
)
},
{
dumpAndDiffRecords(
parsedConfig,
listOf(
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 91011),
airbyteMeta = OutputRecord.Meta(syncId = 42)
)
),
streamWithDefaultNamespace.copy(
descriptor =
streamWithDefaultNamespace.descriptor.copy(
namespace = defaultNamespaceProvider.get(randomizedNamespace)
)
),
listOf(listOf("id")),
cursor = null
)
}
)
}
Expand All @@ -531,9 +563,10 @@ abstract class BasicFunctionalityIntegrationTest(
fun makeStream(
name: String,
schema: LinkedHashMap<String, FieldType> = linkedMapOf("id" to intType),
namespaceSuffix: String = "",
) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, name),
DestinationStream.Descriptor(randomizedNamespace + namespaceSuffix, name),
Append,
ObjectType(schema),
generationId = 0,
Expand Down Expand Up @@ -575,6 +608,11 @@ abstract class BasicFunctionalityIntegrationTest(
"groups",
linkedMapOf("id" to intType, "authorization" to stringType)
),
makeStream(
"streamWithSpecialCharactersInNamespace",
namespaceSuffix = "_spøcial"
),
makeStream("streamWithOperatorInNamespace", namespaceSuffix = "_operator-1"),
)
)
// For each stream, generate a record containing every field in the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper {
): DestinationStream.Descriptor {
// Map the special character but not the '+', because only the former is replaced in file
// paths.
return descriptor.copy(name = descriptor.name.replace("é", "e"))
return descriptor.copy(
namespace = descriptor.namespace?.replace("ø", "o"),
name = descriptor.name.replace("é", "e")
)
}

private fun fieldNameMangler(value: AirbyteValue): AirbyteValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class GlueWriteTest :
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
override fun testNamespaces() {
super.testNamespaces()
}
}

class GlueAssumeRoleWriteTest :
Expand All @@ -179,6 +185,12 @@ class GlueAssumeRoleWriteTest :
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
override fun testNamespaces() {
super.testNamespaces()
}
}

@Disabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.aws.asMicronautProperties
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.OutputRecord
Expand Down Expand Up @@ -53,6 +54,10 @@ abstract class S3V2WriteTest(
expectedRecordMapper,
additionalMicronautEnvs = S3V2Destination.additionalMicronautEnvs,
micronautProperties = S3V2TestUtils.assumeRoleCredentials.asMicronautProperties(),
defaultNamespaceProvider =
object : DefaultNamespaceProvider {
override fun get(randomNamespace: String): String? = null
},
isStreamSchemaRetroactive = false,
supportsDedup = false,
stringifySchemalessObjects = stringifySchemalessObjects,
Expand Down

0 comments on commit 51a07bb

Please sign in to comment.