Skip to content

Commit

Permalink
fix: enable schema evolution tests for s3 datalake (#51569)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Jan 29, 2025
1 parent db9a854 commit 0c72825
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2489,9 +2489,9 @@ abstract class BasicFunctionalityIntegrationTest(
}

companion object {
private val intType = FieldType(IntegerType, nullable = true)
val intType = FieldType(IntegerType, nullable = true)
private val numberType = FieldType(NumberType, nullable = true)
private val stringType = FieldType(StringType, nullable = true)
val stringType = FieldType(StringType, nullable = true)
private val timestamptzType = FieldType(TimestampTypeWithTimezone, nullable = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ package io.airbyte.integrations.destination.s3_data_lake

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.aws.asMicronautProperties
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
Expand All @@ -18,6 +24,7 @@ import java.util.Base64
import okhttp3.FormBody
import okhttp3.OkHttpClient
import okhttp3.Request
import org.junit.jupiter.api.Assumptions
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -61,14 +68,88 @@ abstract class S3DataLakeWriteTest(
super.testDedup()
}

/**
* This test differs from the base test in two critical aspects:
*
* 1. Data Type Conversion:
* ```
* The base test attempts to change a column's data type from INTEGER to STRING,
* which Iceberg does not support and will throw an exception.
* ```
* 2. Result Ordering:
* ```
* While the data content matches exactly, Iceberg returns results in a different
* order than what the base test expects. The base test's ordering assumptions
* need to be adjusted accordingly.
* ```
*/
@Test
@Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)")
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
Assumptions.assumeTrue(verifyDataWriting)
fun makeStream(syncId: Long, schema: LinkedHashMap<String, FieldType>) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(schema),
generationId = 0,
minimumGenerationId = 0,
syncId,
)
runSync(
configContents,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "same" to intType)
),
listOf(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "to_drop": "val1", "same": 42}""",
emittedAtMs = 1234L,
)
)
)
val finalStream =
makeStream(
syncId = 43,
linkedMapOf("id" to intType, "same" to intType, "to_add" to stringType)
)
runSync(
configContents,
finalStream,
listOf(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "same": "43", "to_add": "val3"}""",
emittedAtMs = 1234,
)
)
)
dumpAndDiffRecords(
parsedConfig,
listOf(
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 42, "same" to 42),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 42, "same" to 43, "to_add" to "val3"),
airbyteMeta = OutputRecord.Meta(syncId = 43),
)
),
finalStream,
primaryKey = listOf(listOf("id")),
cursor = listOf("same"),
)
}

@Test
@Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)")
override fun testDedupChangeCursor() {
super.testDedupChangeCursor()
}
Expand Down

0 comments on commit 0c72825

Please sign in to comment.