Skip to content

Commit

Permalink
Load CDK: Expanded namespace coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Feb 3, 2025
1 parent 18798f6 commit 7792525
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.cdk.load.test.util

interface DefaultNamespaceProvider {
fun get(randomNamespace: String): String?
}

class DefaultDefaultNamespaceProvider: DefaultNamespaceProvider {
override fun get(randomNamespace: String): String = randomNamespace
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import io.airbyte.cdk.load.message.InputStreamCheckpoint
import io.airbyte.cdk.load.message.Meta.Change
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.ConfigurationUpdater
import io.airbyte.cdk.load.test.util.DefaultDefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
Expand Down Expand Up @@ -151,6 +153,7 @@ abstract class BasicFunctionalityIntegrationTest(
nameMapper: NameMapper = NoopNameMapper,
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<Property, String> = emptyMap(),
val defaultNamespaceProvider: DefaultNamespaceProvider = DefaultDefaultNamespaceProvider(),
/**
* Whether to actually verify that the connector wrote data to the destination. This should only
* ever be disabled for test destinations (dev-null, etc.).
Expand Down Expand Up @@ -455,9 +458,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testNamespaces() {
assumeTrue(verifyDataWriting)
fun makeStream(namespace: String) =
fun makeStream(namespace: String?, name: String = "test_stream") =
DestinationStream(
DestinationStream.Descriptor(namespace, "test_stream"),
DestinationStream.Descriptor(namespace, name),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
Expand All @@ -466,12 +469,14 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
val streamWithDefaultNamespace = makeStream(null, randomizedNamespace + "_stream")
runSync(
updatedConfig,
DestinationCatalog(
listOf(
stream1,
stream2,
streamWithDefaultNamespace,
)
),
listOf(
Expand All @@ -487,6 +492,12 @@ abstract class BasicFunctionalityIntegrationTest(
data = """{"id": 5678}""",
emittedAtMs = 1234,
),
InputRecord(
namespace = streamWithDefaultNamespace.descriptor.namespace,
name = streamWithDefaultNamespace.descriptor.name,
data = """{"id": 91011}""",
emittedAtMs = 1234,
),
)
)
assertAll(
Expand Down Expand Up @@ -521,6 +532,26 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(listOf("id")),
cursor = null
)
},
{
dumpAndDiffRecords(
parsedConfig,
listOf(
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 91011),
airbyteMeta = OutputRecord.Meta(syncId = 42)
)
),
streamWithDefaultNamespace.copy(
descriptor = streamWithDefaultNamespace.descriptor.copy(
namespace = defaultNamespaceProvider.get(randomizedNamespace)
)
),
listOf(listOf("id")),
cursor = null
)
}
)
}
Expand All @@ -531,9 +562,10 @@ abstract class BasicFunctionalityIntegrationTest(
fun makeStream(
name: String,
schema: LinkedHashMap<String, FieldType> = linkedMapOf("id" to intType),
namespaceSuffix: String = "",
) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, name),
DestinationStream.Descriptor(randomizedNamespace + namespaceSuffix, name),
Append,
ObjectType(schema),
generationId = 0,
Expand Down Expand Up @@ -575,6 +607,11 @@ abstract class BasicFunctionalityIntegrationTest(
"groups",
linkedMapOf("id" to intType, "authorization" to stringType)
),
makeStream(
"streamWithSpecialCharactersInNamespace",
namespaceSuffix = "_spøcial"
),
makeStream("streamWithOperatorInNamespace", namespaceSuffix = "_operator-1"),
)
)
// For each stream, generate a record containing every field in the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper {
): DestinationStream.Descriptor {
// Map the special character but not the '+', because only the former is replaced in file
// paths.
return descriptor.copy(name = descriptor.name.replace("é", "e"))
return descriptor.copy(
namespace = descriptor.namespace?.replace("ø", "o"),
name = descriptor.name.replace("é", "e")
)
}

private fun fieldNameMangler(value: AirbyteValue): AirbyteValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companio
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companion.DEFAULT_PATH_FORMAT
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory.Companion.DEFAULT_STAGING_PREFIX_SUFFIX
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN
import io.mockk.coEvery
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
Expand All @@ -32,14 +33,14 @@ class ObjectStoragePathFactoryUTest {

@BeforeEach
fun setup() {
every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream")
every { timeProvider.syncTimeMillis() } returns 0
every { timeProvider.currentTimeMillis() } returns 1
coEvery { stream.descriptor } returns DestinationStream.Descriptor("test", "stream")
coEvery { timeProvider.syncTimeMillis() } returns 0
coEvery { timeProvider.currentTimeMillis() } returns 1
}

@Test
fun `test matcher with suffix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
null,
Expand All @@ -60,7 +61,7 @@ class ObjectStoragePathFactoryUTest {

@Test
fun `test file pattern with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
Expand All @@ -81,7 +82,7 @@ class ObjectStoragePathFactoryUTest {

@Test
fun `test pattern matcher with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
Expand All @@ -97,7 +98,7 @@ class ObjectStoragePathFactoryUTest {

@Test
fun `test pattern from null namespace`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
"staging",
Expand All @@ -106,7 +107,7 @@ class ObjectStoragePathFactoryUTest {
true,
)
val streamWithNullNamespace = mockk<DestinationStream>()
every { streamWithNullNamespace.descriptor } returns
coEvery { streamWithNullNamespace.descriptor } returns
DestinationStream.Descriptor(null, "stream")
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
Expand All @@ -124,7 +125,7 @@ class ObjectStoragePathFactoryUTest {

@Test
fun `test stream name contains legal regex`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
"staging",
Expand All @@ -133,7 +134,7 @@ class ObjectStoragePathFactoryUTest {
true,
)
val streamWithLegalRegex = mockk<DestinationStream>()
every { streamWithLegalRegex.descriptor } returns
coEvery { streamWithLegalRegex.descriptor } returns
DestinationStream.Descriptor("test", "stream+1")
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
Expand All @@ -155,7 +156,7 @@ class ObjectStoragePathFactoryUTest {
remoteFileKey: String,
expectedPartNumber: Long,
) {
every { pathConfigProvider.objectStoragePathConfiguration } returns
coEvery { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
bucketPathTemplate,
null,
Expand All @@ -164,7 +165,7 @@ class ObjectStoragePathFactoryUTest {
false,
)
val stream = mockk<DestinationStream>()
every { stream.descriptor } returns DestinationStream.Descriptor(namespace, name)
coEvery { stream.descriptor } returns DestinationStream.Descriptor(namespace, name)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)

val matcher = factory.getPathMatcher(stream, OPTIONAL_ORDINAL_SUFFIX_PATTERN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class GlueWriteTest :
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
override fun testNamespaces() {
super.testNamespaces()
}
}

class GlueAssumeRoleWriteTest :
Expand All @@ -179,6 +185,12 @@ class GlueAssumeRoleWriteTest :
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
override fun testNamespaces() {
super.testNamespaces()
}
}

@Disabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.aws.asMicronautProperties
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.test.util.DefaultNamespaceProvider
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.OutputRecord
Expand Down Expand Up @@ -53,6 +54,9 @@ abstract class S3V2WriteTest(
expectedRecordMapper,
additionalMicronautEnvs = S3V2Destination.additionalMicronautEnvs,
micronautProperties = S3V2TestUtils.assumeRoleCredentials.asMicronautProperties(),
defaultNamespaceProvider = object: DefaultNamespaceProvider {
override fun get(randomNamespace: String): String? = null
},
isStreamSchemaRetroactive = false,
supportsDedup = false,
stringifySchemalessObjects = stringifySchemalessObjects,
Expand Down

0 comments on commit 7792525

Please sign in to comment.