-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DBZ-PGYB][yugabyte/yugabyte-db#24200] Execute snapshot in chunks #161
Conversation
...stgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java
Outdated
Show resolved
Hide resolved
debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java
Outdated
Show resolved
Hide resolved
...nnector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
Also could you add some unit tests for this:
|
In the run with custom image it was observed that the task-id was not getting printed in the snapshot logs. Can it be added to those logs or will it require larger effort? |
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
...m-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) { | ||
return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if a user only has range partitioned primary key and the parallel snapshot mode is deployed. What if the primary key is a composite like (id HASH, v1 ASC)? Will this still work or should we add another validation that only hash partitioned columns should be provided in primary.keys
field?
.withDisplayName("Comma separated primary key fields") | ||
.withType(Type.STRING) | ||
.withImportance(Importance.LOW) | ||
.withDescription("A comma separated value having all the primary key components") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update this description to say HASH components of the primary key?
// Perform basic validations. | ||
validateSingleTableProvidedForParallelSnapshot(tableIncludeList); | ||
|
||
// Publication auto create mode should not be for all tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also move this into a separate function - validatePublicationForParallelSnapshot
taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i)); | ||
|
||
long lowerBound = i * rangeSize; | ||
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can add a comment explaining this special handling for the last task.
@@ -1116,6 +1116,46 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException { | |||
assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for primary.keys config?
Problem
For very large tables, the default
SELECT *
query can take a really long time to complete leading to longer time for snapshots.Solution
This PR aims to implement snapshotting the table in parallel using an inbuilt method
yb_hash_code
to only run the query for a given hash range. The following 2 configuration properties are introduced with this PR:snapshot.mode
calledparallel
- this will behave exactly likeinitial_only
but we will have the ability to launch multiple tasks.primary.key.hash.columns
- this config takes in a comma separated values of the primary key hash component of the table.