diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 7ebcdf0d836..5cab2dd0b24 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -40,6 +40,8 @@ public class SeaTunnelSourceCollector implements Collector { private final Meter sourceReceivedQPS; + private volatile long rowCountThisPollNext; + public SeaTunnelSourceCollector( Object checkpointLock, List>> outputs, @@ -54,6 +56,7 @@ public SeaTunnelSourceCollector( public void collect(T row) { try { sendRecordToNext(new Record<>(row)); + rowCountThisPollNext++; sourceReceivedCount.inc(); sourceReceivedQPS.markEvent(); } catch (IOException e) { @@ -66,6 +69,14 @@ public Object getCheckpointLock() { return checkpointLock; } + public long getRowCountThisPollNext() { + return this.rowCountThisPollNext; + } + + public void resetRowCountThisPollNext() { + this.rowCountThisPollNext = 0; + } + public void sendRecordToNext(Record record) throws IOException { synchronized (checkpointLock) { for (OneInputFlowLifeCycle> output : outputs) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index 16adec49e00..8430f689833 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -133,6 +133,11 @@ public void close() throws IOException { public void collect() throws Exception { if (!prepareClose) { reader.pollNext(collector); + if (collector.getRowCountThisPollNext() == 0) { + Thread.sleep(100); + } else { + collector.resetRowCountThisPollNext(); + } } }