diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index e00cfb402e..515b708c0a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -740,19 +740,37 @@ public static SchemaRefreshMode parse(String value) { .withDisplayName("Slot names for parallel consumption") .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple slot names") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field PUBLICATION_NAMES = Field.create("publication.names") .withDisplayName("Publication names for parallel consumption") .withImportance(Importance.LOW) .withDescription("Comma separated values for multiple publication names") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field SLOT_RANGES = Field.create("slot.ranges") .withDisplayName("Ranges on which a slot is supposed to operate") .withImportance(Importance.LOW) .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") - .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + .withValidation((config, field, output) -> { + if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { + output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'"); + return 1; + } + return 0; + }); public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections") .withDisplayName("YB load balance connections") @@ -1580,16 +1598,4 @@ protected static int validateYBHostname(Configuration config, Field field, Field return problemCount; } - - protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) { - String mode = config.getString(STREAMING_MODE); - int problemCount = 0; - - if (!StreamingMode.parse(mode).isParallel()) { - problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode"); - ++problemCount; - } - - return problemCount; - } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index f614984273..835ec35418 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -144,6 +144,13 @@ public List> taskConfigs(int maxTasks) { // YB Note: Only applicable when snapshot mode is not parallel. // this will always have just one task with the given list of properties + + // This is to ensure that whenever the connector runs with a single task model, + // the default task ID is populated as this is not done by debezium-core. + if (props != null) { + props.put(PostgresConnectorConfig.TASK_ID, "0"); + } + return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java index a450a94711..db1b581fed 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java @@ -103,10 +103,22 @@ public void shouldFailIfSlotRangesSpecifiedWithoutParallelStreamingMode() { .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.DEFAULT) .with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536"); - int problemCount = PostgresConnectorConfig.validateUsageWithParallelStreamingMode( - configBuilder.build(), PostgresConnectorConfig.SLOT_RANGES, (field, value, problemMessage) -> System.out.println(problemMessage)); + boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate( + configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage)); - assertThat(problemCount == 1).isTrue(); + assertThat(valid).isFalse(); + } + + @Test + public void ensureNoErrorWhenProperParallelStreamingConfigSpecified() { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL) + .with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536"); + + boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate( + configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage)); + + assertThat(valid).isTrue(); } public void validateCorrectHostname(boolean multiNode) {