From 7792525eb5cf2839f9891c4f44f77427b10cd72f Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 21 Jan 2025 11:13:18 -0800 Subject: [PATCH] Load CDK: Expanded namespace coverage --- .../test/util/DefaultNamespaceProvider.kt | 9 ++++ .../BasicFunctionalityIntegrationTest.kt | 43 +++++++++++++++++-- .../data/avro/AvroExpectedRecordMapper.kt | 5 ++- .../ObjectStoragePathFactoryUTest.kt | 25 +++++------ .../s3_data_lake/S3DataLakeWriteTest.kt | 12 ++++++ .../destination/s3_v2/S3V2WriteTest.kt | 4 ++ 6 files changed, 82 insertions(+), 16 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt new file mode 100644 index 0000000000000..5933f88d2a15d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DefaultNamespaceProvider.kt @@ -0,0 +1,9 @@ +package io.airbyte.cdk.load.test.util + +interface DefaultNamespaceProvider { + fun get(randomNamespace: String): String? +} + +class DefaultDefaultNamespaceProvider: DefaultNamespaceProvider { + override fun get(randomNamespace: String): String = randomNamespace +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 3906709d2915e..18d6bc3982582 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -42,6 +42,8 @@ import io.airbyte.cdk.load.message.InputStreamCheckpoint import io.airbyte.cdk.load.message.Meta.Change import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.cdk.load.test.util.DefaultDefaultNamespaceProvider +import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.ExpectedRecordMapper @@ -151,6 +153,7 @@ abstract class BasicFunctionalityIntegrationTest( nameMapper: NameMapper = NoopNameMapper, additionalMicronautEnvs: List = emptyList(), micronautProperties: Map = emptyMap(), + val defaultNamespaceProvider: DefaultNamespaceProvider = DefaultDefaultNamespaceProvider(), /** * Whether to actually verify that the connector wrote data to the destination. This should only * ever be disabled for test destinations (dev-null, etc.). @@ -455,9 +458,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testNamespaces() { assumeTrue(verifyDataWriting) - fun makeStream(namespace: String) = + fun makeStream(namespace: String?, name: String = "test_stream") = DestinationStream( - DestinationStream.Descriptor(namespace, "test_stream"), + DestinationStream.Descriptor(namespace, name), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -466,12 +469,14 @@ abstract class BasicFunctionalityIntegrationTest( ) val stream1 = makeStream(randomizedNamespace + "_1") val stream2 = makeStream(randomizedNamespace + "_2") + val streamWithDefaultNamespace = makeStream(null, randomizedNamespace + "_stream") runSync( updatedConfig, DestinationCatalog( listOf( stream1, stream2, + streamWithDefaultNamespace, ) ), listOf( @@ -487,6 +492,12 @@ abstract class BasicFunctionalityIntegrationTest( data = """{"id": 5678}""", emittedAtMs = 1234, ), + InputRecord( + namespace = streamWithDefaultNamespace.descriptor.namespace, + name = streamWithDefaultNamespace.descriptor.name, + data = """{"id": 91011}""", + emittedAtMs = 1234, + ), ) ) assertAll( @@ -521,6 +532,26 @@ abstract class BasicFunctionalityIntegrationTest( listOf(listOf("id")), cursor = null ) + }, + { + dumpAndDiffRecords( + parsedConfig, + listOf( + OutputRecord( + extractedAt = 1234, + generationId = 0, + data = mapOf("id" to 91011), + airbyteMeta = OutputRecord.Meta(syncId = 42) + ) + ), + streamWithDefaultNamespace.copy( + descriptor = streamWithDefaultNamespace.descriptor.copy( + namespace = defaultNamespaceProvider.get(randomizedNamespace) + ) + ), + listOf(listOf("id")), + cursor = null + ) } ) } @@ -531,9 +562,10 @@ abstract class BasicFunctionalityIntegrationTest( fun makeStream( name: String, schema: LinkedHashMap = linkedMapOf("id" to intType), + namespaceSuffix: String = "", ) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, name), + DestinationStream.Descriptor(randomizedNamespace + namespaceSuffix, name), Append, ObjectType(schema), generationId = 0, @@ -575,6 +607,11 @@ abstract class BasicFunctionalityIntegrationTest( "groups", linkedMapOf("id" to intType, "authorization" to stringType) ), + makeStream( + "streamWithSpecialCharactersInNamespace", + namespaceSuffix = "_spøcial" + ), + makeStream("streamWithOperatorInNamespace", namespaceSuffix = "_operator-1"), ) ) // For each stream, generate a record containing every field in the schema. diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt index 1f8fb64da99c1..1c4659c8ec3e4 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt @@ -34,7 +34,10 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper { ): DestinationStream.Descriptor { // Map the special character but not the '+', because only the former is replaced in file // paths. - return descriptor.copy(name = descriptor.name.replace("é", "e")) + return descriptor.copy( + namespace = descriptor.namespace?.replace("ø", "o"), + name = descriptor.name.replace("é", "e") + ) } private fun fieldNameMangler(value: AirbyteValue): AirbyteValue = diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt index aff71e2c1d784..03d83d602a504 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companio import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companion.DEFAULT_PATH_FORMAT import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companion.DEFAULT_STAGING_PREFIX_SUFFIX import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN +import io.mockk.coEvery import io.mockk.every import io.mockk.impl.annotations.MockK import io.mockk.mockk @@ -32,14 +33,14 @@ class ObjectStoragePathFactoryUTest { @BeforeEach fun setup() { - every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream") - every { timeProvider.syncTimeMillis() } returns 0 - every { timeProvider.currentTimeMillis() } returns 1 + coEvery { stream.descriptor } returns DestinationStream.Descriptor("test", "stream") + coEvery { timeProvider.syncTimeMillis() } returns 0 + coEvery { timeProvider.currentTimeMillis() } returns 1 } @Test fun `test matcher with suffix`() { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( "prefix", null, @@ -60,7 +61,7 @@ class ObjectStoragePathFactoryUTest { @Test fun `test file pattern with variable in prefix`() { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( "prefix-\${NAMESPACE}", "staging-\${NAMESPACE}", @@ -81,7 +82,7 @@ class ObjectStoragePathFactoryUTest { @Test fun `test pattern matcher with variable in prefix`() { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( "prefix-\${NAMESPACE}", "staging-\${NAMESPACE}", @@ -97,7 +98,7 @@ class ObjectStoragePathFactoryUTest { @Test fun `test pattern from null namespace`() { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( "prefix", "staging", @@ -106,7 +107,7 @@ class ObjectStoragePathFactoryUTest { true, ) val streamWithNullNamespace = mockk() - every { streamWithNullNamespace.descriptor } returns + coEvery { streamWithNullNamespace.descriptor } returns DestinationStream.Descriptor(null, "stream") val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) assertEquals( @@ -124,7 +125,7 @@ class ObjectStoragePathFactoryUTest { @Test fun `test stream name contains legal regex`() { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( "prefix", "staging", @@ -133,7 +134,7 @@ class ObjectStoragePathFactoryUTest { true, ) val streamWithLegalRegex = mockk() - every { streamWithLegalRegex.descriptor } returns + coEvery { streamWithLegalRegex.descriptor } returns DestinationStream.Descriptor("test", "stream+1") val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) assertEquals( @@ -155,7 +156,7 @@ class ObjectStoragePathFactoryUTest { remoteFileKey: String, expectedPartNumber: Long, ) { - every { pathConfigProvider.objectStoragePathConfiguration } returns + coEvery { pathConfigProvider.objectStoragePathConfiguration } returns ObjectStoragePathConfiguration( bucketPathTemplate, null, @@ -164,7 +165,7 @@ class ObjectStoragePathFactoryUTest { false, ) val stream = mockk() - every { stream.descriptor } returns DestinationStream.Descriptor(namespace, name) + coEvery { stream.descriptor } returns DestinationStream.Descriptor(namespace, name) val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) val matcher = factory.getPathMatcher(stream, OPTIONAL_ORDINAL_SUFFIX_PATTERN) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt index 853a733c0dd2f..f4388f9755cfc 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt @@ -162,6 +162,12 @@ class GlueWriteTest : override fun testFunkyCharacters() { super.testFunkyCharacters() } + + @Test + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439") + override fun testNamespaces() { + super.testNamespaces() + } } class GlueAssumeRoleWriteTest : @@ -179,6 +185,12 @@ class GlueAssumeRoleWriteTest : override fun testFunkyCharacters() { super.testFunkyCharacters() } + + @Test + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439") + override fun testNamespaces() { + super.testNamespaces() + } } @Disabled( diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 86b0c029d5274..66d8819825ccf 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.aws.asMicronautProperties import io.airbyte.cdk.load.data.* import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper import io.airbyte.cdk.load.message.InputRecord +import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider import io.airbyte.cdk.load.test.util.ExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.OutputRecord @@ -53,6 +54,9 @@ abstract class S3V2WriteTest( expectedRecordMapper, additionalMicronautEnvs = S3V2Destination.additionalMicronautEnvs, micronautProperties = S3V2TestUtils.assumeRoleCredentials.asMicronautProperties(), + defaultNamespaceProvider = object: DefaultNamespaceProvider { + override fun get(randomNamespace: String): String? = null + }, isStreamSchemaRetroactive = false, supportsDedup = false, stringifySchemalessObjects = stringifySchemalessObjects,