Skip to content

Commit

Permalink
[Hotfix][Connector-V2] Fix ConcurrentModificationException when snaps…
Browse files Browse the repository at this point in the history
…hotState based on SourceReaderBase (#4011)
  • Loading branch information
hailin0 authored Feb 1, 2023
1 parent 79b5cdd commit cd2bd6a
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* An abstract implementation of {@link SourceReader} which provides some synchronization between
Expand All @@ -55,7 +57,7 @@
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final Map<String, SplitContext<T, SplitStateT>> splitStates;
private final ConcurrentMap<String, SplitContext<T, SplitStateT>> splitStates;
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
protected final SourceReaderOptions options;
Expand All @@ -74,7 +76,7 @@ public SourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
this.elementsQueue = elementsQueue;
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.splitStates = new ConcurrentHashMap<>();
this.options = options;
this.context = context;
}
Expand Down

0 comments on commit cd2bd6a

Please sign in to comment.