Skip to content

Commit

Permalink
fix: reactivate jobs when they could not be sent back to client
Browse files Browse the repository at this point in the history
(cherry picked from commit 8ef7c6f)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent a73e1c5 commit 78ac46a
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 50 deletions.
4 changes: 2 additions & 2 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void start() throws IOException {
brokerClient = buildBrokerClient();

final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActivateJobsActor((Consumer<ActorControl>) activateJobsHandler);
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);

final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
Expand Down Expand Up @@ -186,7 +186,7 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActivateJobsActor(final Consumer<ActorControl> consumer) {
private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ private void removeObsoleteRequestsAndUpdateMetrics() {
}

private boolean isObsolete(final InflightActivateJobsRequest request) {
return request.isTimedOut() || request.isCanceled() || request.isCompleted();
return request.isTimedOut()
|| request.isCanceled()
|| request.isCompleted()
|| request.isAborted();
}

public void removeRequest(final InflightActivateJobsRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Objects;
import org.slf4j.Logger;

public final class InflightActivateJobsRequest {
public class InflightActivateJobsRequest {

private static final Logger LOG = Loggers.GATEWAY_LOGGER;
private final long requestId;
Expand Down Expand Up @@ -84,11 +84,24 @@ public boolean isCompleted() {
return isCompleted;
}

/**
* Sends activated jobs to the respective client.
*
* @param activatedJobs to send back to the client
* @return an instance of {@link Either} indicating the following:
* <ul>
* <li>{@link Either#get() == true}: if the activated jobs have been sent back to the client
* <li>{@link Either#get() == false}: if the activated jobs couldn't be sent back to the
* client
* <li>{@link Either#getLeft() != null}: if sending back the activated jobs failed with an
* exception (note: in this case {@link Either#isRight() == false})
* </ul>
*/
public Either<Exception, Boolean> tryToSendActivatedJobs(
final ActivateJobsResponse grpcResponse) {
final ActivateJobsResponse activatedJobs) {
if (isOpen()) {
try {
responseObserver.onNext(grpcResponse);
responseObserver.onNext(activatedJobs);
return Either.right(true);
} catch (final Exception e) {
LOG.warn("Failed to send response to client.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ private void onError(
final Throwable error) {
actor.submit(
() -> {
state.removeActiveRequest(request);
request.onError(error);
state.removeActiveRequest(request);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import io.camunda.zeebe.gateway.impl.broker.RequestDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.RoundRobinDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerFailJobRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.ActorControl;
import io.grpc.protobuf.StatusProto;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -148,19 +151,21 @@ private void handleResponseSuccess(
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);

final var jobsCount = grpcResponse.getJobsCount();
final var jobsActivated = jobsCount > 0;

if (jobsActivated) {
final var result = request.tryToSendActivatedJobs(grpcResponse);
final var responseWasSent = result.isRight() && result.get();
final var responseWasSent = result.getOrElse(false);

if (!responseWasSent) {
final var reason = createReasonMessage(result);
final var activatedJobsToReactivate = grpcResponse.getJobsList();
final var jobKeys = response.getJobKeys();
final var jobType = request.getType();
final var reason = createReasonMessage(result);

logResponseNotSent(jobType, reason);
logResponseNotSent(jobType, jobKeys, reason);
reactivateJobs(activatedJobsToReactivate, reason);
cancelActivateJobsRequest(reason, delegate);
return;
}
Expand All @@ -186,6 +191,30 @@ private String createReasonMessage(final Either<Exception, Boolean> resultValue)
return errorMessage;
}

private void reactivateJobs(final List<ActivatedJob> activateJobs, final String message) {
if (activateJobs != null) {
activateJobs.forEach(j -> tryToReactivateJob(j, message));
}
}

private void tryToReactivateJob(final ActivatedJob job, final String message) {
final var request = toFailJobRequest(job, message);
brokerClient
.sendRequestWithRetry(request)
.whenComplete(
(response, error) -> {
if (error != null) {
Loggers.GATEWAY_LOGGER.info(
"Failed to reactivate job {} due to {}", job.getKey(), error.getMessage());
}
});
}

private BrokerFailJobRequest toFailJobRequest(final ActivatedJob job, final String errorMessage) {
return new BrokerFailJobRequest(job.getKey(), job.getRetries(), 0)
.setErrorMessage(errorMessage);
}

private void cancelActivateJobsRequest(
final String reason, final ResponseObserverDelegate delegate) {
final var status = Status.newBuilder().setCode(Code.CANCELLED_VALUE).setMessage(reason).build();
Expand Down Expand Up @@ -231,9 +260,14 @@ private void logErrorResponse(final int partition, final String jobType, final T
"Failed to activate jobs for type {} from partition {}", jobType, partition, error);
}

private void logResponseNotSent(final String jobType, final String reason) {
private void logResponseNotSent(
final String jobType, final List<Long> jobKeys, final String reason) {
Loggers.GATEWAY_LOGGER.debug(
"Failed to send back activated jobs for type {}, because: {}", jobType, reason);
"Failed to send {} activated jobs for type {} (with job keys: {}) to client, because: {}",
jobKeys.size(),
jobType,
jobKeys,
reason);
}

private PartitionIdIterator partitionIdIteratorForType(
Expand Down
Loading

0 comments on commit 78ac46a

Please sign in to comment.