Skip to content

Commit

Permalink
[yugabyte/yugabyte-db#26107] Parellel streaming changes (#172)
Browse files Browse the repository at this point in the history
This PR introduces the changes to stream changes in parallel using
multiple tasks for a table given the user provides the hash_code ranges
for it to stream. The following changes have been introduced in this PR:
1. New configurations:
a. `streaming.mode`: This values takes the input as `default` or
`parallel` which is then used to decide whether or not parallel
streaming mode is supposed to be used.
b. `slot.names`: A list of comma separated values for all the slot names
which should be used by each task.
c. `publication.names`: A list of comma separated values for all the
publication names which should be used by each task.
d. `slot.ranges`: A list of **semi-colon** separated values for slot
ranges in the format `a,b;b,c;c,d`.
2. Validations in the class `YBValidate` have been introduced:
a. To validate that the complete hash range is provided by the user and
nothing is missing.
b. To validate that the number of slot names is equal to the publication
names as well as the number of slot ranges.
c. To ensure that there's only one table provided in the
`table.include.list` as parallel streaming will not work with multiple
tables.
3. Support for snapshot with `streaming.mode` parallel.
a. This will require providing the hash part of the primary key columns
to the configuration parameter `primary.key.hash.columns`.
4. The `PostgresPartition` object will now also use the slot name to
uniquely identify the source partition.

### Usage example

If the connector configuration contains the following properties:

```
{
  ...
  "streaming.mode":"parallel",
  "slot.names":"rs1,rs1",
  "publication.names":"pb1,pb2",
  "slot.ranges":"0,32768;32768,65536"
  ...
}
```

then we will have 2 tasks created:
1. `task 0`: `slot=rs1 publication=pb1 hash_range=0,32768`
2. `task 1`: `slot=rs2 publication=pb2 hash_range=32768,65536`

### Note:
It is currently the user's responsibility to provide full hash ranges
and maintain the order given in the configs for `slot.names`,
`publication.names` and `slot.ranges` as the values will be picked
sequentially and divided into tasks. Thus, in order to ensure that the
task with a slot gets the same hash_range every time, the user needs to
be careful with the order provided.

This closes yugabyte/yugabyte-db#26107.
  • Loading branch information
vaibhav-yb authored Feb 25, 2025
1 parent 615451f commit 13c3b13
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,39 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
}
}

public enum StreamingMode implements EnumeratedValue {
DEFAULT("DEFAULT") {
@Override
public boolean isParallel() {
return false;
}
},
PARALLEL("PARALLEL") {
@Override
public boolean isParallel() {
return true;
}
};


private final String streamingMode;

StreamingMode(String streamingMode) {
this.streamingMode = streamingMode;
}

public static StreamingMode parse(String s) {
return valueOf(s.trim().toUpperCase());
}

@Override
public String getValue() {
return streamingMode;
}

public abstract boolean isParallel();
}

public enum LsnType implements EnumeratedValue {
SEQUENCE("SEQUENCE") {
@Override
Expand Down Expand Up @@ -696,6 +729,31 @@ public static SchemaRefreshMode parse(String value) {
.withDescription("Whether or not to take a consistent snapshot of the tables." +
"Disabling this option may result in duplication of some already snapshot data in the streaming phase.");

public static final Field STREAMING_MODE = Field.create("streaming.mode")
.withDisplayName("Streaming mode")
.withType(Type.STRING)
.withImportance(Importance.LOW)
.withEnum(StreamingMode.class, StreamingMode.DEFAULT)
.withDescription("Streaming mode the connector should follow");

public static final Field SLOT_NAMES = Field.create("slot.names")
.withDisplayName("Slot names for parallel consumption")
.withImportance(Importance.LOW)
.withDescription("Comma separated values for multiple slot names")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);

public static final Field PUBLICATION_NAMES = Field.create("publication.names")
.withDisplayName("Publication names for parallel consumption")
.withImportance(Importance.LOW)
.withDescription("Comma separated values for multiple publication names")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);

public static final Field SLOT_RANGES = Field.create("slot.ranges")
.withDisplayName("Ranges on which a slot is supposed to operate")
.withImportance(Importance.LOW)
.withDescription("Semi-colon separated values for hash ranges to be polled by tasks.")
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);

public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
.withDisplayName("YB load balance connections")
.withType(Type.BOOLEAN)
Expand Down Expand Up @@ -1153,6 +1211,10 @@ public LsnType slotLsnType() {
return LsnType.parse(getConfig().getString(SLOT_LSN_TYPE));
}

public StreamingMode streamingMode() {
return StreamingMode.parse(getConfig().getString(STREAMING_MODE));
}

protected boolean dropSlotOnStop() {
if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
Expand Down Expand Up @@ -1242,6 +1304,18 @@ public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
}

public List<String> getSlotNames() {
return List.of(getConfig().getString(SLOT_NAMES).trim().split(","));
}

public List<String> getPublicationNames() {
return List.of(getConfig().getString(PUBLICATION_NAMES).trim().split(","));
}

public List<String> getSlotRanges() {
return List.of(getConfig().getString(SLOT_RANGES).trim().split(";"));
}

@Override
public byte[] getUnavailableValuePlaceholder() {
String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
Expand Down Expand Up @@ -1326,7 +1400,11 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
UNAVAILABLE_VALUE_PLACEHOLDER,
LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST,
LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST)
LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST,
STREAMING_MODE,
SLOT_NAMES,
PUBLICATION_NAMES,
SLOT_RANGES)
.excluding(INCLUDE_SCHEMA_CHANGES)
.create();

Expand Down Expand Up @@ -1503,5 +1581,15 @@ protected static int validateYBHostname(Configuration config, Field field, Field
return problemCount;
}

protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) {
String mode = config.getString(STREAMING_MODE);
int problemCount = 0;

if (!StreamingMode.parse(mode).isParallel()) {
problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode");
++problemCount;
}

return problemCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
else {
LOGGER.info("Found previous offset {}", previousOffset);
snapshotter.init(connectorConfig, previousOffset.asOffsetState(), slotInfo);

// If previous offset is present that means that the connector is being restarted.
if (snapshotter.shouldSnapshot() && connectorConfig.streamingMode().isParallel()) {
// Drop existing slot so that a new slot can be created.
LOGGER.info("Dropping existing replication slot '{}' since task restarted before snapshot was completed", connectorConfig.slotName());
jdbcConnection.execute(String.format("SELECT * FROM pg_drop_replication_slot('%s')", connectorConfig.slotName()));

// Set slotInfo to null so that slot can be created again.
slotInfo = null;
}
}

// TODO Vaibhav: Read more in https://issues.redhat.com/browse/DBZ-2118
if (connectorConfig.streamingMode().isParallel()) {
try {
jdbcConnection.commit();
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}

SlotCreationResult slotCreatedInfo = null;
Expand All @@ -191,11 +211,14 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
}
}

try {
jdbcConnection.commit();
}
catch (SQLException e) {
throw new DebeziumException(e);
// TODO Vaibhav: Read more in https://issues.redhat.com/browse/DBZ-2118
if (!connectorConfig.streamingMode().isParallel()) {
try {
jdbcConnection.commit();
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}

final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class PostgresPartition extends AbstractPartition implements Partition {

private final String serverName;
private final String taskId;
private final String slotName;

public PostgresPartition(String serverName, String databaseName, String taskId) {
public PostgresPartition(String serverName, String databaseName, String taskId, String slotName) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
this.slotName = slotName;
}

@Override
Expand Down Expand Up @@ -57,7 +59,7 @@ public String toString() {
}

public String getPartitionIdentificationKey() {
return String.format("%s_%s", serverName, taskId);
return String.format("%s_%s_%s", serverName, taskId, slotName);
}

static class Provider implements Partition.Provider<PostgresPartition> {
Expand All @@ -73,7 +75,7 @@ static class Provider implements Partition.Provider<PostgresPartition> {
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
connectorConfig.getTaskId()));
connectorConfig.getTaskId(), connectorConfig.slotName()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.debezium.connector.postgresql;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.transforms.yugabytedb.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Class to store all the validation methods.
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class YBValidate {
private static final Logger LOGGER = LoggerFactory.getLogger(YBValidate.class);
private static final String RANGE_BEGIN = "0";
private static final String RANGE_END = "65536";

public static void completeRangesProvided(List<String> slotRanges) {
List<Pair<Integer, Integer>> pairList = slotRanges.stream()
.map(entry -> {
String[] parts = entry.split(",");
return new Pair<>(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]));
})
.sorted(Comparator.comparing(Pair::getFirst))
.collect(Collectors.toList());

int rangeBegin = Integer.valueOf(RANGE_BEGIN);

for (Pair<Integer, Integer> pair : pairList) {
if (rangeBegin != pair.getFirst()) {
LOGGER.error("Error while validating ranges: {}", pairList);
throw new DebeziumException(
String.format("Tablet range starting from hash_code %d is missing", rangeBegin));
}

rangeBegin = pair.getSecond();
}

// At this point, if the range is complete, rangeBegin will be pointing to the RANGE_END value.
if (rangeBegin != Integer.valueOf(RANGE_END)) {
LOGGER.error("Error while validating ranges: {}", pairList);
throw new DebeziumException(
String.format("Incomplete ranges provided. Range starting from hash_code %d is missing", rangeBegin));
}
}

public static void slotAndPublicationsAreEqual(List<String> slotNames, List<String> publicationNames) {
if (slotNames.size() != publicationNames.size()) {
throw new DebeziumException(
String.format("Number of provided slots does not match the number of provided " +
"publications. Slots: %s, Publications: %s", slotNames, publicationNames));
}
}

public static void slotRangesMatchSlotNames(List<String> slotNames, List<String> slotRanges) {
if (slotNames.size() != slotRanges.size()) {
throw new DebeziumException(
String.format("Number of provided slots does not match the number of provided " +
"slot ranges. Slots: %s, Slot ranges: %s", slotNames, slotRanges));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class YugabyteDBConnector extends RelationalBaseSourceConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBConnector.class);
private Map<String, String> props;
private PostgresConnectorConfig connectorConfig;

public YugabyteDBConnector() {
}
Expand All @@ -58,6 +59,39 @@ public Class<? extends Task> taskClass() {
@Override
public void start(Map<String, String> props) {
this.props = props;
this.connectorConfig = new PostgresConnectorConfig(Configuration.from(props));
}

protected List<Map<String, String>> getTaskConfigsForParallelStreaming(List<String> slotNames,
List<String> publicationNames,
List<String> slotRanges) {
List<Map<String, String>> taskConfigs = new ArrayList<>();

if (connectorConfig.getSnapshotter().shouldSnapshot()) {
props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()));
}

for (int i = 0; i < slotNames.size(); ++i) {
Map<String, String> taskProps = new HashMap<>(this.props);

taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i));
taskProps.put(PostgresConnectorConfig.SLOT_NAME.name(), slotNames.get(i));
taskProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(), publicationNames.get(i));
taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), "hash_range=" + slotRanges.get(i));

if (connectorConfig.getSnapshotter().shouldSnapshot()) {
String[] splitRange = slotRanges.get(i).split(",");
String query = getParallelSnapshotQuery(splitRange[0], splitRange[1]);
taskProps.put(
PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
query
);
}

taskConfigs.add(taskProps);
}

return taskConfigs;
}

@Override
Expand All @@ -66,14 +100,33 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.emptyList();
}

final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name());

if (connectorConfig.streamingMode().isParallel()) {
LOGGER.info("Initialising parallel streaming mode");

// Validate for a single table.
validateSingleTableProvided(tableIncludeList, false /* isSnapshot */);

List<String> slotNames = connectorConfig.getSlotNames();
List<String> publicationNames = connectorConfig.getPublicationNames();
List<String> slotRanges = connectorConfig.getSlotRanges();

YBValidate.slotAndPublicationsAreEqual(slotNames, publicationNames);
YBValidate.slotRangesMatchSlotNames(slotNames, slotRanges);
YBValidate.completeRangesProvided(slotRanges);

return getTaskConfigsForParallelStreaming(slotNames, publicationNames, slotRanges);
}

// TODO Vaibhav (#26106): The following code block is not needed now, remove in a separate PR.
if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name())
&& props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name())
.equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) {
LOGGER.info("Initialising parallel snapshot consumption");

final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name());
// Perform basic validations.
validateSingleTableProvidedForParallelSnapshot(tableIncludeList);
validateSingleTableProvided(tableIncludeList, true);

// Publication auto create mode should not be for all tables.
if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name())
Expand All @@ -94,15 +147,16 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props));
}

protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException {
protected void validateSingleTableProvided(String tableIncludeList, boolean isSnapshot) throws DebeziumException {
if (tableIncludeList == null) {
throw new DebeziumException("No table provided, provide a table in the table.include.list");
} else if (tableIncludeList.contains(",")) {
// This might indicate the presence of multiple tables in the include list, we do not want that.
throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time");
throw new DebeziumException("parallel " + (isSnapshot ? "snapshot" : "streaming") + " consumption is only supported with one table at a time");
}
}

// TODO Vaibhav (#26106): This method needs to be removed.
protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();

Expand Down Expand Up @@ -137,6 +191,14 @@ protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBo
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound);
}

// TODO Vaibhav (#26106): This is a copy of existing method, remove the older method in a separate PR.
protected String getParallelSnapshotQuery(String lowerBound, String upperBound) {
return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %s AND yb_hash_code(%s) < %s",
props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound,
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound);
}

@Override
public void stop() {
this.props = null;
Expand Down
Loading

0 comments on commit 13c3b13

Please sign in to comment.