Skip to content

Commit

Permalink
[Hotfix][Zeta] The pipeline needs to wait for the CheckpointCoordinat…
Browse files Browse the repository at this point in the history
…or to end (#4272)
  • Loading branch information
EricJoy2048 authored Mar 6, 2023
1 parent 1e912f2 commit d2b1bf4
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class CheckpointCoordinator {

private final ExecutorService executorService;

private CompletableFuture<CheckpointCoordinatorState> checkpointCoordinatorFuture;

@SneakyThrows
public CheckpointCoordinator(
CheckpointManager manager,
Expand Down Expand Up @@ -168,6 +170,7 @@ public CheckpointCoordinator(
this.latestCompletedCheckpoint =
serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
}
this.checkpointCoordinatorFuture = new CompletableFuture();
}

public int getPipelineId() {
Expand Down Expand Up @@ -210,8 +213,13 @@ private void handleCoordinatorError(String message, Throwable e, CheckpointClose
}

private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
CheckpointException checkpointException = new CheckpointException(reason, e);
cleanPendingCheckpoint(reason);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(reason, e));
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED,
ExceptionUtils.getMessage(checkpointException)));
checkpointManager.handleCheckpointError(pipelineId);
}

private void restoreTaskState(TaskLocation taskLocation) {
Expand Down Expand Up @@ -282,6 +290,7 @@ protected void readyToClose(TaskLocation taskLocation) {

protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
checkpointCoordinatorFuture = new CompletableFuture<>();
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
if (alreadyStarted) {
Expand Down Expand Up @@ -653,6 +662,8 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
latestCompletedCheckpoint = completedCheckpoint;
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null));
}
}

Expand All @@ -669,7 +680,8 @@ public boolean isCompleted() {
if (latestCompletedCheckpoint == null) {
return false;
}
return latestCompletedCheckpoint.getCheckpointType() == COMPLETED_POINT_TYPE;
return latestCompletedCheckpoint.getCheckpointType() == COMPLETED_POINT_TYPE
|| latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
}

public boolean isEndOfSavePoint() {
Expand All @@ -678,4 +690,21 @@ public boolean isEndOfSavePoint() {
}
return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
}

public PassiveCompletableFuture<CheckpointCoordinatorState>
waitCheckpointCoordinatorComplete() {
return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
}

public PassiveCompletableFuture<CheckpointCoordinatorState> cancelCheckpoint() {
// checkpoint maybe already failed before all tasks complete.
if (checkpointCoordinatorFuture.isDone()) {
return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
}
cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
CheckpointCoordinatorState checkpointCoordinatorState =
new CheckpointCoordinatorState(CheckpointCoordinatorStatus.CANCELED, null);
checkpointCoordinatorFuture.complete(checkpointCoordinatorState);
return new PassiveCompletableFuture<>(checkpointCoordinatorFuture);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.checkpoint;

import lombok.Getter;

@Getter
public class CheckpointCoordinatorState {

private final CheckpointCoordinatorStatus checkpointCoordinatorStatus;

private final String throwableMsg;

public CheckpointCoordinatorState(
CheckpointCoordinatorStatus checkpointCoordinatorStatus, String throwableMsg) {
this.checkpointCoordinatorStatus = checkpointCoordinatorStatus;
this.throwableMsg = throwableMsg;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.checkpoint;

public enum CheckpointCoordinatorStatus {
FINISHED,

CANCELED,

FAILED;
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}

protected void handleCheckpointError(int pipelineId, Throwable e) {
jobMaster.handleCheckpointError(pipelineId, e);
protected void handleCheckpointError(int pipelineId) {
jobMaster.handleCheckpointError(pipelineId);
}

private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
Expand Down Expand Up @@ -211,18 +211,6 @@ public void readyToClose(TaskLocation taskLocation) {
getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
}

/**
* Called by the JobMaster. <br>
* Listen to the {@link PipelineStatus} of the {@link SubPlan}, which is used to cancel the
* running {@link PendingCheckpoint} when the SubPlan is abnormal.
*/
public CompletableFuture<Void> listenPipelineRetry(
int pipelineId, PipelineStatus pipelineStatus) {
getCheckpointCoordinator(pipelineId)
.cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
return CompletableFuture.completedFuture(null);
}

/**
* Called by the JobMaster. <br>
* Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is used to shut down the
Expand Down Expand Up @@ -287,4 +275,27 @@ protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation)
jobMaster.queryTaskGroupAddress(
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}

/**
* Call By JobMaster If all the tasks canceled or some task failed, JobMaster will call this
* method to cancel checkpoint coordinator.
*
* @param pipelineId
* @return
*/
public PassiveCompletableFuture<CheckpointCoordinatorState> cancelCheckpoint(int pipelineId) {
return getCheckpointCoordinator(pipelineId).cancelCheckpoint();
}

/**
* Call By JobMaster If all the tasks is finished, JobMaster will call this method to wait
* checkpoint coordinator complete.
*
* @param pipelineId
* @return
*/
public PassiveCompletableFuture<CheckpointCoordinatorState> waitCheckpointCoordinatorComplete(
int pipelineId) {
return getCheckpointCoordinator(pipelineId).waitCheckpointCoordinatorComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public void run() throws Exception {
true,
exception ->
exception instanceof TaskGroupContextNotFoundException
&& !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
|| exception instanceof NullPointerException
&& !server.taskIsEnded(
taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ public void addPipelineEndCallback(SubPlan subPlan) {
future.thenAcceptAsync(
pipelineState -> {
try {
// Notify checkpoint manager when the pipeline end, Whether the pipeline
// will be restarted or not
LOGGER.info(
String.format(
"received pipeline %s callback, state %s",
subPlan.getPipelineFullName(),
pipelineState.getPipelineStatus()));
if (jobMaster.getCheckpointManager() != null) {
jobMaster
.getCheckpointManager()
.listenPipelineRetry(
subPlan.getPipelineLocation().getPipelineId(),
pipelineState.getPipelineStatus())
.join();
}
if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
if (subPlan.canRestorePipeline()) {
subPlan.restorePipeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorStatus;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
Expand Down Expand Up @@ -92,8 +94,6 @@ public class SubPlan {

private final Object restoreLock = new Object();

private Throwable checkpointThrowable;

public SubPlan(
int pipelineId,
int totalPipelineNum,
Expand Down Expand Up @@ -179,17 +179,47 @@ private void addPhysicalVertexCallBack(PassiveCompletableFuture<TaskExecutionSta
PipelineStatus pipelineStatus = null;
if (failedTaskNum.get() > 0) {
pipelineStatus = PipelineStatus.FAILED;
// we don't care the checkpoint error reason when the task is
// failed.
jobMaster
.getCheckpointManager()
.cancelCheckpoint(getPipelineId())
.join();
} else if (canceledTaskNum.get() > 0) {
if (checkpointThrowable != null) {
pipelineStatus = PipelineStatus.CANCELED;
CheckpointCoordinatorState checkpointCoordinatorState =
jobMaster
.getCheckpointManager()
.cancelCheckpoint(getPipelineId())
.join();
if (CheckpointCoordinatorStatus.FAILED.equals(
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.FAILED;
errorByPhysicalVertex.set(
ExceptionUtils.getMessage(checkpointThrowable));
checkpointThrowable = null;
} else {
pipelineStatus = PipelineStatus.CANCELED;
checkpointCoordinatorState.getThrowableMsg());
}
} else {
pipelineStatus = PipelineStatus.FINISHED;
CheckpointCoordinatorState checkpointCoordinatorState =
jobMaster
.getCheckpointManager()
.waitCheckpointCoordinatorComplete(getPipelineId())
.join();

if (CheckpointCoordinatorStatus.FAILED.equals(
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.FAILED;
errorByPhysicalVertex.set(
checkpointCoordinatorState.getThrowableMsg());
} else if (CheckpointCoordinatorStatus.CANCELED.equals(
checkpointCoordinatorState
.getCheckpointCoordinatorStatus())) {
pipelineStatus = PipelineStatus.CANCELED;
errorByPhysicalVertex.set(
checkpointCoordinatorState.getThrowableMsg());
}
}

if (!checkNeedRestore(pipelineStatus)) {
Expand Down Expand Up @@ -339,10 +369,7 @@ public synchronized void cancelPipeline() {

private void cancelCheckpointCoordinator() {
if (jobMaster.getCheckpointManager() != null) {
jobMaster
.getCheckpointManager()
.listenPipelineRetry(pipelineId, PipelineStatus.CANCELING)
.join();
jobMaster.getCheckpointManager().cancelCheckpoint(pipelineId).join();
}
}

Expand Down Expand Up @@ -524,12 +551,10 @@ public int getPipelineRestoreNum() {
return pipelineRestoreNum;
}

public void handleCheckpointError(Throwable e) {
public void handleCheckpointError() {
LOGGER.warning(
String.format(
"%s checkpoint have error, cancel the pipeline", getPipelineFullName()),
e);
this.checkpointThrowable = e;
"%s checkpoint have error, cancel the pipeline", getPipelineFullName()));
this.cancelPipeline();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ public void run() {
}
}

public void handleCheckpointError(long pipelineId, Throwable e) {
public void handleCheckpointError(long pipelineId) {
this.physicalPlan
.getPipelineList()
.forEach(
pipeline -> {
if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
pipeline.handleCheckpointError(e);
pipeline.handleCheckpointError();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
Expand Down Expand Up @@ -136,8 +134,7 @@ public void testHandleCheckpointTimeout() throws Exception {
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));

// call checkpoint timeout
jobMaster.handleCheckpointError(
1, new CheckpointException(CheckpointCloseReason.CHECKPOINT_EXPIRED));
jobMaster.handleCheckpointError(1);

// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
// status become running again
Expand Down

0 comments on commit d2b1bf4

Please sign in to comment.