Skip to content

Commit

Permalink
[Destination MSSQL] v2 rc5 (#53236)
Browse files Browse the repository at this point in the history
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
gosusnp and octavia-squidington-iii authored Feb 11, 2025
1 parent 0ba9e3a commit 60fb9be
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ abstract class BasicPerformanceTest(
val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
val dataValidator: DataValidator? = null,
val micronautProperties: Map<Property, String> = emptyMap(),
namespaceOverride: String? = null,
) {

protected val destinationProcessFactory = DestinationProcessFactory.get(emptyList())
Expand All @@ -117,6 +118,9 @@ abstract class BasicPerformanceTest(
private lateinit var testPrettyName: String

val randomizedNamespace = run {
if (namespaceOverride != null) {
return@run namespaceOverride
}
val randomSuffix = RandomStringUtils.secure().nextAlphabetic(4)
val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")
val timestampString =
Expand Down Expand Up @@ -154,6 +158,43 @@ abstract class BasicPerformanceTest(
)
}

@Test
open fun testRefreshingRecords() {
testRefreshingRecords(validation = null)
}

protected fun testRefreshingRecords(
recordsToInsert: Long? = null,
validation: ValidationFunction?,
) {
runSync(
testScenario =
SingleStreamInsert(
idColumn = idColumn,
columns = twoStringColumns,
recordsToInsert = recordsToInsert ?: defaultRecordsToInsert,
randomizedNamespace = randomizedNamespace,
streamName = testInfo.testMethod.get().name,
generationId = 0,
minGenerationId = 0,
),
validation = validation,
)
runSync(
testScenario =
SingleStreamInsert(
idColumn = idColumn,
columns = twoStringColumns,
recordsToInsert = recordsToInsert ?: defaultRecordsToInsert,
randomizedNamespace = randomizedNamespace,
streamName = testInfo.testMethod.get().name,
generationId = 1,
minGenerationId = 1,
),
validation = validation,
)
}

@Test
open fun testInsertRecordsComplexTypes() {
testInsertRecordsComplexTypes(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class SingleStreamInsert(
duplicateChance: Double = 0.0,
randomizedNamespace: String,
streamName: String,
generationId: Long = 0,
minGenerationId: Long = 0,
) : PerformanceTestScenario {

init {
Expand All @@ -54,8 +56,8 @@ class SingleStreamInsert(
descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName),
importType = importType,
schema = ObjectType(linkedMapOf(*schema.toTypedArray())),
generationId = 0,
minimumGenerationId = 0,
generationId = generationId,
minimumGenerationId = minGenerationId,
syncId = 1,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
type: GSM
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
dockerRepository: airbyte/destination-mssql-v2
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2
githubIssueLabel: destination-mssql-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ const val DROP_TABLE_QUERY = """
const val INSERT_INTO_QUERY =
"""
SET NOCOUNT ON;
INSERT INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (TABLOCK) (?$COLUMNS_KEY)
INSERT INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (ROWLOCK) (?$COLUMNS_KEY)
SELECT table_value.*
FROM (VALUES (?$TEMPLATE_COLUMNS_KEY)) table_value(?$COLUMNS_KEY)
"""

const val MERGE_INTO_QUERY =
"""
SET NOCOUNT ON;
MERGE INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (TABLOCK) AS Target
MERGE INTO [?$SCHEMA_KEY].[?$TABLE_KEY] WITH (ROWLOCK) AS Target
USING (VALUES (?$TEMPLATE_COLUMNS_KEY)) AS Source (?$COLUMNS_KEY)
ON ?$UNIQUENESS_CONSTRAINT_KEY
WHEN MATCHED THEN
Expand All @@ -157,12 +157,15 @@ const val ALTER_TABLE_MODIFY =

const val DELETE_WHERE_COL_IS_NOT_NULL =
"""
DELETE FROM [?].[?]
SET NOCOUNT ON;
DELETE FROM [?].[?] WITH (ROWLOCK)
WHERE [?] is not NULL
"""

const val DELETE_WHERE_COL_LESS_THAN = """
DELETE FROM [?].[?]
const val DELETE_WHERE_COL_LESS_THAN =
"""
SET NOCOUNT ON;
DELETE FROM [?].[?] WITH (ROWLOCK)
WHERE [?] < ?
"""

Expand Down Expand Up @@ -329,7 +332,7 @@ class MSSQLQueryBuilder(
outputSchema,
tableName,
COLUMN_NAME_AB_GENERATION_ID,
minGenerationId.toString()
minGenerationId.toString(),
)
.executeUpdate(connection)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DataSourceFactory {
dataSource.maximumPoolSize = 10
dataSource.minimumIdle = 0
dataSource.idleTimeout = 60000
dataSource.leakDetectionThreshold = dataSource.connectionTimeout + 30000
dataSource.leakDetectionThreshold = 0
return dataSource
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,21 @@ class MSSQLPerformanceTest :
dataValidator = MSSQLDataValidator(),
defaultRecordsToInsert = 10000,
) {

@Test
override fun testInsertRecords() {
testInsertRecords(recordsToInsert = 100000) {}
}

@Test
override fun testRefreshingRecords() {
testRefreshingRecords { perfSummary ->
perfSummary.forEach { streamSummary ->
assertEquals(streamSummary.expectedRecordCount, streamSummary.recordCount)
}
}
}

@Test
override fun testInsertRecordsWithDedup() {
testInsertRecordsWithDedup { perfSummary ->
Expand Down
19 changes: 10 additions & 9 deletions docs/integrations/destinations/mssql-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ This connector is in early access, and SHOULD NOT be used for production workloa
<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------|
| 0.1.6 | 2025-02-06 | [53192](https://github.com/airbytehq/airbyte/pull/53192) | RC4: Fix config, timehandling and performance tweak. |
| 0.1.5 | 2025-02-04 | [53174](https://github.com/airbytehq/airbyte/pull/53174) | RC3: Fix metadata.yaml for publish |
| 0.1.4 | 2025-02-04 | [52704](https://github.com/airbytehq/airbyte/pull/52704) | RC2: Performance improvement |
| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate |
| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image |
| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 |
| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:-----------------------------------------------------|
| 0.1.7 | 2025-02-07 | [53236](https://github.com/airbytehq/airbyte/pull/53236) | RC5: Use rowlock hint. |
| 0.1.6 | 2025-02-06 | [53192](https://github.com/airbytehq/airbyte/pull/53192) | RC4: Fix config, timehandling and performance tweak. |
| 0.1.5 | 2025-02-04 | [53174](https://github.com/airbytehq/airbyte/pull/53174) | RC3: Fix metadata.yaml for publish |
| 0.1.4 | 2025-02-04 | [52704](https://github.com/airbytehq/airbyte/pull/52704) | RC2: Performance improvement |
| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate |
| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image |
| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 |
| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit |

</details>

0 comments on commit 60fb9be

Please sign in to comment.