From 8272a9827d746acf99633bd99a1b1151827f6c36 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 21 Sep 2023 11:33:46 +0800 Subject: [PATCH 1/4] [bugfix][zeta] Fix the issue of generating a large number of pending checkpoints --- .../server/checkpoint/CheckpointCoordinator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 222f60a5cb5..66ed12aca41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -406,18 +406,18 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { shutdown)); return; } - if (checkpointType.isFinalCheckpoint() || checkpointType.isSchemaChangeCheckpoint()) { - if (pendingCounter.get() > 0) { - scheduleTriggerPendingCheckpoint(checkpointType, 500L); - return; - } - } if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) { LOG.info("skip trigger generic-checkpoint because schema change in progress"); return; } + if (pendingCounter.get() > 0) { + scheduleTriggerPendingCheckpoint(checkpointType, 500L); + LOG.info("skip trigger checkpoint because there is already a pending checkpoint."); + return; + } + CompletableFuture pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType); startTriggerPendingCheckpoint(pendingCheckpoint); From b5e12c703801843d630680aef4e1c47ce9568f23 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 21 Sep 2023 19:19:52 +0800 Subject: [PATCH 2/4] [bugfix][zeta] Avoid memory leak in ScheduledThreadPoolExecutor due to overly long timeout settings causing numerous completed checkpoints to remain --- .../checkpoint/CheckpointCoordinator.java | 41 ++++++++++++------- .../server/checkpoint/PendingCheckpoint.java | 13 ++++++ 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 66ed12aca41..62568b046a9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -64,6 +64,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -178,6 +179,7 @@ public CheckpointCoordinator( "checkpoint-coordinator-%s/%s", pipelineId, jobId)); return thread; }); + ((ScheduledThreadPoolExecutor) this.scheduler).setRemoveOnCancelPolicy(true); this.serializer = new ProtoStuffSerializer(); this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks()); this.pipelineTaskStatus = new ConcurrentHashMap<>(); @@ -522,19 +524,30 @@ private void startTriggerPendingCheckpoint( checkpointTimeout = coordinatorConfig.getSchemaChangeCheckpointTimeout(); } // TODO Need change to polling check until max timeout fails - scheduler.schedule( - () -> { - // If any task is not acked within the checkpoint timeout - if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) - != null - && !pendingCheckpoint.isFullyAcknowledged()) { - LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo()); - handleCoordinatorError( - CheckpointCloseReason.CHECKPOINT_EXPIRED, null); - } - }, - checkpointTimeout, - TimeUnit.MILLISECONDS); + pendingCheckpoints + .get(pendingCheckpoint.getCheckpointId()) + .setCheckpointTimeOutFuture( + scheduler.schedule( + () -> { + // If any task is not acked within the checkpoint + // timeout + if (pendingCheckpoints.get( + pendingCheckpoint + .getCheckpointId()) + != null + && !pendingCheckpoint + .isFullyAcknowledged()) { + LOG.info( + "timeout checkpoint: " + + pendingCheckpoint.getInfo()); + handleCoordinatorError( + CheckpointCloseReason + .CHECKPOINT_EXPIRED, + null); + } + }, + checkpointTimeout, + TimeUnit.MILLISECONDS)); }); } @@ -734,7 +747,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed completedCheckpoint.getJobId()); latestCompletedCheckpoint = completedCheckpoint; notifyCompleted(completedCheckpoint); - pendingCheckpoints.remove(checkpointId); + pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted(); pendingCounter.decrementAndGet(); if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java index b2a46fee051..1d920963694 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java @@ -27,6 +27,7 @@ import com.beust.jcommander.internal.Nullable; import lombok.Getter; +import lombok.Setter; import java.time.Instant; import java.util.List; @@ -34,6 +35,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; public class PendingCheckpoint implements Checkpoint { private static final Logger LOG = LoggerFactory.getLogger(PendingCheckpoint.class); @@ -57,6 +59,8 @@ public class PendingCheckpoint implements Checkpoint { @Getter private CheckpointException failureCause; + @Setter ScheduledFuture checkpointTimeOutFuture; + public PendingCheckpoint( long jobId, int pipelineId, @@ -174,6 +178,15 @@ public void abortCheckpoint(CheckpointCloseReason closedReason, @Nullable Throwa } } + // Avoid memory leak in ScheduledThreadPoolExecutor due to overly long timeout settings causing + // numerous completed checkpoints to remain + public void abortCheckpointTimeoutFutureWhenIsCompleted() { + if (checkpointTimeOutFuture == null) { + return; + } + checkpointTimeOutFuture.cancel(false); + } + public String getInfo() { return String.format( "%s/%s/%s, %s", From 476d8bad8c119e2a31ab68c37c2daf2d56527f02 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 21 Sep 2023 21:44:42 +0800 Subject: [PATCH 3/4] fix --- .../checkpoint/CheckpointCoordinator.java | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 62568b046a9..df3dc6d3116 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -524,30 +524,24 @@ private void startTriggerPendingCheckpoint( checkpointTimeout = coordinatorConfig.getSchemaChangeCheckpointTimeout(); } // TODO Need change to polling check until max timeout fails - pendingCheckpoints - .get(pendingCheckpoint.getCheckpointId()) - .setCheckpointTimeOutFuture( - scheduler.schedule( - () -> { - // If any task is not acked within the checkpoint - // timeout - if (pendingCheckpoints.get( - pendingCheckpoint - .getCheckpointId()) - != null - && !pendingCheckpoint - .isFullyAcknowledged()) { - LOG.info( - "timeout checkpoint: " - + pendingCheckpoint.getInfo()); - handleCoordinatorError( - CheckpointCloseReason - .CHECKPOINT_EXPIRED, - null); - } - }, - checkpointTimeout, - TimeUnit.MILLISECONDS)); + pendingCheckpoint.setCheckpointTimeOutFuture( + scheduler.schedule( + () -> { + // If any task is not acked within the checkpoint + // timeout + if (pendingCheckpoints.get( + pendingCheckpoint.getCheckpointId()) + != null + && !pendingCheckpoint.isFullyAcknowledged()) { + LOG.info( + "timeout checkpoint: " + + pendingCheckpoint.getInfo()); + handleCoordinatorError( + CheckpointCloseReason.CHECKPOINT_EXPIRED, null); + } + }, + checkpointTimeout, + TimeUnit.MILLISECONDS)); }); } From 7456f00cf01e06b994edcf27bd5142743a5059f6 Mon Sep 17 00:00:00 2001 From: liuli Date: Fri, 22 Sep 2023 11:29:14 +0800 Subject: [PATCH 4/4] [Improve][zeta] Optimize memory usage of completedCheckpoints --- .../server/checkpoint/CheckpointCoordinator.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index df3dc6d3116..e3add95cb9d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -112,7 +112,7 @@ public class CheckpointCoordinator { private final Set readyToCloseStartingTask; private final ConcurrentHashMap pendingCheckpoints; - private final ArrayDeque completedCheckpoints; + private final ArrayDeque completedCheckpointIds; private volatile CompletedCheckpoint latestCompletedCheckpoint = null; @@ -166,7 +166,7 @@ public CheckpointCoordinator( this.plan = plan; this.coordinatorConfig = checkpointConfig; this.pendingCheckpoints = new ConcurrentHashMap<>(); - this.completedCheckpoints = + this.completedCheckpointIds = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1); this.scheduler = Executors.newScheduledThreadPool( @@ -702,7 +702,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCompletedTimestamp()); final long checkpointId = completedCheckpoint.getCheckpointId(); - completedCheckpoints.addLast(completedCheckpoint); + completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId())); try { byte[] states = serializer.serialize(completedCheckpoint); checkpointStorage.storeCheckPoint( @@ -712,18 +712,17 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed .pipelineId(pipelineId) .states(states) .build()); - if (completedCheckpoints.size() + if (completedCheckpointIds.size() % coordinatorConfig.getStorage().getMaxRetainedCheckpoints() == 0 - && completedCheckpoints.size() + && completedCheckpointIds.size() / coordinatorConfig.getStorage().getMaxRetainedCheckpoints() > 1) { List needDeleteCheckpointId = new ArrayList<>(); for (int i = 0; i < coordinatorConfig.getStorage().getMaxRetainedCheckpoints(); i++) { - needDeleteCheckpointId.add( - completedCheckpoints.removeFirst().getCheckpointId() + ""); + needDeleteCheckpointId.add(completedCheckpointIds.removeFirst()); } checkpointStorage.deleteCheckpoint( String.valueOf(completedCheckpoint.getJobId()),