Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC: Fix validation logic for configs related to parallel streaming #173

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -740,19 +740,37 @@ public static SchemaRefreshMode parse(String value) {
.withDisplayName("Slot names for parallel consumption")
.withImportance(Importance.LOW)
.withDescription("Comma separated values for multiple slot names")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
.withValidation((config, field, output) -> {
if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'");
return 1;
}
return 0;
});

public static final Field PUBLICATION_NAMES = Field.create("publication.names")
.withDisplayName("Publication names for parallel consumption")
.withImportance(Importance.LOW)
.withDescription("Comma separated values for multiple publication names")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
.withValidation((config, field, output) -> {
if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'");
return 1;
}
return 0;
});

public static final Field SLOT_RANGES = Field.create("slot.ranges")
.withDisplayName("Ranges on which a slot is supposed to operate")
.withImportance(Importance.LOW)
.withDescription("Semi-colon separated values for hash ranges to be polled by tasks.")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
.withValidation((config, field, output) -> {
if (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'");
return 1;
}
return 0;
});

public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
.withDisplayName("YB load balance connections")
Expand Down Expand Up @@ -1580,16 +1598,4 @@ protected static int validateYBHostname(Configuration config, Field field, Field

return problemCount;
}

protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) {
String mode = config.getString(STREAMING_MODE);
int problemCount = 0;

if (!StreamingMode.parse(mode).isParallel()) {
problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode");
++problemCount;
}

return problemCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

// YB Note: Only applicable when snapshot mode is not parallel.
// this will always have just one task with the given list of properties

// This is to ensure that whenever the connector runs with a single task model,
// the default task ID is populated as this is not done by debezium-core.
if (props != null) {
props.put(PostgresConnectorConfig.TASK_ID, "0");
}

return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.debezium.config.Field;
import org.junit.Test;

import io.debezium.config.ConfigDefinitionMetadataTest;
Expand Down Expand Up @@ -103,10 +104,22 @@ public void shouldFailIfSlotRangesSpecifiedWithoutParallelStreamingMode() {
.with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.DEFAULT)
.with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536");

int problemCount = PostgresConnectorConfig.validateUsageWithParallelStreamingMode(
configBuilder.build(), PostgresConnectorConfig.SLOT_RANGES, (field, value, problemMessage) -> System.out.println(problemMessage));
boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate(
configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat(problemCount == 1).isTrue();
assertThat(valid).isFalse();
}

@Test
public void ensureNoErrorWhenProperParallelStreamingConfigSpecified() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL)
.with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536");

boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate(
configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage));

assertThat(valid).isTrue();
}

public void validateCorrectHostname(boolean multiNode) {
Expand Down