Skip to content

Commit

Permalink
[Fix][Zeta] Fix task can not end cause by lock metrics failed (#7357)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Aug 13, 2024
1 parent 7b19df5 commit 6a7df83
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,8 +39,10 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.map.IMap;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,11 +78,17 @@ public void testSayHello() {

@Test
public void testExecuteJob() throws Exception {
runJobFileWithAssertEndStatus(
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
}

private static void runJobFileWithAssertEndStatus(
String confFile, String name, JobStatus finished)
throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_fakesource_to_file.conf");
String filePath = TestUtils.getResource(confFile);
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");

jobConfig.setName(name);
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
Expand All @@ -94,11 +105,25 @@ public void testExecuteJob() throws Exception {
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FINISHED.equals(
&& finished.equals(
objectCompletableFuture.get())));
}
}

@Test
public void testExecuteJobWithLockMetrics() throws Exception {
// lock metrics map
IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
try {
runJobFileWithAssertEndStatus(
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
} finally {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
}
}

@Test
public void cancelJobTest() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down Expand Up @@ -229,29 +254,9 @@ void afterClass() {

@Test
public void testLastCheckpointErrorJob() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("batch_last_checkpoint_error");

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FAILED.equals(
objectCompletableFuture.get())));
}
runJobFileWithAssertEndStatus(
"batch_last_checkpoint_error.conf",
"batch_last_checkpoint_error",
JobStatus.FAILED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,14 @@ void taskDone(Task task) {
cancellationFutures.remove(taskGroupLocation);
try {
cancelAsyncFunction(taskGroupLocation);
} catch (Throwable e) {
throw new RuntimeException(e);
} catch (Throwable t) {
logger.severe("cancel async function failed", t);
}
try {
updateMetricsContextInImap();
} catch (Throwable t) {
logger.severe("update metrics context in imap failed", t);
}
updateMetricsContextInImap();
if (ex == null) {
logger.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,14 @@ private void subPlanDone(PipelineStatus pipelineStatus) {
RetryUtils.retryWithException(
() -> {
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
try {
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
} catch (Throwable e) {
log.error(
"Remove metrics context for pipeline {} failed, with exception: {}",
pipelineFullName,
ExceptionUtils.getMessage(e));
}
notifyCheckpointManagerPipelineEnd(pipelineStatus);
jobMaster.releasePipelineResource(this);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
Expand Down Expand Up @@ -678,8 +679,13 @@ public void removeMetricsContext(

boolean lockedIMap = false;
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
lockedIMap = true;
lockedIMap =
metricsImap.tryLock(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, TimeUnit.SECONDS);
if (!lockedIMap) {
LOGGER.severe("lock imap failed in update metrics");
return;
}

HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
Expand All @@ -697,6 +703,8 @@ public void removeMetricsContext(
collect.forEach(centralMap::remove);
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
}
} catch (Exception e) {
LOGGER.warning("failed to remove metrics context", e);
} finally {
if (lockedIMap) {
boolean unLockedIMap = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.seatunnel.engine.server.execution.TestTask;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,8 +64,6 @@ public void before() {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCancel() {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand All @@ -92,8 +89,6 @@ public void testCancel() {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCancelBlockTask() throws InterruptedException {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand All @@ -118,8 +113,6 @@ public void testCancelBlockTask() throws InterruptedException {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testFinish() {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand Down Expand Up @@ -150,8 +143,6 @@ public void testFinish() {

/** Test task execution time is the same as the timer timeout */
@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCriticalCallTime() throws InterruptedException {
AtomicBoolean stopMark = new AtomicBoolean(false);
CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -189,8 +180,6 @@ public void testCriticalCallTime() throws InterruptedException {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testThrowException() throws InterruptedException {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand Down Expand Up @@ -264,8 +253,6 @@ public void testThrowException() throws InterruptedException {
}

@RepeatedTest(2)
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testDelay() throws InterruptedException {

long lowLagSleep = 10;
Expand Down

0 comments on commit 6a7df83

Please sign in to comment.