Skip to content

Commit

Permalink
[Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Befor…
Browse files Browse the repository at this point in the history
…e Cancel Task (apache#3838)

* [Improve] [SeaTunnel-Engine] Cancel CheckpointCoordinator First Before Cancel Task
  • Loading branch information
Hisoka-X authored and lhyundeadsoul committed Jan 3, 2023
1 parent e60f529 commit 871f0b8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, TaskE
invoke.get();
notifyStateSuccess = true;
} catch (InterruptedException e) {
logger.severe(e);
Thread.interrupted();
logger.severe("send notify task status failed", e);
} catch (ExecutionException e) {
logger.warning(ExceptionUtils.getMessage(e));
logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
Expand All @@ -278,7 +277,6 @@ private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, TaskE
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
logger.severe(e);
Thread.interrupted();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,20 +306,23 @@ private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {

private boolean turnToEndState(@NonNull ExecutionState endState) {
synchronized (this) {
// consistency check
ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
if (currentState.isEndState()) {
String message = String.format("Task %s is already in terminal state %s", taskFullName, currentState);
LOGGER.warning(message);
return false;
}
if (!endState.isEndState()) {
String message =
String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName,
endState);
LOGGER.warning(message);
return false;
}
// consistency check
ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
if (currentState.equals(endState)) {
return true;
}
if (currentState.isEndState()) {
String message = String.format("Task %s is already in terminal state %s", taskFullName, currentState);
LOGGER.warning(message);
return false;
}

updateStateTimestamps(endState);
runningJobStateIMap.set(taskGroupLocation, endState);
Expand Down Expand Up @@ -372,10 +375,6 @@ public boolean updateTaskState(@NonNull ExecutionState current, @NonNull Executi
}
}

public TaskGroupDefaultImpl getTaskGroup() {
return taskGroup;
}

public void cancel() {
if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED) ||
Expand All @@ -392,6 +391,7 @@ private void noticeTaskExecutionServiceCancel() {
if (!checkTaskGroupIsExecuting(taskGroupLocation)){
updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED);
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
return;
}
int i = 0;
// In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed
Expand All @@ -415,7 +415,6 @@ private void noticeTaskExecutionServiceCancel() {
}
}
}
this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null));
}

private void updateStateTimestamps(@NonNull ExecutionState targetState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,14 @@ public void cancelPipeline() {
if (!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
}
cancelCheckpointCoordinator();
cancelPipelineTasks();
}

private void cancelCheckpointCoordinator() {
jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, PipelineStatus.CANCELING).join();
}

private void cancelPipelineTasks() {
List<CompletableFuture<Void>> coordinatorCancelList =
coordinatorVertexList.stream().map(this::cancelTask).filter(Objects::nonNull)
Expand Down

0 comments on commit 871f0b8

Please sign in to comment.