From aed17cf43f9ddcca08b54864d7d0490dfabc2455 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 27 Feb 2025 13:38:18 +0530 Subject: [PATCH 1/4] fix validator for newly added configs --- .../postgresql/PostgresConnectorConfig.java | 36 +++++++++++-------- .../postgresql/YugabyteDBConnector.java | 7 ++++ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index e00cfb402e..c8affe0499 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -740,19 +740,37 @@ public static SchemaRefreshMode parse(String value) { .withDisplayName("Slot names for parallel consumption") .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple slot names") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field PUBLICATION_NAMES = Field.create("publication.names") .withDisplayName("Publication names for parallel consumption") .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple publication names") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field SLOT_RANGES = Field.create("slot.ranges") .withDisplayName("Ranges on which a slot is supposed to operate") .withImportance(Importance.LOW) .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections") .withDisplayName("YB load balance connections") @@ -1580,16 +1598,4 @@ protected static int validateYBHostname(Configuration config, Field field, Field return problemCount; } - - protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) { - String mode = config.getString(STREAMING_MODE); - int problemCount = 0; - - if (!StreamingMode.parse(mode).isParallel()) { - problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode"); - ++problemCount; - } - - return problemCount; - } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index f614984273..835ec35418 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -144,6 +144,13 @@ public List> taskConfigs(int maxTasks) { // YB Note: Only applicable when snapshot mode is not parallel. // this will always have just one task with the given list of properties + + // This is to ensure that whenever the connector runs with a single task model, + // the default task ID is populated as this is not done by debezium-core. + if (props != null) { + props.put(PostgresConnectorConfig.TASK_ID, "0"); + } + return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } From b1fc4b5e1b075d6a6db33102a178d5be67ef67a1 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 27 Feb 2025 13:57:21 +0530 Subject: [PATCH 2/4] added test --- .../postgresql/PostgresConnectorConfig.java | 6 +++--- .../PostgresConnectorConfigDefTest.java | 19 ++++++++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index c8affe0499..fb1c14fd53 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -741,7 +741,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple slot names") .withValidation((config, field, output) -> { - if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'"); return 1; } @@ -753,7 +753,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple publication names") .withValidation((config, field, output) -> { - if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'"); return 1; } @@ -765,7 +765,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") .withValidation((config, field, output) -> { - if (config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'"); return 1; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java index a450a94711..69149214ab 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java @@ -7,6 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.debezium.config.Field; import org.junit.Test; import io.debezium.config.ConfigDefinitionMetadataTest; @@ -103,10 +104,22 @@ public void shouldFailIfSlotRangesSpecifiedWithoutParallelStreamingMode() { .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.DEFAULT) .with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536"); - int problemCount = PostgresConnectorConfig.validateUsageWithParallelStreamingMode( - configBuilder.build(), PostgresConnectorConfig.SLOT_RANGES, (field, value, problemMessage) -> System.out.println(problemMessage)); + boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate( + configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage)); - assertThat(problemCount == 1).isTrue(); + assertThat(valid).isFalse(); + } + + @Test + public void ensureNoErrorWhenProperParallelStreamingConfigSpecified() { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL) + .with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536"); + + boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate( + configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage)); + + assertThat(valid).isTrue(); } public void validateCorrectHostname(boolean multiNode) { From 0ca22e66268fde89d8666655ba088db90b6c50d7 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 27 Feb 2025 13:58:29 +0530 Subject: [PATCH 3/4] removed unused import --- .../connector/postgresql/PostgresConnectorConfigDefTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java index 69149214ab..db1b581fed 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java @@ -7,7 +7,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.debezium.config.Field; import org.junit.Test; import io.debezium.config.ConfigDefinitionMetadataTest; From b0454cc84008f18695b286c835876b8d40c6ef7c Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 27 Feb 2025 14:34:15 +0530 Subject: [PATCH 4/4] format changes --- .../connector/postgresql/PostgresConnectorConfig.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index fb1c14fd53..515b708c0a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -741,7 +741,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple slot names") .withValidation((config, field, output) -> { - if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'"); return 1; } @@ -753,7 +753,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple publication names") .withValidation((config, field, output) -> { - if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'"); return 1; } @@ -765,7 +765,7 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") .withValidation((config, field, output) -> { - if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'"); return 1; }