diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 56227314c2..e00cfb402e 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -381,6 +381,39 @@ public static SecureConnectionMode parse(String value, String defaultValue) { } } + public enum StreamingMode implements EnumeratedValue { + DEFAULT("DEFAULT") { + @Override + public boolean isParallel() { + return false; + } + }, + PARALLEL("PARALLEL") { + @Override + public boolean isParallel() { + return true; + } + }; + + + private final String streamingMode; + + StreamingMode(String streamingMode) { + this.streamingMode = streamingMode; + } + + public static StreamingMode parse(String s) { + return valueOf(s.trim().toUpperCase()); + } + + @Override + public String getValue() { + return streamingMode; + } + + public abstract boolean isParallel(); + } + public enum LsnType implements EnumeratedValue { SEQUENCE("SEQUENCE") { @Override @@ -696,6 +729,31 @@ public static SchemaRefreshMode parse(String value) { .withDescription("Whether or not to take a consistent snapshot of the tables." + "Disabling this option may result in duplication of some already snapshot data in the streaming phase."); + public static final Field STREAMING_MODE = Field.create("streaming.mode") + .withDisplayName("Streaming mode") + .withType(Type.STRING) + .withImportance(Importance.LOW) + .withEnum(StreamingMode.class, StreamingMode.DEFAULT) + .withDescription("Streaming mode the connector should follow"); + + public static final Field SLOT_NAMES = Field.create("slot.names") + .withDisplayName("Slot names for parallel consumption") + .withImportance(Importance.LOW) + .withDescription("Comma separated values for multiple slot names") + .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + + public static final Field PUBLICATION_NAMES = Field.create("publication.names") + .withDisplayName("Publication names for parallel consumption") + .withImportance(Importance.LOW) + .withDescription("Comma separated values for multiple publication names") + .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + + public static final Field SLOT_RANGES = Field.create("slot.ranges") + .withDisplayName("Ranges on which a slot is supposed to operate") + .withImportance(Importance.LOW) + .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") + .withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode); + public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections") .withDisplayName("YB load balance connections") .withType(Type.BOOLEAN) @@ -1153,6 +1211,10 @@ public LsnType slotLsnType() { return LsnType.parse(getConfig().getString(SLOT_LSN_TYPE)); } + public StreamingMode streamingMode() { + return StreamingMode.parse(getConfig().getString(STREAMING_MODE)); + } + protected boolean dropSlotOnStop() { if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) { return getConfig().getBoolean(DROP_SLOT_ON_STOP); @@ -1242,6 +1304,18 @@ public String primaryKeyHashColumns() { return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); } + public List getSlotNames() { + return List.of(getConfig().getString(SLOT_NAMES).trim().split(",")); + } + + public List getPublicationNames() { + return List.of(getConfig().getString(PUBLICATION_NAMES).trim().split(",")); + } + + public List getSlotRanges() { + return List.of(getConfig().getString(SLOT_RANGES).trim().split(";")); + } + @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); @@ -1326,7 +1400,11 @@ protected SourceInfoStructMaker getSourceInfoStruc INCREMENTAL_SNAPSHOT_CHUNK_SIZE, UNAVAILABLE_VALUE_PLACEHOLDER, LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST, - LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST) + LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST, + STREAMING_MODE, + SLOT_NAMES, + PUBLICATION_NAMES, + SLOT_RANGES) .excluding(INCLUDE_SCHEMA_CHANGES) .create(); @@ -1503,5 +1581,15 @@ protected static int validateYBHostname(Configuration config, Field field, Field return problemCount; } + protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) { + String mode = config.getString(STREAMING_MODE); + int problemCount = 0; + + if (!StreamingMode.parse(mode).isParallel()) { + problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode"); + ++problemCount; + } + return problemCount; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 4d028fa65b..56888d3e71 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -165,6 +165,26 @@ public ChangeEventSourceCoordinator st else { LOGGER.info("Found previous offset {}", previousOffset); snapshotter.init(connectorConfig, previousOffset.asOffsetState(), slotInfo); + + // If previous offset is present that means that the connector is being restarted. + if (snapshotter.shouldSnapshot() && connectorConfig.streamingMode().isParallel()) { + // Drop existing slot so that a new slot can be created. + LOGGER.info("Dropping existing replication slot '{}' since task restarted before snapshot was completed", connectorConfig.slotName()); + jdbcConnection.execute(String.format("SELECT * FROM pg_drop_replication_slot('%s')", connectorConfig.slotName())); + + // Set slotInfo to null so that slot can be created again. + slotInfo = null; + } + } + + // TODO Vaibhav: Read more in https://issues.redhat.com/browse/DBZ-2118 + if (connectorConfig.streamingMode().isParallel()) { + try { + jdbcConnection.commit(); + } + catch (SQLException e) { + throw new DebeziumException(e); + } } SlotCreationResult slotCreatedInfo = null; @@ -191,11 +211,14 @@ public ChangeEventSourceCoordinator st } } - try { - jdbcConnection.commit(); - } - catch (SQLException e) { - throw new DebeziumException(e); + // TODO Vaibhav: Read more in https://issues.redhat.com/browse/DBZ-2118 + if (!connectorConfig.streamingMode().isParallel()) { + try { + jdbcConnection.commit(); + } + catch (SQLException e) { + throw new DebeziumException(e); + } } final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 9b9fd39b23..0166ba7247 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -22,11 +22,13 @@ public class PostgresPartition extends AbstractPartition implements Partition { private final String serverName; private final String taskId; + private final String slotName; - public PostgresPartition(String serverName, String databaseName, String taskId) { + public PostgresPartition(String serverName, String databaseName, String taskId, String slotName) { super(databaseName); this.serverName = serverName; this.taskId = taskId; + this.slotName = slotName; } @Override @@ -57,7 +59,7 @@ public String toString() { } public String getPartitionIdentificationKey() { - return String.format("%s_%s", serverName, taskId); + return String.format("%s_%s_%s", serverName, taskId, slotName); } static class Provider implements Partition.Provider { @@ -73,7 +75,7 @@ static class Provider implements Partition.Provider { public Set getPartitions() { return Collections.singleton(new PostgresPartition( connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), - connectorConfig.getTaskId())); + connectorConfig.getTaskId(), connectorConfig.slotName())); } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YBValidate.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YBValidate.java new file mode 100644 index 0000000000..0a8b459b39 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YBValidate.java @@ -0,0 +1,66 @@ +package io.debezium.connector.postgresql; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.transforms.yugabytedb.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Class to store all the validation methods. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class YBValidate { + private static final Logger LOGGER = LoggerFactory.getLogger(YBValidate.class); + private static final String RANGE_BEGIN = "0"; + private static final String RANGE_END = "65536"; + + public static void completeRangesProvided(List slotRanges) { + List> pairList = slotRanges.stream() + .map(entry -> { + String[] parts = entry.split(","); + return new Pair<>(Integer.valueOf(parts[0]), Integer.valueOf(parts[1])); + }) + .sorted(Comparator.comparing(Pair::getFirst)) + .collect(Collectors.toList()); + + int rangeBegin = Integer.valueOf(RANGE_BEGIN); + + for (Pair pair : pairList) { + if (rangeBegin != pair.getFirst()) { + LOGGER.error("Error while validating ranges: {}", pairList); + throw new DebeziumException( + String.format("Tablet range starting from hash_code %d is missing", rangeBegin)); + } + + rangeBegin = pair.getSecond(); + } + + // At this point, if the range is complete, rangeBegin will be pointing to the RANGE_END value. + if (rangeBegin != Integer.valueOf(RANGE_END)) { + LOGGER.error("Error while validating ranges: {}", pairList); + throw new DebeziumException( + String.format("Incomplete ranges provided. Range starting from hash_code %d is missing", rangeBegin)); + } + } + + public static void slotAndPublicationsAreEqual(List slotNames, List publicationNames) { + if (slotNames.size() != publicationNames.size()) { + throw new DebeziumException( + String.format("Number of provided slots does not match the number of provided " + + "publications. Slots: %s, Publications: %s", slotNames, publicationNames)); + } + } + + public static void slotRangesMatchSlotNames(List slotNames, List slotRanges) { + if (slotNames.size() != slotRanges.size()) { + throw new DebeziumException( + String.format("Number of provided slots does not match the number of provided " + + "slot ranges. Slots: %s, Slot ranges: %s", slotNames, slotRanges)); + } + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 35f2672129..f614984273 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -41,6 +41,7 @@ public class YugabyteDBConnector extends RelationalBaseSourceConnector { private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBConnector.class); private Map props; + private PostgresConnectorConfig connectorConfig; public YugabyteDBConnector() { } @@ -58,6 +59,39 @@ public Class taskClass() { @Override public void start(Map props) { this.props = props; + this.connectorConfig = new PostgresConnectorConfig(Configuration.from(props)); + } + + protected List> getTaskConfigsForParallelStreaming(List slotNames, + List publicationNames, + List slotRanges) { + List> taskConfigs = new ArrayList<>(); + + if (connectorConfig.getSnapshotter().shouldSnapshot()) { + props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name())); + } + + for (int i = 0; i < slotNames.size(); ++i) { + Map taskProps = new HashMap<>(this.props); + + taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); + taskProps.put(PostgresConnectorConfig.SLOT_NAME.name(), slotNames.get(i)); + taskProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(), publicationNames.get(i)); + taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), "hash_range=" + slotRanges.get(i)); + + if (connectorConfig.getSnapshotter().shouldSnapshot()) { + String[] splitRange = slotRanges.get(i).split(","); + String query = getParallelSnapshotQuery(splitRange[0], splitRange[1]); + taskProps.put( + PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + query + ); + } + + taskConfigs.add(taskProps); + } + + return taskConfigs; } @Override @@ -66,14 +100,33 @@ public List> taskConfigs(int maxTasks) { return Collections.emptyList(); } + final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()); + + if (connectorConfig.streamingMode().isParallel()) { + LOGGER.info("Initialising parallel streaming mode"); + + // Validate for a single table. + validateSingleTableProvided(tableIncludeList, false /* isSnapshot */); + + List slotNames = connectorConfig.getSlotNames(); + List publicationNames = connectorConfig.getPublicationNames(); + List slotRanges = connectorConfig.getSlotRanges(); + + YBValidate.slotAndPublicationsAreEqual(slotNames, publicationNames); + YBValidate.slotRangesMatchSlotNames(slotNames, slotRanges); + YBValidate.completeRangesProvided(slotRanges); + + return getTaskConfigsForParallelStreaming(slotNames, publicationNames, slotRanges); + } + + // TODO Vaibhav (#26106): The following code block is not needed now, remove in a separate PR. if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name()) && props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name()) .equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) { LOGGER.info("Initialising parallel snapshot consumption"); - final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()); // Perform basic validations. - validateSingleTableProvidedForParallelSnapshot(tableIncludeList); + validateSingleTableProvided(tableIncludeList, true); // Publication auto create mode should not be for all tables. if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) @@ -94,15 +147,16 @@ public List> taskConfigs(int maxTasks) { return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } - protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException { + protected void validateSingleTableProvided(String tableIncludeList, boolean isSnapshot) throws DebeziumException { if (tableIncludeList == null) { throw new DebeziumException("No table provided, provide a table in the table.include.list"); } else if (tableIncludeList.contains(",")) { // This might indicate the presence of multiple tables in the include list, we do not want that. - throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time"); + throw new DebeziumException("parallel " + (isSnapshot ? "snapshot" : "streaming") + " consumption is only supported with one table at a time"); } } + // TODO Vaibhav (#26106): This method needs to be removed. protected List> getConfigForParallelSnapshotConsumption(int maxTasks) { List> taskConfigs = new ArrayList<>(); @@ -137,6 +191,14 @@ protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBo props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound); } + // TODO Vaibhav (#26106): This is a copy of existing method, remove the older method in a separate PR. + protected String getParallelSnapshotQuery(String lowerBound, String upperBound) { + return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %s AND yb_hash_code(%s) < %s", + props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound, + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound); + } + @Override public void stop() { this.props = null; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 132675fb88..8bd3d0b609 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -65,6 +65,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private final String slotName; private final PostgresConnectorConfig.LsnType lsnType; + private final PostgresConnectorConfig.StreamingMode streamingMode; private final String publicationName; private final RelationalTableFilters tableFilter; private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; @@ -116,6 +117,7 @@ private PostgresReplicationConnection(PostgresConnectorConfig config, this.connectorConfig = config; this.slotName = slotName; this.lsnType = config.slotLsnType(); + this.streamingMode = config.streamingMode(); this.publicationName = publicationName; this.tableFilter = tableFilter; this.publicationAutocreateMode = publicationAutocreateMode; @@ -525,11 +527,14 @@ public Optional createReplicationSlot() throws SQLException try (Statement stmt = pgConnection().createStatement()) { String createCommand = String.format( - "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s", + "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s %s", slotName, tempPart, plugin.getPostgresPluginName(), - lsnType.getLsnTypeName()); + lsnType.getLsnTypeName(), + streamingMode.isParallel() ? "USE_SNAPSHOT" : ""); + LOGGER.info("executing: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"); + stmt.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"); LOGGER.info("Creating replication slot with command {}", createCommand); stmt.execute(createCommand); // when we are in Postgres 9.4+, we can parse the slot creation info, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/Pair.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/Pair.java index 8de3f6ceca..7ac710f37c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/Pair.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/yugabytedb/Pair.java @@ -48,6 +48,11 @@ public boolean equals(Object o) { } } + @Override + public String toString() { + return String.format("%s,%s", first.toString(), second.toString()); + } + public int hashCode() { return Objects.hashCode(new Object[]{this.first, this.second}); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java index c4e9453fc1..a450a94711 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java @@ -97,6 +97,18 @@ public void shouldFailIfInvalidMultiHostFormatSpecifiedWithInvalidCharacter() { assertThat((problemCount == 2)).isTrue(); } + @Test + public void shouldFailIfSlotRangesSpecifiedWithoutParallelStreamingMode() { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.DEFAULT) + .with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536"); + + int problemCount = PostgresConnectorConfig.validateUsageWithParallelStreamingMode( + configBuilder.build(), PostgresConnectorConfig.SLOT_RANGES, (field, value, problemMessage) -> System.out.println(problemMessage)); + + assertThat(problemCount == 1).isTrue(); + } + public void validateCorrectHostname(boolean multiNode) { Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.HOSTNAME, multiNode ? "127.0.0.1:5433,127.0.0.2:5433,127.0.0.3:5433" : "127.0.0.1"); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index f66797152a..6969bf56b2 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest slots = List.of("a", "b"); + List publications = List.of("pub"); + + try { + YBValidate.slotAndPublicationsAreEqual(slots, publications); + } catch (DebeziumException ex) { + assertTrue(ex.getMessage().contains("Number of provided slots does not match the number of provided publications")); + } + } + + @Test + public void shouldThrowExceptionWhenSlotsAndSlotRangesDoNotMatch() { + List slots = List.of("a", "b", "c"); + List slotRanges = List.of("0,10", "10,1000"); + + try { + YBValidate.slotRangesMatchSlotNames(slots, slotRanges); + } catch (DebeziumException ex) { + assertTrue(ex.getMessage().contains("Number of provided slots does not match the number of provided slot ranges")); + } + } + + @Test + public void shouldThrowExceptionWhenEndBoundaryIsMissing() { + List slotRanges = List.of("0,10", "10,1000"); + + try { + YBValidate.completeRangesProvided(slotRanges); + } catch (DebeziumException ex) { + assertTrue(ex.getMessage().contains("Incomplete ranges provided")); + } + } + + @Test + public void shouldThrowExceptionWhenMidRangeIsMissing() { + List slotRanges = List.of("0,6553", "13107,19660", "19660,26214", "26214,32768", "32768,39321", "39321,45875", "45875,52428", "52428,58982", "58982,65536"); + + try { + YBValidate.completeRangesProvided(slotRanges); + } catch (DebeziumException ex) { + assertTrue(ex.getMessage().contains("Tablet range starting from hash_code")); + } + } +}