diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index bbb01167fd8..12e13bc1949 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -53,7 +53,7 @@ public class Constant { public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap"; - public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d"; + public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map"; public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics"; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 971266e091a..48bc9a45433 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -42,7 +42,6 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; -import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.extern.slf4j.Slf4j; @@ -53,8 +52,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID; - /** * Used to manage all checkpoints for a job. * @@ -99,8 +96,6 @@ public CheckpointManager( CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage()) .create(checkpointConfig.getStorage().getStoragePluginConfig()); - IMap checkpointIdMap = - nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId)); this.coordinatorMap = checkpointPlanMap .values() @@ -109,7 +104,7 @@ public CheckpointManager( plan -> { IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter( - plan.getPipelineId(), checkpointIdMap); + jobId, plan.getPipelineId(), nodeEngine); try { idCounter.start(); PipelineState pipelineState = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java index 153e7b652bd..ffbcea0e115 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java @@ -24,25 +24,34 @@ import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.map.IMap; +import com.hazelcast.spi.impl.NodeEngine; +import java.nio.ByteBuffer; +import java.util.Base64; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID; public class IMapCheckpointIDCounter implements CheckpointIDCounter { + + private final Long jobID; private final Integer pipelineId; - private final IMap checkpointIdMap; + private final String key; + private final IMap checkpointIdMap; - public IMapCheckpointIDCounter(Integer pipelineId, IMap checkpointIdMap) { + public IMapCheckpointIDCounter(Long jobID, Integer pipelineId, NodeEngine nodeEngine) { + this.jobID = jobID; this.pipelineId = pipelineId; - this.checkpointIdMap = checkpointIdMap; + this.key = convertLongIntToBase64(jobID, pipelineId); + this.checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(IMAP_CHECKPOINT_ID); } @Override public void start() throws Exception { RetryUtils.retryWithException( () -> { - return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID); + return checkpointIdMap.putIfAbsent(key, INITIAL_CHECKPOINT_ID); }, new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, @@ -54,25 +63,41 @@ public void start() throws Exception { @Override public CompletableFuture shutdown(PipelineStatus pipelineStatus) { if (pipelineStatus.isEndState()) { - checkpointIdMap.remove(pipelineId); + checkpointIdMap.remove(key); } return CompletableFuture.completedFuture(null); } @Override public long getAndIncrement() throws Exception { - Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1); + Long nextId = checkpointIdMap.compute(key, (k, v) -> v == null ? null : v + 1); checkNotNull(nextId); return nextId - 1; } @Override public long get() { - return checkpointIdMap.get(pipelineId); + return checkpointIdMap.get(key); } @Override public void setCount(long newId) throws Exception { - checkpointIdMap.put(pipelineId, newId); + checkpointIdMap.put(key, newId); + } + + public static String convertLongIntToBase64(long longValue, int intValue) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + buffer.putLong(longValue); + buffer.putInt(intValue); + byte[] bytes = buffer.array(); + return Base64.getEncoder().encodeToString(bytes); + } + + public static long[] convertBase64ToLongInt(String encodedStr) { + byte[] decodedBytes = Base64.getDecoder().decode(encodedStr); + ByteBuffer buffer = ByteBuffer.wrap(decodedBytes); + long longValue = buffer.getLong(); + int intValue = buffer.getInt(); + return new long[] {longValue, intValue}; } }