Skip to content

Commit

Permalink
[Fix][Zeta] Fix release slot resource twice (apache#7236)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent 4bc630a commit e9e51dd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public void testJobRetryTimes() throws IOException, InterruptedException {
"Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf"));
}

@Test
public void testNoDuplicatedReleaseSlot() throws IOException, InterruptedException {
Container.ExecResult execResult =
executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertFalse(
server.getLogs().contains("wrong target release operation with job"));
}

@Test
public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException {
Container.ExecResult execResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
Expand All @@ -92,6 +91,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -146,6 +147,8 @@ public class JobMaster {

private Map<Integer, CheckpointPlan> checkpointPlanMap;

private final Map<Integer, List<SlotProfile>> releasedSlotWhenTaskGroupFinished;

private final IMap<Long, JobInfo> runningJobInfoIMap;

private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
Expand Down Expand Up @@ -190,6 +193,7 @@ public JobMaster(
this.engineConfig = engineConfig;
this.metricsImap = metricsImap;
this.seaTunnelServer = seaTunnelServer;
this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
}

public synchronized void init(long initializationTimestamp, boolean restart) throws Exception {
Expand Down Expand Up @@ -464,13 +468,17 @@ public void releaseTaskGroupResource(
jobImmutableInformation.getJobId(),
Collections.singletonList(taskGroupSlotProfile))
.join();

releasedSlotWhenTaskGroupFinished
.computeIfAbsent(
pipelineLocation.getPipelineId(),
k -> new CopyOnWriteArrayList<>())
.add(taskGroupSlotProfile);
return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(
Expand All @@ -487,6 +495,11 @@ public void releasePipelineResource(SubPlan subPlan) {
if (taskGroupLocationSlotProfileMap == null) {
return;
}
List<SlotProfile> alreadyReleased = new ArrayList<>();
if (releasedSlotWhenTaskGroupFinished.containsKey(subPlan.getPipelineId())) {
alreadyReleased.addAll(
releasedSlotWhenTaskGroupFinished.get(subPlan.getPipelineId()));
}

RetryUtils.retryWithException(
() -> {
Expand All @@ -497,10 +510,12 @@ public void releasePipelineResource(SubPlan subPlan) {
resourceManager
.releaseResources(
jobImmutableInformation.getJobId(),
Lists.newArrayList(
taskGroupLocationSlotProfileMap.values()))
taskGroupLocationSlotProfileMap.values().stream()
.filter(p -> !alreadyReleased.contains(p))
.collect(Collectors.toList()))
.join();
ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
releasedSlotWhenTaskGroupFinished.remove(subPlan.getPipelineId());
return null;
},
new RetryUtils.RetryMaterial(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void run() {
SeaTunnelServer server = getService();
try {
response =
server.getTaskExecutionService().getExecutionContext(taskGroupLocation) != null;
server.getTaskExecutionService().getActiveExecutionContext(taskGroupLocation)
!= null;
} catch (TaskGroupContextNotFoundException e) {
response = false;
}
Expand Down

0 comments on commit e9e51dd

Please sign in to comment.