diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 222f60a5cb5..e3add95cb9d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -64,6 +64,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -111,7 +112,7 @@ public class CheckpointCoordinator { private final Set readyToCloseStartingTask; private final ConcurrentHashMap pendingCheckpoints; - private final ArrayDeque completedCheckpoints; + private final ArrayDeque completedCheckpointIds; private volatile CompletedCheckpoint latestCompletedCheckpoint = null; @@ -165,7 +166,7 @@ public CheckpointCoordinator( this.plan = plan; this.coordinatorConfig = checkpointConfig; this.pendingCheckpoints = new ConcurrentHashMap<>(); - this.completedCheckpoints = + this.completedCheckpointIds = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1); this.scheduler = Executors.newScheduledThreadPool( @@ -178,6 +179,7 @@ public CheckpointCoordinator( "checkpoint-coordinator-%s/%s", pipelineId, jobId)); return thread; }); + ((ScheduledThreadPoolExecutor) this.scheduler).setRemoveOnCancelPolicy(true); this.serializer = new ProtoStuffSerializer(); this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks()); this.pipelineTaskStatus = new ConcurrentHashMap<>(); @@ -406,18 +408,18 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { shutdown)); return; } - if (checkpointType.isFinalCheckpoint() || checkpointType.isSchemaChangeCheckpoint()) { - if (pendingCounter.get() > 0) { - scheduleTriggerPendingCheckpoint(checkpointType, 500L); - return; - } - } if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) { LOG.info("skip trigger generic-checkpoint because schema change in progress"); return; } + if (pendingCounter.get() > 0) { + scheduleTriggerPendingCheckpoint(checkpointType, 500L); + LOG.info("skip trigger checkpoint because there is already a pending checkpoint."); + return; + } + CompletableFuture pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType); startTriggerPendingCheckpoint(pendingCheckpoint); @@ -522,19 +524,24 @@ private void startTriggerPendingCheckpoint( checkpointTimeout = coordinatorConfig.getSchemaChangeCheckpointTimeout(); } // TODO Need change to polling check until max timeout fails - scheduler.schedule( - () -> { - // If any task is not acked within the checkpoint timeout - if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) - != null - && !pendingCheckpoint.isFullyAcknowledged()) { - LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo()); - handleCoordinatorError( - CheckpointCloseReason.CHECKPOINT_EXPIRED, null); - } - }, - checkpointTimeout, - TimeUnit.MILLISECONDS); + pendingCheckpoint.setCheckpointTimeOutFuture( + scheduler.schedule( + () -> { + // If any task is not acked within the checkpoint + // timeout + if (pendingCheckpoints.get( + pendingCheckpoint.getCheckpointId()) + != null + && !pendingCheckpoint.isFullyAcknowledged()) { + LOG.info( + "timeout checkpoint: " + + pendingCheckpoint.getInfo()); + handleCoordinatorError( + CheckpointCloseReason.CHECKPOINT_EXPIRED, null); + } + }, + checkpointTimeout, + TimeUnit.MILLISECONDS)); }); } @@ -695,7 +702,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCompletedTimestamp()); final long checkpointId = completedCheckpoint.getCheckpointId(); - completedCheckpoints.addLast(completedCheckpoint); + completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId())); try { byte[] states = serializer.serialize(completedCheckpoint); checkpointStorage.storeCheckPoint( @@ -705,18 +712,17 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed .pipelineId(pipelineId) .states(states) .build()); - if (completedCheckpoints.size() + if (completedCheckpointIds.size() % coordinatorConfig.getStorage().getMaxRetainedCheckpoints() == 0 - && completedCheckpoints.size() + && completedCheckpointIds.size() / coordinatorConfig.getStorage().getMaxRetainedCheckpoints() > 1) { List needDeleteCheckpointId = new ArrayList<>(); for (int i = 0; i < coordinatorConfig.getStorage().getMaxRetainedCheckpoints(); i++) { - needDeleteCheckpointId.add( - completedCheckpoints.removeFirst().getCheckpointId() + ""); + needDeleteCheckpointId.add(completedCheckpointIds.removeFirst()); } checkpointStorage.deleteCheckpoint( String.valueOf(completedCheckpoint.getJobId()), @@ -734,7 +740,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed completedCheckpoint.getJobId()); latestCompletedCheckpoint = completedCheckpoint; notifyCompleted(completedCheckpoint); - pendingCheckpoints.remove(checkpointId); + pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted(); pendingCounter.decrementAndGet(); if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java index b2a46fee051..1d920963694 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java @@ -27,6 +27,7 @@ import com.beust.jcommander.internal.Nullable; import lombok.Getter; +import lombok.Setter; import java.time.Instant; import java.util.List; @@ -34,6 +35,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; public class PendingCheckpoint implements Checkpoint { private static final Logger LOG = LoggerFactory.getLogger(PendingCheckpoint.class); @@ -57,6 +59,8 @@ public class PendingCheckpoint implements Checkpoint { @Getter private CheckpointException failureCause; + @Setter ScheduledFuture checkpointTimeOutFuture; + public PendingCheckpoint( long jobId, int pipelineId, @@ -174,6 +178,15 @@ public void abortCheckpoint(CheckpointCloseReason closedReason, @Nullable Throwa } } + // Avoid memory leak in ScheduledThreadPoolExecutor due to overly long timeout settings causing + // numerous completed checkpoints to remain + public void abortCheckpointTimeoutFutureWhenIsCompleted() { + if (checkpointTimeOutFuture == null) { + return; + } + checkpointTimeOutFuture.cancel(false); + } + public String getInfo() { return String.format( "%s/%s/%s, %s",