diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java index ce2373bc622..a1e6729bff9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java @@ -32,12 +32,17 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; /** Assigner for snapshot split. */ public class SnapshotSplitAssigner implements SplitAssigner { @@ -47,12 +52,12 @@ public class SnapshotSplitAssigner implements SplitAssig private final C sourceConfig; private final List alreadyProcessedTables; - private final List remainingSplits; + private final Queue remainingSplits; private final Map assignedSplits; private final Map splitCompletedOffsets; private boolean assignerCompleted; private final int currentParallelism; - private final LinkedList remainingTables; + private final Deque remainingTables; private final boolean isRemainingTablesCheckpointed; private ChunkSplitter chunkSplitter; @@ -115,12 +120,12 @@ private SnapshotSplitAssigner( this.context = context; this.sourceConfig = context.getSourceConfig(); this.currentParallelism = currentParallelism; - this.alreadyProcessedTables = alreadyProcessedTables; - this.remainingSplits = remainingSplits; - this.assignedSplits = assignedSplits; - this.splitCompletedOffsets = splitCompletedOffsets; + this.alreadyProcessedTables = Collections.synchronizedList(alreadyProcessedTables); + this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits); + this.assignedSplits = new ConcurrentHashMap<>(assignedSplits); + this.splitCompletedOffsets = new ConcurrentHashMap<>(splitCompletedOffsets); this.assignerCompleted = assignerCompleted; - this.remainingTables = new LinkedList<>(remainingTables); + this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables); this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.dialect = dialect; @@ -211,11 +216,15 @@ public SnapshotPhaseState snapshotState(long checkpointId) { SnapshotPhaseState state = new SnapshotPhaseState( alreadyProcessedTables, - remainingSplits, + remainingSplits.isEmpty() + ? Collections.emptyList() + : new ArrayList<>(remainingSplits), assignedSplits, splitCompletedOffsets, assignerCompleted, - remainingTables, + remainingTables.isEmpty() + ? Collections.emptyList() + : new ArrayList<>(remainingTables), isTableIdCaseSensitive, true); // we need a complete checkpoint before mark this assigner to be completed, to wait for all