Skip to content

Commit

Permalink
Load CDK Tests: Expand coverage of funky characters
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 22, 2025
1 parent 93b157d commit 60d2d64
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
Expand All @@ -18,6 +19,7 @@ import java.time.format.DateTimeFormatter

fun interface ExpectedRecordMapper {
fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord
fun mapStreamDescriptor(descriptor: DestinationStream.Descriptor) = descriptor
}

object NoopExpectedRecordMapper : ExpectedRecordMapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ abstract class IntegrationTest(
val actualRecords: List<OutputRecord> = dataDumper.dumpRecords(config, stream)
val expectedRecords: List<OutputRecord> =
canonicalExpectedRecords.map { recordMangler.mapRecord(it, stream.schema) }
val descriptor = recordMangler.mapStreamDescriptor(stream.descriptor)

RecordDiffer(
primaryKey = primaryKey.map { nameMapper.mapFieldName(it) },
Expand All @@ -116,7 +117,7 @@ abstract class IntegrationTest(
.diffRecords(expectedRecords, actualRecords)
?.let {
var message =
"Incorrect records for ${stream.descriptor.namespace}.${stream.descriptor.name}:\n$it"
"Incorrect records for ${descriptor.namespace}.${descriptor.name}:\n$it"
if (reason != null) {
message = reason + "\n" + message
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ abstract class BasicFunctionalityIntegrationTest(
makeStream("stream_with_underscores"),
makeStream("STREAM_WITH_ALL_CAPS"),
makeStream("CapitalCase"),
makeStream("stream_with_spécial_character"),
makeStream("stream_name_with_operator+1"),
makeStream("stream_name_with_numbers_123"),
makeStream("1stream_with_a_leading_number"),
makeStream(
"stream_with_edge_case_field_names_and_values",
linkedMapOf(
Expand All @@ -551,6 +555,9 @@ abstract class BasicFunctionalityIntegrationTest(
"field_with_underscore" to stringType,
"FIELD_WITH_ALL_CAPS" to stringType,
"field_with_spécial_character" to stringType,
"field_name_with_operator+1" to stringType,
"field_name_with_numbers_123" to stringType,
"1field_with_a_leading_number" to stringType,
// "order" is a reserved word in many sql engines
"order" to stringType,
"ProperCase" to stringType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
Expand All @@ -28,13 +29,25 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper {
return expectedRecord.copy(data = withRemappedFieldNames as ObjectValue)
}

override fun mapStreamDescriptor(
descriptor: DestinationStream.Descriptor
): DestinationStream.Descriptor {
// Map the special character but not the '+', because only the former is replaced in file
// paths.
return descriptor.copy(name = descriptor.name.replace("é", "e"))
}

private fun fieldNameMangler(value: AirbyteValue): AirbyteValue =
when (value) {
is ObjectValue ->
ObjectValue(
LinkedHashMap(
value.values
.map { (k, v) -> k.replace("é", "e") to fieldNameMangler(v) }
.map { (k, v) ->
k.replace("é", "e").replace("+", "_").replace(Regex("(^\\d+)")) {
"_${it.groupValues[0]}"
} to fieldNameMangler(v)
}
.toMap()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,22 @@ class ObjectStoragePathFactory(

private fun getPathVariableToPattern(stream: DestinationStream): Map<String, String> {
return PATH_VARIABLES.associate {
it.variable to (it.pattern ?: it.provider(VariableContext(stream)))
it.variable to
(
// Only escape the pattern if
// A) it's not already provided
// B) the value from context is not blank
// This is to ensure stream names/namespaces with special characters (foo+1) match
// correctly,
// but that blank patterns are ignored completely.
it.pattern
?: (it.provider(VariableContext(stream)).let { s ->
if (s.isNotBlank()) {
Regex.escape(s)
} else {
s
}
}))
} +
FILENAME_VARIABLES.associate {
it.variable to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ class ObjectStoragePathFactoryUTest {
assertNotNull(matcher.match("prefix/stream/any_filename"))
}

@Test
fun `test stream name contains legal regex`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
"staging",
"\${STREAM_NAME}/",
"any_filename",
true,
)
val streamWithLegalRegex = mockk<DestinationStream>()
every { streamWithLegalRegex.descriptor } returns
DestinationStream.Descriptor("test", "stream+1")
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
"prefix/stream+1/any_filename",
factory.getPathToFile(streamWithLegalRegex, 0L, isStaging = false)
)
val matcher = factory.getPathMatcher(streamWithLegalRegex, "(-foo)?")
assertNotNull(matcher.match("prefix/stream+1/any_filename"))
}

@ParameterizedTest
@MethodSource("pathTemplateMatrix")
fun `handles duplicate vars in path templates`(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ class GlueWriteTest :
override fun testUnions() {
super.testUnions()
}

@Test
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11439")
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}
}

class GlueAssumeRoleWriteTest :
Expand All @@ -118,7 +124,15 @@ class GlueAssumeRoleWriteTest :
)
),
S3DataLakeTestUtil.getAWSSystemCredentialsAsMap()
) {
@Test
@Disabled(
"https://github.com/airbytehq/airbyte-internal-issues/issues/11439"
)
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}
}

@Disabled(
"This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class S3V2WriteTestAvroUncompressed :
override fun testUnknownTypes() {
super.testUnknownTypes()
}

@Test
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}
}

class S3V2WriteTestAvroBzip2 :
Expand Down

0 comments on commit 60d2d64

Please sign in to comment.