diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index 153221a3aad53..9a0864e42fdae 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -94,6 +94,4 @@ class MockBasicFunctionalityIntegrationTest : override fun testBasicTypes() { super.testBasicTypes() } - - @Test @Disabled override fun testBasicWriteFile() {} } diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt index 85c81a30fd26a..8b1718645da37 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt @@ -56,13 +56,17 @@ object MockDestinationBackend { // Assume that in dedup mode, we don't have duplicates - so we can just find the first // record with the same PK as the incoming record val existingRecord = - file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 } + file.firstOrNull { + RecordDiffer.comparePks(incomingPk, getPk(it), nullEqualsUnset = false) == 0 + } if (existingRecord == null) { file.add(incomingRecord) } else { val incomingCursor = getCursor(incomingRecord) val existingCursor = getCursor(existingRecord) - val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor) + val compare = + RecordDiffer.getValueComparator(nullEqualsUnset = false) + .compare(incomingCursor, existingCursor) // If the incoming record has a later cursor, // or the same cursor but a later extractedAt, // then upsert. (otherwise discard the incoming record.) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt index e6d9c21cdad4a..5f567b00643df 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util import java.time.OffsetDateTime import kotlin.test.assertEquals +import kotlin.test.assertNull import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -155,7 +156,7 @@ class RecordDifferTest { ), ) ) - assertEquals(null, diff) + assertNull(diff) } /** Verify that the differ can sort records which are identical other than the cursor */ @@ -193,7 +194,7 @@ class RecordDifferTest { ), ), ) - assertEquals(null, diff) + assertNull(diff) } /** Verify that the differ can sort records which are identical other than extractedAt */ @@ -231,6 +232,49 @@ class RecordDifferTest { ), ) ) - assertEquals(null, diff) + assertNull(diff) + } + + @Test + fun testNullEqualsUnset() { + val diff = + RecordDiffer(primaryKey = listOf(listOf("id")), cursor = null, nullEqualsUnset = true) + .diffRecords( + listOf( + OutputRecord( + extractedAt = 1, + generationId = 0, + data = + mapOf( + "id" to 1, + "sub_object" to + mapOf( + "foo" to "bar", + "sub_list" to listOf(mapOf()), + ) + ), + airbyteMeta = null, + ), + ), + listOf( + OutputRecord( + extractedAt = 1, + generationId = 0, + data = + mapOf( + "id" to 1, + "name" to null, + "sub_object" to + mapOf( + "foo" to "bar", + "bar" to null, + "sub_list" to listOf(mapOf("foo" to null)), + ) + ), + airbyteMeta = null, + ), + ), + ) + assertNull(diff) } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt index d500e8398b977..a94c9643fff82 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt @@ -4,10 +4,13 @@ package io.airbyte.cdk.load.test.util +import io.airbyte.cdk.load.data.AirbyteType + fun interface ExpectedRecordMapper { - fun mapRecord(expectedRecord: OutputRecord): OutputRecord + fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord } object NoopExpectedRecordMapper : ExpectedRecordMapper { - override fun mapRecord(expectedRecord: OutputRecord): OutputRecord = expectedRecord + override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord = + expectedRecord } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 1164856770039..9a652ee2fd51d 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -103,7 +103,7 @@ abstract class IntegrationTest( ) { val actualRecords: List = dataDumper.dumpRecords(config, stream) val expectedRecords: List = - canonicalExpectedRecords.map { recordMangler.mapRecord(it) } + canonicalExpectedRecords.map { recordMangler.mapRecord(it, stream.schema) } RecordDiffer( primaryKey = primaryKey.map { nameMapper.mapFieldName(it) }, diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index 8711168155413..49f275c17ead7 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue @@ -48,6 +49,8 @@ class RecordDiffer( */ val allowUnexpectedRecord: Boolean = false, ) { + private val valueComparator = getValueComparator(nullEqualsUnset) + private fun extract(data: Map, path: List): AirbyteValue { return when (path.size) { 0 -> throw IllegalArgumentException("Empty path") @@ -87,7 +90,7 @@ class RecordDiffer( ) } - comparePks(pk1, pk2) + comparePks(pk1, pk2, nullEqualsUnset) } /** @@ -276,30 +279,39 @@ class RecordDiffer( } companion object { - val valueComparator: Comparator = - Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) } + fun getValueComparator(nullEqualsUnset: Boolean): Comparator = + Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!, nullEqualsUnset) } /** * Compare each PK field in order, until we find a field that the two records differ in. If * all the fields are equal, then these two records have the same PK. */ - fun comparePks(pk1: List, pk2: List) = - (pk1.zip(pk2) - .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } + fun comparePks( + pk1: List, + pk2: List, + nullEqualsUnset: Boolean, + ): Int { + return (pk1.zip(pk2) + .map { (pk1Field, pk2Field) -> + getValueComparator(nullEqualsUnset).compare(pk1Field, pk2Field) + } .firstOrNull { it != 0 } ?: 0) + } - private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int { + private fun compare(v1: AirbyteValue, v2: AirbyteValue, nullEqualsUnset: Boolean): Int { if (v1 is UnknownValue) { return compare( JsonToAirbyteValue().fromJson(v1.value), v2, + nullEqualsUnset, ) } if (v2 is UnknownValue) { return compare( v1, JsonToAirbyteValue().fromJson(v2.value), + nullEqualsUnset, ) } @@ -348,6 +360,37 @@ class RecordDiffer( } } } + is ObjectValue -> { + fun objComp(a: ObjectValue, b: ObjectValue): Int { + // objects aren't really comparable, so just do an equality check + return if (a == b) 0 else 1 + } + if (nullEqualsUnset) { + // Walk through the airbyte value, removing any NullValue entries + // from ObjectValues. + fun removeObjectNullValues(value: AirbyteValue): AirbyteValue = + when (value) { + is ObjectValue -> + ObjectValue( + value.values + .filterTo(linkedMapOf()) { (_, v) -> + v !is NullValue + } + .mapValuesTo(linkedMapOf()) { (_, v) -> + removeObjectNullValues(v) + } + ) + is ArrayValue -> + ArrayValue(value.values.map { removeObjectNullValues(it) }) + else -> value + } + val filteredV1 = removeObjectNullValues(v1) as ObjectValue + val filteredV2 = removeObjectNullValues(v2) as ObjectValue + objComp(filteredV1, filteredV2) + } else { + objComp(v1, v2 as ObjectValue) + } + } // otherwise, just be a terrible person. // we know these are the same type, so this is safe to do. is Comparable<*> -> diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt index bbd42ce3c73aa..4ea15d75cd242 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt @@ -20,7 +20,8 @@ class IcebergParquetPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), + // TODO not sure why base parquet was doing this as a noop + SchemalessTypesToStringType() to SchemalessValuesToJsonString(), AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), MergeUnions() to AirbyteValueNoopMapper(), UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(), diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt new file mode 100644 index 0000000000000..9e48c9e95d42b --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data.iceberg.parquet + +import io.airbyte.cdk.load.data.AirbyteSchemaIdentityMapper +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.UnknownType + +class SchemalessTypesToStringType : AirbyteSchemaIdentityMapper { + override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema) = StringType + override fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema) = StringType + override fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema) = StringType + override fun mapUnknown(schema: UnknownType) = StringType +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index cfbf58ba7b357..0125333149390 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -7,12 +7,10 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.* -import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange -import java.math.BigDecimal import java.time.Instant import java.util.LinkedHashMap import java.util.UUID @@ -21,28 +19,22 @@ import org.apache.iceberg.data.Record object IcebergV2DataDumper : DestinationDataDumper { - private fun convert(value: Any?, type: AirbyteType): AirbyteValue { - return if (value == null) { - NullValue - } else { - when (type) { - StringType -> StringValue(value as String) - is ArrayType -> ArrayValue((value as List<*>).map { convert(it, type.items.type) }) - BooleanType -> BooleanValue(value as Boolean) - IntegerType -> IntegerValue(value as Long) - NumberType -> NumberValue(BigDecimal(value as Double)) - else -> - throw IllegalArgumentException("Object type with empty schema is not supported") - } - } - } - - private fun getCastedData(schema: ObjectType, record: Record): ObjectValue { + private fun toAirbyteValue(record: Record): ObjectValue { return ObjectValue( LinkedHashMap( - schema.properties - .map { (name, field) -> name to convert(record.getField(name), field.type) } - .toMap() + record + .struct() + .fields() + .filterNot { Meta.COLUMN_NAMES.contains(it.name()) } + .associate { field -> + val name = field.name() + val airbyteValue = + when (val value = record.getField(field.name())) { + is Record -> toAirbyteValue(value) + else -> AirbyteValue.from(value) + } + name to airbyteValue + } ) ) } @@ -80,8 +72,6 @@ object IcebergV2DataDumper : DestinationDataDumper { stream: DestinationStream ): List { val config = IcebergV2TestUtil.getConfig(spec) - val pipeline = ParquetMapperPipelineFactory().create(stream) - val schema = pipeline.finalSchema as ObjectType val catalog = IcebergV2TestUtil.getCatalog(config) val table = catalog.loadTable( @@ -101,7 +91,7 @@ object IcebergV2DataDumper : DestinationDataDumper { ), loadedAt = null, generationId = record.getField(Meta.COLUMN_NAME_AB_GENERATION_ID) as Long, - data = getCastedData(schema, record), + data = toAirbyteValue(record), airbyteMeta = getMetaData(record) ) ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 573bd5e59ce7d..d09cb90af109c 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -38,18 +38,17 @@ abstract class IcebergV2WriteTest( commitDataIncrementally = false, supportFileTransfer = false, allTypesBehavior = StronglyTyped(), + nullEqualsUnset = true, ) { @Test - @Disabled( - "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" - ) + @Disabled("bad values handling for timestamps is currently broken") override fun testBasicTypes() { super.testBasicTypes() } @Test @Disabled( - "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" + "This is currently hanging forever and we should look into why https://github.com/airbytehq/airbyte-internal-issues/issues/11162" ) override fun testInterruptedTruncateWithPriorData() { super.testInterruptedTruncateWithPriorData() @@ -62,30 +61,16 @@ abstract class IcebergV2WriteTest( } @Test - @Disabled - override fun testContainerTypes() { - super.testContainerTypes() - } - - @Test - @Disabled( - "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" - ) + @Disabled("This is currently hanging forever and we should look into why") override fun resumeAfterCancelledTruncate() { super.resumeAfterCancelledTruncate() } @Test - @Disabled("This is expected") + @Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)") override fun testAppendSchemaEvolution() { super.testAppendSchemaEvolution() } - - @Test - @Disabled - override fun testUnions() { - super.testUnions() - } } class IcebergGlueWriteTest : @@ -142,15 +127,18 @@ class IcebergNessieMinioWriteTest : val authToken = getToken() return """ { + "catalog_type": { + "catalog_type": "NESSIE", + "server_uri": "http://$nessieEndpoint:19120/api/v1", + "access_token": "$authToken" + }, "s3_bucket_name": "demobucket", "s3_bucket_region": "us-east-1", "access_key_id": "minioadmin", "secret_access_key": "minioadmin", "s3_endpoint": "http://$minioEndpoint:9002", - "server_uri": "http://$nessieEndpoint:19120/api/v1", "warehouse_location": "s3://demobucket/", - "main_branch_name": "main", - "access_token": "$authToken" + "main_branch_name": "main" } """.trimIndent() }