diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index c5382eb2ed9b0..603d83ca5a2d8 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -1597,6 +1597,148 @@ abstract class BasicFunctionalityIntegrationTest( ) } + @Test + open fun testDedupWithStringKey() { + assumeTrue(supportsDedup) + fun makeStream(syncId: Long) = + DestinationStream( + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + importType = + Dedupe( + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ), + schema = + ObjectType( + properties = + linkedMapOf( + "id1" to stringType, + "id2" to intType, + "updated_at" to timestamptzType, + "name" to stringType, + "_ab_cdc_deleted_at" to timestamptzType, + ) + ), + generationId = 42, + minimumGenerationId = 0, + syncId = syncId, + ) + fun makeRecord(data: String, extractedAt: Long) = + InputRecord( + randomizedNamespace, + "test_stream", + data, + emittedAtMs = extractedAt, + ) + + val sync1Stream = makeStream(syncId = 42) + runSync( + updatedConfig, + sync1Stream, + listOf( + // emitted_at:1000 is equal to 1970-01-01 00:00:01Z. + // This obviously makes no sense in relation to updated_at being in the year 2000, + // but that's OK because (from destinations POV) updated_at has no relation to + // extractedAt. + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice1", "_ab_cdc_deleted_at": null}""", + extractedAt = 1000, + ), + // Emit a second record for id=(1,200) with a different updated_at. + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice2", "_ab_cdc_deleted_at": null}""", + extractedAt = 1000, + ), + // Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an + // explicit null, but we should handle both cases. + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob1"}""", + extractedAt = 1000, + ), + ), + ) + dumpAndDiffRecords( + parsedConfig, + listOf( + // Alice has only the newer record, and Bob also exists + OutputRecord( + extractedAt = 1000, + generationId = 42, + data = + mapOf( + "id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84", + "id2" to 200, + "updated_at" to TimestampWithTimezoneValue("2000-01-01T00:01:00Z"), + "name" to "Alice2", + "_ab_cdc_deleted_at" to null + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 1000, + generationId = 42, + data = + mapOf( + "id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84", + "id2" to 201, + "updated_at" to TimestampWithTimezoneValue("2000-01-01T00:02:00Z"), + "name" to "Bob1" + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + ), + sync1Stream, + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ) + + val sync2Stream = makeStream(syncId = 43) + runSync( + updatedConfig, + sync2Stream, + listOf( + // Update both Alice and Bob + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice3", "_ab_cdc_deleted_at": null}""", + extractedAt = 2000, + ), + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob2"}""", + extractedAt = 2000, + ), + // And delete Bob. Again, T+D doesn't check the actual _value_ of deleted_at (i.e. + // the fact that it's in the past is irrelevant). It only cares whether deleted_at + // is non-null. So the destination should delete Bob. + makeRecord( + """{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}""", + extractedAt = 2000, + ), + ), + ) + dumpAndDiffRecords( + parsedConfig, + listOf( + // Alice still exists (and has been updated to the latest version), but Bob is gone + OutputRecord( + extractedAt = 2000, + generationId = 42, + data = + mapOf( + "id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84", + "id2" to 200, + "updated_at" to TimestampWithTimezoneValue("2000-01-02T00:00:00Z"), + "name" to "Alice3", + "_ab_cdc_deleted_at" to null + ), + airbyteMeta = OutputRecord.Meta(syncId = 43), + ) + ), + sync2Stream, + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ) + } + /** * Change the cursor column in the second sync to a column that doesn't exist in the first sync. * Verify that we overwrite everything correctly. diff --git a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml index af7eefe18c4c8..bc08143e6578c 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml @@ -16,7 +16,7 @@ data: type: GSM connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 dockerRepository: airbyte/destination-mssql-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2 githubIssueLabel: destination-mssql-v2 diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt index 9c81eb773add6..cf8c656bec202 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt @@ -23,12 +23,11 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration -import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToSqlType +import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToMssqlType import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setAsNullValue import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setValue import io.airbyte.integrations.destination.mssql.v2.convert.MssqlType import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.getAirbyteNamedValue -import io.airbyte.integrations.destination.mssql.v2.convert.SqlTypeToMssqlType import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange @@ -230,9 +229,9 @@ class MSSQLQueryBuilder( Append -> emptyList() Overwrite -> emptyList() } + private val indexedColumns: Set = uniquenessKey.toSet() - private val toSqlType = AirbyteTypeToSqlType() - private val toMssqlType = SqlTypeToMssqlType() + private val toMssqlType = AirbyteTypeToMssqlType() val finalTableSchema: List = airbyteFinalTableFields + extractFinalTableSchema(stream.schema) @@ -251,9 +250,7 @@ class MSSQLQueryBuilder( } private fun getSchema(): List = - finalTableSchema.map { - NamedSqlField(it.name, toMssqlType.convert(toSqlType.convert(it.type.type))) - } + finalTableSchema.map { NamedSqlField(it.name, toMssqlType.convert(it.type.type)) } fun updateSchema(connection: Connection) { val existingSchema = getExistingSchema(connection) @@ -486,7 +483,12 @@ class MSSQLQueryBuilder( separator: String = DEFAULT_SEPARATOR ): String { return schema.joinToString(separator = separator) { - "[${it.name}] ${toMssqlType.convert(toSqlType.convert(it.type.type)).sqlString} NULL" + val mssqlType = + toMssqlType.convert( + it.type.type, + isIndexed = indexedColumns.contains(it.name), + ) + "[${it.name}] ${mssqlType.sqlString} NULL" } } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMssqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMssqlType.kt new file mode 100644 index 0000000000000..429e038008e9b --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMssqlType.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import java.sql.Types + +enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) { + TEXT(Types.LONGVARCHAR), + BIT(Types.BOOLEAN), + DATE(Types.DATE), + BIGINT(Types.BIGINT), + DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"), + VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"), + VARCHAR_INDEX(Types.VARCHAR, sqlStringOverride = "VARCHAR(200)"), + DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE), + TIME(Types.TIME), + DATETIME(Types.TIMESTAMP); + + val sqlString: String = sqlStringOverride ?: name +} + +class AirbyteTypeToMssqlType { + fun convert(airbyteSchema: AirbyteType, isIndexed: Boolean = false): MssqlType { + return when (airbyteSchema) { + is ObjectType -> MssqlType.TEXT + is ArrayType -> MssqlType.TEXT + is ArrayTypeWithoutSchema -> MssqlType.TEXT + is BooleanType -> MssqlType.BIT + is DateType -> MssqlType.DATE + is IntegerType -> MssqlType.BIGINT + is NumberType -> MssqlType.DECIMAL + is ObjectTypeWithEmptySchema -> MssqlType.TEXT + is ObjectTypeWithoutSchema -> MssqlType.TEXT + is StringType -> if (isIndexed) MssqlType.VARCHAR_INDEX else MssqlType.VARCHAR + is TimeTypeWithTimezone -> MssqlType.DATETIMEOFFSET + is TimeTypeWithoutTimezone -> MssqlType.TIME + is TimestampTypeWithTimezone -> MssqlType.DATETIMEOFFSET + is TimestampTypeWithoutTimezone -> MssqlType.DATETIME + is UnionType -> MssqlType.TEXT + is UnknownType -> MssqlType.TEXT + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt deleted file mode 100644 index e09ae2b86d109..0000000000000 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mssql.v2.convert - -import io.airbyte.cdk.load.data.AirbyteType -import io.airbyte.cdk.load.data.ArrayType -import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema -import io.airbyte.cdk.load.data.BooleanType -import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.IntegerType -import io.airbyte.cdk.load.data.NumberType -import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema -import io.airbyte.cdk.load.data.StringType -import io.airbyte.cdk.load.data.TimeTypeWithTimezone -import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampTypeWithTimezone -import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.load.data.UnknownType -import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn -import io.airbyte.integrations.destination.mssql.v2.model.SqlTable -import java.sql.Types - -/** CDK pipeline [AirbyteType] to SQL [Types] converter. */ -class AirbyteTypeToSqlType { - - /** - * Converts an [AirbyteType] to the associated SQL [Types] value. - * - * @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType] - * @return The associated SQL [Types] value. - * @throws IllegalArgumentException if the [AirbyteType] is not supported. - */ - fun convert(airbyteSchema: AirbyteType): Int { - return when (airbyteSchema) { - is ObjectType -> Types.LONGVARCHAR - is ArrayType -> Types.LONGVARCHAR - is ArrayTypeWithoutSchema -> Types.LONGVARCHAR - is BooleanType -> Types.BOOLEAN - is DateType -> Types.DATE - is IntegerType -> Types.BIGINT - is NumberType -> Types.DECIMAL - is ObjectTypeWithEmptySchema -> Types.LONGVARCHAR - is ObjectTypeWithoutSchema -> Types.LONGVARCHAR - is StringType -> Types.VARCHAR - is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE - is TimeTypeWithoutTimezone -> Types.TIME - is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE - is TimestampTypeWithoutTimezone -> Types.TIMESTAMP - is UnionType -> Types.LONGVARCHAR - is UnknownType -> Types.LONGVARCHAR - } - } -} - -/** - * Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a - * SQL table. - * - * @param primaryKeys The list of configured primary key properties that should be treated as - * primary keys in the generated [SqlTable] - * @return The [SqlTable] that represents the table to be mapped to the stream represented by the - * [ObjectType]. - */ -fun ObjectType.toSqlTable(primaryKeys: List>): SqlTable { - val identifierFieldNames = primaryKeys.flatten().toSet() - val sqlTypeConverter = AirbyteTypeToSqlType() - val columns = - this.properties.entries.map { (name, field) -> - val isPrimaryKey = identifierFieldNames.contains(name) - val isNullable = !isPrimaryKey && field.nullable - SqlColumn( - name = name, - type = sqlTypeConverter.convert(field.type), - isPrimaryKey = isPrimaryKey, - isNullable = isNullable - ) - } - return SqlTable(columns = columns) -} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt index 4104ae81df5f1..d671c58c9507a 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt @@ -19,9 +19,6 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.util.serializeToJsonBytes -import io.airbyte.integrations.destination.mssql.v2.model.SqlTable -import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRow -import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRowValue import java.sql.Date import java.sql.Time import java.sql.Timestamp @@ -60,33 +57,3 @@ class AirbyteValueToSqlValue { } } } - -/** - * Extension function that converts an [ObjectValue] into a row of SQL values. - * - * @param sqlTable The [SqlTable] that contains data type information for each column. This is used - * to filter the [ObjectValue]'s values to only those that exist in the table. - * @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from - * the provided [ObjectValue]. - */ -fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow { - val converter = AirbyteValueToSqlValue() - return SqlTableRow( - values = - this.values - .filter { (name, _) -> sqlTable.columns.find { it.name == name } != null } - .map { (name, value) -> - val dataType = sqlTable.columns.find { it.name == name }!!.type - val converted = - when (value) { - is ObjectValue -> - (converter.convert(value) as LinkedHashMap<*, *>) - .serializeToJsonBytes() - is ArrayValue -> - (converter.convert(value) as List<*>).serializeToJsonBytes() - else -> converter.convert(value) - } - SqlTableRowValue(name = name, value = converted, type = dataType) - } - ) -} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt index 2255ab91b6de1..bdf7f7e5fcba1 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt @@ -29,7 +29,7 @@ import java.sql.Types class AirbyteValueToStatement { companion object { - private val toSqlType = AirbyteTypeToSqlType() + private val toSqlType = AirbyteTypeToMssqlType() private val toSqlValue = AirbyteValueToSqlValue() private val valueCoercingMapper = AirbyteValueDeepCoercingMapper( @@ -67,7 +67,7 @@ class AirbyteValueToStatement { fun PreparedStatement.setAsNullValue(idx: Int, type: AirbyteType) { val sqlType = toSqlType.convert(type) - setNull(idx, sqlType) + setNull(idx, sqlType.sqlType) } private fun PreparedStatement.setAsBooleanValue(idx: Int, value: BooleanValue) { @@ -95,7 +95,7 @@ class AirbyteValueToStatement { value: StringValue, type: AirbyteType ) { - val sqlType = toSqlType.convert(type) + val sqlType = toSqlType.convert(type).sqlType if (sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR) { setString(idx, value.value) } else { diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt deleted file mode 100644 index 1d0d2bed5d71a..0000000000000 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mssql.v2.convert - -import java.sql.Types - -enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) { - TEXT(Types.LONGVARCHAR), - BIT(Types.BOOLEAN), - DATE(Types.DATE), - BIGINT(Types.BIGINT), - DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"), - VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"), - DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE), - TIME(Types.TIME), - DATETIME(Types.TIMESTAMP); - - val sqlString: String = sqlStringOverride ?: name - - companion object { - val fromSqlType: Map = - entries - .associateByTo(mutableMapOf()) { it.sqlType } - // Manually adding an extra mapping because we since represent both - // sqlTypes TIMESTAMP_WITH_TIMEZONE and TIME_WITH_TIMEZONE as DATETIMEOFFSET - // the auto generated reverse map is missing the nuance. - .apply { this[Types.TIME_WITH_TIMEZONE] = DATETIMEOFFSET } - .toMap() - } -} - -class SqlTypeToMssqlType { - fun convert(type: Int): MssqlType = - MssqlType.fromSqlType.get(type) ?: throw IllegalArgumentException("type $type not found") -} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt deleted file mode 100644 index 59f4457b61e13..0000000000000 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mssql.v2.model - -import java.sql.Types - -/** - * Representation of a colum in a SQL table. - * - * @param name The name of the column - * @param type The data type of the column (see [Types] for values). - * @param isPrimaryKey Whether the column represents a primary key. - * @param isNullable Whether the column's value supports null values. - */ -data class SqlColumn( - val name: String, - val type: Int, - val isPrimaryKey: Boolean = false, - val isNullable: Boolean = false -) - -/** - * Representation of a SQL table. - * - * @param columns The list of columns in the table. - */ -data class SqlTable(val columns: List) - -/** - * Representation of a value in a SQL row/column cell. - * - * @param name The name of the column. - * @param value The value of the row/column cell. - * @param type The SQL type of the row/column cell (see [Types] for values). - */ -data class SqlTableRowValue(val name: String, val value: Any?, val type: Int) - -/** - * Representation of a row of values in a SQL table. - * - * @param values A list of values stored in the row. - */ -data class SqlTableRow(val values: List) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMsqlTypeTest.kt similarity index 67% rename from airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt rename to airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMsqlTypeTest.kt index 8df04c36f6646..43dd6a0645e8d 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToMsqlTypeTest.kt @@ -23,14 +23,12 @@ import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import io.mockk.mockk -import java.sql.Types import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Test -class AirbyteTypeToSqlTypeTest { +class AirbyteTypeToMsqlTypeTest { - private val converter = AirbyteTypeToSqlType() + private val converter = AirbyteTypeToMssqlType() @Test fun testConvertObjectType() { @@ -42,136 +40,118 @@ class AirbyteTypeToSqlTypeTest { ), ) val result = converter.convert(objectType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertArrayType() { val arrayType = ArrayType(FieldType(IntegerType, false)) val result = converter.convert(arrayType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertArrayTypeWithoutSchema() { val arrayType = ArrayTypeWithoutSchema val result = converter.convert(arrayType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertBooleanType() { val booleanType = BooleanType val result = converter.convert(booleanType) - assertEquals(Types.BOOLEAN, result) + assertEquals(MssqlType.BIT, result) } @Test fun testConvertDateType() { val dateType = DateType val result = converter.convert(dateType) - assertEquals(Types.DATE, result) + assertEquals(MssqlType.DATE, result) } @Test fun testConvertIntegerType() { val integerType = IntegerType val result = converter.convert(integerType) - assertEquals(Types.BIGINT, result) + assertEquals(MssqlType.BIGINT, result) } @Test fun testConvertNumberType() { val numberType = NumberType val result = converter.convert(numberType) - assertEquals(Types.DECIMAL, result) + assertEquals(MssqlType.DECIMAL, result) } @Test fun testConvertObjectTypeWithEmptySchema() { val objectType = ObjectTypeWithEmptySchema val result = converter.convert(objectType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertObjectTypeWithoutSchema() { val objectType = ObjectTypeWithoutSchema val result = converter.convert(objectType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertStringType() { val stringType = StringType val result = converter.convert(stringType) - assertEquals(Types.VARCHAR, result) + assertEquals(MssqlType.VARCHAR, result) + } + + @Test + fun testConvertIndexedStringType() { + val stringType = StringType + val result = converter.convert(stringType, isIndexed = true) + assertEquals(MssqlType.VARCHAR_INDEX, result) } @Test fun testConvertTimeTypeWithTimezone() { val timeType = TimeTypeWithTimezone val result = converter.convert(timeType) - assertEquals(Types.TIME_WITH_TIMEZONE, result) + assertEquals(MssqlType.DATETIMEOFFSET, result) } @Test fun testConvertTimeTypeWithoutTimezone() { val timeType = TimeTypeWithoutTimezone val result = converter.convert(timeType) - assertEquals(Types.TIME, result) + assertEquals(MssqlType.TIME, result) } @Test fun testConvertTimestampTypeWithTimezone() { val timestampType = TimestampTypeWithTimezone val result = converter.convert(timestampType) - assertEquals(Types.TIMESTAMP_WITH_TIMEZONE, result) + assertEquals(MssqlType.DATETIMEOFFSET, result) } @Test fun testConvertTimestampTypeWithoutTimezone() { val timestampType = TimestampTypeWithoutTimezone val result = converter.convert(timestampType) - assertEquals(Types.TIMESTAMP, result) + assertEquals(MssqlType.DATETIME, result) } @Test fun testConvertUnionType() { val unionType = UnionType(setOf(StringType, NumberType)) val result = converter.convert(unionType) - assertEquals(Types.LONGVARCHAR, result) + assertEquals(MssqlType.TEXT, result) } @Test fun testConvertUnknownType() { val unknownType = UnknownType(mockk()) val result = converter.convert(unknownType) - assertEquals(Types.LONGVARCHAR, result) - } - - @Test - fun testToSqlTable() { - val primaryKey = "id" - val nullableColumn = "email" - val objectType = - ObjectType( - linkedMapOf( - primaryKey to FieldType(IntegerType, false), - "age" to FieldType(IntegerType, false), - nullableColumn to FieldType(StringType, true), - ), - ) - val primaryKeys = listOf(listOf(primaryKey)) - val table = objectType.toSqlTable(primaryKeys = primaryKeys) - - assertEquals(objectType.properties.size, table.columns.size) - objectType.properties.forEach { (name, type) -> - val column = table.columns.find { it.name == name } - assertNotNull(column) - assertEquals(converter.convert(type.type), column?.type) - assertEquals(primaryKey == name, column?.isPrimaryKey) - assertEquals(nullableColumn == name, column?.isNullable) - } + assertEquals(MssqlType.TEXT, result) } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt index 3b67881316099..36113d4a57c8c 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt @@ -16,14 +16,11 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.util.Jsons import io.airbyte.cdk.load.util.serializeToJsonBytes -import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn -import io.airbyte.integrations.destination.mssql.v2.model.SqlTable import java.math.BigDecimal import java.math.BigInteger import java.sql.Date import java.sql.Time import java.sql.Timestamp -import java.sql.Types import java.time.ZoneOffset import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertEquals @@ -122,128 +119,6 @@ internal class AirbyteValueToSqlValueTest { assertArrayEquals(Jsons.writeValueAsBytes(unknownValue.value), result as ByteArray) } - @Test - fun testToSqlValue() { - val sqlTable = - SqlTable( - listOf( - SqlColumn( - name = "id", - type = Types.INTEGER, - isPrimaryKey = true, - isNullable = false - ), - SqlColumn( - name = "name", - type = Types.VARCHAR, - isPrimaryKey = false, - isNullable = true - ), - SqlColumn( - name = "meta", - type = Types.BLOB, - isPrimaryKey = false, - isNullable = false - ), - SqlColumn( - name = "items", - type = Types.BLOB, - isPrimaryKey = false, - isNullable = false - ) - ) - ) - val objectValue = - ObjectValue( - linkedMapOf( - "id" to IntegerValue(123L), - "name" to StringValue("John Doe"), - "meta" to - ObjectValue( - linkedMapOf( - "sync_id" to IntegerValue(123L), - "changes" to - ObjectValue( - linkedMapOf( - "change" to StringValue("insert"), - "reason" to StringValue("reason"), - ) - ) - ) - ), - "items" to ArrayValue(listOf(StringValue("item1"), StringValue("item2"))) - ) - ) - - val sqlValue = objectValue.toSqlValue(sqlTable) - - assertEquals(sqlTable.columns.size, sqlValue.values.size) - assertEquals( - BigInteger::class.java, - sqlValue.values.find { it.name == "id" }?.value?.javaClass - ) - assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) - assertEquals( - String::class.java, - sqlValue.values.find { it.name == "name" }?.value?.javaClass - ) - assertEquals("John Doe", sqlValue.values.find { it.name == "name" }?.value) - assertEquals( - ByteArray::class.java, - sqlValue.values.find { it.name == "meta" }?.value?.javaClass - ) - assertArrayEquals( - mapOf( - "sync_id" to 123.toBigInteger(), - "changes" to - mapOf( - "change" to "insert", - "reason" to "reason", - ) - ) - .serializeToJsonBytes(), - sqlValue.values.find { it.name == "meta" }?.value as ByteArray - ) - assertEquals( - ByteArray::class.java, - sqlValue.values.find { it.name == "items" }?.value?.javaClass - ) - assertArrayEquals( - listOf("item1", "item2").serializeToJsonBytes(), - sqlValue.values.find { it.name == "items" }?.value as ByteArray - ) - } - - @Test - fun testToSqlValueIgnoresFieldsNotInTable() { - val sqlTable = - SqlTable( - listOf( - SqlColumn( - name = "id", - type = Types.INTEGER, - isPrimaryKey = true, - isNullable = false - ), - ) - ) - val objectValue = - ObjectValue( - linkedMapOf( - "id" to IntegerValue(123L), - "name" to StringValue("Should be ignored"), - ) - ) - - val sqlValue = objectValue.toSqlValue(sqlTable) - assertEquals(sqlTable.columns.size, sqlValue.values.size) - assertEquals( - BigInteger::class.java, - sqlValue.values.find { it.name == "id" }?.value?.javaClass - ) - assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) - } - @Test fun testObjectMapToJsonBytes() { val objectValue = diff --git a/docs/integrations/destinations/mssql-v2.md b/docs/integrations/destinations/mssql-v2.md index 4cab46a35b255..ba60c812bc67d 100644 --- a/docs/integrations/destinations/mssql-v2.md +++ b/docs/integrations/destinations/mssql-v2.md @@ -13,6 +13,7 @@ This connector is in early access, and SHOULD NOT be used for production workloa | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------| +| 0.1.10 | 2025-02-20 | [54186](https://github.com/airbytehq/airbyte/pull/54186) | RC8: Fix String support. | | 0.1.9 | 2025-02-11 | [53364](https://github.com/airbytehq/airbyte/pull/53364) | RC7: Revert deletion change. | | 0.1.8 | 2025-02-11 | [53364](https://github.com/airbytehq/airbyte/pull/53364) | RC6: Break up deletes into loop to reduce locking. | | 0.1.7 | 2025-02-07 | [53236](https://github.com/airbytehq/airbyte/pull/53236) | RC5: Use rowlock hint. |