Skip to content

Commit

Permalink
[DBZ-PGYB][yugabyte/yugabyte-db#24555] Add task ID to PostgresPartiti…
Browse files Browse the repository at this point in the history
…on (#163)

## Problem

With the introduction of the parallel snapshot model, we can have
multiple tasks when the snapshot mode is set to `parallel`. This
introduces a problem at the underlying layer when the connector stores
the sourceInfo for its partitions i.e. `PostgresPartition` objects in
Kafka.

The `PostgresPartition` is identified by a map which has a structure
`{"server", topicPrefix}` - currently this is the same for all the
`PostgresPartition` objects which are created by the tasks when
`snapshot.mode` is `parallel` and hence they all end up referring to the
same source partition in the Kafka topic. Subsequently, what happens is
that (assume that we have 2 tasks i.e. 0 and 1):
1. One task (task_0) completes the snapshot while the other is yet to
start.
a. After completion, `task_0` updates the `sourceInfo` saying that its
snapshot is completed.
2. When task_1 starts up, it reads the same `sourceInfo` object and
concludes that the snapshot is completed so it skips its snapshot.

The above situation will cause a data loss since task_1 will never
actually take a snapshot.

## Solution

This PR implements a short term solution where we simply add the task ID
to the partition so that each `PostgresPartition` can identity a
sourcePartition uniquely, the identifying map will now become
`{"server", topicPrefix_taskId}`.

**Note:**
This solution is a quick fix for the problem given that the number of
tasks in the connector remain the same.

This partially fixes yugabyte/yugabyte-db#24555
  • Loading branch information
vaibhav-yb authored Oct 24, 2024
1 parent 2cda9b7 commit 4e55ebd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ public class PostgresPartition extends AbstractPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";

private final String serverName;
private final int taskId;

public PostgresPartition(String serverName, String databaseName) {
public PostgresPartition(String serverName, String databaseName, int taskId) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
}

@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey());
}

@Override
Expand All @@ -54,6 +56,10 @@ public String toString() {
return "PostgresPartition [sourcePartition=" + getSourcePartition() + "]";
}

public String getPartitionIdentificationKey() {
return String.format("%s_%d", serverName, taskId);
}

static class Provider implements Partition.Provider<PostgresPartition> {
private final PostgresConnectorConfig connectorConfig;
private final Configuration taskConfig;
Expand All @@ -66,7 +72,8 @@ static class Provider implements Partition.Provider<PostgresPartition> {
@Override
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name())));
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
connectorConfig.taskId()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest<PostgresPartiti

@Override
protected PostgresPartition createPartition1() {
return new PostgresPartition("server1", "database1");
return new PostgresPartition("server1", "database1", 0);
}

@Override
protected PostgresPartition createPartition2() {
return new PostgresPartition("server2", "database1");
return new PostgresPartition("server2", "database1", 0);
}
}

0 comments on commit 4e55ebd

Please sign in to comment.