diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt new file mode 100644 index 0000000000000..bf993f7581549 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.test.util + +interface DefaultNamespaceProvider { + fun get(randomNamespace: String): String? +} + +class DefaultDefaultNamespaceProvider : DefaultNamespaceProvider { + override fun get(randomNamespace: String): String? = null +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 3906709d2915e..bef06a7f120fc 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -42,6 +42,8 @@ import io.airbyte.cdk.load.message.InputStreamCheckpoint import io.airbyte.cdk.load.message.Meta.Change import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.cdk.load.test.util.DefaultDefaultNamespaceProvider +import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.ExpectedRecordMapper @@ -151,6 +153,7 @@ abstract class BasicFunctionalityIntegrationTest( nameMapper: NameMapper = NoopNameMapper, additionalMicronautEnvs: List = emptyList(), micronautProperties: Map = emptyMap(), + val defaultNamespaceProvider: DefaultNamespaceProvider = DefaultDefaultNamespaceProvider(), /** * Whether to actually verify that the connector wrote data to the destination. This should only * ever be disabled for test destinations (dev-null, etc.). @@ -455,9 +458,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testNamespaces() { assumeTrue(verifyDataWriting) - fun makeStream(namespace: String) = + fun makeStream(namespace: String?, name: String = "test_stream") = DestinationStream( - DestinationStream.Descriptor(namespace, "test_stream"), + DestinationStream.Descriptor(namespace, name), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -466,12 +469,14 @@ abstract class BasicFunctionalityIntegrationTest( ) val stream1 = makeStream(randomizedNamespace + "_1") val stream2 = makeStream(randomizedNamespace + "_2") + val streamWithDefaultNamespace = makeStream(null, randomizedNamespace + "_stream") runSync( updatedConfig, DestinationCatalog( listOf( stream1, stream2, + streamWithDefaultNamespace, ) ), listOf( @@ -487,6 +492,12 @@ abstract class BasicFunctionalityIntegrationTest( data = """{"id": 5678}""", emittedAtMs = 1234, ), + InputRecord( + namespace = streamWithDefaultNamespace.descriptor.namespace, + name = streamWithDefaultNamespace.descriptor.name, + data = """{"id": 91011}""", + emittedAtMs = 1234, + ), ) ) assertAll( @@ -521,6 +532,27 @@ abstract class BasicFunctionalityIntegrationTest( listOf(listOf("id")), cursor = null ) + }, + { + dumpAndDiffRecords( + parsedConfig, + listOf( + OutputRecord( + extractedAt = 1234, + generationId = 0, + data = mapOf("id" to 91011), + airbyteMeta = OutputRecord.Meta(syncId = 42) + ) + ), + streamWithDefaultNamespace.copy( + descriptor = + streamWithDefaultNamespace.descriptor.copy( + namespace = defaultNamespaceProvider.get(randomizedNamespace) + ) + ), + listOf(listOf("id")), + cursor = null + ) } ) } @@ -531,9 +563,10 @@ abstract class BasicFunctionalityIntegrationTest( fun makeStream( name: String, schema: LinkedHashMap = linkedMapOf("id" to intType), + namespaceSuffix: String = "", ) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, name), + DestinationStream.Descriptor(randomizedNamespace + namespaceSuffix, name), Append, ObjectType(schema), generationId = 0, @@ -575,6 +608,11 @@ abstract class BasicFunctionalityIntegrationTest( "groups", linkedMapOf("id" to intType, "authorization" to stringType) ), + makeStream( + "streamWithSpecialCharactersInNamespace", + namespaceSuffix = "_spøcial" + ), + makeStream("streamWithOperatorInNamespace", namespaceSuffix = "_operator-1"), ) ) // For each stream, generate a record containing every field in the schema. diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt index 1f8fb64da99c1..1c4659c8ec3e4 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt @@ -34,7 +34,10 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper { ): DestinationStream.Descriptor { // Map the special character but not the '+', because only the former is replaced in file // paths. - return descriptor.copy(name = descriptor.name.replace("é", "e")) + return descriptor.copy( + namespace = descriptor.namespace?.replace("ø", "o"), + name = descriptor.name.replace("é", "e") + ) } private fun fieldNameMangler(value: AirbyteValue): AirbyteValue = diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt index 853a733c0dd2f..f4388f9755cfc 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt @@ -162,6 +162,12 @@ class GlueWriteTest : override fun testFunkyCharacters() { super.testFunkyCharacters() } + + @Test + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439") + override fun testNamespaces() { + super.testNamespaces() + } } class GlueAssumeRoleWriteTest : @@ -179,6 +185,12 @@ class GlueAssumeRoleWriteTest : override fun testFunkyCharacters() { super.testFunkyCharacters() } + + @Test + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439") + override fun testNamespaces() { + super.testNamespaces() + } } @Disabled( diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 86b0c029d5274..cb74519f3daa6 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.aws.asMicronautProperties import io.airbyte.cdk.load.data.* import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper import io.airbyte.cdk.load.message.InputRecord +import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider import io.airbyte.cdk.load.test.util.ExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.OutputRecord @@ -53,6 +54,10 @@ abstract class S3V2WriteTest( expectedRecordMapper, additionalMicronautEnvs = S3V2Destination.additionalMicronautEnvs, micronautProperties = S3V2TestUtils.assumeRoleCredentials.asMicronautProperties(), + defaultNamespaceProvider = + object : DefaultNamespaceProvider { + override fun get(randomNamespace: String): String? = null + }, isStreamSchemaRetroactive = false, supportsDedup = false, stringifySchemalessObjects = stringifySchemalessObjects,