Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter #4832

Merged
merged 1 commit into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Constant {

public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap";

public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";

public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,8 +52,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

/**
* Used to manage all checkpoints for a job.
*
Expand Down Expand Up @@ -99,8 +96,6 @@ public CheckpointManager(
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
IMap<Integer, Long> checkpointIdMap =
nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId));
this.coordinatorMap =
checkpointPlanMap
.values()
Expand All @@ -109,7 +104,7 @@ public CheckpointManager(
plan -> {
IMapCheckpointIDCounter idCounter =
new IMapCheckpointIDCounter(
plan.getPipelineId(), checkpointIdMap);
jobId, plan.getPipelineId(), nodeEngine);
try {
idCounter.start();
PipelineState pipelineState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

public class IMapCheckpointIDCounter implements CheckpointIDCounter {

private final Long jobID;
private final Integer pipelineId;
private final IMap<Integer, Long> checkpointIdMap;
private final String key;
private final IMap<String, Long> checkpointIdMap;

public IMapCheckpointIDCounter(Integer pipelineId, IMap<Integer, Long> checkpointIdMap) {
public IMapCheckpointIDCounter(Long jobID, Integer pipelineId, NodeEngine nodeEngine) {
this.jobID = jobID;
this.pipelineId = pipelineId;
this.checkpointIdMap = checkpointIdMap;
this.key = convertLongIntToBase64(jobID, pipelineId);
this.checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(IMAP_CHECKPOINT_ID);
}

@Override
public void start() throws Exception {
RetryUtils.retryWithException(
() -> {
return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
return checkpointIdMap.putIfAbsent(key, INITIAL_CHECKPOINT_ID);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
Expand All @@ -54,25 +63,41 @@ public void start() throws Exception {
@Override
public CompletableFuture<Void> shutdown(PipelineStatus pipelineStatus) {
if (pipelineStatus.isEndState()) {
checkpointIdMap.remove(pipelineId);
checkpointIdMap.remove(key);
}
return CompletableFuture.completedFuture(null);
}

@Override
public long getAndIncrement() throws Exception {
Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1);
Long nextId = checkpointIdMap.compute(key, (k, v) -> v == null ? null : v + 1);
checkNotNull(nextId);
return nextId - 1;
}

@Override
public long get() {
return checkpointIdMap.get(pipelineId);
return checkpointIdMap.get(key);
}

@Override
public void setCount(long newId) throws Exception {
checkpointIdMap.put(pipelineId, newId);
checkpointIdMap.put(key, newId);
}

public static String convertLongIntToBase64(long longValue, int intValue) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
buffer.putLong(longValue);
buffer.putInt(intValue);
byte[] bytes = buffer.array();
return Base64.getEncoder().encodeToString(bytes);
}

public static long[] convertBase64ToLongInt(String encodedStr) {
byte[] decodedBytes = Base64.getDecoder().decode(encodedStr);
ByteBuffer buffer = ByteBuffer.wrap(decodedBytes);
long longValue = buffer.getLong();
int intValue = buffer.getInt();
return new long[] {longValue, intValue};
}
}