Skip to content

Commit

Permalink
[Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y authored May 29, 2023
1 parent 7a81fd3 commit ea44042
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Constant {

public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap";

public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";

public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,8 +52,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

/**
* Used to manage all checkpoints for a job.
*
Expand Down Expand Up @@ -99,8 +96,6 @@ public CheckpointManager(
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
IMap<Integer, Long> checkpointIdMap =
nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId));
this.coordinatorMap =
checkpointPlanMap
.values()
Expand All @@ -109,7 +104,7 @@ public CheckpointManager(
plan -> {
IMapCheckpointIDCounter idCounter =
new IMapCheckpointIDCounter(
plan.getPipelineId(), checkpointIdMap);
jobId, plan.getPipelineId(), nodeEngine);
try {
idCounter.start();
PipelineState pipelineState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

public class IMapCheckpointIDCounter implements CheckpointIDCounter {

private final Long jobID;
private final Integer pipelineId;
private final IMap<Integer, Long> checkpointIdMap;
private final String key;
private final IMap<String, Long> checkpointIdMap;

public IMapCheckpointIDCounter(Integer pipelineId, IMap<Integer, Long> checkpointIdMap) {
public IMapCheckpointIDCounter(Long jobID, Integer pipelineId, NodeEngine nodeEngine) {
this.jobID = jobID;
this.pipelineId = pipelineId;
this.checkpointIdMap = checkpointIdMap;
this.key = convertLongIntToBase64(jobID, pipelineId);
this.checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(IMAP_CHECKPOINT_ID);
}

@Override
public void start() throws Exception {
RetryUtils.retryWithException(
() -> {
return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
return checkpointIdMap.putIfAbsent(key, INITIAL_CHECKPOINT_ID);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
Expand All @@ -54,25 +63,41 @@ public void start() throws Exception {
@Override
public CompletableFuture<Void> shutdown(PipelineStatus pipelineStatus) {
if (pipelineStatus.isEndState()) {
checkpointIdMap.remove(pipelineId);
checkpointIdMap.remove(key);
}
return CompletableFuture.completedFuture(null);
}

@Override
public long getAndIncrement() throws Exception {
Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1);
Long nextId = checkpointIdMap.compute(key, (k, v) -> v == null ? null : v + 1);
checkNotNull(nextId);
return nextId - 1;
}

@Override
public long get() {
return checkpointIdMap.get(pipelineId);
return checkpointIdMap.get(key);
}

@Override
public void setCount(long newId) throws Exception {
checkpointIdMap.put(pipelineId, newId);
checkpointIdMap.put(key, newId);
}

public static String convertLongIntToBase64(long longValue, int intValue) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
buffer.putLong(longValue);
buffer.putInt(intValue);
byte[] bytes = buffer.array();
return Base64.getEncoder().encodeToString(bytes);
}

public static long[] convertBase64ToLongInt(String encodedStr) {
byte[] decodedBytes = Base64.getDecoder().decode(encodedStr);
ByteBuffer buffer = ByteBuffer.wrap(decodedBytes);
long longValue = buffer.getLong();
int intValue = buffer.getInt();
return new long[] {longValue, intValue};
}
}

0 comments on commit ea44042

Please sign in to comment.