From 2748375562d760a443f68774c026320721c554b0 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 31 Jan 2023 11:00:51 +0800 Subject: [PATCH] [Hotfix][Connector] Fix ConcurrentModificationException when snapshotState based on SourceReaderBase --- .../seatunnel/common/source/reader/SourceReaderBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java index 23dff321f8f..3f856e46577 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java @@ -36,6 +36,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * An abstract implementation of {@link SourceReader} which provides some synchronization between @@ -55,7 +57,7 @@ public abstract class SourceReaderBase implements SourceReader { private final BlockingQueue> elementsQueue; - private final Map> splitStates; + private final ConcurrentMap> splitStates; protected final RecordEmitter recordEmitter; protected final SplitFetcherManager splitFetcherManager; protected final SourceReaderOptions options; @@ -74,7 +76,7 @@ public SourceReaderBase(BlockingQueue> elementsQueue, this.elementsQueue = elementsQueue; this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; - this.splitStates = new HashMap<>(); + this.splitStates = new ConcurrentHashMap<>(); this.options = options; this.context = context; }