diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt index 303ba50ed8f84..7c0f18183247d 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicPerformanceTest.kt @@ -109,6 +109,7 @@ abstract class BasicPerformanceTest( val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater, val dataValidator: DataValidator? = null, val micronautProperties: Map = emptyMap(), + namespaceOverride: String? = null, ) { protected val destinationProcessFactory = DestinationProcessFactory.get(emptyList()) @@ -117,6 +118,9 @@ abstract class BasicPerformanceTest( private lateinit var testPrettyName: String val randomizedNamespace = run { + if (namespaceOverride != null) { + return@run namespaceOverride + } val randomSuffix = RandomStringUtils.secure().nextAlphabetic(4) val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd") val timestampString = @@ -154,6 +158,43 @@ abstract class BasicPerformanceTest( ) } + @Test + open fun testRefreshingRecords() { + testRefreshingRecords(validation = null) + } + + protected fun testRefreshingRecords( + recordsToInsert: Long? = null, + validation: ValidationFunction?, + ) { + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = twoStringColumns, + recordsToInsert = recordsToInsert ?: defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + generationId = 0, + minGenerationId = 0, + ), + validation = validation, + ) + runSync( + testScenario = + SingleStreamInsert( + idColumn = idColumn, + columns = twoStringColumns, + recordsToInsert = recordsToInsert ?: defaultRecordsToInsert, + randomizedNamespace = randomizedNamespace, + streamName = testInfo.testMethod.get().name, + generationId = 1, + minGenerationId = 1, + ), + validation = validation, + ) + } + @Test open fun testInsertRecordsComplexTypes() { testInsertRecordsComplexTypes(null) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt index 3306213b1204b..8884215227fa9 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/PerformanceTestScenarios.kt @@ -31,6 +31,8 @@ class SingleStreamInsert( duplicateChance: Double = 0.0, randomizedNamespace: String, streamName: String, + generationId: Long = 0, + minGenerationId: Long = 0, ) : PerformanceTestScenario { init { @@ -54,8 +56,8 @@ class SingleStreamInsert( descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName), importType = importType, schema = ObjectType(linkedMapOf(*schema.toTypedArray())), - generationId = 0, - minimumGenerationId = 0, + generationId = generationId, + minimumGenerationId = minGenerationId, syncId = 1, ) } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml index dee4daf2595d4..204f21ac670a6 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml @@ -16,7 +16,7 @@ data: type: GSM connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 dockerRepository: airbyte/destination-mssql-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2 githubIssueLabel: destination-mssql-v2 diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt index 55a5fe78b09c8..12bb757477bd0 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt @@ -122,7 +122,7 @@ const val DROP_TABLE_QUERY = """ const val INSERT_INTO_QUERY = """ SET NOCOUNT ON; - INSERT INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (TABLOCK) (?$COLUMNS_KEY) + INSERT INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (ROWLOCK) (?$COLUMNS_KEY) SELECT table_value.* FROM (VALUES (?$TEMPLATE_COLUMNS_KEY)) table_value(?$COLUMNS_KEY) """ @@ -130,7 +130,7 @@ const val INSERT_INTO_QUERY = const val MERGE_INTO_QUERY = """ SET NOCOUNT ON; - MERGE INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (TABLOCK) AS Target + MERGE INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (ROWLOCK) AS Target USING (VALUES (?$TEMPLATE_COLUMNS_KEY)) AS Source (?$COLUMNS_KEY) ON ?$UNIQUENESS_CONSTRAINT_KEY WHEN MATCHED THEN @@ -157,12 +157,15 @@ const val ALTER_TABLE_MODIFY = const val DELETE_WHERE_COL_IS_NOT_NULL = """ - DELETE FROM [?].[?] + SET NOCOUNT ON; + DELETE FROM [?].[?] WITH (ROWLOCK) WHERE [?] is not NULL """ -const val DELETE_WHERE_COL_LESS_THAN = """ - DELETE FROM [?].[?] +const val DELETE_WHERE_COL_LESS_THAN = + """ + SET NOCOUNT ON; + DELETE FROM [?].[?] WITH (ROWLOCK) WHERE [?] < ? """ @@ -329,7 +332,7 @@ class MSSQLQueryBuilder( outputSchema, tableName, COLUMN_NAME_AB_GENERATION_ID, - minGenerationId.toString() + minGenerationId.toString(), ) .executeUpdate(connection) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt index e3c7b80cdb504..3fd2835bf9c37 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt @@ -24,7 +24,7 @@ class DataSourceFactory { dataSource.maximumPoolSize = 10 dataSource.minimumIdle = 0 dataSource.idleTimeout = 60000 - dataSource.leakDetectionThreshold = dataSource.connectionTimeout + 30000 + dataSource.leakDetectionThreshold = 0 return dataSource } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt index 7804535a50d72..7928f6cdb3547 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt @@ -61,10 +61,21 @@ class MSSQLPerformanceTest : dataValidator = MSSQLDataValidator(), defaultRecordsToInsert = 10000, ) { + @Test override fun testInsertRecords() { testInsertRecords(recordsToInsert = 100000) {} } + + @Test + override fun testRefreshingRecords() { + testRefreshingRecords { perfSummary -> + perfSummary.forEach { streamSummary -> + assertEquals(streamSummary.expectedRecordCount, streamSummary.recordCount) + } + } + } + @Test override fun testInsertRecordsWithDedup() { testInsertRecordsWithDedup { perfSummary -> diff --git a/docs/integrations/destinations/mssql-v2.md b/docs/integrations/destinations/mssql-v2.md index af1c9f60a22a5..e531c29f2af8e 100644 --- a/docs/integrations/destinations/mssql-v2.md +++ b/docs/integrations/destinations/mssql-v2.md @@ -11,14 +11,15 @@ This connector is in early access, and SHOULD NOT be used for production workloa
Expand to review -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------| -| 0.1.6 | 2025-02-06 | [53192](https://github.com/airbytehq/airbyte/pull/53192) | RC4: Fix config, timehandling and performance tweak. | -| 0.1.5 | 2025-02-04 | [53174](https://github.com/airbytehq/airbyte/pull/53174) | RC3: Fix metadata.yaml for publish | -| 0.1.4 | 2025-02-04 | [52704](https://github.com/airbytehq/airbyte/pull/52704) | RC2: Performance improvement | -| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate | -| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image | -| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 | -| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:------------------------------------------------------------|:-----------------------------------------------------| +| 0.1.7 | 2025-02-07 | [53236](https://github.com/airbytehq/airbyte/pull/53236) | RC5: Use rowlock hint. | +| 0.1.6 | 2025-02-06 | [53192](https://github.com/airbytehq/airbyte/pull/53192) | RC4: Fix config, timehandling and performance tweak. | +| 0.1.5 | 2025-02-04 | [53174](https://github.com/airbytehq/airbyte/pull/53174) | RC3: Fix metadata.yaml for publish | +| 0.1.4 | 2025-02-04 | [52704](https://github.com/airbytehq/airbyte/pull/52704) | RC2: Performance improvement | +| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate | +| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image | +| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 | +| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit |