From 2cda9b77544ab63f71335df37d6116d945815d45 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Thu, 17 Oct 2024 21:56:22 +0530 Subject: [PATCH] [DBZ-PGYB][yugabyte/yugabyte-db#24200] Execute snapshot in chunks (#161) ## Problem For very large tables, the default `SELECT *` query can take a really long time to complete leading to longer time for snapshots. ## Solution This PR aims to implement snapshotting the table in parallel using an inbuilt method `yb_hash_code` to only run the query for a given hash range. The following 2 configuration properties are introduced with this PR: 1. A new `snapshot.mode` called `parallel` - this will behave exactly like `initial_only` but we will have the ability to launch multiple tasks. 2. `primary.key.hash.columns` - this config takes in a comma separated values of the primary key hash component of the table. > **Note:** When `snapshot.mode` is set to `parallel`, we will not support providing regex in the property `table.include.list` and the user will need to specify the full name of the table in the property. Additionally, we will only allow one table in the `table.include.list` if `snapshot.mode` is `parallel`. --- .../PostgresChangeEventSourceCoordinator.java | 6 +- .../postgresql/PostgresConnectorConfig.java | 36 ++++++++++ .../postgresql/PostgresConnectorTask.java | 2 +- .../postgresql/PostgresTaskContext.java | 12 ++++ .../postgresql/YugabyteDBConnector.java | 72 +++++++++++++++++++ .../snapshot/ParallelSnapshotter.java | 46 ++++++++++++ .../postgresql/PostgresConnectorIT.java | 56 +++++++++++++++ 7 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index 7d25a68fa42..708a6b234fb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -73,7 +73,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps final PostgresPartition partition = previousOffsets.getTheOnlyPartition(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); - previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("snapshot|%s", taskContext.getTaskId()), partition)); SnapshotResult snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset); getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset())); @@ -94,7 +95,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps } } LOGGER.info("Transitioning to streaming"); - previousLogContext.set(taskContext.configureLoggingContext("streaming", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("streaming|%s", taskContext.getTaskId()), partition)); streamEvents(context, partition, snapshotResult.getOffset()); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index f342426dce3..7d70a05260a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -14,6 +14,7 @@ import java.util.regex.Pattern; import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.snapshot.ParallelSnapshotter; import io.debezium.data.Envelope; import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.HeartbeatConnectionProvider; @@ -212,6 +213,11 @@ public enum SnapshotMode implements EnumeratedValue { */ INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()), + /** + * Perform a snapshot using parallel tasks. + */ + PARALLEL("parallel", (c) -> new ParallelSnapshotter()), + /** * Inject a custom snapshotter, which allows for more control over snapshots. */ @@ -983,6 +989,27 @@ public static AutoCreateMode parse(String value, String defaultValue) { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(PostgresSourceInfoStructMaker.class.getName()); + public static final Field TASK_ID = Field.create("task.id") + .withDisplayName("ID of the connector task") + .withType(Type.INT) + .withDefault(0) + .withImportance(Importance.LOW) + .withDescription("Internal use only"); + + public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns") + .withDisplayName("Comma separated primary key fields") + .withType(Type.STRING) + .withImportance(Importance.LOW) + .withDescription("A comma separated value having all the hash components of the primary key") + .withValidation((config, field, output) -> { + if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("parallel") && config.getString(field, "").isEmpty()) { + output.accept(field, "", "primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'"); + return 1; + } + + return 0; + }); + private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; @@ -1108,6 +1135,14 @@ public boolean isFlushLsnOnSource() { return flushLsnOnSource; } + public int taskId() { + return getConfig().getInteger(TASK_ID); + } + + public String primaryKeyHashColumns() { + return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); + } + @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); @@ -1181,6 +1216,7 @@ protected SourceInfoStructMaker getSourceInfoStruc SNAPSHOT_MODE, SNAPSHOT_MODE_CLASS, YB_CONSISTENT_SNAPSHOT, + PRIMARY_KEY_HASH_COLUMNS, HSTORE_HANDLING_MODE, BINARY_HANDLING_MODE, SCHEMA_NAME_ADJUSTMENT_MODE, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 38d961b8b0d..99eaca22b00 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -109,7 +109,7 @@ public ChangeEventSourceCoordinator st final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry); schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter); - this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy); + this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId()); final Offsets previousOffsets = getPreviousOffsets( new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index f9b1f711202..721c408681d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -52,6 +52,18 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch this.schema = schema; } + protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy, int taskId) { + super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet); + + this.config = config; + if (config.xminFetchInterval().toMillis() > 0) { + this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis()); + } + this.topicNamingStrategy = topicNamingStrategy; + assert schema != null; + this.schema = schema; + } + protected TopicNamingStrategy topicNamingStrategy() { return topicNamingStrategy; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 335d73bfb53..2491e98d8c0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -7,6 +7,7 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,10 +62,81 @@ public void start(Map props) { @Override public List> taskConfigs(int maxTasks) { + if (props == null) { + return Collections.emptyList(); + } + + if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + && props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) { + LOGGER.info("Initialising parallel snapshot consumption"); + + final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()); + // Perform basic validations. + validateSingleTableProvidedForParallelSnapshot(tableIncludeList); + + // Publication auto create mode should not be for all tables. + if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + && props.get(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue())) { + throw new DebeziumException("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables, " + + "use publication.autocreate.mode=filtered"); + } + + // Add configuration for select override. + props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), tableIncludeList); + + return getConfigForParallelSnapshotConsumption(maxTasks); + } + + // YB Note: Only applicable when snapshot mode is not parallel. // this will always have just one task with the given list of properties return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } + protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException { + if (tableIncludeList == null) { + throw new DebeziumException("No table provided, provide a table in the table.include.list"); + } else if (tableIncludeList.contains(",")) { + // This might indicate the presence of multiple tables in the include list, we do not want that. + throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time"); + } + } + + protected List> getConfigForParallelSnapshotConsumption(int maxTasks) { + List> taskConfigs = new ArrayList<>(); + + final long upperBoundExclusive = 64 * 1024; + final long rangeSize = upperBoundExclusive / maxTasks; + + for (int i = 0; i < maxTasks; ++i) { + Map taskProps = new HashMap<>(this.props); + + taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i)); + + long lowerBound = i * rangeSize; + long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1); + + LOGGER.info("Using query for task {}: {}", i, getQueryForParallelSnapshotSelect(lowerBound, upperBound)); + + taskProps.put( + PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + getQueryForParallelSnapshotSelect(lowerBound, upperBound) + ); + + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + + protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) { + return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %d", + props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound, + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound); + } + @Override public void stop() { this.props = null; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java new file mode 100644 index 00000000000..e4fdabf23a4 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java @@ -0,0 +1,46 @@ +package io.debezium.connector.postgresql.snapshot; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.spi.OffsetState; +import io.debezium.connector.postgresql.spi.SlotState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Snapshotter class to take snapshot using parallel tasks. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class ParallelSnapshotter extends QueryingSnapshotter { + private final static Logger LOGGER = LoggerFactory.getLogger(ParallelSnapshotter.class); + private OffsetState sourceInfo; + + @Override + public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { + super.init(config, sourceInfo, slotState); + this.sourceInfo = sourceInfo; + + LOGGER.info("Initialised ParallelSnapshotter for task {}", config.taskId()); + } + + @Override + public boolean shouldStream() { + return false; + } + + @Override + public boolean shouldSnapshot() { + if (sourceInfo == null) { + LOGGER.info("Taking parallel snapshot for new datasource"); + return true; + } + else if (sourceInfo.snapshotInEffect()) { + LOGGER.info("Found previous incomplete snapshot"); + return true; + } + else { + LOGGER.info("Previous snapshot completed, no snapshot will be performed"); + return false; + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 0a743c99fd0..1cf09d41eb7 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1116,6 +1116,62 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException { assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404); } + @Test + public void shouldFailIfNoPrimaryKeyHashColumnSpecifiedWithSnapshotModeParallel() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, ""); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + assertThat(message.contains("primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'")).isTrue(); + }); + } + + @Test + public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("parallel snapshot consumption is only supported with one table at a time")).isTrue(); + }); + } + + @Test + public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("No table provided, provide a table in the table.include.list")).isTrue(); + }); + } + + @Test + public void shouldFailIfSnapshotModeParallelHasPublicationAutoCreateModeAllTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES) + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");; + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables")).isTrue(); + }); + } + @Test public void shouldResumeSnapshotIfFailingMidstream() throws Exception { // insert another set of rows so we can stop at certain point